Hyperf对接报表 当帆布报表的生成任务出现超时或失败时,如何借助 HyperF 的异步队列和任务重试机制设计一套可靠的报表任务调度系统?请说明死信队列的处理策略。
HyperF 报表任务调度 死信队列系统 选型 hyperf/async-queue hyperf/retry Redis Stream 死信队列 钉钉告警 --- 架构总览 提交任务 └─ AsyncQueue(default)# 主队列└─ ReportJob# 执行报表生成├─ 成功 → 标记完成 ├─ 超时 → hyperf/retry 自动重试(指数退避)└─ 失败3次 → 死信队列(failed)└─ DeadLetterConsumer ├─ 人工干预通知(钉钉)├─ 可手动重放 └─ 超期归档 --- 一、队列配置?php // config/autoload/async_queue.phpreturn[// 主队列default[driverHyperf\AsyncQueue\Driver\RedisDriver::class,channelreport:queue,timeout300, // 单任务最长执行5分钟retry_seconds[10,30,60], // 三次重试间隔指数退避handle_timeout360,processes4,concurrent[limit8],], // 死信队列failed[driverHyperf\AsyncQueue\Driver\RedisDriver::class,channelreport:failed,timeout60,retry_seconds[], // 死信不自动重试handle_timeout120,processes1,concurrent[limit2],],];--- 二、报表 Job — 超时 重试?php // app/Job/ReportJob.php namespace App\Job;use App\Export\ReportExporter;use App\Service\ReportTaskService;use Hyperf\AsyncQueue\Job;use Hyperf\Retry\Annotation\Retry;class ReportJob extends Job{public int$maxAttempts3;// 最多执行3次 publicfunction__construct(privatereadonlystring$taskId, privatereadonlyarray$params, public int$attempt0,){}// hyperf/retry 注解指数退避仅重试可恢复异常#[Retry(maxAttempts:3, base:10, strategy:\Hyperf\Retry\Policy\ExponentialBackoffRetryPolicy::class, retryThrowables:[\RuntimeException::class,\PDOException::class], ignoreThrowables:[\InvalidArgumentException::class],)]publicfunctionhandle(): void{$svcmake(ReportTaskService::class);$svc-markRunning($this-taskId, $this-attempt);try{// 协程超时保护\Hyperf\Coroutine\Coroutine::create(function(){\Swoole\Coroutine::defer(fn()null);});$path(new ReportExporter($this-params))-run();$svc-markDone($this-taskId,$path);}catch(\Throwable$e){$svc-markFailed($this-taskId,$e-getMessage(),$this-attempt);throw$e;// 抛出让队列驱动计入失败次数}}}--- 三、任务状态服务?php // app/Service/ReportTaskService.php namespace App\Service;use App\Job\DeadLetterJob;use Hyperf\AsyncQueue\Driver\DriverFactory;use Hyperf\DbConnection\Db;class ReportTaskService{publicfunction__construct(privatereadonlyDriverFactory$queue){}publicfunctionsubmit(array$params): string{$iduniqid(rpt_,true);Db::table(report_tasks)-insert([id$id,statuspending,paramsjson_encode($params),max_retry3,attempt0,created_attime(),]);$this-queue-get(default)-push(new\App\Job\ReportJob($id,$params));return$id;}publicfunctionmarkRunning(string$id, int$attempt): void{Db::table(report_tasks)-where(id,$id)-update([statusrunning,attempt$attempt,started_attime(),]);}publicfunctionmarkDone(string$id, string$path): void{Db::table(report_tasks)-where(id,$id)-update([statusdone,output_path$path,finished_attime(),]);}publicfunctionmarkFailed(string$id, string$error, int$attempt): void{$taskDb::table(report_tasks)-where(id,$id)-first();if($attempt$task-max_retry){// 超过重试上限 → 投入死信队列 Db::table(report_tasks)-where(id,$id)-update([statusdead,last_error$error,finished_attime(),]);$this-queue-get(failed)-push(new DeadLetterJob($id, json_decode($task-params,true),$error));return;}Db::table(report_tasks)-where(id,$id)-update([statusretrying,last_error$error,attempt$attempt,]);}// 手动重放死信任务 publicfunctionreplay(string$taskId): void{$taskDb::table(report_tasks)-where(id,$taskId)-firstOrFail();Db::table(report_tasks)-where(id,$taskId)-update([statuspending,attempt0,]);$this-queue-get(default)-push(new\App\Job\ReportJob($taskId, json_decode($task-params,true)));}}--- 四、死信 Job — 告警 归档?php // app/Job/DeadLetterJob.php namespace App\Job;use Hyperf\AsyncQueue\Job;use Hyperf\DbConnection\Db;class DeadLetterJob extends Job{public int$maxAttempts1;// 死信不重试 publicfunction__construct(privatereadonlystring$taskId, privatereadonlyarray$params, privatereadonlystring$error,){}publicfunctionhandle(): void{//1. 持久化死信记录 Db::table(dead_letter_tasks)-insert([task_id$this-taskId,paramsjson_encode($this-params),error$this-error,created_attime(),expire_attime()86400*7, //7天后归档]);//2. 即时告警$this-alert();}privatefunctionalert(): void{$webhookenv(DINGTALK_WEBHOOK);$msg[报表死信告警]\n.任务ID: {$this-taskId}\n.错误: {$this-error}\n.时间: .date(Y-m-d H:i:s).\n.操作: POST /report/task/{$this-taskId}/replay;make(\GuzzleHttp\Client::class)-post($webhook,[json[msgtypetext,text[content$msg]],]);}}--- 五、定时扫描 — 超时任务兜底?php // app/Crontab/TimeoutScanner.php namespace App\Crontab;use App\Service\ReportTaskService;use Hyperf\Crontab\Annotation\Crontab;use Hyperf\DbConnection\Db;#[Crontab(name: TimeoutScanner, rule: * * * * *, memo: 扫描超时报表任务)]class TimeoutScanner{publicfunction__construct(privatereadonlyReportTaskService$svc){}publicfunctionexecute(): void{// running 超过6分钟视为超时$timeoutsDb::table(report_tasks)-where(status,running)-where(started_at,, time()-360)-get([id,params,attempt,max_retry]);foreach($timeoutsas$task){$this-svc-markFailed($task-id,timeout: exceeded 360s,$task-attempt);}}}--- 六、Controller — 任务管理?php // app/Controller/ReportTaskController.php namespace App\Controller;use App\Service\ReportTaskService;use Hyperf\DbConnection\Db;use Hyperf\HttpServer\Annotation\{Controller, Post, Get};use Hyperf\HttpServer\Contract\RequestInterface;#[Controller(prefix: /report/task)]class ReportTaskController{publicfunction__construct(privatereadonlyReportTaskService$svc){}#[Post(/submit)]publicfunctionsubmit(RequestInterface$req): array{return[task_id$this-svc-submit($req-all())];}#[Get(/{id}/status)]publicfunctionstatus(string$id): array{return(array)Db::table(report_tasks)-where(id,$id)-first([id,status,attempt,last_error,output_path,finished_at]);}#[Post(/{id}/replay)]publicfunctionreplay(string$id): array{$this-svc-replay($id);return[oktrue];}#[Get(/dead-letters)]publicfunctiondeadLetters(): array{returnDb::table(dead_letter_tasks)-where(expire_at,, time())-orderByDesc(created_at)-limit(50)-get()-toArray();}}--- 七、任务状态机 死信流转 submit │ ▼ pending ──── 入主队列 ────► running │ ┌──────────┴──────────┐ 成功 失败/超时 │ │ ▼ attemptmax_retrydone│ ┌────┴────┐ 是 否 │ │ retrying dead ──► 死信队列 │ │ 指数退避重新入队 告警持久化[10s, 30s, 60s]│7天后归档 可手动replay 重试退避时间attempt1→ 等待 10sattempt2→ 等待 30sattempt3→ 等待 60s → 仍失败 → dead --- 八、数据库表 CREATE TABLE report_tasks(idVARCHAR(40)PRIMARY KEY, status VARCHAR(16)NOT NULL DEFAULTpending, params JSON, attempt TINYINT NOT NULL DEFAULT0, max_retry TINYINT NOT NULL DEFAULT3, last_error TEXT, output_path VARCHAR(512), started_at INT, finished_at INT, created_at INT NOT NULL, INDEX idx_status(status, created_at));CREATE TABLE dead_letter_tasks(idINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, task_id VARCHAR(40)NOT NULL, params JSON, error TEXT, created_at INT NOT NULL, expire_at INT NOT NULL, INDEX idx_expire(expire_at));--- 九、关键设计决策 ┌───────────────┬──────────────────────┬──────────────────────────────────┐ │ 问题 │ 决策 │ 原因 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 重试间隔 │ 指数退避[10,30,60]s │ 避免瞬时故障下的惊群效应 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 超时兜底 │ 每分钟 Crontab 扫描 │ 进程崩溃时队列驱动无法感知超时 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 死信不重试 │maxAttempts1│ 死信需人工确认原因再决定是否重放 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 死信7天留存 │ expire_at 字段 │ 给运维足够时间处理到期自动清理 │ ├───────────────┼──────────────────────┼──────────────────────────────────┤ │ 状态持久化 │ MySQL │ Redis 重启丢失任务状态必须落库 │ └───────────────┴──────────────────────┴──────────────────────────────────┘