“dag import error : AttributeError: ‘ _TaskDecorator ‘ object has no attribute ‘ update_relative ‘”

huangapple go评论99阅读模式
英文:

dag import error : AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

问题

I'm facing an issue which my dag cannot be imported, but cannot figure out why:

from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag

@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():

    check_db_alive = SqlSensor(
        task_id="check_db_alive",
        conn_id="evergreen",
        sql="SELECT pg_is_in_recovery()",
        success= lambda x: x == False,
        poke_interval= 60,
        #timeout = 60 * 2,
        mode = "reschedule",
    )


    @task()
    def alert_of_db_inrecovery():
        import requests
        # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"

        data = {"@key":"kkll",
                "@version" : "alertapi-0.1",
                "@type":"ALERT",
                "object" : "Testobject",
                "severity" : "MINOR",
                "text" : str("Former primary instance is in recovery")
            }
        requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)


    check_db_alive >> alert_of_db_inrecovery


dag = Pipeline()

I get this error:

AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

英文:

I'm facing an issue which my dag cannot be imported, but cannot figure out why:

from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag

@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():

	check_db_alive = SqlSensor(
		task_id="check_db_alive",
		conn_id="evergreen",
		sql="SELECT pg_is_in_recovery()",
		success= lambda x: x == False,
		poke_interval= 60,
		#timeout = 60 * 2,
		mode = "reschedule",
	)


	@task()
	def alert_of_db_inrecovery():
		import requests
		# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"

		data = {"@key":"kkll",
				"@version" : "alertapi-0.1",
				"@type":"ALERT",
				"object" : "Testobject",
				"severity" : "MINOR",
				"text" : str("Former primary instance is in recovery")
			}
		requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)


	check_db_alive >> alert_of_db_inrecovery


dag = Pipeline()

I get this error:

> AttributeError: '_TaskDecorator' object has no attribute 'update_relative'

答案1

得分: 4

你需要调用Python任务流操作符,即将check_db_alive >> alert_of_db_inrecovery更改为check_db_alive >> alert_of_db_inrecovery()

检查正确的代码如下:

from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task, dag

@dag(
    dag_id="database_monitor",
    schedule_interval='*/10 * * * *',
    start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
    catchup=False,
)
def Pipeline():
    check_db_alive = SqlSensor(
        task_id="check_db_alive",
        conn_id="evergreen",
        sql="SELECT pg_is_in_recovery()",
        success=lambda x: x == False,
        poke_interval=60,
        # timeout = 60 * 2,
        mode="reschedule",
    )

    @task
    def alert_of_db_inrecovery():
        import requests
        # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"

        data = {"@key": "kkll",
            "@version": "alertapi-0.1",
            "@type": "ALERT",
            "object": "Testobject",
            "severity": "MINOR",
            "text": str("Former primary instance is in recovery")
            }
        requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)

    check_db_alive >> alert_of_db_inrecovery()

dag = Pipeline()

参考链接:https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

英文:

You need to call the Python task flow operator i.e

change check_db_alive >> alert_of_db_inrecovery to check_db_alive >> alert_of_db_inrecovery()

check correct code

from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task, dag


@dag(
    dag_id="database_monitor",
    schedule_interval='*/10 * * * *',
    start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
    catchup=False,
)
def Pipeline():
    check_db_alive = SqlSensor(
        task_id="check_db_alive",
        conn_id="evergreen",
        sql="SELECT pg_is_in_recovery()",
        success=lambda x: x == False,
        poke_interval=60,
        # timeout = 60 * 2,
        mode="reschedule",
    )

    @task
    def alert_of_db_inrecovery():
        import requests
        # result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"

        data = {"@key": "kkll",
            "@version": "alertapi-0.1",
            "@type": "ALERT",
            "object": "Testobject",
            "severity": "MINOR",
            "text": str("Former primary instance is in recovery")
            }
        requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)

    check_db_alive >> alert_of_db_inrecovery()


dag = Pipeline()

Ref: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

huangapple
  • 本文由 发表于 2023年7月18日 15:56:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76710614.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定