Airflow:如何强制重新解析DAG定义文件

huangapple go评论94阅读模式
英文:

Airflow: how to force reparse of DAG definition file

问题

  • 有没有办法在Airflow中强制重新解析DAG定义文件?所有DAG定义文件都可以吗?
  • 有没有办法检查DAG文件上次解析的时间?
英文:

I'm using airflow 2.3.4 and primarily interact with airflow through the web UI.

Two questions:

  • Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
  • Is there a way to check the last time a DAG file was parsed?

答案1

得分: 1

以下是翻译好的部分:

Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
有没有一种方式可以强制重新解析Airflow中的DAG定义文件?所有DAG定义文件怎么办?

First, you can use dag-processor command to manually parse all the files, the files in a subfolder or a specific dag file:
首先,您可以使用 dag-processor 命令手动解析所有文件,子文件夹中的文件或特定的DAG文件:

$ airflow dag-processor --help
使用方法:airflow dag-processor [-h] [-D] [-p] [-l LOG_FILE] [-n NUM_RUNS]
                             [--pid [PID]] [--stderr STDERR] [--stdout STDOUT]
                             [-S SUBDIR] [-v]

启动独立的Dag Processor实例

可选参数:
  -h, --help            显示此帮助消息并退出
  -D, --daemon          守护进程化,而不是在前台运行
  -p, --do-pickle       尝试将DAG对象进行pickle以发送给工作进程,而不是让工作进程运行其代码的版本
  -l LOG_FILE, --log-file LOG_FILE
                        日志文件的位置
  -n NUM_RUNS, --num-runs NUM_RUNS
                        设置在退出之前要执行的运行数
  --pid [PID]           PID文件位置
  --stderr STDERR       重定向stderr到此文件
  --stdout STDOUT       重定向stdout到此文件
  -S SUBDIR, --subdir SUBDIR
                        查找DAG的文件位置或目录。默认为 '[AIRFLOW_HOME]/dags',其中[AIRFLOW_HOME]是您在 'airflow.cfg' 中设置的 'AIRFLOW_HOME' 配置的值
  -v, --verbose         使日志输出更详细

By default, DagFileProcessor will skip parsing files when they are parsed since less than AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL, so you can set it to 0 to force the file processing.
默认情况下,当文件的解析次数小于 AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL 时,DagFileProcessor将跳过解析文件,因此您可以将其设置为0以强制进行文件处理。

To parse all the dags:
要解析所有的DAG:

AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1

And to parse a specific dag file:
要解析特定的DAG文件:

AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1 -S /path/to/dag/file

You can also do that with python, by loading the DagBag and call the sync_to_db method.
您还可以使用Python来执行此操作,方法是加载DagBag并调用 sync_to_db 方法。

Is there a way to check the last time a DAG file was parsed?
有没有一种方法可以检查DAG文件上次解析的时间?

This time is available on wbeserver in code endpoint, and you can use dags details command to get the information:
此时间在 code 终端上可用,您可以使用 dags details 命令获取信息:

airflow dags details <your dag id> -o json | jq '.[0].last_parsed_time'
英文:

> Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?

First, you can use dag-processor command to manually parse all the files, the files in a subfolder or a specific dag file:

$ airflow dag-processor --help
usage: airflow dag-processor [-h] [-D] [-p] [-l LOG_FILE] [-n NUM_RUNS]
                             [--pid [PID]] [--stderr STDERR] [--stdout STDOUT]
                             [-S SUBDIR] [-v]

Start a standalone Dag Processor instance

optional arguments:
  -h, --help            show this help message and exit
  -D, --daemon          Daemonize instead of running in the foreground
  -p, --do-pickle       Attempt to pickle the DAG object to send over to the workers, instead of letting workers run their version of the code
  -l LOG_FILE, --log-file LOG_FILE
                        Location of the log file
  -n NUM_RUNS, --num-runs NUM_RUNS
                        Set the number of runs to execute before exiting
  --pid [PID]           PID file location
  --stderr STDERR       Redirect stderr to this file
  --stdout STDOUT       Redirect stdout to this file
  -S SUBDIR, --subdir SUBDIR
                        File location or directory from which to look for the dag. Defaults to &#39;[AIRFLOW_HOME]/dags&#39; where [AIRFLOW_HOME] is the value you set for &#39;AIRFLOW_HOME&#39; config you set in &#39;airflow.cfg&#39; 
  -v, --verbose         Make logging output more verbose

This command needs activating standalone dag processor by setting AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR to True.

By default, DagFileProcessor will skip parsing files when they are parsed since less than AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL, so you can set it to 0 to force the file processing.

To parse all the dags:

AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1

And to parse a specific dag file:

AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1 -S /path/to/dag/file

You can also do that with python, by loading the DagBag and call the sync_to_db method.

> Is there a way to check the last time a DAG file was parsed?

This time is available on wbeserver in code endpoint, and you can use dags details command to get the information:

airflow dags details &lt;your dag id&gt; -o json | jq &#39;.[0].last_parsed_time&#39;

huangapple
  • 本文由 发表于 2023年7月11日 03:42:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/76656846.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定