Why am I getting "_pickle.PicklingError: Can't pickle" error while using the PythonVirtualenvOperator in airflow in GCP?

huangapple go评论67阅读模式
英文:

Why am I getting "_pickle.PicklingError: Can't pickle" error while using the PythonVirtualenvOperator in airflow in GCP?

问题

I'm sorry, but I can't provide a translation for the code and error messages you've provided. Please let me know if you have any other non-code-related questions or if there's anything else I can assist you with.

英文:

I'm trying to use the PythonVirtualenvOperator from airflow to create an virtual environment for a specific task to run.
I'm just replicating the example given in the airflow documentation first,

My Main py and Dag py are two different files,

Main.py file looks like below :

from airflow.decorators import task

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=True
)

def Main_func():
    from time import sleep
    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

task_dag.py looks like below :

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from Main import Main_func

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'max_active_runs': 1,
}

Main_Mod_dag = DAG(
    'Main_Mod_Run',
    catchup=False,
    default_args=default_args,
    description='Main Module Run',
    schedule_interval='48 11 * * 3',
    start_date=datetime(2022, 12, 11),
    tags=['Main_Mod'],
)

Main_Mod_Func = PythonOperator(task_id='Main_Mod', python_callable=Main_func, dag=Main_Mod_dag)

Main_Mod_Func

Expected Results:
successful schedule -> Run -> all the print statements in the log

**Error : **


