英文:
Airflow: how to force reparse of DAG definition file
问题
- 有没有办法在Airflow中强制重新解析DAG定义文件?所有DAG定义文件都可以吗?
- 有没有办法检查DAG文件上次解析的时间?
英文:
I'm using airflow 2.3.4 and primarily interact with airflow through the web UI.
Two questions:
- Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
- Is there a way to check the last time a DAG file was parsed?
答案1
得分: 1
以下是翻译好的部分:
Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
有没有一种方式可以强制重新解析Airflow中的DAG定义文件?所有DAG定义文件怎么办?
First, you can use dag-processor
command to manually parse all the files, the files in a subfolder or a specific dag file:
首先,您可以使用 dag-processor
命令手动解析所有文件,子文件夹中的文件或特定的DAG文件:
$ airflow dag-processor --help
使用方法:airflow dag-processor [-h] [-D] [-p] [-l LOG_FILE] [-n NUM_RUNS]
[--pid [PID]] [--stderr STDERR] [--stdout STDOUT]
[-S SUBDIR] [-v]
启动独立的Dag Processor实例
可选参数:
-h, --help 显示此帮助消息并退出
-D, --daemon 守护进程化,而不是在前台运行
-p, --do-pickle 尝试将DAG对象进行pickle以发送给工作进程,而不是让工作进程运行其代码的版本
-l LOG_FILE, --log-file LOG_FILE
日志文件的位置
-n NUM_RUNS, --num-runs NUM_RUNS
设置在退出之前要执行的运行数
--pid [PID] PID文件位置
--stderr STDERR 重定向stderr到此文件
--stdout STDOUT 重定向stdout到此文件
-S SUBDIR, --subdir SUBDIR
查找DAG的文件位置或目录。默认为 '[AIRFLOW_HOME]/dags',其中[AIRFLOW_HOME]是您在 'airflow.cfg' 中设置的 'AIRFLOW_HOME' 配置的值
-v, --verbose 使日志输出更详细
By default, DagFileProcessor will skip parsing files when they are parsed since less than AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL
, so you can set it to 0 to force the file processing.
默认情况下,当文件的解析次数小于 AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL
时,DagFileProcessor将跳过解析文件,因此您可以将其设置为0以强制进行文件处理。
To parse all the dags:
要解析所有的DAG:
AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1
And to parse a specific dag file:
要解析特定的DAG文件:
AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1 -S /path/to/dag/file
You can also do that with python, by loading the DagBag and call the sync_to_db
method.
您还可以使用Python来执行此操作,方法是加载DagBag并调用 sync_to_db
方法。
Is there a way to check the last time a DAG file was parsed?
有没有一种方法可以检查DAG文件上次解析的时间?
This time is available on wbeserver in code
endpoint, and you can use dags details
command to get the information:
此时间在 code
终端上可用,您可以使用 dags details
命令获取信息:
airflow dags details <your dag id> -o json | jq '.[0].last_parsed_time'
英文:
> Is there a way to force a reparse of a DAG definition file in Airflow? How about all DAG definition files?
First, you can use dag-processor
command to manually parse all the files, the files in a subfolder or a specific dag file:
$ airflow dag-processor --help
usage: airflow dag-processor [-h] [-D] [-p] [-l LOG_FILE] [-n NUM_RUNS]
[--pid [PID]] [--stderr STDERR] [--stdout STDOUT]
[-S SUBDIR] [-v]
Start a standalone Dag Processor instance
optional arguments:
-h, --help show this help message and exit
-D, --daemon Daemonize instead of running in the foreground
-p, --do-pickle Attempt to pickle the DAG object to send over to the workers, instead of letting workers run their version of the code
-l LOG_FILE, --log-file LOG_FILE
Location of the log file
-n NUM_RUNS, --num-runs NUM_RUNS
Set the number of runs to execute before exiting
--pid [PID] PID file location
--stderr STDERR Redirect stderr to this file
--stdout STDOUT Redirect stdout to this file
-S SUBDIR, --subdir SUBDIR
File location or directory from which to look for the dag. Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg'
-v, --verbose Make logging output more verbose
This command needs activating standalone dag processor by setting AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR
to True
.
By default, DagFileProcessor will skip parsing files when they are parsed since less than AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL
, so you can set it to 0 to force the file processing.
To parse all the dags:
AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1
And to parse a specific dag file:
AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=0 airflow dag-processor -n 1 -S /path/to/dag/file
You can also do that with python, by loading the DagBag and call the sync_to_db
method.
> Is there a way to check the last time a DAG file was parsed?
This time is available on wbeserver in code
endpoint, and you can use dags details
command to get the information:
airflow dags details <your dag id> -o json | jq '.[0].last_parsed_time'
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论