Python多进程在同一个AWS Glue 4.0作业中卡住

huangapple go评论71阅读模式
英文:

Python mutiprocessing within the same AWS Glue 4.0 job hangs

问题

我试图使用Python的多进程来在同一个AWS Glue 4.0作业中并行处理数据。我知道我可以使用多个作业的Glue工作流来实现并行数据处理,但由于与此无关的原因,这是我不想做的事情。

这是我的Python代码:

从第3行到第81行是Python代码略去

不幸的是,尽管它似乎正确启动了多个工作进程,但它卡住了,直到Glue作业最终超时。

这是我在CloudWatch输出日志中看到的。错误日志中没有错误。

2023-04-19T12:01:49.566+02:00	STARTED WORKER: TABLE1 LOADING: TABLE1
2023-04-19T12:01:49.566+02:00	STARTED WORKER: TABLE2 LOADING: TABLE2
2023-04-19T12:01:49.566+02:00	STARTED WORKER: TABLE3 LOADING: TABLE3 
2023-04-19T12:01:49.566+02:00   STARTED WORKER: TABLE4 LOADING: TABLE4
2023-04-19T12:01:49.603+02:00	STARTED WORKER: TABLE5 LOADING: TABLE5
2023-04-19T12:01:49.604+02:00	STARTED WORKER: TABLE6 LOADING: TABLE6
2023-04-19T12:01:49.607+02:00	STARTED WORKER: TABLE7 LOADING: TABLE7
2023-04-19T12:01:49.608+02:00	STARTED WORKER: TABLE8 LOADING: TABLE8
2023-04-19T12:01:49.609+02:00	STARTED WORKER: TABLE9 LOADING: TABLE9

我尝试了几种方法,但我无法准确理解问题是什么,除了似乎卡在create_dynamic_frame.from_catalog()上。

有人尝试过类似的操作并解决了吗?
为什么它不起作用?

非常感谢!

英文:

I am trying to use Python Multiprocessing to process data in parallel within the same AWS Glue 4.0 job. I know that I could use Glue Workflows with multiple jobs to achieve parallel data processing, but for reasons that are irrelevant here, it is something that I don't want to do.

This is my Python code:

from multiprocessing import Pool
import sys
import time
import random

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print(f"{args['JOB_NAME']} STARTED")

def worker(table_name, tmp_dir):
    print(f"STARTED WORKER: {table_name}")
    data = load_data(table_name, tmp_dir)
    process_data(table_name, data)
    print(f"FINISHED WORKER: {table_name}")
    
def load_data(table_name, tmp_dir):    
    print(f"LOADING: {table_name}")
    data = glueContext.create_dynamic_frame.from_catalog(database="my_database",
                                                         table_name=table_name,
                                                         redshift_tmp_dir=f"{tmp_dir}/{table_name}",
                                                         transformation_ctx=f"data_source_{table_name}")
    time.sleep(random.randint(1, 5))  # added here to simulate different loading times
    print(f"LOADED: {table_name} has {data.count()} rows")
    return data

def process_data(table_name, data):
    print(f"PROCESSING: {table_name}")
    # do something
    time.sleep(random.randint(1, 5))  # added here to simulate different processing times
    print(f"DONE: {table_name}")

pool = Pool(4)
tables = ['TABLE1', 'TABLE2', 'TABLE3', 'TABLE4', 'TABLE5', 'TABLE6', 'TABLE7', 'TABLE8', 'TABLE9']
for table in tables:
    pool.apply_async(worker, args=(table, args['TempDir']))
pool.close()
pool.join()

print(f"{args['JOB_NAME']} COMPLETED")
job.commit()

Unfortunately, while it seems to start multiple workers correctly, it hangs and never completes until the Glue job finally times out.

This is what I see in the CloudWatch output log. There are no errors in the error log.

2023-04-19T12:01:49.566+02:00	STARTED WORKER: TABLE1 LOADING: TABLE1
2023-04-19T12:01:49.566+02:00	STARTED WORKER: TABLE2 LOADING: TABLE2
2023-04-19T12:01:49.566+02:00	STARTED WORKER: TABLE3 LOADING: TABLE3 
2023-04-19T12:01:49.566+02:00   STARTED WORKER: TABLE4 LOADING: TABLE4
2023-04-19T12:01:49.603+02:00	STARTED WORKER: TABLE5 LOADING: TABLE5
2023-04-19T12:01:49.604+02:00	STARTED WORKER: TABLE6 LOADING: TABLE6
2023-04-19T12:01:49.607+02:00	STARTED WORKER: TABLE7 LOADING: TABLE7
2023-04-19T12:01:49.608+02:00	STARTED WORKER: TABLE8 LOADING: TABLE8
2023-04-19T12:01:49.609+02:00	STARTED WORKER: TABLE9 LOADING: TABLE9

I have tried several things, but I cannot understand exactly what the problem is, except that it seems to be hanging on create_dynamic_frame.from_catalog().

Has anybody attempted to do the same and solved it?
Why doesn't it work?

Thank you in advance!

答案1

得分: 1

经过多次尝试和添加额外的调试信息和异常处理,我发现 Python 的 multiprocessing 与 AWS Glue 不兼容。我从 create_dynamic_frame.from_catalog() 得到的错误是 JsonOptions does not exist in the JVM,无法继续进行。

然而,将 multiprocessing.Pool() 替换为 concurrent.futures.ThreadPoolExecutor() 可以正常工作,现在我可以在同一个 Glue 作业中运行并行进程。

英文:

After several attempts and adding additional debugging information and exceptions handling, I found out that Python's multiprocessing doesn't work with AWS Glue. The error I got from create_dynamic_frame.from_catalog() is JsonOptions does not exist in the JVM and couldn't go any further.

However, replacing multiprocessing.Pool() with concurrent.futures.ThreadPoolExecutor() worked and I can now run parallel processes within the same Glue job.

huangapple
  • 本文由 发表于 2023年4月19日 18:28:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76053434.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定