英文:
How to set the state of a past TaskInstance to success and continue the pipeline inside a airflow DAG?
问题
我试图创建一个DAG,其中可以使用dag_id/task_id触发参数。此DAG的目标是将上一个执行任务的状态设置为“success”,并从这一点继续管道。
管道示例:
[![DAG管道示例][1]][1]
[1]: https://i.stack.imgur.com/HZc7q.png
在我的DAG中,我希望能够将“run_that”设置为成功,并自动运行“run_them”作为新状态更改的结果。
到目前为止,我已经完成了以下工作:
```py
import airflow
from airflow.models import DagRun, TaskInstance, DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.state import State
from airflow.operators.python_operator import PythonOperator
import pendulum
from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail
from datetime import timedelta, datetime
default_args = {
'owner': 'tozzi',
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 12, 19, tz='Europe/Paris'),
'retries': 0,
'retry_delay': timedelta(minutes=5),
'xcom_push': True,
'catchup': False,
'params': {
'dag_id': 'my_dag',
'task_id': 'run_that',
}
}
def last_exec(dag_id, task_id, session):
task_instances = (
session.query(TaskInstance)
.filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id)
.all()
)
task_instances.sort(key=lambda x: x.execution_date, reverse=True)
if task_instances:
return task_instances[0]
return None
def set_last_task_success(**kwargs):
dag_id = kwargs['dag_id']
task_id = kwargs['task_id']
session = airflow.settings.Session()
task_instance = last_exec(dag_id, task_id, session)
if (task_instance is not None):
task_instance.state = 'success'
# task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date)
task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True)
session.commit()
session.close()
doc_md=f"""## 设置给定dag_id的任务task_id为成功状态"""
# 远程启动
launcher = ScriptLauncher(default_args, "@once", 'set_task_to_success', ['airflow'], doc_md)
dag = launcher.dag;
set_to_success = PythonOperator(
task_id='set_to_success',
provide_context=True,
python_callable=set_last_task_success,
dag=dag,
op_kwargs={
'dag_id': '{{ params.dag_id }}',
'task_id': '{{ params.task_id }}',
}
)
这里的task_instance.run(...)
调用失败,出现以下错误:"AttributeError: 'TaskInstance'对象没有'task'属性",不过状态更改是正常工作的。我应该更改什么以便在更改“run_that”任务的状态时重新运行“run_them”任务?
<details>
<summary>英文:</summary>
I'm trying to make a DAG with params that can be triggered with a dag_id/task_id. The goal of this DAG is to set the state of the last executed task to "success" and to continue the pipeline from this point.
exemple of pipeline:
[![DAG pipeline exemple][1]][1]
[1]: https://i.stack.imgur.com/HZc7q.png
In my dag I want to be able to set "run_that" to success and automatically run "run_them" as a result of the new state change.
Here is what I did from now:
```py
import airflow
from airflow.models import DagRun, TaskInstance, DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.state import State
from airflow.operators.python_operator import PythonOperator
import pendulum
from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail
from datetime import timedelta, datetime
default_args = {
'owner': 'tozzi',
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 12, 19, tz='Europe/Paris'),
'retries': 0,
'retry_delay': timedelta(minutes=5),
'xcom_push': True,
'catchup': False,
'params': {
'dag_id': 'my_dag',
'task_id': 'run_that',
}
}
def last_exec(dag_id, task_id, session):
task_instances = (
session.query(TaskInstance)
.filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id)
.all()
)
task_instances.sort(key=lambda x: x.execution_date, reverse=True)
if task_instances:
return task_instances[0]
return None
def set_last_task_success(**kwargs):
dag_id = kwargs['dag_id']
task_id = kwargs['task_id']
session = airflow.settings.Session()
task_instance = last_exec(dag_id, task_id, session)
if (task_instance is not None):
task_instance.state = 'success'
# task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date)
task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True)
session.commit()
session.close()
doc_md=f"""## Set the given task_id to success of the given dag_id"""
# launched remotely
launcher = ScriptLauncher(default_args, "@once", 'set_task_to_success', ['airflow'], doc_md)
dag = launcher.dag;
set_to_success = PythonOperator(
task_id='set_to_success',
provide_context=True,
python_callable=set_last_task_success,
dag=dag,
op_kwargs={
'dag_id': '{{ params.dag_id }}',
'task_id': '{{ params.task_id }}',
}
)
The task_instance.run(...)
call fail here with this error : "AttributeError: 'TaskInstance' object has no attribute 'task'", the state change is correctly working tho. What should I change so it rerun the "run_them" task when I change the state of the "run_that" task?
答案1
得分: 2
截至Airflow 2.5.0版本,您可以通过Airflow REST API更新任何现有任务的状态。
英文:
As of Airflow 2.5.0 you can update the state of any existing task via the Airflow REST API.
答案2
得分: 1
使用@TJaniF的答案,我制作了这个小型可重用的失败函数任务on_failure_send_force_success_mail
,该函数会发送一封邮件,其中包含指向自定义API的链接,该API调用使用GET请求来调用修补任务实例请求,它的运行符合预期:
def on_failure_send_force_success_mail(task_id):
def send_mail(**context):
result = requests.post('mailing_api_url', json={
"from": "foobar@mail.com",
"to": "tozzi@mail.com",
"subject": "Airflow task failed",
"html": f'Airflow task test failed: <a href="custom_api_url/script/{context["task_instance"].dag_id}/dag_runs/{context["dag_run"].run_id}/task_instances/{task_id}?new_state=success">click here</a>',
})
return send_mail
def run_this_func_failed(**context):
raise Exception('FAILED')
def run_this_func(**context):
print('COMPLETED')
run_that = PythonOperator(
task_id='run_that',
provide_context=True,
python_callable=run_this_func_failed,
dag=dag,
)
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
trigger_rule="all_success",
dag=dag,
)
on_failure = PythonOperator(
task_id='on_failure',
provide_context=True,
python_callable=on_failure_send_force_success_mail('run_that'),
trigger_rule="all_failed",
dag=dag,
)
run_that >> on_failure
run_that >> run_this
英文:
Using @TJaniF answer, I made this little reusable failure function task on_failure_send_force_success_mail
, this function send a mail with a link to a custom API that call the patch task instance request using a get, it works as expected:
def on_failure_send_force_success_mail(task_id):
def send_mail(**context):
result = requests.post('mailing_api_url', json={
"from": "foobar@mail.com",
"to": "tozzi@mail.com",
"subject": "Airflow task failed",
"html": f'Airflow task test failed: <a href="custom_api_url/script/{context["task_instance"].dag_id}/dag_runs/{context["dag_run"].run_id}/task_instances/{task_id}?new_state=success">click here</a>',
})
return send_mail
def run_this_func_failed(**context):
raise Exception('FAILED')
def run_this_func(**context):
print('COMPLETED')
run_that = PythonOperator(
task_id='run_that',
provide_context=True,
python_callable=run_this_func_failed,
dag=dag,
)
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
trigger_rule="all_success",
dag=dag,
)
on_failure = PythonOperator(
task_id='on_failure',
provide_context=True,
python_callable=on_failure_send_force_success_mail('run_that'),
trigger_rule="all_failed",
dag=dag,
)
run_that >> on_failure
run_that >> run_this
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论