Python职责链模式
# Python 职责链模式 (Chain of Responsibility)# Handler 接口后继链请求处理管道中间件# 职责链将请求沿处理链传递典型应用日志/认证/校验中间件管道。from abc import ABC, abstractmethodfrom dataclasses import dataclass, fieldfrom typing import Optionaldataclassclass Request:path: str; method: strauthenticated: bool False; body: str dataclassclass Response:status: int 200; body: str # 处理器基类class Handler(ABC):def __init__(self):self._next: Optional[Handler] Nonedef set_next(self, handler: Handler) - Handler:self._next handler; return handlerabstractmethoddef handle(self, request: Request) - Response: ...def next(self, request: Request) - Response:if self._next is None:return Response(404, 未找到处理器)return self._next.handle(request)# 具体处理器class LoggingHandler(Handler):def handle(self, request: Request) - Response:print(f[日志] {request.method} {request.path})return self.next(request)class AuthHandler(Handler):def handle(self, request: Request) - Response:if not request.authenticated:return Response(401, 未认证)return self.next(request)class ValidationHandler(Handler):def handle(self, request: Request) - Response:if request.method in (POST, PUT) and not request.body:return Response(400, 请求体为空)return self.next(request)class CacheHandler(Handler):def __init__(self):super().__init__()self._cache: dict[str, Response] {}def handle(self, request: Request) - Response:if request.method GET:cached self._cache.get(request.path)if cached:return cachedresp self.next(request)if resp.status 200:self._cache[request.path] respreturn respreturn self.next(request)class Router(Handler):def __init__(self):super().__init__()self._routes: dict[tuple[str, str], str] {}def register(self, m: str, p: str, body: str) - None:self._routes[(m, p)] bodydef handle(self, request: Request) - Response:body self._routes.get((request.method, request.path))if body:return Response(200, body)return self.next(request)if __name__ __main__:router Router()router.register(GET, /ping, pong)chain LoggingHandler()chain.set_next(AuthHandler()) \.set_next(ValidationHandler()) \.set_next(CacheHandler()) \.set_next(router)print(chain.handle(Request(/ping, GET, authenticatedTrue)))