Airflow

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

Airflow is used to process data, but has the opinion that tasks should ideally be idempotent (i.e. results of the task will be the same, and will not create duplicated data in a destination system), and should not pass large quantities of data from one task to the next (though tasks can pass metadata using Airflow's Xcom feature).

Airflow is not a streaming solution, but it is often used to process real-time data, pulling data off streams in batches.

安装

单机试用

试用环境配置:使用SQLite+SequentialExecutor1

airflow standalone
# airflow db init
# airflow users create \
#     --username admin \
#     --firstname Peter \
#     --lastname Parker \
#     --role Admin \
#     --email spiderman@superhero.org
# airflow webserver --port 8080
# airflow scheduler

部署Airflow

使用Docker部署

从官方获取示例docker-compose.yaml文件2,并根据生产环境进行定制。

VERSION="2.2.5"
curl -LfO "https://airflow.apache.org/docs/apache-airflow/$VERSION/docker-compose.yaml"
  • airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
  • airflow-webserver - The webserver is available at http://localhost:8080.
  • airflow-worker - The worker that executes the tasks given by the scheduler.
  • airflow-init - The initialization service(执行后正常退出).
  • flower - The flower app for monitoring the environment. It is available at http://localhost:5555.
  • postgres - The database.
  • redis - The redis - broker that forwards messages from scheduler to worker.

.env文件设置要使用的Airflow版本(默认为docker-compose.yaml文件中指定的默认版本)以及用户名和密码。

COMPOSE_PROJECT_NAME=airflow2
# AIRFLOW_IMAGE_NAME="apache/airflow:2.2.5"
_AIRFLOW_WWW_USER_USERNAME="gary"
_AIRFLOW_WWW_USER_PASSWORD="gang2019"
AIRFLOW_UID=50000  # run service as the user with UID inside the containers

如果需要映射主机目录到容器,则设置AIRFLOW_UID=$(id -u),防止在相关数据目录下以root账户创建文件。如果使用数据卷则使用默认值即可。

此外,配置额外插件及插件的配置参数,插件将在初始化阶段安装。不推荐在生产环境中使用这种方式安装插件,而是应该构建自定义Docker镜像

_PIP_ADDITIONAL_REQUIREMENTS="airflow-code-editor black"
AIRFLOW__CODE_EDITOR__ROOT_DIRECTORY="/opt/airflow/dags"
AIRFLOW__CODE_EDITOR__LINE_LENGTH=90
AIRFLOW__CODE_EDITOR__STRING_NORMALIZATION=False
AIRFLOW__CODE_EDITOR__GIT_ENABLED=False

执行数据库初始化并创建首个用户(初始化成功返回状态为0)。

docker-compose up airflow-init
docker-compose down --volumes --remove-orphans  # 如果出现错误,清理环境,检查配置文件重新执行初始化

启动Airflow服务集群(docker-compose up -d)并检查服务状态(docker-compose ps)。

数据存储

在上述集群配置文件中,所有相关服务共享同一个Airflow的工作目录(包括dagslogsplugins)。

自定义镜像

https://airflow.apache.org/docs/docker-stack/build.html

安装插件

插件以Python模块的形式提供。

  • Docker:在_PIP_ADDITIONAL_REQUIREMENTS中指定附加插件,或构建自定义镜像时安装插件。

安装Providers

http://airflow.apache.org/docs/apache-airflow-providers/index.html

eco-system

andreax79/airflow-code-editor: A plugin for Apache Airflow that allows you to edit DAGs in browser (github.com)

Ecosystem | Apache Airflow

Elyra Documentation — Elyra 3.2.2 documentation

Airflow: Creating a DAG in airflow via UI - Stack Overflow

配置

export AIRFLOW_HOME=/opt/airflow/  # 默认工作目录: 包含dags,logs等子目录

$AIRFLOW_HOME/airflow.cfg:配置文件。

概念

DAG

作业设计说明(有向无环图)。有一个或多个任务构成。

airflow dags list
airflow dags list-jobs   # 列出作业
airflow dags list-runs
airflow dags test dag_id run_date

默认路径位于$AIRFLOW_HOME/dags,在该目录下创建的作业会被airflow扫描并自动加载(有一定的时延)。

task

[Tasks — Airflow Documentation (apache.org)](https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html?highlight=create task)

任务。

airflow run dag_id task_id date
jobs

正在运行的作业。

airflow jobs check

使用Airflow

CLI

进入任意服务节点的终端,执行airflow info可查看Airflow的配置以及系统环境信息。

airflow [-h] GROUP_OR_COMMAND ...

positional arguments:
  GROUP_OR_COMMAND

    Groups:
      celery         Celery components
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      jobs           Manage jobs
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      standalone     Run an all-in-one copy of Airflow
      sync-perm      Update permissions for existing roles and optionally DAGs
      triggerer      Start a triggerer instance
      version        Show the version
      webserver      Start a Airflow webserver instance
usage: airflow dags [-h] COMMAND ...

Manage DAGs

positional arguments:
  COMMAND
    backfill      Run subsections of a DAG for a specified date range
    delete        Delete all DB records related to the specified DAG
    list          List all the DAGs
    list-jobs     List the jobs
    list-runs     List DAG runs given a DAG id
    next-execution
                  Get the next execution datetimes of a DAG
    pause         Pause a DAG
    report        Show DagBag loading report
    show          Displays DAG's tasks with their dependencies
    state         Get the status of a dag run
    test          Execute one single DagRun
    trigger       Trigger a DAG run
    unpause       Resume a paused DAG

dags

tasks

airflow tasks test $DAG_ID $TASK_ID $DATE # 执行任务

使用Web界面

访问airflow-web服务开启的HTTP端口(默认为http://localhost:8080,默认用户名和密码为预先使用环境变量配置的)。

使用REST接口

使用REST API访问Web服务。

ENDPOINT_URL="http://localhost:8080/"
curl -X GET --user "USER:PASSWD" "${ENDPOINT_URL}/api/v1/pools"

Airflow提供了封装的Python API客户端访问Web服务(apache/airflow-client-python: Apache Airflow - OpenApi Client for Python (github.com)

任务编排

为任务添加标签方便查询相关任务。

with DAG(dag_id="first_airflow_dag", 
          schedule_interval='* * * * *', # crontab语法或@once/hourly/daily/weekly/monthly/yearly
          start_date=datetime(year=2022, month=2, day=1),
          tags = ["team1", "sql"]) as dag:
  task_get_datetime = BashOperator(
    task_id='get_datetime',
    bash_command='date'
  )
  task_get_datetime >> task_process_datetime >> task_save_datetime

任务保存在工作目录的dags/目录下,可从Web界面访问并执行/暂停任务(可在DAG列表或DAG页面执行)。

image-20220406112048869

任务执行的返回值由Airflow内部保存,可通过xcoms接口获取。

使用>>set_downstream()连接任务,保证任务按正确顺序执行。

任务执行流程

任务树和对应的执行状态

Airflow将动态检测任务编排文件,并应用最新的任务调度配置。

Tutorial — Airflow Documentation (apache.org)

How-to Guides

Best Practices — Airflow Documentation (apache.org)

内置任务模块

BashOperator(task_id, bash_command)

PythonOperator(task_id, python_callable=pyfunc)

系统内置变量

访问Airflow网页,进入Admin-Variables创建变量(Key-Value-Description)。变量可在任务编排代码中引用。

var_name = Variable.get('var_name')

问题

Airflow not loading dags in /usr/local/airflow/dags - Stack Overflow

python - DAG not visible in Web-UI - Stack Overflow

任务编排工具对比

最好的任务编排工具:Airflow vs Luigi vs Argo vs MLFlow - 数据黑客的文章 - 知乎 https://zhuanlan.zhihu.com/p/321231718

Getting Started — Luigi 2.8.13 documentation

参考文献