可将 airflow 的命令行包装成 REST-ful 风格 API 的插件,以扩展 Airflow 官方 API 的能力。
该插件可用于 Airflow 2.0 以上版本,且易于扩展,你可以根据需要修改代码,将任意 CLI 命令封装成 API。
当前支持使用以下命令:
airflow dags backfill
airflow tasks run
airflow tasks clear
- 通过 Pip 安装
pip install airflow-extended-api
-
重启 Airflow WebServer
-
打开 Airflow 界面中的
Docs - Extended API OpenAPI
或http://localhost:8080/
来查看 API 细节。
curl -X POST --user "airflow:airflow" https://localhost:8080/api/extended/clear -H "Content-Type: application/json" -d '{"dagName": "string","downstream": true,"endDate": "2019-08-24T14:15:22Z","jobName": "string","startDate": "2019-08-24T14:15:22Z","username": "Extended API"}'
{
"executed_command": "string",
"exit_code": 0,
"output_info": [
"string"
],
"error_info": [
"string"
]
}
请以--user "{username}:{password}"
的样式提供 airflow 账户信息,否则将鉴权失败。
curl -X POST http://127.0.0.1:8080/api/extended/clear -H "Content-Type: application/json" -d '{"dagName": "string","downstream": true,"endDate": "2019-08-24T14:15:22Z","jobName": "string","startDate": "2019-08-24T14:15:22Z","username": "Extended API"}'
{
"detail": null,
"status": 401,
"title": "Unauthorized",
"type": "https://airflow.apache.org/docs/apache-airflow/2.2.5/stable-rest-api-ref.html#section/Errors/Unauthenticated"
}
curl -X POST --user "airflow:airflow" http://127.0.0.1:8080/api/extended/clear -H "Content-Type: application/json" -d '{"dagName": "string","downstream": true,"endDate": "2019-08-24T14:15:22Z","jobName": "string","startDate": "2019-08-24T14:15:22Z","username": "Extended API"}'
{
"error_info": [
"Traceback (most recent call last):",
" File \"/home/airflow/.local/bin/airflow\", line 8, in <module>",
" sys.exit(main())",
" File \"/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py\", line 48, in main",
" args.func(args)",
" File \"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py\", line 48, in command",
" return func(*args, **kwargs)",
" File \"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py\", line 92, in wrapper",
" return f(*args, **kwargs)",
" File \"/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py\", line 506, in task_clear",
" dags = get_dags(args.subdir, args.dag_id, use_regex=args.dag_regex)",
" File \"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py\", line 203, in get_dags",
" return [get_dag(subdir, dag_id)]",
" File \"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py\", line 193, in get_dag",
" f\"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse.\"",
"airflow.exceptions.AirflowException: Dag 'string' could not be found; either it does not exist or it failed to parse.",
""
],
"executed_command": "airflow tasks clear string -e 2019-08-24T14:15:22+00:00 -s 2019-08-24T14:15:22+00:00 -t string -y -d",
"exit_code": 1,
"output_info": [
"[\u001b[34m2022-04-22 10:05:50,538\u001b[0m] {\u001b[34mdagbag.py:\u001b[0m500} INFO\u001b[0m - Filling up the DagBag from /opt/airflow/dags\u001b[0m",
""
]
}
- 支持
backfill
命令 - support custom configuration
- Airflow 配置文档
- Airflow 命令行工具
- 开发过程中参考了以下项目,在此表示感谢
- 联系邮箱 Eric Cao
[email protected]