Airflow 被 Airbnb 内部用来创建、监控和调整数据管道。任何工作流都可以在这个使用 Python 编写的平台上运行(目前加入 Apache 基金会孵化器)。
Airflow 允许工作流开发人员轻松创建、维护和周期性地调度运行工作流(即有向无环图或成为DAGs)的工具。在Airbnb中,这些工作流包括了如数据存储、增长分析、Email发送、A/B测试等等这些跨越多部门的用例。这个平台拥有和 Hive、Presto、MySQL、HDFS、Postgres和S3交互的能力,并且提供了钩子使得系统拥有很好地扩展性。除了一个命令行界面,该工具还提供了一个 基于Web的用户界面让您可以可视化管道的依赖关系、监控进度、触发任务等。
Airflow 包含如下组件:
一个元数据库(MySQL或Postgres)
一组Airflow工作节点
一个调节器(Redis或RabbitMQ)
一个Airflow Web服务器
截图:
管道定义示例:
""" Code that goes along with the Airflow tutorial located at: https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1)