ImportError: 无法导入名称 ‘Param’,来自 airflow.models

huangapple go评论62阅读模式
英文:

airflow.models import Param ImportError: cannot import name 'Param'

问题

我遇到以下错误。我应该怎么做才能修复它?

from airflow.models import Param ImportError: cannot import name 'Param'

> Airflow version: 2.0.0

DAG:

from datetime import datetime
from airflow import DAG
from airflow.models import Param
from airflow.operators.python import PythonOperator

is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"

def my_main(is_debug: bool, seti: str):
    print(is_debug)
    print(seti)

with DAG(
        dag_id="my_main",
        description='my_main',
        start_date=datetime(2022, 6, 8, 1, 0),
        schedule_interval=None,
        catchup=False,
        params={
            "is_debug": Param(default=False, type="boolean", ),
            "seti": Param(default="FG", type="string", enum=["FG", "RX"]),
        },
        render_template_as_native_obj=True,
) as dag:
    task_my_main = PythonOperator(
        task_id='task_my_main_main',
        provide_context=True,
        python_callable=my_main,
        op_kwargs={
            "is_debug": is_debug_param,
            "seti": seti_param,
        },
    )

    (
        task_my_main
    )

请注意,我保留了代码中的原始格式,并只翻译了文本部分。

英文:

I am getting error as below. What can i do to fix it?

airflow.models import Param ImportError: cannot import name 'Param' 

> Airflow version: 2.0.0

DAG:

from datetime import datetime
from airflow import DAG
from airflow.models import Param
from airflow.operators.python import PythonOperator

is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"


def my_main(is_debug: bool, seti: str):
    print(is_debug)
    print(seti)


with DAG(
        dag_id="my_main",
        description='my_main',
        start_date=datetime(2022, 6, 8, 1, 0),
        schedule_interval=None,
        catchup=False,
        params={
            "is_debug": Param(default=False, type="boolean", ),
            "seti": Param(default="FG", type="string", enum=["FG", "RX"]),
        },
        render_template_as_native_obj=True,
) as dag:
    task_my_main = PythonOperator(
        task_id='task_my_main_main',
        provide_context=True,
        python_callable=my_main,
        op_kwargs={
            "is_debug": is_debug_param,
            "seti": seti_param,
        },
    )

    (
        task_my_main
    )

答案1

得分: 1

对于Airflow版本2.0.0,没有Param模块(此更改的PR已插入到版本2.2.0中 - 请参阅github.com/apache/airflow/pull/17100

另外,“render_template_as_native_obj”不受支持,因此您需要将参数进行类型转换。

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import ast 

is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"
lst_param = "{{ params.lst }}"

def my_main(is_debug, seti, lst):
    print(ast.literal_eval(is_debug)) # 布尔值
    print(seti)
    print(ast.literal_eval(lst)) # 列表

with DAG(
        dag_id="my_main",
        description='my_main',
        start_date=datetime(2022, 6, 8, 1, 0),
        schedule_interval=None,
        catchup=False,
        params={
            "is_debug": False,
            "seti": "FG",
            "lst": ["a", "b"],
        },
) as dag:
    task_my_main = PythonOperator(
        task_id='task_my_main_main',
        provide_context=True,
        python_callable=my_main,
        op_kwargs={
            "is_debug": is_debug_param,
            "seti": seti_param,
            "lst": lst_param,
        },
    )

(
    task_my_main
)
英文:

for airflow version 2.0.0 there is no Param module (the pr for this change inserted in version 2.2.0 - see github.com/apache/airflow/pull/17100)

also "render_template_as_native_obj" is not supported, so you'll have to cast the parameters.

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import ast 

is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"
lst_param = "{{ params.lst }}"


def my_main(is_debug, seti, lst):
    print(ast.literal_eval(is_debug)) # bool
    print(seti)
    print(ast.literal_eval(lst)) # list


with DAG(
        dag_id="my_main",
        description='my_main',
        start_date=datetime(2022, 6, 8, 1, 0),
        schedule_interval=None,
        catchup=False,
        params={
            "is_debug": False,
            "seti": "FG",
            "lst": ["a", "b"],
        },
) as dag:
    task_my_main = PythonOperator(
        task_id='task_my_main_main',
        provide_context=True,
        python_callable=my_main,
        op_kwargs={
            "is_debug": is_debug_param,
            "seti": seti_param,
            "lst": lst_param,
        },
    )

    (
        task_my_main
    )

huangapple
  • 本文由 发表于 2023年6月15日 20:37:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76482553.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定