| | from .configuration_transformer import TransformerConfig |
| | import torch |
| | from torch import nn |
| | from torch.nn import init |
| | import torch.nn.functional as F |
| | from torch.nn.parameter import Parameter |
| | from torch import nn |
| | from typing import Tuple, List |
| | from itertools import chain |
| | from transformers.modeling_outputs import ( |
| | BaseModelOutput, |
| | MaskedLMOutput, |
| | CausalLMOutput, |
| | ) |
| | from torch.utils.checkpoint import checkpoint |
| | from transformers.modeling_utils import PreTrainedModel |
| | import math |
| | try: |
| | from flash_attn import flash_attn_varlen_func, flash_attn_func |
| | except ImportError: |
| | pass |
| |
|
| |
|
| | A_LARGE_NEGATIVE_NUMER = -1e10 |
| |
|
| |
|
| | def create_4d_mask(attn_mask, return_type="bool", x=None, causal=False): |
| | B, L = attn_mask.shape |
| | device = attn_mask.device |
| | mask_4d = torch.eq(attn_mask[:, None, :, None], attn_mask[:, None, None, :]) |
| | if causal: |
| | causal_mask = torch.tril(torch.ones(L, L, device=device)).unsqueeze(0).unsqueeze(0) |
| | mask_4d = mask_4d & causal_mask |
| | if return_type == "bool": |
| | return mask_4d.to(torch.bool) |
| | elif return_type == "float": |
| | mask_4d = mask_4d.to(x.dtype) |
| | return mask_4d * 0 + (1 - mask_4d) * A_LARGE_NEGATIVE_NUMER |
| |
|
| |
|
| | def rotate_half(x): |
| | return torch.cat((-x[..., x.shape[-1] // 2 :], x[..., : x.shape[-1] // 2]), dim=-1) |
| |
|
| |
|
| | def apply_rotary_pos_emb(q, k, cos, sin, unsqueeze_dim=1): |
| | cos = cos.unsqueeze(unsqueeze_dim) |
| | sin = sin.unsqueeze(unsqueeze_dim) |
| | q_embed = (q * cos) + (rotate_half(q) * sin) |
| | k_embed = (k * cos) + (rotate_half(k) * sin) |
| | return q_embed, k_embed |
| |
|
| |
|
| | def apply_rotary_pos_emb_1(x, cos, sin, unsqueeze_dim=1): |
| | cos = cos.unsqueeze(unsqueeze_dim) |
| | sin = sin.unsqueeze(unsqueeze_dim) |
| | return (x * cos) + (rotate_half(x) * sin) |
| |
|
| |
|
| | class RMSNorm(nn.Module): |
| | |
| | def __init__(self, dim: int, eps: float = 1e-5): |
| | super().__init__() |
| | self.eps = eps |
| | init_device = None |
| | self.weight = Parameter( |
| | torch.empty(dim, device=init_device, dtype=torch.float32) |
| | ) |
| | init.ones_(self.weight) |
| |
|
| | def _norm(self, x): |
| | return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps) |
| |
|
| | def forward(self, x : torch.Tensor) -> torch.Tensor: |
| | output = self._norm(x.float()).type_as(x) |
| | return output * self.weight |
| |
|
| |
|
| | class TokenEmbedding(nn.Module): |
| |
|
| | def __init__(self, config: TransformerConfig): |
| | super().__init__() |
| | self.config = config |
| | self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size) |
| | if config.embedding_layer_norm: |
| | if config.layernorm_type == "layernorm": |
| | self.rms_norm = nn.LayerNorm( |
| | config.hidden_size, eps=config.layer_norm_eps, bias=False |
| | ) |
| | elif config.layernorm_type == "rmsnorm": |
| | self.rms_norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
| | self.dropout = nn.Dropout(config.hidden_dropout_prob) |
| | self.position_embedding_type = config.position_embedding_type |
| | self.padding_idx = config.pad_token_id |
| | self.token_dropout = config.token_dropout |
| | self.mask_token_id = config.mask_token_id |
| |
|
| | def forward(self, input_ids: torch.Tensor) -> torch.Tensor: |
| | embeddings = self.word_embeddings(input_ids) |
| | if self.config.embedding_shrinking and self.training: |
| | |
| | embeddings = embeddings * 0.1 + embeddings.detach() * 0.9 |
| | if self.config.embedding_layer_norm: |
| | embeddings = self.rms_norm(embeddings) |
| | return embeddings |
| |
|
| |
|
| | class RotaryEmbedding(nn.Module): |
| | |
| | def __init__(self, dim: int, b: int = 10000): |
| | super().__init__() |
| | inv_freq = 1.0 / ( |
| | b ** (torch.arange(0, dim, 2, dtype=torch.int64).float() / dim) |
| | ) |
| | self.register_buffer("inv_freq", inv_freq, persistent=False) |
| |
|
| | @torch.no_grad() |
| | def forward(self, x: torch.Tensor, position_ids: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
| | inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1) |
| | position_ids_expanded = position_ids[:, None, :].float() |
| | with torch.autocast(device_type=x.device.type, enabled=False): |
| | freqs = inv_freq_expanded.float() @ position_ids_expanded.float() |
| | freqs = freqs.transpose(1, 2) |
| | emb = torch.cat((freqs, freqs), dim=-1) |
| | cos = emb.cos() |
| | sin = emb.sin() |
| | return cos.to(x.dtype), sin.to(x.dtype) |
| |
|
| |
|
| | class SelfAttention(nn.Module): |
| | |
| | def __init__(self, config: TransformerConfig, causal: bool=False): |
| | super().__init__() |
| | if config.hidden_size % config.num_attention_heads != 0: |
| | raise ValueError( |
| | f"The hidden size ({config.hidden_size}) is not a multiple of the number of attention " |
| | f"heads ({config.num_attention_heads})" |
| | ) |
| | self.config = config |
| | self.num_attention_heads = config.num_attention_heads |
| | self.attention_head_size = config.hidden_size // config.num_attention_heads |
| | self.all_head_size = self.num_attention_heads * self.attention_head_size |
| | self.query = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
| | self.key = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
| | self.value = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
| | self.dropout = nn.Dropout(config.attention_probs_dropout_prob) |
| | self.output = nn.Linear(config.hidden_size, self.all_head_size, bias=False) |
| | self.output_dropout = nn.Dropout(config.hidden_dropout_prob) |
| | self.config = config |
| | self.causal = causal |
| |
|
| | def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor: |
| | new_x_shape = x.size()[:-1] + ( |
| | self.num_attention_heads, |
| | self.attention_head_size, |
| | ) |
| | x = x.view(new_x_shape) |
| | return x.permute( |
| | 0, 2, 1, 3 |
| | ) |
| |
|
| | def naive_forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor = None, |
| | rotary_embeddings: torch.Tensor = None, |
| | output_attentions: bool = False, |
| | ) -> Tuple[torch.Tensor]: |
| | B, L, D = hidden_states.size() |
| | query_states = self.query(hidden_states) |
| | key_states = self.key(hidden_states) |
| | value_states = self.value(hidden_states) |
| | key_states = self.transpose_for_scores(key_states).contiguous() |
| | query_states = self.transpose_for_scores(query_states).contiguous() |
| | value_states = self.transpose_for_scores(value_states).contiguous() |
| | if rotary_embeddings is not None: |
| | cos, sin = rotary_embeddings |
| | query_states, key_states = apply_rotary_pos_emb( |
| | query_states, key_states, cos, sin |
| | ) |
| | scale_factor = self.attention_head_size**-0.5 |
| | attention_scores = torch.matmul( |
| | query_states, key_states.transpose(-1, -2) |
| | ) |
| | attention_scores = attention_scores * scale_factor |
| | if attention_mask is not None: |
| | attention_scores = attention_scores + attention_mask |
| | attention_probs = nn.functional.softmax( |
| | attention_scores, dim=-1, dtype=torch.float32 |
| | ).to(query_states.dtype) |
| | attention_probs = self.dropout(attention_probs) |
| | context = torch.matmul( |
| | attention_probs, value_states |
| | ) |
| | context = context.permute( |
| | 0, 2, 1, 3 |
| | ).contiguous() |
| | context = context.view(B, L, D) |
| | context = self.output(context) |
| | context = self.output_dropout(context) |
| | return_attention_probs = attention_probs.detach() if output_attentions else None |
| | return context, return_attention_probs |
| |
|
| | def sdpa_forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor = None, |
| | rotary_embeddings: torch.Tensor = None, |
| | ) -> Tuple[torch.Tensor]: |
| | B, L, D = hidden_states.size() |
| | query_states = self.query(hidden_states) |
| | key_states = self.key(hidden_states) |
| | value_states = self.value(hidden_states) |
| | key_states = self.transpose_for_scores(key_states).contiguous() |
| | query_states = self.transpose_for_scores(query_states).contiguous() |
| | value_states = self.transpose_for_scores(value_states).contiguous() |
| | if rotary_embeddings is not None: |
| | cos, sin = rotary_embeddings |
| | query_states, key_states = apply_rotary_pos_emb( |
| | query_states, key_states, cos, sin |
| | ) |
| | scale_factor = self.attention_head_size**-0.5 |
| | dropout_p = self.config.attention_probs_dropout_prob if self.training else 0 |
| | context = F.scaled_dot_product_attention( |
| | query=query_states, |
| | key=key_states, |
| | value=value_states, |
| | attn_mask=attention_mask, |
| | dropout_p=dropout_p, |
| | scale=scale_factor, |
| | ) |
| | context = context.permute( |
| | 0, 2, 1, 3 |
| | ).contiguous() |
| | context = context.view(B, L, D) |
| | context = self.output(context) |
| | context = self.output_dropout(context) |
| | return_attention_probs = None |
| | return context, return_attention_probs |
| |
|
| | def flash_attn_forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | rotary_embeddings: torch.Tensor = None, |
| | lengths: List[List[int]] = None, |
| | ) -> Tuple[torch.Tensor]: |
| | B, L, D = hidden_states.size() |
| | NH = self.num_attention_heads |
| | H = self.attention_head_size |
| |
|
| | scale_factor = self.attention_head_size**-0.5 |
| | query_states = self.query(hidden_states) |
| | key_states = self.key(hidden_states) |
| | value_states = self.value(hidden_states) |
| |
|
| | if lengths is not None: |
| | |
| | query_states = query_states.view(B * L, NH, H).contiguous() |
| | key_states = key_states.view(B * L, NH, H).contiguous() |
| | value_states = value_states.view(B * L, NH, H).contiguous() |
| | if rotary_embeddings is not None: |
| | cos, sin = rotary_embeddings |
| | cos = cos.view(B * L, 1, H) |
| | sin = sin.view(B * L, 1, H) |
| | query_states = (query_states * cos) + (rotate_half(query_states) * sin) |
| | key_states = (key_states * cos) + (rotate_half(key_states) * sin) |
| | lengths = [0, ] + list(chain(*lengths)) |
| | lengths = torch.tensor(lengths, dtype=torch.int, device=query_states.device) |
| | max_seqlen = torch.max(lengths) |
| | cum_seqlen = torch.cumsum(lengths, dim=0, dtype=torch.int) |
| | context = flash_attn_varlen_func( |
| | q=query_states, |
| | k=key_states, |
| | v=value_states, |
| | cu_seqlens_q=cum_seqlen, |
| | cu_seqlens_k=cum_seqlen, |
| | max_seqlen_q=max_seqlen, |
| | max_seqlen_k=max_seqlen, |
| | causal=self.causal, |
| | return_attn_probs=False, |
| | softmax_scale=scale_factor, |
| | ) |
| | else: |
| | query_states = query_states.view(B, L, NH, H).contiguous() |
| | key_states = key_states.view(B, L, NH, H).contiguous() |
| | value_states = value_states.view(B, L, NH, H).contiguous() |
| | if rotary_embeddings is not None: |
| | cos, sin = rotary_embeddings |
| | query_states, key_states = apply_rotary_pos_emb( |
| | query_states, key_states, cos, sin, unsqueeze_dim=2 |
| | ) |
| | context = flash_attn_func( |
| | q=query_states, |
| | k=key_states, |
| | v=value_states, |
| | softmax_scale=scale_factor, |
| | causal=self.causal, |
| | ) |
| | context = context.view(B, L, D).contiguous() |
| | context = self.output(context) |
| | context = self.output_dropout(context) |
| | return_attention_probs = None |
| | return context, return_attention_probs |
| |
|
| | def forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor = None, |
| | lengths: List[torch.Tensor] = None, |
| | rotary_embeddings: torch.Tensor = None, |
| | output_attentions: bool = False, |
| | ): |
| | if self.config.attn_impl == "naive": |
| | return self.naive_forward( |
| | hidden_states=hidden_states, |
| | attention_mask=attention_mask, |
| | rotary_embeddings=rotary_embeddings, |
| | output_attentions=output_attentions, |
| | ) |
| | elif self.config.attn_impl == "sdpa": |
| | return self.sdpa_forward( |
| | hidden_states=hidden_states, |
| | attention_mask=attention_mask, |
| | rotary_embeddings=rotary_embeddings, |
| | ) |
| | elif self.config.attn_impl == "flash_attn": |
| | return self.flash_attn_forward( |
| | hidden_states=hidden_states, |
| | rotary_embeddings=rotary_embeddings, |
| | lengths=lengths, |
| | ) |
| |
|
| |
|
| | class FeedForwardNetwork(nn.Module): |
| | |
| | def __init__(self, config: TransformerConfig): |
| | super().__init__() |
| | self.w1 = nn.Linear(config.hidden_size, config.intermediate_size, bias=False) |
| | self.w2 = nn.Linear(config.intermediate_size, config.hidden_size, bias=False) |
| | if config.act_fn == "gelu": |
| | self.act_fn = nn.GELU() |
| | elif config.act_fn == "silu": |
| | self.act_fn = nn.SiLU() |
| |
|
| | def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: |
| | return self.w2(self.act_fn(self.w1(hidden_states))) |
| |
|
| |
|
| | class TransFormerLayer(nn.Module): |
| |
|
| | def __init__(self, config: TransformerConfig, causal=False): |
| | super().__init__() |
| | self.config = config |
| | self.causal = causal |
| | self.attention = SelfAttention(config, causal=causal) |
| | self.ffn = FeedForwardNetwork(config) |
| | if config.layernorm_type == "layernorm": |
| | self.pre_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, bias=False) |
| | self.post_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, bias=False) |
| | else: |
| | self.pre_norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
| | self.post_norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
| |
|
| | def forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor = None, |
| | lengths: List[torch.Tensor] = None, |
| | rotary_embeddings: torch.Tensor = None, |
| | output_attentions: bool = False, |
| | ): |
| | residual = hidden_states |
| | hidden_states = self.pre_norm(hidden_states) |
| | hidden_states, attn_probs = self.attention( |
| | hidden_states=hidden_states, |
| | attention_mask=attention_mask, |
| | lengths=lengths, |
| | rotary_embeddings=rotary_embeddings, |
| | output_attentions=output_attentions |
| | ) |
| | hidden_states = residual + hidden_states |
| | residual = hidden_states |
| | hidden_states = self.post_norm(hidden_states) |
| | hidden_states = self.ffn(hidden_states) |
| | hidden_states = residual + hidden_states |
| | return (hidden_states, attn_probs) |
| |
|
| |
|
| | class TransformerCore(nn.Module): |
| |
|
| | def __init__(self, config: TransformerConfig, causal=False): |
| | super().__init__() |
| | self.config = config |
| | self.layer = [] |
| | for _ in range(config.num_hidden_layers): |
| | sub_layer = TransFormerLayer(config, causal=causal) |
| | self.layer.append(sub_layer) |
| | self.layer = nn.ModuleList(self.layer) |
| | if self.config.layernorm_type == "layernorm": |
| | self.norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps, bias=False) |
| | else: |
| | self.norm = RMSNorm(config.hidden_size, eps=config.layer_norm_eps) |
| | self.gradient_checkpointing = False |
| |
|
| | def forward( |
| | self, |
| | hidden_states: torch.Tensor, |
| | attention_mask: torch.Tensor = None, |
| | lengths: List[torch.Tensor] = None, |
| | rotary_embeddings: torch.Tensor = None, |
| | output_attentions: bool = False, |
| | output_hidden_states=False, |
| | ): |
| | all_hidden_states = [] |
| | all_self_attentions = [] |
| | for i, layer_module in enumerate(self.layer): |
| | if output_hidden_states: |
| | all_hidden_states.append(hidden_states.detach().cpu()) |
| | if torch.is_grad_enabled() and hidden_states.requires_grad and self.gradient_checkpointing: |
| | hidden_states, attn_probs = checkpoint( |
| | layer_module, |
| | hidden_states, |
| | attention_mask, |
| | rotary_embeddings, |
| | lengths, |
| | output_attentions, |
| | use_reentrant=False, |
| | ) |
| | else: |
| | hidden_states, attn_probs = layer_module( |
| | hidden_states=hidden_states, |
| | attention_mask=attention_mask, |
| | lengths=lengths, |
| | rotary_embeddings=rotary_embeddings, |
| | output_attentions=output_attentions |
| | ) |
| | if output_attentions: |
| | all_self_attentions.append(attn_probs.detach().cpu() if attn_probs is not None else None) |
| | hidden_states = self.norm(hidden_states) |
| | if output_hidden_states: |
| | all_hidden_states.append(hidden_states.detach().cpu(), ) |
| | return BaseModelOutput( |
| | last_hidden_state=hidden_states, |
| | hidden_states=all_hidden_states, |
| | attentions=all_self_attentions, |
| | ) |
| |
|
| |
|
| | class BaseTransformerModel(PreTrainedModel): |
| |
|
| | config_class = TransformerConfig |
| | base_model_prefix = "transformer" |
| |
|
| |
|
| | class TransformerModel(BaseTransformerModel): |
| |
|
| | def __init__(self, config: TransformerConfig, causal=False): |
| | super().__init__(config) |
| | self.config = config |
| | self.rotary_embedding = RotaryEmbedding(dim=config.hidden_size // config.num_attention_heads) |
| | self.token_embedding = TokenEmbedding(config) |
| | self.transformer = TransformerCore(config, causal=causal) |
| | self.causal = causal |
| |
|
| | def enable_gradient_checkpointing(self): |
| | self.transformer.gradient_checkpointing = True |
| |
|
| | def disable_gradient_checkpointing(self): |
| | self.transformer.gradient_checkpointing = False |
| |
|
| | def forward( |
| | self, |
| | input_ids: torch.Tensor, |
| | attention_mask: torch.Tensor=None, |
| | lengths: torch.Tensor=None, |
| | position_ids: torch.Tensor=None, |
| | output_attentions=False, |
| | output_hidden_states=False, |
| | ) -> BaseModelOutput: |
| | embeddings = self.token_embedding(input_ids) |
| | if position_ids is None: |
| | position_ids = torch.arange(input_ids.size(1)).to(input_ids.device) |
| | position_ids = position_ids.unsqueeze(0).expand_as(input_ids) |
| | if position_ids.shape != input_ids.shape: |
| | raise ValueError("Position IDs must have the same shape as input_ids") |
| | rotary_embeddings = self.rotary_embedding(embeddings, position_ids) |
| | |
| | if attention_mask is not None: |
| | if self.config.attn_impl == "flash_attn": |
| | raise ValueError("Flash attention does not support specifying attention mask") |
| | attention_mask = create_4d_mask( |
| | attention_mask, |
| | return_type="float", |
| | x=embeddings, |
| | causal=self.causal, |
| | ) |
| | |
| | outputs = self.transformer( |
| | hidden_states=embeddings, |
| | attention_mask=attention_mask, |
| | lengths=lengths, |
| | rotary_embeddings=rotary_embeddings, |
| | output_attentions=output_attentions, |
| | output_hidden_states=output_hidden_states |
| | ) |
| | |
| | return BaseModelOutput( |
| | last_hidden_state=outputs.last_hidden_state, |
| | hidden_states=outputs.hidden_states, |
| | attentions=outputs.attentions, |
| | ) |
| |
|
| |
|
| | class TransformerForMaskedLM(BaseTransformerModel): |
| | |
| | def __init__(self, config: TransformerConfig): |
| | super().__init__(config) |
| | self.model = TransformerModel(config, causal=False) |
| | self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
| | self.post_init() |
| |
|
| | def forward( |
| | self, |
| | input_ids: torch.Tensor, |
| | attention_mask: torch.Tensor=None, |
| | lengths: torch.Tensor=None, |
| | position_ids: torch.Tensor=None, |
| | labels: torch.Tensor=None, |
| | output_attentions=False, |
| | output_hidden_states=False, |
| | ) -> MaskedLMOutput: |
| | outputs = self.model( |
| | input_ids=input_ids, |
| | attention_mask=attention_mask, |
| | lengths=lengths, |
| | position_ids=position_ids, |
| | output_attentions=output_attentions, |
| | output_hidden_states=output_hidden_states |
| | ) |
| | sequence_output = outputs.last_hidden_state |
| | prediction_scores = self.lm_head(sequence_output) |
| | loss = None |
| | if labels is not None: |
| | loss_fct = nn.CrossEntropyLoss() |
| | labels = labels.to(prediction_scores.device) |
| | loss = loss_fct( |
| | prediction_scores.view(-1, self.config.vocab_size).float(), labels.view(-1) |
| | ) |
| | return MaskedLMOutput( |
| | loss=loss, |
| | logits=prediction_scores, |
| | hidden_states=sequence_output, |
| | attentions=outputs.attentions, |
| | ) |
| |
|
| |
|
| | class TransformerForCausalLM(BaseTransformerModel): |
| |
|
| | def __init__(self, config): |
| | super().__init__(config) |
| | self.model = TransformerModel(config, causal=True) |
| | self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
| | self.init_weights() |
| |
|
| | def forward( |
| | self, |
| | input_ids: torch.Tensor, |
| | attention_mask: torch.Tensor=None, |
| | lengths: torch.Tensor=None, |
| | position_ids: torch.Tensor=None, |
| | labels: torch.Tensor=None, |
| | output_attentions=False, |
| | output_hidden_states=False, |
| | reduction="mean", |
| | ) -> CausalLMOutput: |
| | outputs = self.model( |
| | input_ids=input_ids, |
| | attention_mask=attention_mask, |
| | lengths=lengths, |
| | position_ids=position_ids, |
| | output_attentions=output_attentions, |
| | output_hidden_states=output_hidden_states |
| | ) |
| | sequence_output = outputs.last_hidden_state |
| | prediction_scores = self.lm_head(sequence_output) |
| | loss = None |
| | if labels is not None: |
| | loss_fct = nn.CrossEntropyLoss(reduction=reduction) |
| | labels = labels.to(prediction_scores.device) |
| | loss = loss_fct( |
| | prediction_scores.view(-1, self.config.vocab_size).to(torch.float32), |
| | labels.view(-1), |
| | ) |
| | return CausalLMOutput( |
| | loss=loss, |
| | logits=prediction_scores, |
| | hidden_states=outputs.hidden_states, |
| | attentions=outputs.attentions, |
| | ) |
| | |
| | TransformerForMaskedLM.register_for_auto_class("AutoModelForMaskedLM") |