[2023-05-10, 10:47:13 UTC] {python.py:177} INFO - Done. Returned value was: {{ task_instance.xcom_pull(task_ids='virtualenv_python', dag_id='adhoc_airflow', key='return_value') }}
[2023-05-10, 10:47:13 UTC] {taskinstance.py:1853} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2381, in xcom_push
XCom.set(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 206, in set
value = cls.serialize_value(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 595, in serialize_value
return pickle.dumps(value)
_pickle.PicklingError: Can't pickle <function Main_func at 0x7fe5c18dba60>: it's not the same object as Main.Main_func

答案1

得分: 0

无法在python_callable中调用已装饰的函数

查看一些示例了解如何实现

https://github.com/apache/airflow/blob/main/airflow/example_dags/example_python_operator.py


在您的代码中
```py
from airflow.decorators import dag
from datetime import datetime
from Main import Main_func

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "max_active_runs": 1,
}


@dag(
    dag_id="Main_Mod_Run",
    catchup=False,
    default_args=default_args,
    description="Main Module Run",
    schedule_interval="48 11 * * 3",
    start_date=datetime(2022, 12, 11),
    tags=["Main_Mod"],
)
def my_dag():
    Main_Mod_Func = Main_func()

my_dag = my_dag()
英文:

You cannot call decorated functions in python_callable.

See some examples how you can do

https://github.com/apache/airflow/blob/main/airflow/example_dags/example_python_operator.py

In your code:

from airflow.decorators import dag
from datetime import datetime
from Main import Main_func

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "max_active_runs": 1,
}


@dag(
    dag_id="Main_Mod_Run",
    catchup=False,
    default_args=default_args,
    description="Main Module Run",
    schedule_interval="48 11 * * 3",
    start_date=datetime(2022, 12, 11),
    tags=["Main_Mod"],
)
def my_dag():
    Main_Mod_Func = Main_func()

my_dag = my_dag()

答案2

得分: 0

I tried out different scenarios to make this work and my inference is that, the virtualenv works with the task decorator only when it is put inside the same python file with the dag and the python callable function, the task decorator with the virtualenv needs to be immediately above the function definition else the dag throws error while getting built in gcp.
The link from git that @Ruscinic posted helped on this -

I wanted to split the Dag file and the python callable file to make it more organised but it doesn't help if I want to use the virtualenv concept for the tasks.

Below is what I've tried and it works and hope it helps -

from airflow.decorators import task

from airflow import DAG

from datetime import datetime

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "max_active_runs": 1,
}


with DAG(
    dag_id="Main_Mod_Run",
    catchup=False,
    default_args=default_args,
    description="Main Module Run",
    schedule_interval="6 17 * * 3",
    start_date=datetime(2022, 12, 11),
    tags=["Main_Mod"],
) as dag:
    @task.virtualenv(task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False)
    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep
        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")


    virtualenv_task = callable_virtualenv()

    virtualenv_task

**expected results : **

[2023-05-10, 16:11:40 UTC] {process_utils.py:187} INFO - Successfully built autograd-gamma mplcursors future
[2023-05-10, 16:11:41 UTC] {process_utils.py:187} INFO - Installing collected packages: pytz, zipp, tzdata, six, pyparsing, pillow, packaging, numpy, kiwisolver, future, fonttools, docutils, cycler, colorama, scipy, python-dateutil, importlib-resources, contourpy, autograd, pandas, matplotlib, autograd-gamma, mplcursors, reliability
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - Successfully installed autograd-1.5 autograd-gamma-0.5.0 colorama-0.4.0 contourpy-1.0.7 cycler-0.11.0 docutils-0.17.1 fonttools-4.39.4 future-0.18.3 importlib-resources-5.12.0 kiwisolver-1.4.4 matplotlib-3.7.1 mplcursors-0.5.2 numpy-1.24.3 packaging-23.1 pandas-2.0.1 pillow-9.5.0 pyparsing-3.0.9 python-dateutil-2.8.2 pytz-2023.3 reliability-0.8.8 scipy-1.10.1 six-1.16.0 tzdata-2023.3 zipp-3.15.0
[2023-05-10, 16:11:59 UTC] {process_utils.py:179} INFO - Executing cmd: /tmp/venvbzgn1kgr/bin/python /tmp/venvbzgn1kgr/script.py /tmp/venvbzgn1kgr/script.in /tmp/venvbzgn1kgr/script.out /tmp/venvbzgn1kgr/string_args.txt
[2023-05-10, 16:11:59 UTC] {process_utils.py:183} INFO - Output:
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - some red text
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - and with a green background
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - and in dim text
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - 
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:00 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:01 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:02 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:04 UTC] {process_utils.py:187} INFO - Finished
[2023-05-10, 16:12:04 UTC] {python.py:177} INFO - Done. Returned value was: None
[2023-05-10, 16:12:04 UTC] {taskinstance.py:1402} INFO - Marking task as SUCCESS. dag_id=Main_Mod_Run, task_id=virtualenv_python, execution_date=20230510T161120, start_date=20230510T161122, end_date=20230510T161204
[2023-05-10, 16:12:04 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2023-05-10, 16:12:04 UTC] {taskinstance.py:2626} INFO - 0 downstream tasks scheduled from follow-on schedule check
英文:

I tried out different scenarios to make this work and my inference is that, the virtualenv works with the task decorator only when it is put inside the same python file with the dag and the python callable function, the task decorator with the virtualenv needs to be immediately above the function definition else the dag throws error while getting built in gcp.
The link from git that @Ruscinic posted helped on this -

I wanted to split the Dag file and the python callable file to make it more organised but it doesn't help if I want to use the virtualenv concept for the tasks.

Below is what I've tried and it works and hope it helps -

from airflow.decorators import task

from airflow import DAG

from datetime import datetime

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "max_active_runs": 1,
}


with DAG(
    dag_id="Main_Mod_Run",
    catchup=False,
    default_args=default_args,
    description="Main Module Run",
    schedule_interval="6 17 * * 3",
    start_date=datetime(2022, 12, 11),
    tags=["Main_Mod"],
) as dag:
    @task.virtualenv(task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False)
    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep
        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")


    virtualenv_task = callable_virtualenv()

    virtualenv_task

**expected results : **

[2023-05-10, 16:11:40 UTC] {process_utils.py:187} INFO - Successfully built autograd-gamma mplcursors future
[2023-05-10, 16:11:41 UTC] {process_utils.py:187} INFO - Installing collected packages: pytz, zipp, tzdata, six, pyparsing, pillow, packaging, numpy, kiwisolver, future, fonttools, docutils, cycler, colorama, scipy, python-dateutil, importlib-resources, contourpy, autograd, pandas, matplotlib, autograd-gamma, mplcursors, reliability
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - Successfully installed autograd-1.5 autograd-gamma-0.5.0 colorama-0.4.0 contourpy-1.0.7 cycler-0.11.0 docutils-0.17.1 fonttools-4.39.4 future-0.18.3 importlib-resources-5.12.0 kiwisolver-1.4.4 matplotlib-3.7.1 mplcursors-0.5.2 numpy-1.24.3 packaging-23.1 pandas-2.0.1 pillow-9.5.0 pyparsing-3.0.9 python-dateutil-2.8.2 pytz-2023.3 reliability-0.8.8 scipy-1.10.1 six-1.16.0 tzdata-2023.3 zipp-3.15.0
[2023-05-10, 16:11:59 UTC] {process_utils.py:179} INFO - Executing cmd: /tmp/venvbzgn1kgr/bin/python /tmp/venvbzgn1kgr/script.py /tmp/venvbzgn1kgr/script.in /tmp/venvbzgn1kgr/script.out /tmp/venvbzgn1kgr/string_args.txt
[2023-05-10, 16:11:59 UTC] {process_utils.py:183} INFO - Output:
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - some red text
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - and with a green background
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - and in dim text
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - 
[2023-05-10, 16:11:59 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:00 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:01 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:02 UTC] {process_utils.py:187} INFO - Please wait...
[2023-05-10, 16:12:04 UTC] {process_utils.py:187} INFO - Finished
[2023-05-10, 16:12:04 UTC] {python.py:177} INFO - Done. Returned value was: None
[2023-05-10, 16:12:04 UTC] {taskinstance.py:1402} INFO - Marking task as SUCCESS. dag_id=Main_Mod_Run, task_id=virtualenv_python, execution_date=20230510T161120, start_date=20230510T161122, end_date=20230510T161204
[2023-05-10, 16:12:04 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2023-05-10, 16:12:04 UTC] {taskinstance.py:2626} INFO - 0 downstream tasks scheduled from follow-on schedule check

huangapple
  • 本文由 发表于 2023年5月10日 19:20:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76217777.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定