Airflow:如何强制重新解析DAG定义文件

huangapple go评论125阅读模式
英文:

Airflow: how to force reparse of DAG definition file

问题

  • 有没有办法在Airflow中强制重新解析DAG定义文件?所有DAG定义文件都可以吗?
  • 有没有办法检查DAG文件上次解析的时间?
英文:

I'm using airflow 2.3.4 and primarily interact with airflow through the web UI.

Two questions:

  • Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
  • Is there a way to check the last time a DAG file was parsed?

答案1

得分: 1

以下是翻译好的部分:

Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
有没有一种方式可以强制重新解析Airflow中的DAG定义文件?所有DAG定义文件怎么办?

First, you can use dag-processor command to manually parse all the files, the files in a subfolder or a specific dag file:
首先,您可以使用 dag-processor 命令手动解析所有文件,子文件夹中的文件或特定的DAG文件:

  1. $ airflow dag-processor --help
  2. 使用方法:airflow dag-processor [-h] [-D] [-p] [-l LOG_FILE] [-n NUM_RUNS]
  3. [--pid [PID]] [--stderr STDERR] [--stdout STDOUT]
  4. [-S SUBDIR] [-v]
  5. 启动独立的Dag Processor实例
  6. 可选参数:
  7. -h, --help 显示此帮助消息并退出
  8. -D, --daemon 守护进程化,而不是在前台运行
  9. -p, --do-pickle 尝试将DAG对象进行pickle以发送给工作进程,而不是让工作进程运行其代码的版本
  10. -l LOG_FILE, --log-file LOG_FILE
  11. 日志文件的位置
  12. -n NUM_RUNS, --num-runs NUM_RUNS
  13. 设置在退出之前要执行的运行数
  14. --pid [PID] PID文件位置
  15. --stderr STDERR 重定向stderr到此文件
  16. --stdout STDOUT 重定向stdout到此文件
  17. -S SUBDIR, --subdir SUBDIR
  18. 查找DAG的文件位置或目录。默认为 '[AIRFLOW_HOME]/dags',其中[AIRFLOW_HOME]是您在 'airflow.cfg' 中设置的 'AIRFLOW_HOME' 配置的值
  19. -v, --verbose 使日志输出更详细

By default, DagFileProcessor will skip parsing files when they are parsed since less than AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL, so you can set it to 0 to force the file processing.
默认情况下,当文件的解析次数小于 AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL 时,DagFileProcessor将跳过解析文件,因此您可以将其设置为0以强制进行文件处理。

To parse all the dags:
要解析所有的DAG:

  1. AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1

And to parse a specific dag file:
要解析特定的DAG文件:

  1. AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1 -S /path/to/dag/file

You can also do that with python, by loading the DagBag and call the sync_to_db method.
您还可以使用Python来执行此操作,方法是加载DagBag并调用 sync_to_db 方法。

Is there a way to check the last time a DAG file was parsed?
有没有一种方法可以检查DAG文件上次解析的时间?

This time is available on wbeserver in code endpoint, and you can use dags details command to get the information:
此时间在 code 终端上可用,您可以使用 dags details 命令获取信息:

  1. airflow dags details <your dag id> -o json | jq '.[0].last_parsed_time'
英文:

> Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?

First, you can use dag-processor command to manually parse all the files, the files in a subfolder or a specific dag file:

  1. $ airflow dag-processor --help
  2. usage: airflow dag-processor [-h] [-D] [-p] [-l LOG_FILE] [-n NUM_RUNS]
  3. [--pid [PID]] [--stderr STDERR] [--stdout STDOUT]
  4. [-S SUBDIR] [-v]
  5. Start a standalone Dag Processor instance
  6. optional arguments:
  7. -h, --help show this help message and exit
  8. -D, --daemon Daemonize instead of running in the foreground
  9. -p, --do-pickle Attempt to pickle the DAG object to send over to the workers, instead of letting workers run their version of the code
  10. -l LOG_FILE, --log-file LOG_FILE
  11. Location of the log file
  12. -n NUM_RUNS, --num-runs NUM_RUNS
  13. Set the number of runs to execute before exiting
  14. --pid [PID] PID file location
  15. --stderr STDERR Redirect stderr to this file
  16. --stdout STDOUT Redirect stdout to this file
  17. -S SUBDIR, --subdir SUBDIR
  18. File location or directory from which to look for the dag. Defaults to &#39;[AIRFLOW_HOME]/dags&#39; where [AIRFLOW_HOME] is the value you set for &#39;AIRFLOW_HOME&#39; config you set in &#39;airflow.cfg&#39;
  19. -v, --verbose Make logging output more verbose

This command needs activating standalone dag processor by setting AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR to True.

By default, DagFileProcessor will skip parsing files when they are parsed since less than AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL, so you can set it to 0 to force the file processing.

To parse all the dags:

  1. AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1

And to parse a specific dag file:

  1. AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1 -S /path/to/dag/file

You can also do that with python, by loading the DagBag and call the sync_to_db method.

> Is there a way to check the last time a DAG file was parsed?

This time is available on wbeserver in code endpoint, and you can use dags details command to get the information:

  1. airflow dags details &lt;your dag id&gt; -o json | jq &#39;.[0].last_parsed_time&#39;

huangapple
  • 本文由 发表于 2023年7月11日 03:42:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76656846.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定