大数据-252 离线数仓 - Airflow + Crontab 入门实战:定时调度、DAG 编排与常见报错排查
TL;DR场景需要从 Linux Crontab 过渡到 Airflow完成离线任务的定时调度与依赖编排。结论Crontab 适合单点定时Airflow 适合可视化、可编排、可追踪的任务流管理但示例需区分版本差异。产出给出 Crontab 基础、Airflow 核心概念、DAG 入门示例以及错误速查卡用于快速排障。Crontab简介基础介绍Linux系统是由croncrond系统服务来控制的Linux系统上原本那就有非常多的计划性工作因此这个系统服务是默认启动的。Linux系统也提供了Linux用户控制计划任务的命令crontab命令日志文件ll /var/log/cron*编辑文件: vim /etc/crontab进程: ps -ef | grep crond /etc/init.d/crond restart作用任务命令定时调度 定时备份等格式说明以上各个字段中还可以使用以下特殊字符代表所有的取值范围内的数字如月份字段为则表示1到12个月/代表每一定时间间隔的意思如分钟字段为*/10表示每10分钟执行1次-代表从某个区间范围是闭区间如2-5表示2,3,4,5小时字段中0-23/2表示在0~23点范围内每两小时执行一次分散的数字不连续如1,2,3,6,8,9由于各个地方每周的第一天不一样因此Sunday0第一天或Sunday7最后一天配置实例# 每一分钟执行一次command因cron默认每1分钟扫描一次因此全为*即可* * * * *command# 每小时的第3和第15分钟执行command3,15* * * *command# 每天上午8-11点的第3和15分钟执行command3,158-11 * * *command# 每隔2天的上午8-11点的第3和15分钟执行command3,158-11 */2 * *command# 每个星期一的上午8点到11点的第3和第15分钟执行command3,158-11 * *1command# 每晚的21:30执行command3021* * *command# 每月1、10、22日的4:45执行command4541,10,22 * *command# 每周六、周日的1 : 10执行command101* *6,0command# 每小时执行command0*/1 * * *command# 晚上11点到早上7点之间每隔一小时执行command*23-7/1 * * *command任务集成部署Airflow核心概念DAGs有向无环图Directed Acyclic Graph将所有需要运行的tasks按照依赖关系组织起来描述的是所有tasks执行的顺序OperatorsAirflow内置了很多OperatorsBashOperator 执行一个Bash命令PythonOperator 调用任意的Python函数EmailOperator 用于发送邮件HTTPOperator 用于发送HTTP请求SqlOperator 用于执行SQL命令自定义 OperatorTaskTaskTask是Operator的一个实例Task InstanceTask Instance由于Task会被重复调度每次Tasks的运行就是不同的Task InstanceTask Instance 有自己的状态包括 success、running、failed、skipped、up_for_rechedule、up_for_retry、queued、no_status等Task RelationshipsTask RelationshipsDAGs中的不同Tasks之间可以有依赖关系ExecutorExecutor在Airflow中支持的执行器就有四种SequentialExecutor单进程顺序执行任务默认执行器通常只用于测试LocalExecutor多进程本地执行任务CeleryExecutor分布式调度生产常用Celery是一个分布式调度框架其本身没有队列功能需要使用第三方组件如RabbitMQDaskExecutor动态任务调度主要用于数据分析执行器的修改修改 $AIRFLOW_HOME/airflow.cfg 中executor LocalExecutor这里关于执行器的修改修改如下所示入门案例编写脚本mkdir$AIRFLOW_HOME/dagsvim$AIRFLOW_HOME/dags/helloworld.py我们需要写入的内容如下fromdatetimeimportdatetime,timedeltafromairflowimportDAGfromairflow.utilsimportdatesfromairflow.utils.helpersimportchainfromairflow.operators.bash_operatorimportBashOperatorfromairflow.operators.python_operatorimportPythonOperator# 定义默认参数defdefault_options():default_args{owner:airflow,# 拥有者名称start_date:dates.days_ago(1),# 第一次开始执行的时间retries:1,# 失败重试次数retry_delay:timedelta(seconds5)# 失败重试间隔}returndefault_args# 定义Bash任务deftask1(dag):tpwdtaskBashOperator(task_idMyTask1,# task_idbash_commandt,# 指定要执行的命令dagdag# 指定归属的dag)returntask# Python任务函数defhello_world():current_timestr(datetime.today())print(hello world at {}.format(current_time))# 定义Python任务deftask2(dag):taskPythonOperator(task_idMyTask2,python_callablehello_world,# 指定要执行的函数dagdag)returntask# 定义另一个Bash任务deftask3(dag):tdatetaskBashOperator(task_idMyTask3,bash_commandt,dagdag)returntask# 定义DAGwithDAG(HelloWorldDag,# dag_iddefault_argsdefault_options(),# 指定默认参数schedule_interval*/2 * * * *# 执行周期每分钟2次)asd:task1task1(d)task2task2(d)task3task3(d)chain(task1,task2,task3)# 指定执行顺序写入的内容如下所示测试运行# 执行命令检查脚本是否有错误。如果命令行没有报错就表示没问题python$AIRFLOW_HOME/dags/helloworld.py执行的结果如下图所示查看生效的 dags# 查看生效的 dagsairflow dags list--subdir$AIRFLOW_HOME/dags执行结果如下图所示查看指定dag中的taskairflow tasks list HelloWorldDag执行的结果如下图所示测试dag中的taskairflow taskstestHelloWorldDag MyTask22020-08-01执行的结果如下所示错误速查症状根因定位修复ModuleNotFoundError: airflow.operators.bash_operator使用了 Airflow 1.x 导包路径但实际环境是 Airflow 2.x直接执行 DAG 文件或 airflow dags list 时出现按实际版本修改导包路径统一代码与环境版本DAG 文件无报错但 airflow dags list 看不到文件可被 Python 执行不代表被 Airflow 成功注册也可能 DAG 对象未正确生成查看 Scheduler 日志、执行 airflow dags list检查 DAG 对象是否在全局可见、导包是否兼容、Scheduler 是否正常运行schedule_interval“*/2 * * * *” 被解释错误文字描述写错把“每 2 分钟一次”写成“每分钟 2 次”对照 cron 表达式含义改正文案不改表达式仅修正文字描述* 23-7/1 * * * command 不按预期执行小时字段跨天范围写法存在歧义不同实现下可读性差手工拆分表达式验证改成两个明确规则或拆成 23,0-7 这类更直白表达修改 airflow.cfg 后执行器未生效改了配置但未重启相关进程检查 Webserver/Scheduler/Worker 进程状态修改配置后重启 Airflow 相关服务airflow tasks test 失败提示找不到 DAG 或 Taskdag_id、task_id、执行日期不匹配或 DAG 未被加载先执行 airflow dags list、airflow tasks list HelloWorldDag先确认 DAG 可见再核对 Task 名称与测试日期Bash 任务执行失败bash_command 所依赖命令不存在或环境变量不一致查看 task log使用绝对路径、补齐环境变量、先在同一运行用户下手工执行Python 任务能导入但运行失败python_callable 逻辑报错或运行环境依赖缺失查看 task log 的 traceback补齐依赖、缩小函数副作用、先本地独立验证函数任务一直排队不执行Executor 与资源配置不匹配Scheduler/Worker 未正常工作查 Airflow UI 状态、Scheduler/Worker 日志检查执行器类型、并发参数、Worker 进程与消息队列状态文章示例能跑但读者环境跑不通正文混用了不同 Linux 发行版习惯和不同 Airflow 大版本写法对比系统版本与 Airflow 版本在文首明确“系统环境 Airflow 版本”前置条件其他系列 AI篇持续更新中长期更新AI炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究持续打造实用AI工具指南AI研究-132 Java 生态前沿 2025Spring、Quarkus、GraalVM、CRaC 与云原生落地 AI模块直达链接 Java篇持续更新中长期更新Java-218 RocketMQ Java API 实战同步/异步 Producer 与 Pull/Push ConsumerMyBatis 已完结Spring 已完结Nginx已完结Tomcat已完结分布式服务已完结Dubbo已完结MySQL已完结MongoDB已完结Neo4j已完结FastDFS 已完结OSS已完结GuavaCache已完结EVCache已完结RabbitMQ已完结RocketMQ正在更新… 深入浅出助你打牢基础 Java模块直达链接 大数据板块已完成多项干货更新300篇包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件覆盖离线实时数仓全栈大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT案例 详解 大数据模块直达链接