Airflow DAG 调度与依赖管理实践
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract():
print("extract")
def transform():
print("transform")
def load():
print("load")
default_args = {
"owner": "data",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_pipeline",
start_date=datetime(2025, 1, 1),
schedule_interval="0 * * * *",
catchup=False,
default_args=default_args,
) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t3 = PythonOperator(task_id="load", python_callable=load)
t1 >> t2 >> t3
调度与依赖
- 通过
schedule_interval设置定时;catchup=False关闭补跑 - 使用位移运算符
>>管理任务拓扑
总结
明确的调度与依赖配置能提升管道的稳定性与可维护性。

发表评论 取消回复