Airflow
Airflow is a platform to programmatically author, schedule and monitor workflows.
Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.
Airflow is used to process data, but has the opinion that tasks should ideally be idempotent (i.e. results of the task will be the same, and will not create duplicated data in a destination system), and should not pass large quantities of data from one task to the next (though tasks can pass metadata using Airflow's Xcom feature).
Airflow is not a streaming solution, but it is often used to process real-time data, pulling data off streams in batches.
安装
单机试用
试用环境配置:使用SQLite+SequentialExecutor
1。
airflow standalone
# airflow db init
# airflow users create \
# --username admin \
# --firstname Peter \
# --lastname Parker \
# --role Admin \
# --email spiderman@superhero.org
# airflow webserver --port 8080
# airflow scheduler
部署Airflow
使用Docker部署
从官方获取示例docker-compose.yaml
文件2,并根据生产环境进行定制。
VERSION="2.2.5"
curl -LfO "https://airflow.apache.org/docs/apache-airflow/$VERSION/docker-compose.yaml"
airflow-scheduler
- The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.airflow-webserver
- The webserver is available athttp://localhost:8080
.airflow-worker
- The worker that executes the tasks given by the scheduler.airflow-init
- The initialization service(执行后正常退出).flower
- The flower app for monitoring the environment. It is available athttp://localhost:5555
.postgres
- The database.redis
- The redis - broker that forwards messages from scheduler to worker.
在.env
文件设置要使用的Airflow版本(默认为docker-compose.yaml
文件中指定的默认版本)以及用户名和密码。
COMPOSE_PROJECT_NAME=airflow2
# AIRFLOW_IMAGE_NAME="apache/airflow:2.2.5"
_AIRFLOW_WWW_USER_USERNAME="gary"
_AIRFLOW_WWW_USER_PASSWORD="gang2019"
AIRFLOW_UID=50000 # run service as the user with UID inside the containers
如果需要映射主机目录到容器,则设置
AIRFLOW_UID=$(id -u)
,防止在相关数据目录下以root
账户创建文件。如果使用数据卷则使用默认值即可。
此外,配置额外插件及插件的配置参数,插件将在初始化阶段安装。不推荐在生产环境中使用这种方式安装插件,而是应该构建自定义Docker镜像。
_PIP_ADDITIONAL_REQUIREMENTS="airflow-code-editor black"
AIRFLOW__CODE_EDITOR__ROOT_DIRECTORY="/opt/airflow/dags"
AIRFLOW__CODE_EDITOR__LINE_LENGTH=90
AIRFLOW__CODE_EDITOR__STRING_NORMALIZATION=False
AIRFLOW__CODE_EDITOR__GIT_ENABLED=False
执行数据库初始化并创建首个用户(初始化成功返回状态为0
)。
docker-compose up airflow-init
docker-compose down --volumes --remove-orphans # 如果出现错误,清理环境,检查配置文件重新执行初始化
启动Airflow服务集群(docker-compose up -d
)并检查服务状态(docker-compose ps
)。
数据存储
在上述集群配置文件中,所有相关服务共享同一个Airflow的工作目录(包括dags
,logs
和plugins
)。
自定义镜像
https://airflow.apache.org/docs/docker-stack/build.html
安装插件
插件以Python模块的形式提供。
- Docker:在
_PIP_ADDITIONAL_REQUIREMENTS
中指定附加插件,或构建自定义镜像时安装插件。
安装Providers
http://airflow.apache.org/docs/apache-airflow-providers/index.html
eco-system
Elyra Documentation — Elyra 3.2.2 documentation
Airflow: Creating a DAG in airflow via UI - Stack Overflow
配置
export AIRFLOW_HOME=/opt/airflow/ # 默认工作目录: 包含dags,logs等子目录
$AIRFLOW_HOME/airflow.cfg
:配置文件。
概念
DAG
作业设计说明(有向无环图)。有一个或多个任务构成。
airflow dags list
airflow dags list-jobs # 列出作业
airflow dags list-runs
airflow dags test dag_id run_date
默认路径位于
$AIRFLOW_HOME/dags
,在该目录下创建的作业会被airflow扫描并自动加载(有一定的时延)。
task
[Tasks — Airflow Documentation (apache.org)](https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html?highlight=create task)
任务。
airflow run dag_id task_id date
jobs
正在运行的作业。
airflow jobs check
使用Airflow
CLI
进入任意服务节点的终端,执行airflow info
可查看Airflow的配置以及系统环境信息。
airflow [-h] GROUP_OR_COMMAND ...
positional arguments:
GROUP_OR_COMMAND
Groups:
celery Celery components
config View configuration
connections Manage connections
dags Manage DAGs
db Database operations
jobs Manage jobs
kubernetes Tools to help run the KubernetesExecutor
pools Manage pools
providers Display providers
roles Manage roles
tasks Manage tasks
users Manage users
variables Manage variables
Commands:
cheat-sheet Display cheat sheet
info Show information about current Airflow and environment
kerberos Start a kerberos ticket renewer
plugins Dump information about loaded plugins
rotate-fernet-key
Rotate encrypted connection credentials and variables
scheduler Start a scheduler instance
standalone Run an all-in-one copy of Airflow
sync-perm Update permissions for existing roles and optionally DAGs
triggerer Start a triggerer instance
version Show the version
webserver Start a Airflow webserver instance
usage: airflow dags [-h] COMMAND ...
Manage DAGs
positional arguments:
COMMAND
backfill Run subsections of a DAG for a specified date range
delete Delete all DB records related to the specified DAG
list List all the DAGs
list-jobs List the jobs
list-runs List DAG runs given a DAG id
next-execution
Get the next execution datetimes of a DAG
pause Pause a DAG
report Show DagBag loading report
show Displays DAG's tasks with their dependencies
state Get the status of a dag run
test Execute one single DagRun
trigger Trigger a DAG run
unpause Resume a paused DAG
dags
tasks
airflow tasks test $DAG_ID $TASK_ID $DATE # 执行任务
使用Web界面
访问airflow-web
服务开启的HTTP端口(默认为http://localhost:8080,默认用户名和密码为预先使用环境变量配置的)。
使用REST接口
使用REST API访问Web服务。
ENDPOINT_URL="http://localhost:8080/"
curl -X GET --user "USER:PASSWD" "${ENDPOINT_URL}/api/v1/pools"
Airflow提供了封装的Python API客户端访问Web服务(apache/airflow-client-python: Apache Airflow - OpenApi Client for Python (github.com))
任务编排
为任务添加标签方便查询相关任务。
with DAG(dag_id="first_airflow_dag",
schedule_interval='* * * * *', # crontab语法或@once/hourly/daily/weekly/monthly/yearly
start_date=datetime(year=2022, month=2, day=1),
tags = ["team1", "sql"]) as dag:
task_get_datetime = BashOperator(
task_id='get_datetime',
bash_command='date'
)
task_get_datetime >> task_process_datetime >> task_save_datetime
任务保存在工作目录的dags/
目录下,可从Web界面访问并执行/暂停任务(可在DAG列表或DAG页面执行)。
任务执行的返回值由Airflow内部保存,可通过xcoms
接口获取。
使用>>
或set_downstream()
连接任务,保证任务按正确顺序执行。

Airflow将动态检测任务编排文件,并应用最新的任务调度配置。
Tutorial — Airflow Documentation (apache.org)
Best Practices — Airflow Documentation (apache.org)
内置任务模块
BashOperator(task_id, bash_command)
PythonOperator(task_id, python_callable=pyfunc)
系统内置变量
访问Airflow网页,进入Admin-Variables
创建变量(Key-Value-Description
)。变量可在任务编排代码中引用。
var_name = Variable.get('var_name')
问题
Airflow not loading dags in /usr/local/airflow/dags - Stack Overflow
python - DAG not visible in Web-UI - Stack Overflow
任务编排工具对比
最好的任务编排工具:Airflow vs Luigi vs Argo vs MLFlow - 数据黑客的文章 - 知乎 https://zhuanlan.zhihu.com/p/321231718
Getting Started — Luigi 2.8.13 documentation