013、WebSocket实战从握手失败到高并发聊天室的踩坑笔记昨天深夜调试一个设备实时数据看板前端同事跑过来说“你这接口怎么老是断” 我盯着日志里的1006异常码突然意识到问题出在哪——WebSocket的心跳机制没对齐。这种生产环境里冒出来的问题教科书上可不会告诉你。WebSocket不是“升级版HTTP”很多人把WebSocket想简单了以为就是个长连接。实际上它是独立的协议握手阶段走HTTP之后就是全双工二进制帧通信。刚入门时我犯过一个错误试图用HTTP的中间件处理WebSocket连接结果拦截器把Upgrade头给弄丢了。# 错误示范别这样写app.middleware(http)asyncdefadd_process_time_header(request,call_next):# 这里会把WebSocket握手请求也拦截了responseawaitcall_next(request)returnresponse# 正确姿势WebSocket单独处理app.websocket(/ws)asyncdefwebsocket_endpoint(websocket:WebSocket):awaitwebsocket.accept()# 业务逻辑连接管理这个坑我踩过三次第一次做聊天室时我用了个简单的列表存连接。用户量上到500就内存泄漏因为断开连接没清理。# 初期方案问题很大active_connections[]app.websocket(/ws)asyncdefwebsocket_endpoint(websocket:WebSocket):awaitwebsocket.accept()active_connections.append(websocket)# 这里埋雷了try:whileTrue:dataawaitwebsocket.receive_text()# 广播逻辑except:# 断开时没remove内存泄漏pass# 现在用的方案classConnectionManager:def__init__(self):# 用WeakRef避免循环引用self.active_connections:List[WebSocket][]asyncdefconnect(self,websocket:WebSocket):awaitwebsocket.accept()self.active_connections.append(websocket)defdisconnect(self,websocket:WebSocket):# 这里一定要清理干净ifwebsocketinself.active_connections:self.active_connections.remove(websocket)asyncdefbroadcast(self,message:str):# 广播时要处理已断开连接dead_connections[]forconnectioninself.active_connections:try:awaitconnection.send_text(message)except:dead_connections.append(connection)# 清理僵尸连接fordeadindead_connections:self.disconnect(dead)心跳机制别依赖TCP Keepalive物联网项目里遇到过设备频繁重连最后发现是运营商NAT超时。TCP的Keepalive默认两小时等它检测连接早断了。# 自己实现心跳asyncdefwebsocket_with_heartbeat(websocket:WebSocket):awaitwebsocket.accept()# 开个任务发心跳asyncdefsend_ping():whileTrue:awaitasyncio.sleep(30)# 30秒一次try:awaitwebsocket.send_json({type:ping})except:breakping_taskasyncio.create_task(send_ping())try:whileTrue:dataawaitwebsocket.receive_json()ifdata.get(type)pong:continue# 心跳响应# 处理业务数据exceptWebSocketDisconnect:ping_task.cancel()# 记得取消任务广播优化千人聊天室不卡顿早期版本我直接遍历连接列表发消息用户量上来后广播延迟飙升。后来改成按房间分组再加个发布订阅。# 优化后的广播逻辑classRoomManager:def__init__(self):# 房间字典room_id - 连接集合self.rooms:Dict[str,Set[WebSocket]]defaultdict(set)# 连接映射websocket - room_idself.connection_room:Dict[WebSocket,str]{}asyncdefbroadcast_to_room(self,room_id:str,message:str):# 只发给特定房间ifroom_idnotinself.rooms:returndead_connections[]forconnectioninself.rooms[room_id]:try:awaitconnection.send_text(message)except:dead_connections.append(connection)# 异步清理不阻塞广播asyncio.create_task(self._clean_dead(dead_connections))生产环境必须加的三层防护第一层连接数限制。防止单个IP恶意连接耗尽资源。fromslowapiimportLimiterfromslowapi.utilimportget_remote_address limiterLimiter(key_funcget_remote_address)app.websocket(/ws)limiter.limit(50 per minute)# 每分钟最多50个连接asyncdefwebsocket_endpoint(websocket:WebSocket):# 实现略第二层消息大小限制。防止内存攻击。app.websocket(/ws)asyncdefwebsocket_endpoint(websocket:WebSocket):awaitwebsocket.accept()# 限制单条消息10KBwebsocket.max_message_size10*1024第三层业务层频控。比如聊天室防刷屏。classRateLimiter:def__init__(self,max_per_second:int):self.max_per_secondmax_per_second self.user_timestamps:Dict[str,List[float]]{}asyncdefcheck(self,user_id:str)-bool:nowtime.time()timestampsself.user_timestamps.get(user_id,[])# 清理1秒前的记录timestamps[tfortintimestampsifnow-t1]iflen(timestamps)self.max_per_second:returnFalsetimestamps.append(now)self.user_timestamps[user_id]timestampsreturnTrue调试技巧看懂那些错误码1006连接异常关闭。通常是心跳没对齐或网络问题1001主动断开。前端调了close()1008协议错误。消息格式不对1011服务端内部错误。你代码抛异常了建议在accept之后加个try-except把异常日志打详细点try:awaitwebsocket.accept()whileTrue:dataawaitwebsocket.receive()exceptWebSocketDisconnectase:logger.info(f客户端断开: code{e.code}, reason{e.reason})exceptExceptionase:logger.error(fWebSocket异常:{e},exc_infoTrue)个人经验包WebSocket看着简单真用到生产环境全是细节。三个建议第一连接管理一定要用WeakRef或者定期清理不然内存泄漏查到你头疼第二心跳间隔要比NAT超时时间短移动网络按30秒设置比较安全第三广播消息一定要异步化同步发送会阻塞整个事件循环。调试时多关注1006错误码八成是心跳问题。还有那个max_message_size别忘了设真有人会发几个G的文件上来测试。最后记住WebSocket适合实时性要求高的场景如果只是定时拉数据用SSE可能更简单。