Shortcuts

Source code for torchtune.modules.model_fusion._fusion_layers

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import torch
from torch import nn


[docs]class FusionLayer(nn.Module): """Fusion layer as introduced in `Flamingo: a Visual Language Model for Few-Shot Learning <https://arxiv.org/abs/2204.14198>`_. Deep Fusion model architectures combine pretrained encoder models with pretrained language models by infusing the encoder outputs into the middle layers of the LLM. This allows the language model to interpret the enocder outputs as text and "understand" any modality for which you can train an encoder. To enable the language model to adapt to the encoder outputs, the FusionLayer fuses a new learnable layer to an existing decoder (language model) layer. This additional layer can take the encoder embeddings and learn to combine them with the token embeddings from the decoder. The module supports fusing the new layer before or after the original, in Flamingo the new layer is fused before the original. The original layer is wrapped in FusionLayer such that it maintains its original state_dict key and the pre-trained checkpoint isn't broken. The new layer parameters are available through ``fusion_params`` to separately control if they're trainable or not. Example: >>> # Original decoder style transformer >>> layer = nn.TransformerSelfAttentionLayer(...) >>> model = TransformerDecoder(layers=layer, num_layers=32, ...) >>> >>> # Fuse a cross attention layer to each self attention layer to adapt for the encoder >>> fusion_layer = nn.TransformerCrossAttentionLayer(...) >>> fused_layer = FusionLayer(layer, fusion_layer) >>> model = TransformerDecoder(layers=fused_layer, num_layers=32, ...) >>> >>> # Original decoder state_dict still works >>> model.load_state_dict(..., strict=False) Args: layer (nn.Module): original decoder layer fusion_layer (nn.Module): new fusion layer fusion_first (bool): boolean to insert fusion layer before or after the decoder layer. """ def __init__( self, layer: nn.Module, fusion_layer: nn.Module, fusion_first: bool = True ): super().__init__() self.layer = layer self.fusion_layer = fusion_layer self.fusion_first = fusion_first # Keep FusionLayer wrappings out of the state_dict self._register_state_dict_hook(FusionLayer._state_dict_hook) self._register_load_state_dict_pre_hook( FusionLayer._load_state_dict_hook, with_module=True ) # TODO: Switch to register_load_state_dict_pre_hook and # register_state_dict_pre_hook after PyTorch v2.5 def _state_dict_hook(self, state_dict, prefix, *args, **kwargs): """Remove "layer" from the original layer in the state_dict name. This keeps the orginal state dict name for the layer from before fusing with the FusionLayer. [!Note] This update changes the order of the OrderedDict """ keys = list(state_dict.keys()) for key in keys: local_key = key[len(prefix) :] if local_key.startswith("layer"): new_key = prefix + local_key.replace("layer.", "") state_dict[new_key] = state_dict[key] del state_dict[key] def _load_state_dict_hook(self, state_dict, prefix, *args, **kwargs): """Apply extra "layer" prefix to the state_dict key to account for the FusionLayer wrapping. """ keys = list(state_dict.keys()) for key in keys: local_key = key[len(prefix) :] if not local_key.startswith("fusion_layer"): new_key = prefix + "layer." + local_key state_dict[new_key] = state_dict[key] del state_dict[key]
[docs] def setup_caches( self, batch_size: int, dtype: torch.dtype, *, encoder_max_seq_len: int, decoder_max_seq_len: int, ) -> None: """Setup key value cache for both layers. Args: batch_size (int): batch size for the caches. dtype (torch.dtype): dtype for the caches. encoder_max_seq_len (int): maximum cache sequence length for cross-attention layer. decoder_max_seq_len (int): maximum cache sequence length for self-attention layer. """ self.layer.setup_caches( batch_size, dtype, encoder_max_seq_len=encoder_max_seq_len, decoder_max_seq_len=decoder_max_seq_len, ) self.fusion_layer.setup_caches( batch_size, dtype, encoder_max_seq_len=encoder_max_seq_len, decoder_max_seq_len=decoder_max_seq_len, )
[docs] def caches_are_setup(self) -> bool: """ Check if the key value caches are setup on ``self.layer``. See :func:~torchtune.modules.TransformerDecoder.caches_are_setup`. """ return self.layer.caches_are_setup()
[docs] def caches_are_enabled(self) -> bool: """ Checks if the key value caches on ``self.layer`` are enabled. See :func:~torchtune.modules.TransformerDecoder.caches_are_enabled`. """ return self.layer.caches_are_enabled()
[docs] def reset_cache(self): """Reset both layers' key value caches.""" self.layer.reset_cache() self.fusion_layer.reset_cache()
[docs] def fusion_params(self) -> list[str]: """ Return parameters of fusion layer. """ fusion_params = [ f"fusion_layer.{k}" for k, v in self.fusion_layer.named_parameters() ] return fusion_params
[docs] def forward(self, x: torch.Tensor, **kwargs: dict) -> torch.Tensor: """ Args: x (torch.Tensor): input tensor with shape [batch_size x seq_length x embed_dim] **kwargs (dict): all additional layer args Returns: Tensor: output tensor with same shape as input [batch_size x seq_length x embed_dim]` """ if self.fusion_first: x = self.fusion_layer(x, **kwargs) x = self.layer(x, **kwargs) else: x = self.layer(x, **kwargs) x = self.fusion_layer(x, **kwargs) return x
[docs]class FusionEmbedding(nn.Module): """Fusion embedding supports training additional special tokens while keeping the original embedding frozen. When fusing new models with a language model, there may be some additional tokens needed to support the fused language model. For example, adding a vision encoder might necessitate additional tokens like ``<|image|>`` to indicate an images position in text and require learning an embedding for this token. The FusionEmbedding keeps the original embeddings frozen while learning a much smaller second embedding for the additional tokens. During forward this module routes the tokens to the appropriate embedding table. Use this as a drop-in replacement for :class:`torch.nn.Embedding` in your model. Example: >>> embedding = FusionEmbedding(vocab_size=100, fusion_vocab_size=10, embed_dim=128) >>> model = TransformerDecoder(tok_embeddings=embedding, ...) >>> >>> # Original model state_dict still works >>> model.load_state_dict(..., strict=False) .. note:: This module assumes all tokens in the range [0, vocab_size) are part of the original embedding table and all new tokens in the range [vocab_size, vocab_size + fusion_vocab_size) Args: vocab_size (int): language model vocab size fusion_vocab_size (int): additional tokens for the fused model embed_dim (int): embedding dimension of the two embedding tables """ def __init__(self, vocab_size: int, fusion_vocab_size: int, embed_dim: int) -> None: super().__init__() self.embedding = nn.Embedding(vocab_size, embed_dim) self.fusion_embedding = nn.Embedding(fusion_vocab_size, embed_dim) self.dim = embed_dim self.num_embeddings = vocab_size + fusion_vocab_size # TODO: Support merging the embeddings after finetuning # Keep FusionLayer wrappings out of the state_dict self._register_state_dict_hook(FusionEmbedding._state_dict_hook) self._register_load_state_dict_pre_hook( FusionEmbedding._load_state_dict_hook, with_module=True ) # TODO: Switch to register_load_state_dict_pre_hook and # register_state_dict_pre_hook after PyTorch v2.5 def _state_dict_hook(self, destination, prefix, keep_vars): """Remove "embedding" from the original embedding in the state_dict name. This keeps the orginal state dict name for the embedding from before fusing with the FusionEmbedding. [!Note] This update changes the order of the OrderedDict """ key = prefix + "embedding.weight" new_key = prefix + "weight" destination[new_key] = destination[key] del destination[key] def _load_state_dict_hook(self, state_dict, prefix, *args, **kwargs): """Apply extra "embedding" prefix to the state_dict key to account for the FusionEmbedding wrapping. """ if state_dict: key = prefix + "weight" new_key = prefix + "embedding.weight" state_dict[new_key] = state_dict[key] del state_dict[key]
[docs] def fusion_params(self) -> list[str]: """ Return fusion embedding parameters. """ fusion_params = ["fusion_embedding.weight"] return fusion_params
def _fused_embed(self, bs, seq_len): """ Return an empty tensor the shape of the combined embedding. """ device = self.embedding.weight.device dtype = self.embedding.weight.dtype return torch.empty(bs, seq_len, self.dim, device=device, dtype=dtype)
[docs] def forward(self, input: torch.Tensor) -> torch.Tensor: """ Args: input (torch.Tensor): input integer tensor with shape [batch_size x seq_length] Returns: Tensor: output tensor embedding with shape [batch_size x seq_length x embed_dim]` """ bs, seq_len = input.size() vocab_size = self.embedding.num_embeddings mask = input < vocab_size # num_tokens = (input < vocab_size).sum() tokens = torch.masked_select(input, mask) # num_fusion_tokens = (input >= vocab_size).sum() fusion_tokens = torch.masked_select(input, ~mask) - vocab_size # [batch_size * num_tokens, embed_dim] embeds = self.embedding(tokens) # [batch_size * num_fusion_tokens, embed_dim] fusion_embeds = self.fusion_embedding(fusion_tokens) # [batch_size x seq_length x embed_dim] out = self._fused_embed(bs, seq_len) mask = mask.unsqueeze(-1).expand(bs, seq_len, self.dim) out = out.masked_scatter(mask, embeds) out = out.masked_scatter(~mask, fusion_embeds) return out

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources