如何提取和操作Airflow DAG执行日期的小时作为变量?

huangapple go评论59阅读模式
英文:

How to extract and manipulate the hour of an Airflow DAG execution date as a variable?

问题

我可以提取Airflow DAG执行日期的小时作为一个变量,并对其进行操作(比如乘以一个因子或从一个值中减去),同时确保可以使用不同的执行日期重新运行DAG。我目前编写如下代码来提取小时作为一个变量:

EXEC_HOUR = '{{ execution_date.strftime("%H") }}'

要执行数学操作,你可以使用to_int过滤器将其转换为整数:

NEW_VAR = EXEC_HOUR | to_int * -1

这将使你能够将EXEC_HOUR的值乘以-1并存储在NEW_VAR中。

英文:

How can I extract the hour of an Airflow DAG execution date as a variable and perform manipulations on it (such as multiplication by a factor or subtracting from a value) while ensuring that the DAG can be rerun with a different execution date? I currently write

EXEC_HOUR = '{{ execution_date.strftime("%H") }}'

to extract the hour as a variable, but I cannot perform any logic with it.

so for example I can not do:
NEW_VAR = EXEC_HOUR * -1

答案1

得分: 0

我找到的方法是将逻辑放在一个方法中,然后在PythonOperator中使用它,像这样:

my_logic_task = PythonOperator(
    task_id='logic_task',
    provide_context=True,
    python_callable=logic_method,
    dag=dag
)

当方法类似于:

def logic_method(**kwargs):
    execution_date = kwargs['execution_date']
    return -execution_date.hour if execution_date.hour < 12 else 24 - execution_date.hour

之后,您可以像这样创建一个变量 MY_ANSWER

MY_ANSWER = "{{ task_instance.xcom_pull(task_ids='logic_task') }}"

并将其传递给您的 KubernetesPodOperator 参数。

如果不使用 PythonOperator,参数将如下所示:

[<function logic_method at 0xffff937ee830>]
英文:

the way that i found to do it is to put the logic in a method use it in a PythonOperator like that:

    my_logic_task = PythonOperator(
        task_id=&#39;logic_task&#39;,
        provide_context=True,
        python_callable=logic_method,
        dag=dag
    )

when the method is something like that:

def logic_method(**kwargs):
    execution_date = kwargs[&#39;execution_date&#39;]
    return -execution_date.hour if execution_date.hour &lt; 12 else 24 - execution_date.hour

after that you can create a variable MY_ANSWER like that:

MY_ANSWER = &quot;{{ task_instance.xcom_pull(task_ids=&#39;logic_task&#39;) }}&quot;

and pass it in your KubernetesPodOperator arguments.

Without using PythonOperator the argument it will look like this:

[&lt;function logic_method at 0xffff937ee830&gt;]

huangapple
  • 本文由 发表于 2023年4月7日 05:05:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/75953781.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定