Airflow DAG 调度与依赖管理实践from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator def extract(): print("extract") def transform(): print("transform") def load(): print("load") default_args = { "owner": "data", "retries": 2, "retry_delay": timedelta(minutes=5), } with DAG( dag_id="etl_pipeline", start_date=datetime(2025, 1, 1), schedule_interval="0 * * * *", catchup=False, default_args=default_args, ) as dag: t1 = PythonOperator(task_id="extract", python_callable=extract) t2 = PythonOperator(task_id="transform", python_callable=transform) t3 = PythonOperator(task_id="load", python_callable=load) t1 >> t2 >> t3 调度与依赖通过 `schedule_interval` 设置定时;`catchup=False` 关闭补跑使用位移运算符 `>>` 管理任务拓扑总结明确的调度与依赖配置能提升管道的稳定性与可维护性。

发表评论 取消回复