Airflow任务 XComArg 结果未找到。

huangapple go评论74阅读模式
英文:

Airflow task XComArg result not found

问题

create_release_report函数中出现了错误:

...
 File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 342, in resolve
    raise XComNotFound(ti.dag_id, task_id, self.key)
airflow.exceptions.XComNotFound: XComArg result from deviation_calculation_metrik_calls_mean_execution_time_std_dev at data_comparism_psql with key="significant_differences" is not found!

如果我在Airflow UI中检查每个步骤,都显示为绿色,此外,检查引用任务的XCom时,我可以找到significant_differences...

任何提示都会有所帮助,我尝试了几种方法,包括任务依赖性,但没有成功。

英文:

I have a DAG which is built like this:

...
default_args = {
    'owner': 'airflow',
}


@dag(default_args=default_args, start_date=datetime.datetime(2021, 1, 1), schedule_interval=None, tags=['mimir'])
def data_comparism_psql():
    @task(task_id="pull_release_data_from_blob")
....
@task(task_id="deviation_calculation_metrik_calls_mean_execution_time_std_dev")
    def compare_release_files(release_data_export: List[Dict], querytype: Querytype):
     ...
....
release_data_calls = get_files_from_blob(querytype=Querytype.CALLS)
report_calls = compare_release_files(release_data_calls, querytype=Querytype.CALLS)["significant_differences"]
report_calls_std_dev = compare_release_files(release_data_calls, querytype=Querytype.STD_DEV)["significant_differences"]
release_data_mean_exec_time = get_files_from_blob(querytype=Querytype.MEAN_EXEC_TIME)
report_mean_exec_time = compare_release_files(release_data_mean_exec_time, querytype=Querytype.MEAN_EXEC_TIME)["significant_differences"]
report_mean_exec_time_std_dev = compare_release_files(release_data_mean_exec_time, querytype=Querytype.STD_DEV)["significant_differences"] 
create_release_report([report_calls, report_mean_exec_time, report_calls_std_dev, report_mean_exec_time_std_dev], "Queries")


data_comparism_dag = data_comparism_psql()

Somehow in the create_release_report it fails:

...
 File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 342, in resolve
    raise XComNotFound(ti.dag_id, task_id, self.key)
airflow.exceptions.XComNotFound: XComArg result from deviation_calculation_metrik_calls_mean_execution_time_std_dev at data_comparism_psql with key="significant_differences" is not found!

If I check every step in the Airflow UI its green + additionally checking the XCom of the reference task, I can find significant_differences...

Any tip would be helpful, I tried several approaches also with task dependencies but it didn't work.

答案1

得分: 1

如果您需要返回返回值的每个键,您应该将compare_release_files的任务装饰器设置为multiple_outputs=True

否则,xcom ID为"return_value"及其值是一个字典。

英文:

if you need to return each key of the return value you should set the task decorator of compare_release_files with multiple_outputs=True.

otherwise the value of the xcom id "return_value" and its value its a dictionary.

huangapple
  • 本文由 发表于 2023年7月27日 21:54:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76780466.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定