Airflow AttributeError: ‘coroutine’ object has no attribute ‘update_relative’

huangapple go评论63阅读模式
英文:

Airflow AttributeError: 'coroutine' object has no attribute 'update_relative'

问题

以下是您的代码的翻译部分:

我有这样的DAG

从日期时间导入时间间隔
导入摆钟
从空气流装饰器导入DAG
从阶段导入stage_data
从table_async_pg导入main

@dag(
dag_id = "data-sync",
schedule_interval = "1 * * * *",
start_date=pendulum.datetime(2023, 3, 8, tz="Asia/Hong_Kong"),
catchup=False,
dagrun_timeout=timedelta(minutes=20),
)
def Pipeline():
    a = stage_data()
    b = main()

    a >> b

pipeline = Pipeline()

当我不包括b=main()a >> b时,一切都正常工作。如果我包括它们,我会得到以下错误:

/opt/airflow/dags/airflow_sched.py | Traceback (most recent call last):                                                                      
                                   |   File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
                                   |     self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)                 	
                                   |   File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
                                   |     task_object.update_relative(self, not upstream)                                                     
                                   | AttributeError: 'coroutine'对象没有属性'update_relative'
英文:

I have my dag as such:

from datetime import timedelta
import pendulum
from airflow.decorators import dag
from stage import stage_data
from table_async_pg import main

@dag(
dag_id = "data-sync",
schedule_interval = "1 * * * *",
start_date=pendulum.datetime(2023, 3, 8, tz="Asia/Hong_Kong"),
catchup=False,
dagrun_timeout=timedelta(minutes=20),
)
def Pipeline():
    a = stage_data()
    b = main()

    a >> b

pipeline = Pipeline()

When I don't include b=main() and a>>b everything works fine. If I include then I get this error:

/opt/airflow/dags/airflow_sched.py | Traceback (most recent call last):                                                                      
                                   |   File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
                                   |     self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)                 
                                   |   File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
                                   |     task_object.update_relative(self, not upstream)                                                     
                                   | AttributeError: 'coroutine' object has no attribute 'update_relative'         

答案1

得分: 1

the main() function you're calling in your DAG is a asyncio coroutine, which doesn't have the update_relative attribute that Airflow expects when setting task dependencies using the >> operator.

for fix this problem:

modify the main() function to return a non-coroutine function or use the >> operator with a non-coroutine function instead

def Pipeline():
    async def run_main():
        await main()

    a = stage_data()
    b = PythonOperator(
        task_id="run_main",
        python_callable=run_main,
    )

    a >> b

pipeline = Pipeline()

like this [How to provide an async function in PythonOperator's python_callable in Airflow?](https://stackoverflow.com/questions/57423101/how-to-provide-an async-function-in-pythonoperators-python-callable-in-airflow)

英文:

the main() function you're calling in your DAG is a asyncio coroutine, which doesn't have the update_relative attribute that Airflow expects when setting task dependencies using the >> operator.

for fix this problem:

modify the main() function to return a non-coroutine function or use the >> operator with a non-coroutine function instead

def Pipeline():
    async def run_main():
        await main()

    a = stage_data()
    b = PythonOperator(
        task_id="run_main",
        python_callable=run_main,
    )

    a >> b

pipeline = Pipeline()

like this How to provide an async function in PythonOperator's python_callable in Airflow?

答案2

得分: 1

你正在将 table_async_pg.main 用作Airflow任务,但它不是Airflow的 BaseOperator 的子类。尽管你可以为 b 分配任何值,但一旦你尝试通过设置依赖关系 (a >> b) 来将 b 注册为Airflow任务,就会失败。Airflow将尝试通过调用方法 update_relative 来注册依赖关系,但它在 stage_data 和/或 main 上不存在,因为它们不是BaseOperator的子类,从而导致错误。

英文:

You're using table_async_pg.main as if it's an Airflow task, but it's not a subclass of Airflow's BaseOperator.

While you can assign any value to b, this will fail once you try to register b as an Airflow task by setting dependencies (a >> b). Airflow will try to register the dependency by calling the method update_relative, which doesn't exist on stage_data and/or main because they don't subclass BaseOperator, resulting in an error.

huangapple
  • 本文由 发表于 2023年3月9日 15:42:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/75681677.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定