英文:
Why is my Airflow hanging up if I send a http request inside a task?
问题
系统: MacOS Apple M1 (本地机器)
Airflow: 2.5.3
执行器: 本地与Postgres数据库
我正在尝试实现一些外部触发的工作流程,这些工作流程从我们的REST API中加载数据。我使用Python Operator来运行代码,并使用Airflow UI手动触发流程。但是,当执行到具有发送http请求代码的任务时,它会永远挂起,而且笔记本电脑开始运行得非常烫。
任务在另一个文件中定义,我将其导入为模块。以下是任务文件(tasks/import_logs.py)的内容:
import requests
def import_logs(**context):
print("运行日志导入器")
context["ti"].xcom_push(
key="logs", value=["log/location/1", "log/location/2"])
print("日志位置已推送到xcom")
# 定义虚拟端点的URL
url = 'https://jsonplaceholder.typicode.com/posts'
# 定义JSON请求的载荷
payload = {
"title": "foo",
"body": "bar",
"userId": 1
}
# 定义请求的标头
headers = {'Content-Type': 'application/json'}
# 发送POST请求到虚拟端点
response = requests.post(url, json=payload, headers=headers)
# 打印响应状态代码和内容
print(f'响应状态代码:{response.status_code}')
print(f'响应内容:{response.content}')
以下是DAG的定义:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from tasks.import_logs import import_logs
from tasks.import_tops import import_tops
from tasks.process_input import process_input
from tasks.process_log_data import process_logs
from tasks.output_logs import output_logs
from tasks.cleanup import cleanup
from tasks.trigger_data_update import trigger_data_update
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 31),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('process_log', default_args=default_args, schedule_interval=None)
validate_input = PythonOperator(
task_id='validate_input',
python_callable=process_input,
provide_context=True,
dag=dag
)
import_log = PythonOperator(
task_id='import_logs',
python_callable=import_logs,
provide_context=True,
dag=dag
)
import_top = PythonOperator(
task_id='import_tops',
python_callable=import_tops,
provide_context=True,
dag=dag
)
process_log = PythonOperator(
task_id='process_logs',
python_callable=process_logs,
provide_context=True,
dag=dag
)
output_log = PythonOperator(
task_id='write_logs',
python_callable=output_logs,
provide_context=True,
dag=dag
)
cleanup_task = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
provide_context=True,
dag=dag
)
update_task = PythonOperator(
task_id='trigger_data_update',
python_callable=trigger_data_update,
provide_context=True,
dag=dag
)
validate_input >> [import_top, import_log] >> process_log >> output_log >> [cleanup_task, update_task]
如果要使用Airflow HTTP Hook并使用连接,也会导致相同的行为。根据我的了解,从任务中发送请求不应该出现问题,所以我放弃了那种方法。
目前,所有其他任务也都是虚拟的,只是打印一些东西。在DAG文件底部运行的测试顺利通过,没有任何问题。我正在考虑以调试模式运行Airflow并开始调试,但这将花费很多时间。
英文:
System: MacOS Apple M1 (local machine)
Airflow: 2.5.3
Executor: Local with Postgres Database
I am trying to implement some externally triggered workflows which load data from our REST APIs. I am using the Python Operator to run the code and trigger the process manually using the Airflow UI. However, when the execution gets to the task which has code sending a http request, it hangs forever and the laptop starts running really hot.
The task is defined in another file and I am importing it as a module. Here are the contents of the task file (tasks/import_logs.py)
import requests
def import_logs(**context):
print("[Sasha] Running log importer")
context["ti"].xcom_push(
key="logs", value=["log/location/1", "log/location/2"])
print("Log locations pushed to xcom")
# Define the URL for the dummy endpoint
url = 'https://jsonplaceholder.typicode.com/posts'
# Define the payload for the JSON request
payload = {
"title": "foo",
"body": "bar",
"userId": 1
}
# Define the headers for the request
headers = {'Content-Type': 'application/json'}
# Send the POST request to the dummy endpoint
response = requests.post(url, json=payload, headers=headers)
# Print the response status code and content
print(f'Response status code: {response.status_code}')
print(f'Response content: {response.content}')
Here is the Dag definition:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from tasks.import_logs import import_logs
from tasks.import_tops import import_tops
from tasks.process_input import process_input
from tasks.process_log_data import process_logs
from tasks.output_logs import output_logs
from tasks.cleanup import cleanup
from tasks.trigger_data_update import trigger_data_update
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 31),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('process_log', default_args=default_args, schedule_interval=None)
validate_input = PythonOperator(
task_id='validate_input',
python_callable=process_input,
provide_context=True,
dag=dag
)
import_log = PythonOperator(
task_id='import_logs',
python_callable=import_logs,
provide_context=True,
dag=dag
)
import_top = PythonOperator(
task_id='import_tops',
python_callable=import_tops,
provide_context=True,
dag=dag
)
process_log = PythonOperator(
task_id='process_logs',
python_callable=process_logs,
provide_context=True,
dag=dag
)
output_log = PythonOperator(
task_id='write_logs',
python_callable=output_logs,
provide_context=True,
dag=dag
)
cleanup_task = PythonOperator(
task_id='cleanup',
python_callable=cleanup,
provide_context=True,
dag=dag
)
update_task = PythonOperator(
task_id='trigger_data_update',
python_callable=trigger_data_update,
provide_context=True,
dag=dag
)
validate_input >> [import_top, import_log] >> process_log >> output_log >> [cleanup_task, update_task]
if __name__ == "__main__":
import json
with open('test_conf/process_log.json', 'r') as f:
conf = json.load(f)
dag.test(
run_conf=conf
)
It started happening with the Sequential executor. Since then I have moved the instance to a Local executor, stripped all the extra code and just left a simple dummy request to try and narrow down the error, but it just keeps happening.
I also tried using the Airflow HTTP Hook with a Connection, which led to the same behavior. Reading about it it seems that sending a request from a task should not be an issue, so I abandoned that approach.
At this point all other tasks are dummy as well, just printing things. Running the test at the bottom of the DAG file goes through OK without any issues. I am thinking to run Airflow in debug mode and start debugging, but that will kill a lot of time.
答案1
得分: 4
我也遇到了HTTP请求挂起的问题,就像你一样。
我的本地环境是airflow2.5.1/python3.8.13/M1 macOS。
这似乎是一个macOS Python包的问题。
我通过添加如下的环境变量来解决了这个问题:
https://github.com/apache/airflow/discussions/24463
希望这可以帮助你。
英文:
I also encounter http request hanging out problem just like you.
My local environment is airflow2.5.1/python3.8.13/M1 macOS.
It seems like a macOS python package problem.
I fixed it by adding env like this below:
export NO_PROXY="*"
https://github.com/apache/airflow/discussions/24463
Hope this can help you.
答案2
得分: 0
Switching from PythonOperator
to PythonVirtualenvOperator
solved this for me. However, it added a lot of new issues related to communication between tasks due to some objects not being serializable.
英文:
Switching from PythonOperator
to PythonVirtualenvOperator
solved this for me. However, it added a lot of new issues related communication between tasks due to some objects not being serializable.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论