英文:
How to extract and manipulate the hour of an Airflow DAG execution date as a variable?
问题
我可以提取Airflow DAG执行日期的小时作为一个变量,并对其进行操作(比如乘以一个因子或从一个值中减去),同时确保可以使用不同的执行日期重新运行DAG。我目前编写如下代码来提取小时作为一个变量:
EXEC_HOUR = '{{ execution_date.strftime("%H") }}'
要执行数学操作,你可以使用to_int
过滤器将其转换为整数:
NEW_VAR = EXEC_HOUR | to_int * -1
这将使你能够将EXEC_HOUR
的值乘以-1并存储在NEW_VAR
中。
英文:
How can I extract the hour of an Airflow DAG execution date as a variable and perform manipulations on it (such as multiplication by a factor or subtracting from a value) while ensuring that the DAG can be rerun with a different execution date? I currently write
EXEC_HOUR = '{{ execution_date.strftime("%H") }}'
to extract the hour as a variable, but I cannot perform any logic with it.
so for example I can not do:
NEW_VAR = EXEC_HOUR * -1
答案1
得分: 0
我找到的方法是将逻辑放在一个方法中,然后在PythonOperator中使用它,像这样:
my_logic_task = PythonOperator(
task_id='logic_task',
provide_context=True,
python_callable=logic_method,
dag=dag
)
当方法类似于:
def logic_method(**kwargs):
execution_date = kwargs['execution_date']
return -execution_date.hour if execution_date.hour < 12 else 24 - execution_date.hour
之后,您可以像这样创建一个变量 MY_ANSWER
:
MY_ANSWER = "{{ task_instance.xcom_pull(task_ids='logic_task') }}"
并将其传递给您的 KubernetesPodOperator 参数。
如果不使用 PythonOperator,参数将如下所示:
[<function logic_method at 0xffff937ee830>]
英文:
the way that i found to do it is to put the logic in a method use it in a PythonOperator like that:
my_logic_task = PythonOperator(
task_id='logic_task',
provide_context=True,
python_callable=logic_method,
dag=dag
)
when the method is something like that:
def logic_method(**kwargs):
execution_date = kwargs['execution_date']
return -execution_date.hour if execution_date.hour < 12 else 24 - execution_date.hour
after that you can create a variable MY_ANSWER
like that:
MY_ANSWER = "{{ task_instance.xcom_pull(task_ids='logic_task') }}"
and pass it in your KubernetesPodOperator arguments.
Without using PythonOperator the argument it will look like this:
[<function logic_method at 0xffff937ee830>]
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论