如何在 @task.external_python 中将 Python 列表用作全局变量?

huangapple go评论72阅读模式
英文:

How to use a python list as global variable python list with in @task.external_python?

问题

以下是您要的代码翻译部分:

# GOAL:
# 1. 作为全局变量在任务之间拥有一个Python列表。
# 2. 当前在第一个任务崩溃。
# 3. 1.) 我试图拥有一个简单的Python列表,它可以从一个任务传递到下一个任务,并在任务2中向其附加一些字符串值。因此,目标是拥有一个共享列表。
# 4. 2.) 即使有一个任务失败,也应该继续进行,不受影响(显然标记任务区域为失败)。

# SETUP:
# 1. 我使用Airflow 2.4.1
# 2. 我使用Airflow Docker构建了一个Python环境,我已经使用多次,而且运行正常。

# MY CODE:

from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()

my_default_args = {
    'owner': 'me',
    'email': ['some_email@some_email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'write_successes': [],
}

with DAG(
    dag_id='my_dag_id',
    schedule='9 9 * * *',
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
    default_args=my_default_args,
    tags=['a', 'b'],
) as dag:

    @task.external_python(task_id="one", python='/opt/airflow/venv1/bin/python3')
    def first(**kwargs):
        task_id="one"
        write_successes = kwargs.get('write_successes', [])

        print(write_successes)
        write_successes.append(99)
        print(write_successes)

    @task.external_python(task_id="two", python='/opt/airflow/venv1/bin/python3')
    def second(**kwargs):
        write_successes = kwargs.get('write_successes', [])

        print(write_successes)
        write_successes.append(101)
        print(write_successes)

    one = first()
    two = second()

    one >> two

# ERROR:
# (您的错误信息部分,已经被保留)

我已经为您翻译了代码部分,如有需要,请随时提出问题。

英文:

GOAL:

  • Have a python list as a global variable between tasks.
  • Currently it crashes at the 1st task.
  • 1.) I am trying to have a simple python list that is carried from 1 task to the next and append a few string values to it at task 2. So the goal is to have 1 shared list.
  • 2.) Even if 1 task fails it should just move on ad dotn care (obviously mark the task area failed)

SETUP:

  • I am on Airflow 2.4.1
  • I use Airflow Docker and build a python environemnt that I have used many times and just works fine.

MY CODE:

from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
my_default_args = {
'owner': 'me',
'email': ['some_email@some_email.com'],
'email_on_failure': True,
'email_on_retry': False, 
'write_successes': [],
}
with DAG(
dag_id='my_dag_id',
schedule='9 9 * * *',
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
default_args=my_default_args,
tags=['a', 'b'],
) as dag:
@task.external_python(task_id="one", python='/opt/airflow/venv1/bin/python3')
def first(**kwargs):
task_id="one"
write_successes = kwargs.get('write_successes', [])
print(write_successes)
write_successes.append(99)
print(write_successes)
@task.external_python(task_id="two", python='/opt/airflow/venv1/bin/python3')
def second(**kwargs):
write_successes = kwargs.get('write_successes', [])
print(write_successes)
write_successes.append(101)
print(write_successes)
one = first()
two = second()
one >> two

ERROR:

