PHP消息中间件与事件驱动架构事件驱动架构让系统组件之间松耦合通信。事件发布者不需要知道谁在消费事件消费者也不需要知道谁发布了事件。今天说说PHP中事件驱动架构的实现。事件总线的核心是管理事件的发布和订阅。phpclass Event{public function __construct(public readonly string $name,public readonly mixed $data null,public readonly array $metadata [],public bool $propagationStopped false) {}public function stopPropagation(): void{$this-propagationStopped true;}}interface EventListener{public function handle(Event $event): void;}class EventBus{private array $listeners [];private array $sorted [];private array $middlewares [];public function addListener(string $eventName, callable $listener, int $priority 0): void{$this-listeners[$eventName][$priority][] $listener;unset($this-sorted[$eventName]);}public function addMiddleware(callable $middleware): void{$this-middlewares[] $middleware;}public function dispatch(Event $event): void{$listeners $this-getListeners($event-name);$handler function (Event $event) use ($listeners, $handler) {if (empty($listeners) || $event-propagationStopped) return;$listener array_shift($listeners);$listener($event);if (!$event-propagationStopped) {// 下一个监听器}};// 应用中间件$pipeline $handler;foreach (array_reverse($this-middlewares) as $middleware) {$next $pipeline;$pipeline function (Event $event) use ($middleware, $next) {return $middleware($event, $next);};}foreach ($listeners as $listener) {if ($event-propagationStopped) break;$listener($event);}}public function asyncDispatch(Event $event): void{// 异步派发将事件写入队列$payload serialize([name $event-name,data $event-data,metadata $event-metadata,]);$redis new Redis();$redis-connect(127.0.0.1, 6379);$redis-lPush(events, $payload);}private function getListeners(string $eventName): array{if (!isset($this-sorted[$eventName])) {$this-sortListeners($eventName);}return $this-sorted[$eventName] ?? [];}private function sortListeners(string $eventName): void{if (!isset($this-listeners[$eventName])) {$this-sorted[$eventName] [];return;}krsort($this-listeners[$eventName]);$this-sorted[$eventName] array_merge(...$this-listeners[$eventName]);}}?// 具体事件class OrderPlacedEvent extends Event{public function __construct(public readonly string $orderId,public readonly int $userId,public readonly float $amount,public readonly array $items) {parent::__construct(order.placed, [order_id $orderId,user_id $userId,amount $amount,items $items,]);}}class UserRegisteredEvent extends Event{public function __construct(public readonly int $userId,public readonly string $name,public readonly string $email) {parent::__construct(user.registered, [user_id $userId,name $name,email $email,]);}}// 事件处理器$bus new EventBus();$bus-addListener(order.placed, function (Event $event) {echo 发送订单确认邮件: {$event-data[order_id]}\n;}, 10);$bus-addListener(order.placed, function (Event $event) {echo 扣减库存\n;}, 8);$bus-addListener(order.placed, function (Event $event) {echo 通知配送系统\n;}, 5);$bus-addListener(user.registered, function (Event $event) {echo 发送欢迎邮件至: {$event-data[email]}\n;}, 10);$bus-addListener(user.registered, function (Event $event) {echo 创建用户默认配置\n;}, 5);// 触发事件echo 订单事件 \n;$bus-dispatch(new OrderPlacedEvent(ORD-001, 123, 299.00, [商品A, 商品B]));echo \n 用户事件 \n;$bus-dispatch(new UserRegisteredEvent(456, 张三, zhangsantest.com));?基于消息队列的事件驱动更适合跨进程的异步通信phpclass AsyncEventWorker{private Redis $redis;private array $handlers [];public function __construct(Redis $redis){$this-redis $redis;}public function registerHandler(string $event, callable $handler): void{$this-handlers[$event][] $handler;}public function work(): void{echo 事件消费者启动\n;while (true) {$payload $this-redis-brPop([events], 3);if ($payload null) continue;$event unserialize($payload[1]);$this-processEvent($event);}}private function processEvent(Event $event): void{$handlers $this-handlers[$event-name] ?? [];foreach ($handlers as $handler) {try {$handler($event);} catch (Exception $e) {error_log(事件处理失败: {$event-name} - {$e-getMessage()});}}}}?事件驱动架构的核心优势是解耦和扩展性。新的业务逻辑可以通过添加新的事件处理器来实现不需要修改现有代码。配合消息队列可以实现异步事件处理提高系统的响应速度。事件风暴是设计事件驱动架构的常用方法通过识别业务事件来构建系统。