如何在Airflow 2中使用KubernetesPodOperator的`pod_template_file`参数?

huangapple go评论56阅读模式
英文:

How to use pod_template_file parameter with KubernetesPodOperator in airflow 2?

问题

I am running airflow via MWAA on aws and the worker nodes are running k8s. The pods are getting scheduled just fine but I am trying to use pod_template_file with KubernetesPodOperator, it's giving me a bunch of uncertain behavior.

My template file stored in S3

apiVersion: v1
kind: Pod
metadata:
  name: app1
  namespace: app1
spec:
  containers:
  - name: base
    image: "alpine:latest"
    command: ["/bin/sh"]
    args: ["-c", "while true; do echo hi>> /data/app.log; sleep 5; done"]
    volumeMounts:
    - name: persistent-storage
      mountPath: /data
  volumes:
  - name: persistent-storage
    persistentVolumeClaim:
      claimName: claim1

My DAG file

t_1 = KubernetesPodOperator(
    task_id=job_name_1,
    namespace="app",
    name=job_name_1,
    get_logs=True,
    is_delete_operator_pod=False,
    in_cluster=False,
    config_file=kube_config_path,
    startup_timeout_seconds=240,
    cluster_context='test',
    pod_template_file="/opt/airflow/pod_template_file_example-1.yaml",
    dag=dag)

When I go with this, I get an error that the pod spec is invalid as it's missing the image field. This is surprising as image is present in the pod-template.

I also tried below, it works, but it totally ignores the pod-template file and spins up an Alpine container and exits. So looks like it's totally ignoring the pod_template_file param.

full_pod_spec = k8s.V1Pod(
    metadata=metadata_2,
    spec=k8s.V1PodSpec(containers=[
        k8s.V1Container(
            name="base",
            image="alpine:latest",
        )
    ], ))
    
t_1 = KubernetesPodOperator(
    task_id=job_name_1,
    namespace="mlops",
    name=job_name_1,
    get_logs=True,
    is_delete_operator_pod=False,
    in_cluster=False,
    config_file=kube_config_path,
    startup_timeout_seconds=240,
    cluster_context='aws',
    full_pod_spec=full_pod_spec,
    pod_template_file="/opt/airflow/pod_template_file_example-1.yaml",
    dag=dag)

What is the correct way to reference a pod_template_file in KubernetesPodOperator in Airflow?

英文:

I am running airflow via MWAA on aws and the worker nodes are running k8s. The pods are getting scheduled just fine but I am trying to use pod_template_file with KubernetesPodOperator, it's giving me a bunch of uncertain behavior.

My template file stored in S3

apiVersion: v1
kind: Pod
metadata:
  name: app1
  namespace: app1
spec:
  containers:
  - name: base
    image: "alpine:latest"
    command: ["/bin/sh"]
    args: ["-c", "while true; do echo hi>> /data/app.log; sleep 5; done"]
    volumeMounts:
    - name: persistent-storage
      mountPath: /data
  volumes:
  - name: persistent-storage
    persistentVolumeClaim:
      claimName: claim1

My DAG file

t_1 = KubernetesPodOperator(
    task_id=job_name_1,
    namespace="app",
    name=job_name_1,
    get_logs=True,
    is_delete_operator_pod=False,
    in_cluster=False,
    config_file=kube_config_path,
    startup_timeout_seconds=240,
    cluster_context='test',
    pod_template_file="/opt/airflow/pod_template_file_example-1.yaml",
    dag=dag)

When I go with this, I get an error that the pod spec is invalid as it's missing image field. This is surprising as image is present in pod-template.

I also tried below, it works, but it totally ignores the pod-template file and spins up a alpine container and exits. So looks like it's totally ignoring the pod_template_file param.

full_pod_spec = k8s.V1Pod(
    metadata=metadata_2,
    spec=k8s.V1PodSpec(containers=[
        k8s.V1Container(
            name="base",
            image="alpine:latest",
        )
    ], ))

t_1 = KubernetesPodOperator(
    task_id=job_name_1,
    namespace="mlops",
    name=job_name_1,
    get_logs=True,
    is_delete_operator_pod=False,
    in_cluster=False,
    config_file=kube_config_path,
    startup_timeout_seconds=240,
    cluster_context='aws',
    full_pod_spec=full_pod_spec,
    pod_template_file="/opt/airflow/pod_template_file_example-1.yaml",
    dag=dag)

What is the correct way to reference a pod_template_file in KubernetesPodOperator in airflow?

References : medium

答案1

得分: 2

从未在AWS上使用过MWAA。但是,你的问题似乎是:

你的模板文件是否以路径"/opt/airflow/pod_template_file_example-1.yaml"存储在S3,并且可以通过DAG脚本文件访问?

查看文档了解KubernetesPodOperator和Kubernetes对象规范之间的区别

因此,KubernetesPodOperator只是为Pod创建等效的YAML/JSON对象规范,它包含4种类型的参数,按照以下优先顺序:

  1. KubernetesPodOperator的参数,如name/namespace/image...;
  2. 完整的Pod规范,即参数:full_pod_spec;
  3. Pod模板文件,即参数:pod_template_file;
  4. Airflow连接。
英文:

Never used MWAA on aws before. But your problem seems to be:

Is your template file stored in S3 available to the DAG script file through path "/opt/airflow/pod_template_file_example-1.yaml"?

Check out the docs for Difference between KubernetesPodOperator and Kubernetes object spec.

So KubernetesPodOperator just creates the equivalent YAML/JSON object spec for the Pod, and it contains 4 types of arguments for this purpose with the following order of precedence:

  1. KubernetesPodOperator arguments, like name/namespace/image...;
  2. full pod spec, which is arg: full_pod_spec;
  3. pod template file, which is arg: pod_template_file;
  4. airflow connection;

huangapple
  • 本文由 发表于 2023年3月7日 13:57:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/75658478.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定