*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=scheduled__2023-02-05T09:09:00+00:00/task_id=one/attempt=1.log
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): one> on 2023-02-05 09:09:00+00:00
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:54} INFO - Started process 239657 to run task
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'one', 'scheduled__2023-02-05T09:09:00+00:00', '--job-id', '72751', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpxldmrzpp']
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:83} INFO - Job 72751: Subtask one
[2023-02-06, 12:24:43 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 12:24:43 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=one
AIRFLOW_CTX_EXECUTION_DATE=2023-02-05T09:09:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-02-05T09:09:00+00:00
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_start' or 'logical_date' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds_nodash }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 411, in _execute_python_callable_in_subprocess
self._write_args(input_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 381, in _write_args
file.write_bytes(self.pickling_library.dumps({'args': self.op_args, 'kwargs': self.op_kwargs}))
_pickle.PicklingError: Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=one, execution_date=20230205T090900, start_date=20230206T122443, end_date=20230206T122444
[2023-02-06, 12:24:44 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 72751 for task one (Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first; 239657)
[2023-02-06, 12:24:44 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-06, 12:24:44 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

I have tried to fix it based on the following posts:

from airflow.decorators import dag, task
from pendulum import datetime
@dag(
dag_id='test_global_variable',
start_date=datetime(2022,12,10),
schedule=None,
catchup=False,)
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/venv1/bin/python3')
def add_to_list(my_list):
print(my_list)
my_list.append(19)
return my_list
@task.external_python(task_id="task_2", python='/opt/airflow/venv1/bin/python3')
def add_to_list_2(my_list):
print(my_list)
my_list.append(42)
return my_list
add_to_list_2(add_to_list([23, 5, 8]))
write_var()

LOG From the succesful task

[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1362} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1364} INFO - 
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_1> on 2023-02-06 15:36:51.225176+00:00
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:54} INFO - Started process 249785 to run task
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_1', 'manual__2023-02-06T15:36:51.225176+00:00', '--job-id', '72908', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpuw6bfiif']
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:83} INFO - Job 72908: Subtask task_1
[2023-02-06, 15:36:52 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_1
AIRFLOW_CTX_EXECUTION_DATE=2023-02-06T15:36:51.225176+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-06T15:36:51.225176+00:00
[2023-02-06, 15:36:53 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmd35abbbcv/script.py /tmp/tmd35abbbcv/script.in /tmp/tmd35abbbcv/script.out /tmp/tmd35abbbcv/string_args.txt
[2023-02-06, 15:36:53 GMT] {process_utils.py:183} INFO - Output:
[2023-02-06, 15:36:54 GMT] {process_utils.py:187} INFO - [23, 5, 8]
[2023-02-06, 15:36:54 GMT] {python.py:177} INFO - Done. Returned value was: [23, 5, 8, 19]
[2023-02-06, 15:36:54 GMT] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=test_global_variable, task_id=task_1, execution_date=20230206T153651, start_date=20230206T153652, end_date=20230206T153654
[2023-02-06, 15:36:54 GMT] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-02-06, 15:36:54 GMT] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check

Screenshot:

如何在 @task.external_python 中将 Python 列表用作全局变量?

答案1

得分: 1

I'm curious what your tried for Airflow XCom? The following DAG passes a list from one task to another using XCom via the TaskFlow API. Tested for Airflow 2.5.1, but it should work the same with 2.4.1.

from airflow.decorators import dag, task
from pendulum import datetime


@dag(
    start_date=datetime(2022, 12, 10),
    schedule=None,
    catchup=False,
)
def write_var():

    @task.external_python(
        task_id="task_1",
        python='/home/astro/.pyenv/versions/my_env/bin/python'
    )
    def add_to_list(my_list):
        print(my_list)
        my_list.append(19)
        return my_list


    @task.external_python(
        task_id="task_2",
        python='/home/astro/.pyenv/versions/my_env/bin/python'
    )
    def add_to_list_2(my_list):
        print(my_list)
        my_list.append(42)
        return my_list

    add_to_list_2(add_to_list([23, 5, 8]))


write_var()

Screenshot:

如何在 @task.external_python 中将 Python 列表用作全局变量?

英文:

I'm curious what your tried for Airflow XCom? The following DAG passes a list from one task to another using XCom via the TaskFlow API. Tested for Airflow 2.5.1, but it should work the same with 2.4.1.

from airflow.decorators import dag, task
from pendulum import datetime


@dag(
    start_date=datetime(2022,12,10),
    schedule=None,
    catchup=False,
)
def write_var():

    @task.external_python(
        task_id="task_1",
        python='/home/astro/.pyenv/versions/my_env/bin/python'
    )
    def add_to_list(my_list):
        print(my_list)
        my_list.append(19)
        return my_list


    @task.external_python(
        task_id="task_2",
        python='/home/astro/.pyenv/versions/my_env/bin/python'
    )
    def add_to_list_2(my_list):
        print(my_list)
        my_list.append(42)
        return my_list

    add_to_list_2(add_to_list([23, 5, 8]))


write_var()

Screenshot:

如何在 @task.external_python 中将 Python 列表用作全局变量?

huangapple
  • 本文由 发表于 2023年2月6日 20:39:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/75361423.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定