1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
| """ 这是以最原子(atomic)的方式在纯 Python 中实现 GPT 的训练和推理。 该文件包含完整的算法,其余所有工业级库(如 PyTorch)只是为了提高效率。 """
import os # 用于处理文件路径 import math # 提供 log, exp 等基础数学运算 import random # 提供随机数初始化和数据打乱
# 固定随机种子,确保实验可重复 random.seed(42)
# --- 1. 数据准备 (Dataset) --- # 下载并读取 names.txt,这是一个包含数千个名字的数据集 if not os.path.exists('input.txt'): import urllib.request names_url = 'https://raw.githubusercontent.com/karpathy/makemore/refs/heads/master/names.txt' urllib.request.urlretrieve(names_url, 'input.txt')
# 将数据集处理成字符串列表,每个名字是一个 document docs = [l.strip() for l in open('input.txt').read().strip().split('\n') if l.strip()] random.shuffle(docs) print(f"num docs: {len(docs)}")
# --- 2. 分词器 (Tokenizer) --- # 构建字符级词表。<BOS> (Beginning of String) 是起始占位符 chars = ['<BOS>'] + sorted(set(''.join(docs))) vocab_size = len(chars) # 映射表:字符 <-> 索引 (stoi: string to index, itos: index to string) stoi = { ch:i for i, ch in enumerate(chars) } itos = { i:ch for i, ch in enumerate(chars) } BOS = stoi['<BOS>'] print(f"vocab size: {vocab_size}")
# --- 3. 自动求导引擎 (Autograd) --- # 这是微型版的 Micrograd。它追踪每一个标量的运算,用于后续通过链式法则计算梯度。 class Value: """ 存储单个标量及其梯度 """
def __init__(self, data, _children=(), _op=''): self.data = data self.grad = 0 # 存储损失函数对该变量的偏导数 self._backward = lambda: None # 反向传播的闭包函数 self._prev = set(_children) # 记录生成该节点的父节点(用于回溯) self._op = _op
def __add__(self, other): other = other if isinstance(other, Value) else Value(other) out = Value(self.data + other.data, (self, other), '+') def _backward(): self.grad += out.grad # 加法门:梯度直接传递给两个操作数 other.grad += out.grad out._backward = _backward return out
def __mul__(self, other): other = other if isinstance(other, Value) else Value(other) out = Value(self.data * other.data, (self, other), '*') def _backward(): # 乘法门:每个操作数的梯度 = 另一个操作数的值 * 输出梯度 self.grad += other.data * out.grad other.grad += self.data * out.grad out._backward = _backward return out
def __pow__(self, other): assert isinstance(other, (int, float)) out = Value(self.data**other, (self,), f'**{other}') def _backward(): # 幂函数导数:d(x^n)/dx = n * x^(n-1) self.grad += (other * self.data**(other-1)) * out.grad out._backward = _backward return out
def log(self): out = Value(math.log(self.data), (self,), 'log') def _backward(): # log(x) 导数:1/x self.grad += (1 / self.data) * out.grad out._backward = _backward return out
def exp(self): out = Value(math.exp(self.data), (self,), 'exp') def _backward(): # exp(x) 导数:exp(x) self.grad += out.data * out.grad out._backward = _backward return out
def relu(self): out = Value(0 if self.data < 0 else self.data, (self,), 'ReLU') def _backward(): # ReLU 导数:大于0为1,小于0为0 self.grad += (out.data > 0) * out.grad out._backward = _backward return out
def backward(self): # 拓扑排序:确保计算梯度时,先处理子节点,再处理父节点 topo = [] visited = set() def build_topo(v): if v not in visited: visited.add(v) for child in v._prev: build_topo(child) topo.append(v) build_topo(self) self.grad = 1 # 损失函数自身的梯度是 1 for v in reversed(topo): v._backward()
# 运算符重载,让 Value 像普通浮点数一样参与运算 def __neg__(self): return self * -1 def __radd__(self, other): return self + other def __sub__(self, other): return self + (-other) def __rsub__(self, other): return other + (-self) def __rmul__(self, other): return self * other def __truediv__(self, other): return self * other**-1 def __rtruediv__(self, other): return other * self**-1 def __repr__(self): return f"Value(data={self.data}, grad={self.grad})"
# --- 4. 模型超参数与参数初始化 --- n_embd = 16 # 每个字符的向量维度 n_head = 4 # 注意力头数 n_layer = 1 # 变压器层数 block_size = 8 # 上下文窗口最大长度 head_dim = n_embd // n_head # 每个头的维度
# 初始化工具:生成包含随机 Value 的矩阵 matrix = lambda nout, nin, std=0.02: [[Value(random.gauss(0, std)) for _ in range(nin)] for _ in range(nout)]
# 状态字典:存储模型权重 state_dict = { 'wte': matrix(vocab_size, n_embd), # 词嵌入矩阵 (Token Embedding) 'wpe': matrix(block_size, n_embd), # 位置嵌入矩阵 (Positional Embedding) 'lm_head': matrix(vocab_size, n_embd) # 输出层 (Language Model Head) } for i in range(n_layer): state_dict[f'layer{i}.attn_wq'] = matrix(n_embd, n_embd) # Q权重 state_dict[f'layer{i}.attn_wk'] = matrix(n_embd, n_embd) # K权重 state_dict[f'layer{i}.attn_wv'] = matrix(n_embd, n_embd) # V权重 state_dict[f'layer{i}.attn_wo'] = matrix(n_embd, n_embd, std=0) # 注意力输出投影 state_dict[f'layer{i}.mlp_fc1'] = matrix(4 * n_embd, n_embd) # FFN 第一层 (4x 扩张) state_dict[f'layer{i}.mlp_fc2'] = matrix(n_embd, 4 * n_embd, std=0) # FFN 第二层
# 将所有 Value 扁平化存入列表,方便优化器处理 params = [p for mat in state_dict.values() for row in mat for p in row] print(f"num params: {len(params)}")
# --- 5. GPT 架构实现 ---
def linear(x, w): """ 线性层:计算矩阵相乘 w * x """ return [sum(wi * xi for wi, xi in zip(wo, x)) for wo in w]
def softmax(logits): """ 归一化操作:将分数转化为概率分布 """ max_val = max(val.data for val in logits) # 减去最大值防止溢出 exps = [(val - max_val).exp() for val in logits] total = sum(exps) return [e / total for e in exps]
def rmsnorm(x): """ 均方根归一化:用于稳定网络训练 """ ms = sum(xi * xi for xi in x) / len(x) scale = (ms + 1e-5) ** -0.5 return [xi * scale for xi in x]
def gpt(token_id, pos_id, keys, values): """ GPT 单步前向传播逻辑 """ tok_emb = state_dict['wte'][token_id] # 获取字符向量 pos_emb = state_dict['wpe'][pos_id] # 获取位置向量 x = [t + p for t, p in zip(tok_emb, pos_emb)] # 融合语义和位置 x = rmsnorm(x)
for li in range(n_layer): # 1) 多头自注意力机制 (Multi-head Attention) x_residual = x # 残差连接 x = rmsnorm(x) q = linear(x, state_dict[f'layer{li}.attn_wq']) k = linear(x, state_dict[f'layer{li}.attn_wk']) v = linear(x, state_dict[f'layer{li}.attn_wv']) # 缓存 K 和 V 向量(KV Cache 逻辑,用于自回归生成) keys[li].append(k) values[li].append(v)
x_attn = [] for h in range(n_head): hs = h * head_dim q_h = q[hs:hs+head_dim] k_h = [ki[hs:hs+head_dim] for ki in keys[li]] v_h = [vi[hs:hs+head_dim] for vi in values[li]] # 计算注意力分数: Dot(Q, K) / sqrt(d) attn_logits = [sum(q_h[j] * k_h[t][j] for j in range(head_dim)) / head_dim**0.5 for t in range(len(k_h))] attn_weights = softmax(attn_logits) # 计算加权后的 V: Sum(Weight * V) head_out = [sum(attn_weights[t] * v_h[t][j] for t in range(len(v_h))) for j in range(head_dim)] x_attn.extend(head_out)
x = linear(x_attn, state_dict[f'layer{li}.attn_wo']) x = [a + b for a, b in zip(x, x_residual)] # 残差相加
# 2) 前馈网络 (MLP Block) x_residual = x x = rmsnorm(x) x = linear(x, state_dict[f'layer{li}.mlp_fc1']) x = [xi.relu() ** 2 for xi in x] # ReLU^2 激活函数 x = linear(x, state_dict[f'layer{li}.mlp_fc2']) x = [a + b for a, b in zip(x, x_residual)]
# 输出层投影到词表大小的 logits logits = linear(x, state_dict['lm_head']) return logits
# --- 6. 训练循环 (Training) ---
# Adam 优化器参数 learning_rate, beta1, beta2, eps_adam = 1e-2, 0.9, 0.95, 1e-8 m = [0.0] * len(params) # 一阶动量缓存 v = [0.0] * len(params) # 二阶动量缓存
num_steps = 500 for step in range(num_steps):
# 随机取一个名字作为训练样本,在首尾添加 <BOS> doc = docs[step % len(docs)] tokens = [BOS] + [stoi[ch] for ch in doc] + [BOS] n = min(block_size, len(tokens) - 1)
# 前向传播:计算 Loss keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)] losses = [] for pos_id in range(n): token_id, target_id = tokens[pos_id], tokens[pos_id + 1] logits = gpt(token_id, pos_id, keys, values) probs = softmax(logits) # 交叉熵损失:-log(预测正确 token 的概率) loss_t = -probs[target_id].log() losses.append(loss_t) loss = (1 / n) * sum(losses) # 计算整句的平均损失
# 反向传播:计算所有参数的 grad loss.backward()
# Adam 优化器参数更新 lr_t = learning_rate * (1 - step / num_steps) # 学习率衰减 for i, p in enumerate(params): m[i] = beta1 * m[i] + (1 - beta1) * p.grad v[i] = beta2 * v[i] + (1 - beta2) * p.grad ** 2 m_hat = m[i] / (1 - beta1 ** (step + 1)) v_hat = v[i] / (1 - beta2 ** (step + 1)) p.data -= lr_t * m_hat / (v_hat ** 0.5 + eps_adam) p.grad = 0 # !!!非常重要:更新后将梯度清零,否则会累加到下一次
print(f"step {step+1:4d} / {num_steps:4d} | loss {loss.data:.4f}")
# --- 7. 推理生成 (Inference) --- temperature = 0.6 # 控制采样的随机性,值越低生成越保守 print("\n--- 推理结果 ---") for sample_idx in range(20): keys, values = [[] for _ in range(n_layer)], [[] for _ in range(n_layer)] token_id = BOS print(f"sample {sample_idx+1}: ", end="") for pos_id in range(block_size): logits = gpt(token_id, pos_id, keys, values) # 应用温度系数,重新计算概率分布 probs = softmax([l / temperature for l in logits]) # 根据概率随机采样下一个字符 token_id = random.choices(range(vocab_size), weights=[p.data for p in probs])[0] if token_id == BOS: # 如果遇到结束符则停止该句生成 break print(itos[token_id], end="") print()
|