英文:
dag import error : AttributeError: '_TaskDecorator' object has no attribute 'update_relative'
问题
I'm facing an issue which my dag cannot be imported, but cannot figure out why:
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag
@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success= lambda x: x == False,
poke_interval= 60,
#timeout = 60 * 2,
mode = "reschedule",
)
@task()
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key":"kkll",
"@version" : "alertapi-0.1",
"@type":"ALERT",
"object" : "Testobject",
"severity" : "MINOR",
"text" : str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)
check_db_alive >> alert_of_db_inrecovery
dag = Pipeline()
I get this error:
AttributeError: '_TaskDecorator' object has no attribute 'update_relative'
英文:
I'm facing an issue which my dag cannot be imported, but cannot figure out why:
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag
@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success= lambda x: x == False,
poke_interval= 60,
#timeout = 60 * 2,
mode = "reschedule",
)
@task()
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key":"kkll",
"@version" : "alertapi-0.1",
"@type":"ALERT",
"object" : "Testobject",
"severity" : "MINOR",
"text" : str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)
check_db_alive >> alert_of_db_inrecovery
dag = Pipeline()
I get this error:
> AttributeError: '_TaskDecorator' object has no attribute 'update_relative'
答案1
得分: 4
你需要调用Python任务流操作符,即将check_db_alive >> alert_of_db_inrecovery
更改为check_db_alive >> alert_of_db_inrecovery()
检查正确的代码如下:
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task, dag
@dag(
dag_id="database_monitor",
schedule_interval='*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
catchup=False,
)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success=lambda x: x == False,
poke_interval=60,
# timeout = 60 * 2,
mode="reschedule",
)
@task
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key": "kkll",
"@version": "alertapi-0.1",
"@type": "ALERT",
"object": "Testobject",
"severity": "MINOR",
"text": str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)
check_db_alive >> alert_of_db_inrecovery()
dag = Pipeline()
参考链接:https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
英文:
You need to call the Python task flow operator i.e
change check_db_alive >> alert_of_db_inrecovery
to check_db_alive >> alert_of_db_inrecovery()
check correct code
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task, dag
@dag(
dag_id="database_monitor",
schedule_interval='*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
catchup=False,
)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success=lambda x: x == False,
poke_interval=60,
# timeout = 60 * 2,
mode="reschedule",
)
@task
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key": "kkll",
"@version": "alertapi-0.1",
"@type": "ALERT",
"object": "Testobject",
"severity": "MINOR",
"text": str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)
check_db_alive >> alert_of_db_inrecovery()
dag = Pipeline()
Ref: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论