Airflow AttributeError: ‘coroutine’ object has no attribute ‘update_relative’

huangapple go评论107阅读模式
英文:

Airflow AttributeError: 'coroutine' object has no attribute 'update_relative'

问题

以下是您的代码的翻译部分:

  1. 我有这样的DAG
  2. 从日期时间导入时间间隔
  3. 导入摆钟
  4. 从空气流装饰器导入DAG
  5. 从阶段导入stage_data
  6. table_async_pg导入main
  7. @dag(
  8. dag_id = "data-sync",
  9. schedule_interval = "1 * * * *",
  10. start_date=pendulum.datetime(2023, 3, 8, tz="Asia/Hong_Kong"),
  11. catchup=False,
  12. dagrun_timeout=timedelta(minutes=20),
  13. )
  14. def Pipeline():
  15. a = stage_data()
  16. b = main()
  17. a >> b
  18. pipeline = Pipeline()

当我不包括b=main()a >> b时,一切都正常工作。如果我包括它们,我会得到以下错误:

  1. /opt/airflow/dags/airflow_sched.py | Traceback (most recent call last):
  2. | File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
  3. | self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  4. | File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
  5. | task_object.update_relative(self, not upstream)
  6. | AttributeError: 'coroutine'对象没有属性'update_relative'
英文:

I have my dag as such:

  1. from datetime import timedelta
  2. import pendulum
  3. from airflow.decorators import dag
  4. from stage import stage_data
  5. from table_async_pg import main
  6. @dag(
  7. dag_id = "data-sync",
  8. schedule_interval = "1 * * * *",
  9. start_date=pendulum.datetime(2023, 3, 8, tz="Asia/Hong_Kong"),
  10. catchup=False,
  11. dagrun_timeout=timedelta(minutes=20),
  12. )
  13. def Pipeline():
  14. a = stage_data()
  15. b = main()
  16. a >> b
  17. pipeline = Pipeline()

When I don't include b=main() and a>>b everything works fine. If I include then I get this error:

  1. /opt/airflow/dags/airflow_sched.py | Traceback (most recent call last):
  2. | File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
  3. | self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  4. | File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
  5. | task_object.update_relative(self, not upstream)
  6. | AttributeError: 'coroutine' object has no attribute 'update_relative'

答案1

得分: 1

the main() function you're calling in your DAG is a asyncio coroutine, which doesn't have the update_relative attribute that Airflow expects when setting task dependencies using the >> operator.

for fix this problem:

modify the main() function to return a non-coroutine function or use the >> operator with a non-coroutine function instead

  1. def Pipeline():
  2. async def run_main():
  3. await main()
  4. a = stage_data()
  5. b = PythonOperator(
  6. task_id="run_main",
  7. python_callable=run_main,
  8. )
  9. a >> b
  10. pipeline = Pipeline()

like this [How to provide an async function in PythonOperator's python_callable in Airflow?](https://stackoverflow.com/questions/57423101/how-to-provide-an async-function-in-pythonoperators-python-callable-in-airflow)

英文:

the main() function you're calling in your DAG is a asyncio coroutine, which doesn't have the update_relative attribute that Airflow expects when setting task dependencies using the >> operator.

for fix this problem:

modify the main() function to return a non-coroutine function or use the >> operator with a non-coroutine function instead

  1. def Pipeline():
  2. async def run_main():
  3. await main()
  4. a = stage_data()
  5. b = PythonOperator(
  6. task_id="run_main",
  7. python_callable=run_main,
  8. )
  9. a >> b
  10. pipeline = Pipeline()

like this How to provide an async function in PythonOperator's python_callable in Airflow?

答案2

得分: 1

你正在将 table_async_pg.main 用作Airflow任务,但它不是Airflow的 BaseOperator 的子类。尽管你可以为 b 分配任何值,但一旦你尝试通过设置依赖关系 (a >> b) 来将 b 注册为Airflow任务,就会失败。Airflow将尝试通过调用方法 update_relative 来注册依赖关系,但它在 stage_data 和/或 main 上不存在,因为它们不是BaseOperator的子类,从而导致错误。

英文:

You're using table_async_pg.main as if it's an Airflow task, but it's not a subclass of Airflow's BaseOperator.

While you can assign any value to b, this will fail once you try to register b as an Airflow task by setting dependencies (a >> b). Airflow will try to register the dependency by calling the method update_relative, which doesn't exist on stage_data and/or main because they don't subclass BaseOperator, resulting in an error.

huangapple
  • 本文由 发表于 2023年3月9日 15:42:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/75681677.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定