如何将过去的TaskInstance的状态设置为成功,并在airflow DAG内继续执行流程?

huangapple go评论69阅读模式
英文:

How to set the state of a past TaskInstance to success and continue the pipeline inside a airflow DAG?

问题

我试图创建一个DAG其中可以使用dag_id/task_id触发参数此DAG的目标是将上一个执行任务的状态设置为success”,并从这一点继续管道

管道示例:

[![DAG管道示例][1]][1]


  [1]: https://i.stack.imgur.com/HZc7q.png

在我的DAG中我希望能够将run_that设置为成功并自动运行run_them作为新状态更改的结果

到目前为止我已经完成了以下工作:

```py
import airflow
from airflow.models import DagRun, TaskInstance, DagBag

from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.state import State

from airflow.operators.python_operator import PythonOperator

import pendulum

from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail

from datetime import timedelta, datetime

default_args = {
  'owner': 'tozzi',
  'depends_on_past': False,
  'start_date': pendulum.datetime(2022, 12, 19, tz='Europe/Paris'),
  'retries': 0,
  'retry_delay': timedelta(minutes=5),
  'xcom_push': True,
  'catchup': False,
  'params': {
    'dag_id': 'my_dag',
    'task_id': 'run_that',
  }
}

def last_exec(dag_id, task_id, session):
    task_instances = (
        session.query(TaskInstance)
        .filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id)
        .all()
    )
    task_instances.sort(key=lambda x: x.execution_date, reverse=True)
    if task_instances:
        return task_instances[0]
    return None

def set_last_task_success(**kwargs):
    dag_id = kwargs['dag_id']
    task_id = kwargs['task_id']

    session = airflow.settings.Session()
    task_instance = last_exec(dag_id, task_id, session)
    
    if (task_instance is not None):
      task_instance.state = 'success'
      # task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date)
      task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True)
      session.commit()
      session.close()


doc_md=f"""## 设置给定dag_id的任务task_id为成功状态"""

# 远程启动
launcher = ScriptLauncher(default_args, "@once", 'set_task_to_success', ['airflow'], doc_md)

dag = launcher.dag;

set_to_success = PythonOperator(
    task_id='set_to_success',
    provide_context=True,
    python_callable=set_last_task_success,
    dag=dag,
    op_kwargs={
        'dag_id': '{{ params.dag_id }}',
        'task_id': '{{ params.task_id }}',
    }
)

这里的task_instance.run(...)调用失败,出现以下错误:"AttributeError: 'TaskInstance'对象没有'task'属性",不过状态更改是正常工作的。我应该更改什么以便在更改“run_that”任务的状态时重新运行“run_them”任务?


<details>
<summary>英文:</summary>
I&#39;m trying to make a DAG with params that can be triggered with a dag_id/task_id. The goal of this DAG is to set the state of the last executed task to &quot;success&quot; and to continue the pipeline from this point.
exemple of pipeline:
[![DAG pipeline exemple][1]][1]
[1]: https://i.stack.imgur.com/HZc7q.png
In my dag I want to be able to set &quot;run_that&quot; to success and automatically run &quot;run_them&quot; as a result of the new state change.
Here is what I did from now:
```py
import airflow
from airflow.models import DagRun, TaskInstance, DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.state import State
from airflow.operators.python_operator import PythonOperator
import pendulum
from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail
from datetime import timedelta, datetime
default_args = {
&#39;owner&#39;: &#39;tozzi&#39;,
&#39;depends_on_past&#39;: False,
&#39;start_date&#39;: pendulum.datetime(2022, 12, 19, tz=&#39;Europe/Paris&#39;),
&#39;retries&#39;: 0,
&#39;retry_delay&#39;: timedelta(minutes=5),
&#39;xcom_push&#39;: True,
&#39;catchup&#39;: False,
&#39;params&#39;: {
&#39;dag_id&#39;: &#39;my_dag&#39;,
&#39;task_id&#39;: &#39;run_that&#39;,
}
}
def last_exec(dag_id, task_id, session):
task_instances = (
session.query(TaskInstance)
.filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id)
.all()
)
task_instances.sort(key=lambda x: x.execution_date, reverse=True)
if task_instances:
return task_instances[0]
return None
def set_last_task_success(**kwargs):
dag_id = kwargs[&#39;dag_id&#39;]
task_id = kwargs[&#39;task_id&#39;]
session = airflow.settings.Session()
task_instance = last_exec(dag_id, task_id, session)
if (task_instance is not None):
task_instance.state = &#39;success&#39;
# task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date)
task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True)
session.commit()
session.close()
doc_md=f&quot;&quot;&quot;## Set the given task_id to success of the given dag_id&quot;&quot;&quot;
# launched remotely
launcher = ScriptLauncher(default_args, &quot;@once&quot;, &#39;set_task_to_success&#39;, [&#39;airflow&#39;], doc_md)
dag = launcher.dag;
set_to_success = PythonOperator(
task_id=&#39;set_to_success&#39;,
provide_context=True,
python_callable=set_last_task_success,
dag=dag,
op_kwargs={
&#39;dag_id&#39;: &#39;{{ params.dag_id }}&#39;,
&#39;task_id&#39;: &#39;{{ params.task_id }}&#39;,
}
)

The task_instance.run(...) call fail here with this error : "AttributeError: 'TaskInstance' object has no attribute 'task'", the state change is correctly working tho. What should I change so it rerun the "run_them" task when I change the state of the "run_that" task?

答案1

得分: 2

截至Airflow 2.5.0版本,您可以通过Airflow REST API更新任何现有任务的状态。

英文:

As of Airflow 2.5.0 you can update the state of any existing task via the Airflow REST API.

答案2

得分: 1

使用@TJaniF的答案,我制作了这个小型可重用的失败函数任务on_failure_send_force_success_mail,该函数会发送一封邮件,其中包含指向自定义API的链接,该API调用使用GET请求来调用修补任务实例请求,它的运行符合预期:

def on_failure_send_force_success_mail(task_id):
    def send_mail(**context):
        result = requests.post('mailing_api_url', json={
            "from": "foobar@mail.com",
            "to": "tozzi@mail.com",
            "subject": "Airflow task failed",
            "html": f'Airflow task test failed: <a href="custom_api_url/script/{context["task_instance"].dag_id}/dag_runs/{context["dag_run"].run_id}/task_instances/{task_id}?new_state=success">click here</a>',
        })
    return send_mail

def run_this_func_failed(**context):
    raise Exception('FAILED')

def run_this_func(**context):
    print('COMPLETED')

run_that = PythonOperator(
    task_id='run_that',
    provide_context=True,
    python_callable=run_this_func_failed,
    dag=dag,
)

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    trigger_rule="all_success",
    dag=dag,
)

on_failure = PythonOperator(
    task_id='on_failure',
    provide_context=True,
    python_callable=on_failure_send_force_success_mail('run_that'),
    trigger_rule="all_failed",
    dag=dag,
)

run_that >> on_failure
run_that >> run_this
英文:

Using @TJaniF answer, I made this little reusable failure function task on_failure_send_force_success_mail, this function send a mail with a link to a custom API that call the patch task instance request using a get, it works as expected:

def on_failure_send_force_success_mail(task_id):
    def send_mail(**context):
        result = requests.post(&#39;mailing_api_url&#39;, json={
            &quot;from&quot;: &quot;foobar@mail.com&quot;,
            &quot;to&quot;: &quot;tozzi@mail.com&quot;,
            &quot;subject&quot;: &quot;Airflow task failed&quot;,
            &quot;html&quot;: f&#39;Airflow task test failed: &lt;a href=&quot;custom_api_url/script/{context[&quot;task_instance&quot;].dag_id}/dag_runs/{context[&quot;dag_run&quot;].run_id}/task_instances/{task_id}?new_state=success&quot;&gt;click here&lt;/a&gt;&#39;,
        })
    return send_mail

def run_this_func_failed(**context):
    raise Exception(&#39;FAILED&#39;)

def run_this_func(**context):
    print(&#39;COMPLETED&#39;)

run_that = PythonOperator(
    task_id=&#39;run_that&#39;,
    provide_context=True,
    python_callable=run_this_func_failed,
    dag=dag,
)

run_this = PythonOperator(
    task_id=&#39;run_this&#39;,
    provide_context=True,
    python_callable=run_this_func,
    trigger_rule=&quot;all_success&quot;,
    dag=dag,
)

on_failure = PythonOperator(
    task_id=&#39;on_failure&#39;,
    provide_context=True,
    python_callable=on_failure_send_force_success_mail(&#39;run_that&#39;),
    trigger_rule=&quot;all_failed&quot;,
    dag=dag,
)

run_that &gt;&gt; on_failure
run_that &gt;&gt; run_this

huangapple
  • 本文由 发表于 2023年2月6日 21:18:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/75361826.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定