英文:
airflow.models import Param ImportError: cannot import name 'Param'
问题
我遇到以下错误。我应该怎么做才能修复它?
from airflow.models import Param ImportError: cannot import name 'Param'
> Airflow version: 2.0.0
DAG:
from datetime import datetime
from airflow import DAG
from airflow.models import Param
from airflow.operators.python import PythonOperator
is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"
def my_main(is_debug: bool, seti: str):
print(is_debug)
print(seti)
with DAG(
dag_id="my_main",
description='my_main',
start_date=datetime(2022, 6, 8, 1, 0),
schedule_interval=None,
catchup=False,
params={
"is_debug": Param(default=False, type="boolean", ),
"seti": Param(default="FG", type="string", enum=["FG", "RX"]),
},
render_template_as_native_obj=True,
) as dag:
task_my_main = PythonOperator(
task_id='task_my_main_main',
provide_context=True,
python_callable=my_main,
op_kwargs={
"is_debug": is_debug_param,
"seti": seti_param,
},
)
(
task_my_main
)
请注意,我保留了代码中的原始格式,并只翻译了文本部分。
英文:
I am getting error as below. What can i do to fix it?
airflow.models import Param ImportError: cannot import name 'Param'
> Airflow version: 2.0.0
DAG:
from datetime import datetime
from airflow import DAG
from airflow.models import Param
from airflow.operators.python import PythonOperator
is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"
def my_main(is_debug: bool, seti: str):
print(is_debug)
print(seti)
with DAG(
dag_id="my_main",
description='my_main',
start_date=datetime(2022, 6, 8, 1, 0),
schedule_interval=None,
catchup=False,
params={
"is_debug": Param(default=False, type="boolean", ),
"seti": Param(default="FG", type="string", enum=["FG", "RX"]),
},
render_template_as_native_obj=True,
) as dag:
task_my_main = PythonOperator(
task_id='task_my_main_main',
provide_context=True,
python_callable=my_main,
op_kwargs={
"is_debug": is_debug_param,
"seti": seti_param,
},
)
(
task_my_main
)
答案1
得分: 1
对于Airflow版本2.0.0,没有Param模块(此更改的PR已插入到版本2.2.0中 - 请参阅github.com/apache/airflow/pull/17100)
另外,“render_template_as_native_obj”不受支持,因此您需要将参数进行类型转换。
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import ast
is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"
lst_param = "{{ params.lst }}"
def my_main(is_debug, seti, lst):
print(ast.literal_eval(is_debug)) # 布尔值
print(seti)
print(ast.literal_eval(lst)) # 列表
with DAG(
dag_id="my_main",
description='my_main',
start_date=datetime(2022, 6, 8, 1, 0),
schedule_interval=None,
catchup=False,
params={
"is_debug": False,
"seti": "FG",
"lst": ["a", "b"],
},
) as dag:
task_my_main = PythonOperator(
task_id='task_my_main_main',
provide_context=True,
python_callable=my_main,
op_kwargs={
"is_debug": is_debug_param,
"seti": seti_param,
"lst": lst_param,
},
)
(
task_my_main
)
英文:
for airflow version 2.0.0 there is no Param module (the pr for this change inserted in version 2.2.0 - see github.com/apache/airflow/pull/17100)
also "render_template_as_native_obj" is not supported, so you'll have to cast the parameters.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import ast
is_debug_param = "{{ params.is_debug }}"
seti_param = "{{ params.seti }}"
lst_param = "{{ params.lst }}"
def my_main(is_debug, seti, lst):
print(ast.literal_eval(is_debug)) # bool
print(seti)
print(ast.literal_eval(lst)) # list
with DAG(
dag_id="my_main",
description='my_main',
start_date=datetime(2022, 6, 8, 1, 0),
schedule_interval=None,
catchup=False,
params={
"is_debug": False,
"seti": "FG",
"lst": ["a", "b"],
},
) as dag:
task_my_main = PythonOperator(
task_id='task_my_main_main',
provide_context=True,
python_callable=my_main,
op_kwargs={
"is_debug": is_debug_param,
"seti": seti_param,
"lst": lst_param,
},
)
(
task_my_main
)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论