英文:
Dags triggered by DagTriggenRunOperator are stuck in queue
问题
example_trigger_controller_dag.py
import pendulum
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="example_trigger_controller_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@once",
tags=["example"],
) as dag:
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger
conf={"message": "Hello World"},
)
example_trigger_target_dag.py
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
@task(task_id="run_this")
def run_this_func(dag_run=None):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param dag_run: The DagRun object
"""
print(f"Remotely received value of {dag_run.conf.get('message')} for key=message")
with DAG(
dag_id="example_trigger_target_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example"],
) as dag:
run_this = run_this_func()
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $message"',
env={"message": '{{ dag_run.conf.get("message") }}'},
)
controller dag中的任务成功结束,但target dag中的任务在队列中卡住。有关如何解决此问题的任何想法?
英文:
example_trigger_controller_dag.py
import pendulum
from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="example_trigger_controller_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule="@once",
tags=["example"],
) as dag:
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger
conf={"message": "Hello World"},
)
example_trigger_target_dag.py
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
@task(task_id="run_this")
def run_this_func(dag_run=None):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param dag_run: The DagRun object
"""
print(f"Remotely received value of {dag_run.conf.get('message')} for key=message")
with DAG(
dag_id="example_trigger_target_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example"],
) as dag:
run_this = run_this_func()
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $message"',
env={"message": '{{ dag_run.conf.get("message") }}'},
)
the task in controller dag successfully ended but the task in target dag stuck in queue. Any ideas about how to solve this problem?
答案1
得分: 1
我运行了您的DAG(两者都未暂停),它们在全新环境中运行正常(Airflow 2.5.0,Astro CLI Runtime 7.1.0)。因此,问题很可能不在于您的DAG代码。
任务卡在队列中通常是与调度器相关的问题,主要与较旧的Airflow版本有关。我建议您:
-
确保第一个DAG运行时两个DAG都未暂停。
-
确保所有的
start_date
都在过去(虽然在这种情况下,任务通常甚至不会排队)。 -
重新启动您的调度器/Airflow环境。
-
尝试在没有其他DAG正在运行时运行DAG,以检查问题是否可能是达到了并行性限制。(如果您正在使用K8s执行器,您还应该检查worker_pods_creation_batch_size,对于Celery执行器,还要检查worker_concurrency和stalled_task_timeout)。
-
查看您的调度器日志(位于
$AIRFLOW_HOME/logs/scheduler
)。 -
如果您正在运行较旧的版本,请升级Airflow。
英文:
I ran your DAGs (with both of them unpaused) and they work fine in a completely new environment (Airflow 2.5.0, Astro CLI Runtime 7.1.0). So the issue is most likely not with your DAG code.
Tasks stuck in queue is often an issue with the scheduler, mostly with older Airflow versions. I suggest you:
- make sure both DAGs are unpaused when the first DAG runs.
- make sure all
start_date
s are in the past (though in this case usually the tasks don't even get queued) - restart your scheduler/Airflow environment
- try running the DAGs while no other DAGs are running to check if the issue could be that the parallelism limit is reached. (if you are using K8s executor you should also check worker_pods_creation_batch_size and with the Celery Executor worker_concurrency and stalled_task_timeout)
- take a look at your scheduler logs (at
$AIRFLOW_HOME/logs/scheduler
) - upgrade Airflow if you are running an older version.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论