用Python代码拆解DDPM从数学公式到可运行的扩散模型当我在第一次接触扩散模型时那些复杂的数学推导让我望而却步。直到有一天我决定用代码把每个公式跑出来看着中间变量在屏幕上跳动突然一切都变得清晰起来。这就是本文想带给你的体验——我们不用死记硬背那些概率密度函数而是用Python代码让DDPM的每个计算步骤变得可见、可验证。1. 环境准备与核心概念可视化在开始前让我们先准备好实验环境。这个极简实现只需要PyTorch和Matplotlibimport torch import matplotlib.pyplot as plt from torch import nn import math device torch.device(cuda if torch.cuda.is_available() else cpu) print(fUsing device: {device})扩散模型的核心思想其实非常直观想象一杯清水被一滴墨水逐渐染黑的过程前向扩散然后我们尝试用录像机记录这个过程。如果能够倒放录像反向去噪就能从墨水中还原出最初的清水。DDPM正是模拟了这个过程前向过程逐步向数据添加高斯噪声扩散反向过程学习逐步去除噪声去噪让我们用代码定义一个简单的1D数据集来可视化这个过程def generate_1d_data(num_points100): 生成由两个高斯分布混合的1D数据 data1 torch.normal(mean-2.0, std0.5, size(num_points//2,)) data2 torch.normal(mean2.0, std1.0, size(num_points//2,)) return torch.cat([data1, data2]) original_data generate_1d_data() plt.hist(original_data.numpy(), bins50) plt.title(Original Data Distribution) plt.show()2. 前向扩散过程的代码实现前向过程的核心公式是$$ x_t \sqrt{1-\beta_t} \epsilon_t \sqrt{\beta_t} x_{t-1} $$其中$\beta_t$是噪声调度参数。让我们先实现这个噪声调度def linear_beta_schedule(timesteps, start0.0001, end0.02): 线性噪声调度 return torch.linspace(start, end, timesteps) timesteps 200 betas linear_beta_schedule(timesteps) alphas 1. - betas alphas_cumprod torch.cumprod(alphas, axis0)现在我们可以实现单步前向扩散def q_sample(x_start, t, noiseNone): 根据调度参数扩散数据 if noise is None: noise torch.randn_like(x_start) sqrt_alphas_cumprod_t torch.sqrt(alphas_cumprod[t]) sqrt_one_minus_alphas_cumprod_t torch.sqrt(1. - alphas_cumprod[t]) return sqrt_alphas_cumprod_t * x_start sqrt_one_minus_alphas_cumprod_t * noise让我们可视化不同时间步的扩散效果def plot_diffusion_process(data, num_steps5): plt.figure(figsize(12, 6)) plt.hist(data.numpy(), bins50, alpha0.5, labelOriginal) for step in [0, timesteps//4, timesteps//2, 3*timesteps//4, timesteps-1]: noisy_data q_sample(data, torch.tensor([step])) plt.hist(noisy_data.numpy(), bins50, alpha0.5, labelfStep {step}) plt.legend() plt.title(Forward Diffusion Process) plt.show() plot_diffusion_process(original_data)你会看到数据如何从原始分布逐渐变成纯噪声——这正是我们需要的扩散过程。3. 构建UNet噪声预测模型反向过程需要一个模型来预测噪声。我们实现一个极简的1D UNetclass SimpleUNet1D(nn.Module): def __init__(self): super().__init__() # 下采样 self.down1 nn.Sequential( nn.Linear(1, 32), nn.SiLU() ) self.down2 nn.Sequential( nn.Linear(32, 64), nn.SiLU() ) # 中间层 self.mid nn.Sequential( nn.Linear(64, 64), nn.SiLU() ) # 上采样 self.up1 nn.Sequential( nn.Linear(64, 32), nn.SiLU() ) self.up2 nn.Sequential( nn.Linear(32, 1), ) def forward(self, x, t): # 添加时间嵌入 t_emb self.get_time_embedding(t) x torch.cat([x, t_emb], dim-1) # 下采样路径 h1 self.down1(x) h2 self.down2(h1) # 中间层 h self.mid(h2) # 上采样路径 h self.up1(h h2) h self.up2(h h1) return h def get_time_embedding(self, timestep): # 简单的时间嵌入 freqs torch.arange(0, 32, dtypetorch.float32, devicedevice) args timestep.float() * freqs embedding torch.cat([torch.sin(args), torch.cos(args)], dim-1) return embedding.unsqueeze(0)这个模型虽然简单但包含了UNet的核心思想下采样捕获上下文上采样恢复细节。4. 训练噪声预测模型训练过程的关键是让模型学会预测添加到数据中的噪声model SimpleUNet1D().to(device) optimizer torch.optim.Adam(model.parameters(), lr1e-3) def train_step(model, x_start, t): # 生成随机噪声 noise torch.randn_like(x_start) # 前向扩散过程 x_noisy q_sample(x_start, t, noise) # 预测噪声 predicted_noise model(x_noisy.unsqueeze(-1), t) # 计算损失 loss nn.functional.mse_loss(predicted_noise, noise.unsqueeze(-1)) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() return loss # 训练循环 for epoch in range(1000): # 随机采样时间步 t torch.randint(0, timesteps, (1,), devicedevice).long() # 获取一批数据 x_start generate_1d_data(128).float().to(device) loss train_step(model, x_start, t) if epoch % 100 0: print(fEpoch {epoch} | Loss: {loss.item():.4f})训练完成后我们的模型已经学会了如何根据噪声数据和当前时间步预测噪声。5. 反向去噪过程实现反向过程的核心公式是$$ x_{t-1} \frac{1}{\sqrt{\alpha_t}}(x_t - \frac{1-\alpha_t}{\sqrt{1-\bar{\alpha}t}}\epsilon\theta) \sigma_t z $$让我们用代码实现这个采样过程torch.no_grad() def p_sample(model, x, t, t_index): betas_t betas[t].to(device) sqrt_one_minus_alphas_cumprod_t torch.sqrt(1. - alphas_cumprod[t]).to(device) sqrt_recip_alphas_t torch.sqrt(1.0 / alphas[t]).to(device) # 预测噪声 pred_noise model(x, t) # 计算均值 model_mean sqrt_recip_alphas_t * (x - betas_t * pred_noise / sqrt_one_minus_alphas_cumprod_t) if t_index 0: return model_mean else: posterior_variance_t betas[t] * (1. - alphas_cumprod[t-1]) / (1. - alphas_cumprod[t]) noise torch.randn_like(x) return model_mean torch.sqrt(posterior_variance_t) * noise torch.no_grad() def p_sample_loop(model, shape): # 从纯噪声开始 img torch.randn(shape, devicedevice) imgs [] for i in reversed(range(0, timesteps)): t torch.full((1,), i, devicedevice, dtypetorch.long) img p_sample(model, img, t, i) imgs.append(img.cpu().numpy()) return imgs现在让我们生成一些样本# 生成样本 samples p_sample_loop(model, shape(100, 1)) final_samples samples[-1] # 可视化结果 plt.hist(final_samples.squeeze(), bins50) plt.title(Generated Samples) plt.show()你会看到生成的样本分布与原始数据分布非常相似——我们的模型成功学会了去噪过程6. 关键细节与常见问题解答在实现过程中有几个关键点需要特别注意噪声调度策略的选择会显著影响模型性能。除了线性调度还可以尝试def cosine_beta_schedule(timesteps, s0.008): 余弦噪声调度 steps timesteps 1 x torch.linspace(0, timesteps, steps) alphas_cumprod torch.cos(((x / timesteps) s) / (1 s) * math.pi * 0.5) ** 2 alphas_cumprod alphas_cumprod / alphas_cumprod[0] betas 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1]) return torch.clip(betas, 0, 0.999)训练技巧方面我发现以下几点特别重要学习率需要精细调整太大容易不稳定太小收敛慢批量大小影响训练稳定性建议从128开始尝试时间步的随机采样策略会影响收敛速度常见问题解答Q: 为什么我的生成结果总是噪声 A: 检查模型是否真的学会了预测噪声。可以在训练过程中可视化预测噪声与真实噪声的对比。Q: 如何判断模型是否收敛 A: 观察训练损失曲线好的训练过程应该呈现稳定下降趋势。Q: 为什么需要时间嵌入 A: 时间嵌入帮助模型区分不同时间步的噪声模式对性能至关重要。这个实现虽然简单但包含了DDPM的所有核心要素。在实际项目中你可以在此基础上扩展更复杂的网络结构、尝试不同的噪声调度策略或者添加条件生成能力。