英文:
Airflow AttributeError: 'coroutine' object has no attribute 'update_relative'
问题
以下是您的代码的翻译部分:
我有这样的DAG:
从日期时间导入时间间隔
导入摆钟
从空气流装饰器导入DAG
从阶段导入stage_data
从table_async_pg导入main
@dag(
dag_id = "data-sync",
schedule_interval = "1 * * * *",
start_date=pendulum.datetime(2023, 3, 8, tz="Asia/Hong_Kong"),
catchup=False,
dagrun_timeout=timedelta(minutes=20),
)
def Pipeline():
a = stage_data()
b = main()
a >> b
pipeline = Pipeline()
当我不包括b=main()
和a >> b
时,一切都正常工作。如果我包括它们,我会得到以下错误:
/opt/airflow/dags/airflow_sched.py | Traceback (most recent call last):
| File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
| self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
| File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
| task_object.update_relative(self, not upstream)
| AttributeError: 'coroutine'对象没有属性'update_relative'
英文:
I have my dag as such:
from datetime import timedelta
import pendulum
from airflow.decorators import dag
from stage import stage_data
from table_async_pg import main
@dag(
dag_id = "data-sync",
schedule_interval = "1 * * * *",
start_date=pendulum.datetime(2023, 3, 8, tz="Asia/Hong_Kong"),
catchup=False,
dagrun_timeout=timedelta(minutes=20),
)
def Pipeline():
a = stage_data()
b = main()
a >> b
pipeline = Pipeline()
When I don't include b=main()
and a>>b
everything works fine. If I include then I get this error:
/opt/airflow/dags/airflow_sched.py | Traceback (most recent call last):
| File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
| self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
| File "/opt/.venv/lib/python3.9/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
| task_object.update_relative(self, not upstream)
| AttributeError: 'coroutine' object has no attribute 'update_relative'
答案1
得分: 1
the main()
function you're calling in your DAG is a asyncio coroutine, which doesn't have the update_relative
attribute that Airflow expects when setting task dependencies using the >>
operator.
for fix this problem:
modify the main()
function to return a non-coroutine
function or use the >>
operator with a non-coroutine
function instead
def Pipeline():
async def run_main():
await main()
a = stage_data()
b = PythonOperator(
task_id="run_main",
python_callable=run_main,
)
a >> b
pipeline = Pipeline()
like this [How to provide an async function in PythonOperator's python_callable in Airflow?](https://stackoverflow.com/questions/57423101/how-to-provide-an async-function-in-pythonoperators-python-callable-in-airflow)
英文:
the main()
function you're calling in your DAG is a asyncio coroutine, which doesn't have the update_relative
attribute that Airflow expects when setting task dependencies using the >>
operator.
for fix this problem:
modify the main()
function to return a non-coroutine
function or use the >>
operator with a non-coroutine
function instead
def Pipeline():
async def run_main():
await main()
a = stage_data()
b = PythonOperator(
task_id="run_main",
python_callable=run_main,
)
a >> b
pipeline = Pipeline()
like this How to provide an async function in PythonOperator's python_callable in Airflow?
答案2
得分: 1
你正在将 table_async_pg.main
用作Airflow任务,但它不是Airflow的 BaseOperator 的子类。尽管你可以为 b
分配任何值,但一旦你尝试通过设置依赖关系 (a >> b
) 来将 b
注册为Airflow任务,就会失败。Airflow将尝试通过调用方法 update_relative
来注册依赖关系,但它在 stage_data
和/或 main
上不存在,因为它们不是BaseOperator的子类,从而导致错误。
英文:
You're using table_async_pg.main
as if it's an Airflow task, but it's not a subclass of Airflow's BaseOperator.
While you can assign any value to b
, this will fail once you try to register b
as an Airflow task by setting dependencies (a >> b
). Airflow will try to register the dependency by calling the method update_relative
, which doesn't exist on stage_data
and/or main
because they don't subclass BaseOperator, resulting in an error.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论