“dag import error : AttributeError: ‘ _TaskDecorator ‘ object has no attribute ‘ update_relative ‘”

huangapple go评论125阅读模式
英文:

dag import error : AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

问题

I'm facing an issue which my dag cannot be imported, but cannot figure out why:

  1. from airflow.sensors.sql import SqlSensor
  2. import pendulum
  3. from airflow.decorators import task,dag
  4. @dag(
  5. dag_id = "database_monitor",
  6. schedule_interval = '*/10 * * * *',
  7. start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
  8. catchup=False,)
  9. def Pipeline():
  10. check_db_alive = SqlSensor(
  11. task_id="check_db_alive",
  12. conn_id="evergreen",
  13. sql="SELECT pg_is_in_recovery()",
  14. success= lambda x: x == False,
  15. poke_interval= 60,
  16. #timeout = 60 * 2,
  17. mode = "reschedule",
  18. )
  19. @task()
  20. def alert_of_db_inrecovery():
  21. import requests
  22. # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
  23. data = {"@key":"kkll",
  24. "@version" : "alertapi-0.1",
  25. "@type":"ALERT",
  26. "object" : "Testobject",
  27. "severity" : "MINOR",
  28. "text" : str("Former primary instance is in recovery")
  29. }
  30. requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)
  31. check_db_alive >> alert_of_db_inrecovery
  32. dag = Pipeline()

I get this error:

AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

英文:

I'm facing an issue which my dag cannot be imported, but cannot figure out why:

  1. from airflow.sensors.sql import SqlSensor
  2. import pendulum
  3. from airflow.decorators import task,dag
  4. @dag(
  5. dag_id = "database_monitor",
  6. schedule_interval = '*/10 * * * *',
  7. start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
  8. catchup=False,)
  9. def Pipeline():
  10. check_db_alive = SqlSensor(
  11. task_id="check_db_alive",
  12. conn_id="evergreen",
  13. sql="SELECT pg_is_in_recovery()",
  14. success= lambda x: x == False,
  15. poke_interval= 60,
  16. #timeout = 60 * 2,
  17. mode = "reschedule",
  18. )
  19. @task()
  20. def alert_of_db_inrecovery():
  21. import requests
  22. # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
  23. data = {"@key":"kkll",
  24. "@version" : "alertapi-0.1",
  25. "@type":"ALERT",
  26. "object" : "Testobject",
  27. "severity" : "MINOR",
  28. "text" : str("Former primary instance is in recovery")
  29. }
  30. requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)
  31. check_db_alive >> alert_of_db_inrecovery
  32. dag = Pipeline()

I get this error:

> AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

答案1

得分: 4

你需要调用Python任务流操作符,即将check_db_alive >> alert_of_db_inrecovery更改为check_db_alive >> alert_of_db_inrecovery()

检查正确的代码如下:

  1. from airflow.sensors.sql import SqlSensor
  2. import pendulum
  3. from airflow.decorators import task, dag
  4. @dag(
  5. dag_id="database_monitor",
  6. schedule_interval='*/10 * * * *',
  7. start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
  8. catchup=False,
  9. )
  10. def Pipeline():
  11. check_db_alive = SqlSensor(
  12. task_id="check_db_alive",
  13. conn_id="evergreen",
  14. sql="SELECT pg_is_in_recovery()",
  15. success=lambda x: x == False,
  16. poke_interval=60,
  17. # timeout = 60 * 2,
  18. mode="reschedule",
  19. )
  20. @task
  21. def alert_of_db_inrecovery():
  22. import requests
  23. # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
  24. data = {"@key": "kkll",
  25. "@version": "alertapi-0.1",
  26. "@type": "ALERT",
  27. "object": "Testobject",
  28. "severity": "MINOR",
  29. "text": str("Former primary instance is in recovery")
  30. }
  31. requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)
  32. check_db_alive >> alert_of_db_inrecovery()
  33. dag = Pipeline()

参考链接:https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

英文:

You need to call the Python task flow operator i.e

change check_db_alive >> alert_of_db_inrecovery to check_db_alive >> alert_of_db_inrecovery()

check correct code

  1. from airflow.sensors.sql import SqlSensor
  2. import pendulum
  3. from airflow.decorators import task, dag
  4. @dag(
  5. dag_id="database_monitor",
  6. schedule_interval='*/10 * * * *',
  7. start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
  8. catchup=False,
  9. )
  10. def Pipeline():
  11. check_db_alive = SqlSensor(
  12. task_id="check_db_alive",
  13. conn_id="evergreen",
  14. sql="SELECT pg_is_in_recovery()",
  15. success=lambda x: x == False,
  16. poke_interval=60,
  17. # timeout = 60 * 2,
  18. mode="reschedule",
  19. )
  20. @task
  21. def alert_of_db_inrecovery():
  22. import requests
  23. # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
  24. data = {"@key": "kkll",
  25. "@version": "alertapi-0.1",
  26. "@type": "ALERT",
  27. "object": "Testobject",
  28. "severity": "MINOR",
  29. "text": str("Former primary instance is in recovery")
  30. }
  31. requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)
  32. check_db_alive >> alert_of_db_inrecovery()
  33. dag = Pipeline()

Ref: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

huangapple
  • 本文由 发表于 2023年7月18日 15:56:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76710614.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定