MoEForward
from transformers.activations import ACT2FN
class FeedForward(nn.Module):
def __init__(self, config: MiniMindConfig):
super().__init__()
if config.intermediate_size is None:
intermediate_size = int(config.hidden_size * 8 / 3)
config.intermediate_size = 64 * ((intermediate_size + 64 - 1) // 64)
self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)
self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
self.dropout = nn.Dropout(config.dropout)
self.act_fn = ACT2FN[config.hidden_act]
def forward(self, x):
return self.dropout(self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)))
上面是一个前馈神经网络的代码,其中 ACT2FN
是一个字典,用于选择激活函数。很容易得出,这个前馈神经网络仅仅只是几个线性层。
class MOEFeedForward(nn.Module):
def __init__(self, config: MiniMindConfig):
super().__init__()
self.config = config
self.experts = nn.ModuleList([
FeedForward(config)
for _ in range(config.n_routed_experts)
])
self.gate = MoEGate(config)
if config.n_shared_experts > 0:
self.shared_experts = nn.ModuleList([
FeedForward(config)
for _ in range(config.n_shared_experts)
])
MoEFeedForward 的定义代码如上,有专家和共享专家 ModuleList
(保存着专家以及共享专家,每一个专家都是一个前馈神经网络)。
def forward(self, x):
identity = x
orig_shape = x.shape
bsz, seq_len, _ = x.shape
# 使用门控机制选择专家
topk_idx, topk_weight, aux_loss = self.gate(x)
x = x.view(-1, x.shape[-1])
flat_topk_idx = topk_idx.view(-1)
if self.training:
x = x.repeat_interleave(self.config.num_experts_per_tok, dim=0)
y = torch.empty_like(x, dtype=torch.float16)
for i, expert in enumerate(self.experts):
y[flat_topk_idx == i] = expert(x[flat_topk_idx == i]).to(y.dtype) # 确保类型一致
y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1)
y = y.view(*orig_shape)
else:
y = self.moe_infer(x, flat_topk_idx, topk_weight.view(-1, 1)).view(*orig_shape)
if self.config.n_shared_experts > 0:
for expert in self.shared_experts:
y = y + expert(identity)
self.aux_loss = aux_loss
return y
以上是前向传播的代码,通过门控机制选择专家,topk_idx, topk_weight, aux_loss = self.gate(x)
输出的 topk_idx, topk_weight 形状为 (bsz, seq len, num k) 代表每个 token 对应 k 个专家的索引和权重。
如果是训练过程的话,x 首先 repeat 为 (bsz * seq len * k, -1);flat_topk_idx 的形状为 (bsz * seq len * k, -1),记录了每个 token 选择 k 个专家的索引。x[flat_topk_idx == i]
被专家 i 处理的 token,交由 专家 i 处理后保存到 y 中;随后,加权求和。
如果是推理过程的话,调用 moe_infer 函数,主要目的是将相同专家处理的 token 分配在一起,从而可以批量运算。
如果有共享专家的话,还要加上共享专家的结果。
@torch.no_grad()
def moe_infer(self, x, flat_expert_indices, flat_expert_weights):
expert_cache = torch.zeros_like(x)
idxs = flat_expert_indices.argsort()
tokens_per_expert = flat_expert_indices.bincount().cpu().numpy().cumsum(0)
token_idxs = idxs // self.config.num_experts_per_tok
# 当tokens_per_expert = [6, 15, 20, 26],tokens_per_expert.shape[0]即为专家数量(此时为4)
# 且token_idxs = [3, 7, 19, 21, 24, 25, 4, 5, 6, 10, 11, 12...] 时
# 意味token_idxs[:6] -> [3, 7, 19, 21, 24, 25]这6个位置属于专家0处理的token(每个token有可能被多个专家处理,这取决于num_experts_per_tok)
# 接下来9个位置token_idxs[6:15] -> [4, 5, 6, 10, 11, 12...]属于专家1处理的token...依此类推
for i, end_idx in enumerate(tokens_per_expert):
start_idx = 0 if i == 0 else tokens_per_expert[i - 1]
if start_idx == end_idx:
continue
expert = self.experts[i]
exp_token_idx = token_idxs[start_idx:end_idx]
expert_tokens = x[exp_token_idx]
expert_out = expert(expert_tokens).to(expert_cache.dtype)
expert_out.mul_(flat_expert_weights[idxs[start_idx:end_idx]])
expert_cache.scatter_add_(0, exp_token_idx.view(-1, 1).repeat(1, x.shape[-1]), expert_out)
return expert_cache
moe_infer 将相同专家的 token 分配在一起,从而可以批量处理,提升效率。
RMSNorm
class RMSNorm(torch.nn.Module):
def __init__(self, dim: int, eps: float = 1e-5):
super().__init__()
self.eps = eps
self.weight = nn.Parameter(torch.ones(dim))
def _norm(self, x):
return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)
def forward(self, x):
return self.weight * self._norm(x.float()).type_as(x)
Minimind 使用的是 RMSNorm,公式为 $\bar{a_i} = \frac{a_i}{\sqrt{\frac{1}{n}\sum a_i^2}} g_i$。其实就是将 token 向量归一化后,再乘上一个缩放系数。缩放系数通过神经网络在数据中进行学习。
MinimindBlock
class MiniMindBlock(nn.Module):
def __init__(self, layer_id: int, config: MiniMindConfig):
super().__init__()
self.num_attention_heads = config.num_attention_heads
self.hidden_size = config.hidden_size
self.head_dim = config.hidden_size // config.num_attention_heads
self.self_attn = Attention(config)
self.layer_id = layer_id
self.input_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.post_attention_layernorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.mlp = FeedForward(config) if not config.use_moe else MOEFeedForward(config)
以上是 MinimindBlock 的代码,有注意力层,Norm层,mlp层。mlp 层可以通过设置 config 决定是否启用混合专家模型,Norm 层采用 RMSNorm,在大语言模型中 RMSNorm 由于其简洁性,运算快、效率高,成为流行选择。而且,有论文也发现,RMSNorm 的效果并不会更差。
def forward(self, hidden_states, position_embeddings, past_key_value=None, use_cache=False, attention_mask=None):
residual = hidden_states
hidden_states, present_key_value = self.self_attn(
self.input_layernorm(hidden_states), position_embeddings,
past_key_value, use_cache, attention_mask
)
hidden_states += residual
hidden_states = hidden_states + self.mlp(self.post_attention_layernorm(hidden_states))
return hidden_states, present_key_value
MiniMindBlock 中,首先进行自注意力操作,随后残差连接;再将进行 Norm 后的结果通过多层感知机,辅以残差连接,还算清晰自然。
class MiniMindModel(nn.Module):
def __init__(self, config: MiniMindConfig):
super().__init__()
self.config = config
self.vocab_size, self.num_hidden_layers = config.vocab_size, config.num_hidden_layers
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)
self.dropout = nn.Dropout(config.dropout)
self.layers = nn.ModuleList([MiniMindBlock(l, config) for l in range(self.num_hidden_layers)])
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
freqs_cos, freqs_sin = precompute_freqs_cis(dim=config.hidden_size // config.num_attention_heads,
end=config.max_position_embeddings, theta=config.rope_theta)
self.register_buffer("freqs_cos", freqs_cos, persistent=False)
self.register_buffer("freqs_sin", freqs_sin, persistent=False)
上为 MiniMindModel 代码,给出了 dropout 层、norm 层、嵌入层以及由 MiniMindBlock 构成的 layers。
def forward(self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
past_key_values: Optional[List[Tuple[torch.Tensor, torch.Tensor]]] = None,
use_cache: bool = False,
**kwargs):
batch_size, seq_length = input_ids.shape
past_key_values = past_key_values or [None] * len(self.layers)
start_pos = past_key_values[0][0].shape[1] if past_key_values[0] is not None else 0
hidden_states = self.dropout(self.embed_tokens(input_ids))
position_embeddings = (
self.freqs_cos[start_pos:start_pos + seq_length],
self.freqs_sin[start_pos:start_pos + seq_length]
)
presents = []
for layer_idx, (layer, past_key_value) in enumerate(zip(self.layers, past_key_values)):
hidden_states, present = layer(
hidden_states,
position_embeddings,
past_key_value=past_key_value,
use_cache=use_cache,
attention_mask=attention_mask
)
presents.append(present)
hidden_states = self.norm(hidden_states)
aux_loss = sum(
layer.mlp.aux_loss
for layer in self.layers
if isinstance(layer.mlp, MOEFeedForward)
)
return hidden_states, presents, aux_loss
首先将输入 Token 嵌入后经过 dropout,随后生成 cos sin 向量用于计算位置编码;紧接着通过若干个 MiniMindBlock 层,随后进行 normalization。aux_loss 是将所有层的 loss 求和。
总结
本系列的第二篇文章完整呈现了 MiniMind 的模型代码,其中比较工程化的设计有:
- RMSNorm 降低计算量、提升效率
- 旋转位置编码采用的是 transformer 库提供的实现,与原设计等价(我并没有亲自证明),代码更加简洁、流畅
- 推理时将专家分组、批量处理,从而提升推理时的速度