欢迎来到PyTorch深度学习实战的世界博客主页卿云阁欢迎关注点赞收藏⭐️留言首发时间2026年5月21日✉️希望可以和大家一起完成进阶之路作者水平很有限如果发现错误请留言轰炸哦万分感谢任务介绍数据处理旧区总曝气量OldZone_Aeration OldZone_Aeration1 OldZone_Aeration2新区总曝气量NewZone_Aeration NewZone_Aeration1 NewZone_Aeration2然后发现这两列中还是有很多0值不能直接用于建模。所以我们把这两列数据继续相加作为新的总曝气量。LSTM对总曝气量进行建模数据预处理scaler MinMaxScaler() # 创建归一化工具 scaler.fit(data[:train_end]) # 只在【训练集】上学习最大最小值 data_s scaler.transform(data) # 用训练集学到的规则给所有数据归一化整理数据# 滑窗: X[t-12,...,t-1], y第(tPRED_HORIZON-1)小时的值 def make_seq(arr, seq_len, horizon): X, y [], [] for i in range(len(arr) - seq_len - horizon 1): X.append(arr[i : iseq_len]) y.append(arr[iseq_lenhorizon-1]) return np.array(X), np.array(y)arr [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] len(arr)16seq_len 3 # 用过去3小时horizon 2 # 预测未来第2小时i12代表总共可以构造12组数据i 0Xarr[0:3] → [0, 1, 2]yarr[032-1] arr[4] → 4→ 用 0,1,2 预测 第 2 小时4最后X [ [0,1,2], [1,2,3], ... [11,12,13] ] y [4,5,6,7,8,9,10,11,12,13,14,15]模型的前向传播输入 x (64, 12, 1) ↓ 通过 LSTM out (64, 12, 64) ↓ 取最后时刻 out[:, -1, :] (64, 64) ↓ 全连接层 输出 (64, 1)整体代码import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score import matplotlib.pyplot as plt # 1. 配置 SEQ_LEN 12 # 用过去12小时 PRED_HORIZON 1 # 预测未来第 t 小时, 改这里即可: 1, 2, 3... BATCH_SIZE 64 EPOCHS 30 LR 1e-3 device torch.device(cuda if torch.cuda.is_available() else cpu) # 2. 数据准备 df pd.read_csv(aeration.csv, parse_dates[Date]) data df[Aeration].values.reshape(-1, 1) n len(data) train_end, val_end int(n*0.7), int(n*0.85) # 归一化(只用训练集 fit, 防止数据泄漏) scaler MinMaxScaler() scaler.fit(data[:train_end]) data_s scaler.transform(data) # 滑窗: X[t-12,...,t-1], y第(tPRED_HORIZON-1)小时的值 def make_seq(arr, seq_len, horizon): X, y [], [] for i in range(len(arr) - seq_len - horizon 1): X.append(arr[i : iseq_len]) y.append(arr[iseq_lenhorizon-1]) return np.array(X), np.array(y) X, y make_seq(data_s, SEQ_LEN, PRED_HORIZON) n_samples len(X) train_size int(n_samples * 0.7) val_size int(n_samples * 0.15) X_tr, y_tr X[:train_size], y[:train_size] X_va, y_va X[train_size : train_sizeval_size], y[train_size : train_sizeval_size] X_te, y_te X[train_sizeval_size :], y[train_sizeval_size :] def to_loader(X, y, shuffleFalse): ds TensorDataset(torch.FloatTensor(X), torch.FloatTensor(y)) return DataLoader(ds, batch_sizeBATCH_SIZE, shuffleshuffle) train_loader to_loader(X_tr, y_tr, shuffleTrue) val_loader to_loader(X_va, y_va) test_loader to_loader(X_te, y_te) # 3. LSTM 模型 class LSTMModel(nn.Module): def __init__(self, hidden64, layers2): super().__init__() self.lstm nn.LSTM(1, hidden, layers, batch_firstTrue, dropout0.1) self.fc nn.Linear(hidden, 1) def forward(self, x): out, _ self.lstm(x) return self.fc(out[:, -1, :]) model LSTMModel().to(device) criterion nn.MSELoss() optimizer torch.optim.Adam(model.parameters(), lrLR) # 4. 训练 train_losses, val_losses [], [] best_val, best_state float(inf), None for ep in range(1, EPOCHS1): model.train() tl 0 for xb, yb in train_loader: xb, yb xb.to(device), yb.to(device) optimizer.zero_grad() loss criterion(model(xb), yb) loss.backward() optimizer.step() tl loss.item() * xb.size(0) tl / len(train_loader.dataset) model.eval() vl 0 with torch.no_grad(): for xb, yb in val_loader: xb, yb xb.to(device), yb.to(device) vl criterion(model(xb), yb).item() * xb.size(0) vl / len(val_loader.dataset) train_losses.append(tl); val_losses.append(vl) if vl best_val: best_val, best_state vl, {k: v.clone() for k, v in model.state_dict().items()} print(fEpoch {ep:02d} | train {tl:.5f} | val {vl:.5f}) model.load_state_dict(best_state) # 5. 测试评估 model.eval() preds [] with torch.no_grad(): for xb, yb in test_loader: preds.append(model(xb.to(device)).cpu().numpy()) preds np.concatenate(preds) y_pred scaler.inverse_transform(preds) y_true scaler.inverse_transform(y_te) rmse np.sqrt(mean_squared_error(y_true, y_pred)) mae mean_absolute_error(y_true, y_pred) r2 r2_score(y_true, y_pred) print(f\n 测试集 (预测未来第 {PRED_HORIZON} 小时) ) print(fRMSE {rmse:.3f}) print(fMAE {mae:.3f}) print(fR² {r2:.4f})模型保存文件作用必须保存吗model.pt模型权重✅ 必须scaler.pkl归一化参数✅ 必须没它模型废掉config.json模型结构 元信息✅ 必须推理时构造模型用model.load_state_dict(best_state) # 保存部署所需文件 import joblib from pathlib import Path deploy_dir Path(fdeploy_t{PRED_HORIZON}h) deploy_dir.mkdir(exist_okTrue) torch.save(model.state_dict(), deploy_dir / model.pt) joblib.dump(scaler, deploy_dir / scaler.pkl) print(f已保存: {deploy_dir}/model.pt, {deploy_dir}/scaler.pkl)import torch import torch.nn as nn import joblib import numpy as np # 1. 模型结构(必须和训练时完全一致) class LSTMModel(nn.Module): def __init__(self, hidden64, layers2): super().__init__() self.lstm nn.LSTM(1, hidden, layers, batch_firstTrue, dropout0.1) self.fc nn.Linear(hidden, 1) def forward(self, x): out, _ self.lstm(x) return self.fc(out[:, -1, :]) # 2. 加载模型和 scaler device torch.device(cuda if torch.cuda.is_available() else cpu) model LSTMModel(hidden64, layers2).to(device) model.load_state_dict(torch.load(deploy_t1h/model.pt, map_locationdevice)) model.eval() # 切换到推理模式, 关闭 dropout scaler joblib.load(deploy_t1h/scaler.pkl) print(模型加载完成) # 3. 准备一个样本 (过去 12 小时的 Aeration 值) history [ 561.1596221923829, 562.870346069335, 566.306594848633, 565.813049316406, 565.824310302735, 565.37336730957, 493.188201904297, 379.360862731933, 379.547348047461, 377.33290863037, 530.632003784179, 557.160324096679 ] # 转成 numpy, reshape 成 [12, 1] x np.array(history, dtypenp.float32).reshape(-1, 1) # 用训练时的 scaler 归一化 x_scaled scaler.transform(x) # 转成 tensor, 加 batch 维度: [1, 12, 1] x_tensor torch.from_numpy(x_scaled).unsqueeze(0).to(device) # 4. 推理 with torch.no_grad(): pred_scaled model(x_tensor).cpu().numpy() # 反归一化得到真实数值 pred scaler.inverse_transform(pred_scaled) print(f\n输入: 过去 12 小时的 Aeration {history}) print(f预测: 下一小时的 Aeration {pred[0][0]:.2f})输入: 过去 12 小时的 Aeration [561.1596221923829, 562.870346069335, 566.306594848633, 565.813049316406, 565.824310302735, 565.37336730957, 493.188201904297, 379.360862731933, 379.547348047461, 377.33290863037, 530.632003784179, 557.160324096679]预测: 下一小时的 Aeration 567.72模型预测567.72真实值557.32是否需要转换模型维度你的场景是否需要 ONNX调用频率小时级每小时 1 次❌ 不需要延迟要求几秒内出结果即可❌ 不需要并发量可能就 1 个 SCADA / 1 个调用方❌ 不需要部署语言纯 Python❌ 不需要部署环境厂里的 Linux 服务器不是边缘设备❌ 不需要模型大小LSTM 很小几十 KB❌ 不需要如何变成api的调用from fastapi import FastAPI from pydantic import BaseModel from typing import List from predictor import AerationPredictor app FastAPI() predictor AerationPredictor() class PredictRequest(BaseModel): history: List[float] app.get(/) def root(): return {message: Aeration API is running. Visit /docs for documentation.} app.post(/predict) def predict(req: PredictRequest): pred predictor.predict(req.history) return {prediction: pred}第 1 部分导入工具from fastapi import FastAPI from pydantic import BaseModel from typing import List from predictor import AerationPredictor第 2 部分创建应用 加载模型app FastAPI() predictor AerationPredictor()创建一个 FastAPI 应用实例 创建一个预测器实例模型在这里被加载到内存第 3 部分定义请求数据的形状class PredictRequest(BaseModel): history: List[float]# 客户端发的 JSON { history: [520.1, 525.3, 530.0, ...] } # 对应的 Python 类 class PredictRequest(BaseModel): history: List[float] # ← 字段名要和 JSON 的 key 一致函数体业务逻辑def predict(req: PredictRequest): pred predictor.predict(req.history) return {prediction: pred}# 客户端发的 JSON: {history: [520.1, 525.3, ...]} # 在函数里访问: req.history # 等于 [520.1, 525.3, ...] # 客户端收到的响应: {prediction: 562.34}调用api的代码import requests response requests.post( http://127.0.0.1:8000/predict, json{history: [520.1, 525.3, 530.0, 528.4, 535.2, 540.1, 545.6, 550.0, 548.3, 552.1, 555.7, 560.0]} ) print(response.json()) # 输出: {prediction: 554.8828735351562}┌─────────────────────────────────────────────────────────┐ │ 客户端 (Python requests) │ │ requests.post( │ │ http://127.0.0.1:8000/predict, │ │ json{history: [520.1, 525.3, ..., 560.0]} │ │ ) │ └────────────────────────┬────────────────────────────────┘ │ │ HTTP POST 请求 │ Body (JSON): {history: [...]} ▼ ┌─────────────────────────────────────────────────────────┐ │ FastAPI 接收到请求 │ │ │ │ Step 1: 看路径是 /predict, 方法是 POST │ │ ↓ 找到对应的处理函数 predict() │ │ │ │ Step 2: 读取请求体, 解析 JSON │ │ ↓ {history: [520.1, ..., 560.0]} │ │ │ │ Step 3: 用 PredictRequest 校验 │ │ ✓ history 字段存在 │ │ ✓ 是 list 类型 │ │ ✓ 内部元素都是 float │ │ ↓ 构造 PredictRequest 实例 │ │ │ │ Step 4: 调用 predict(req该实例) │ └────────────────────────┬────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ 你的代码执行: │ │ │ │ pred predictor.predict(req.history) │ │ │ │ │ ├─ 归一化输入 │ │ ├─ LSTM 推理 │ │ └─ 反归一化输出 │ │ ↓ │ │ pred 562.34 │ │ │ │ return {prediction: 562.34} │ └────────────────────────┬────────────────────────────────┘ │ │ FastAPI 把字典转成 JSON ▼ ┌─────────────────────────────────────────────────────────┐ │ HTTP 响应 │ │ Status: 200 OK │ │ Body: {prediction: 562.34} │ └────────────────────────┬────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────┐ │ 客户端收到响应 │ │ response.json() {prediction: 562.34} │ └─────────────────────────────────────────────────────────┘