Airflow Kubernetes Pod Operator中的XCom返回值在pythonOperator中访问时为”None”。

huangapple go评论49阅读模式
英文:

Airflow Kubernetes Pod Operator XCom return value is "None" when accessed in pythonOperator

问题

I am running a kubernetes pod using airflow KubernetesPodOperator. Then executing a jar file in the pod and writing the output to the /airflow/xcom/return.json. When checking the task's XCom value it is showing the return_value as the content of the return.json file in the airflow UI. But when trying to pull the return value inside the pythonOperator callable function, It is returning None. Any idea why this is happening and how to retrieve the return value inside python callable function? Thanks. Code and screenshots below.

从Airflow中使用KubernetesPodOperator运行Kubernetes Pod,然后在Pod中执行一个JAR文件并将输出写入/airflow/xcom/return.json。在检查任务的XCom值时,它显示return.json文件的内容作为Airflow UI中的return_value。但是,在pythonOperator可调用函数中尝试提取返回值时,它返回None。对此发生的原因以及如何在python可调用函数中检索返回值有任何想法吗?谢谢。以下是代码和截图。

英文:

I am running a kubernetes pod using airflow KubernetesPodOperator. Then executing a jar file in the pod and writing the output to the /airflow/xcom/return.json. When checking the task's XCom value it is showing the return_value as the the content of the return.json file in the airflow UI. But when trying to pull the return value inside the pythonOperator callable function, It is returning None. Any idea why this is happening and how to retrieve the return value inside python callable function? Thanks. Code and screenshots below.

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod 
import KubernetesPodOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from kubernetes.client import models as k8s

def set_input_f(**context):
print("!!!-Pushing input-!!!")
context['ti'].xcom_push(key='input', value='Mr.CocaCola')
print(context['ti'].xcom_pull(key="input"))

def get_output_f(**context):
print("!!!-Getting Output-!!!")
print(context)
pod_logs = context['task_instance'].xcom_pull(task_ids='run_java_pod_xcom')
#output = pod_logs.split('\n')[-2]
print('Output:', pod_logs)
ti=context['ti']
pod_logs1 = ti.xcom_pull(task_ids='run_java_pod_xcom')
print('Output1:', pod_logs1)

with DAG(dag_id="kubernetes_java_xcom",
     start_date=datetime(2021,1,1),
     schedule_interval="@monthly",
     catchup=False) as dag:
set_input = PythonOperator(
    task_id='set_input',
    python_callable=set_input_f
)
get_output = PythonOperator(
    task_id='get_output',
    python_callable=get_output_f,
    provide_context=True
)
k = KubernetesPodOperator(
kubernetes_conn_id="<kubernetes connection>",
namespace="default",
image="<private image location>",
image_pull_secrets=[k8s.V1LocalObjectReference("xxxxx")],
name="run_java_pod_xcom",
cmds=["bash", "-cx"],
env_vars={"INPUT_XCOM": "{{ ti.xcom_pull(task_ids='set_input', key='input') }}"},
arguments=["java -jar /deployments/app.jar"],
labels={"foo": "bar"},
task_id="run_java_code",
do_xcom_push=True
)           



set_input >> k >> get_output

Set input task log

KubernetesPodOperator return_value in ui:

get_output task's log:

答案1

得分: 0

在PythonOperator中,task_ids的名称应为"run_java_code"(k的task_id),而不是"run_java_pod_xcom"。

    def get_output_f(**context):
        print("!!!-获取输出-!!!")
        print(context)
        pod_logs = context['task_instance'].xcom_pull(task_ids='run_java_pod')
英文:

the name of the task_ids in the PythonOperator should be "run_java_code" (the task_id of the k) and not "run_java_pod_xcom"

def get_output_f(**context):
    print("!!!-Getting Output-!!!")
    print(context)
    pod_logs = context['task_instance'].xcom_pull(task_ids='run_java_pod')

huangapple
  • 本文由 发表于 2023年5月11日 00:27:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76220700.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定