英文:
Python mutiprocessing within the same AWS Glue 4.0 job hangs
问题
我试图使用Python的多进程来在同一个AWS Glue 4.0作业中并行处理数据。我知道我可以使用多个作业的Glue工作流来实现并行数据处理,但由于与此无关的原因,这是我不想做的事情。
这是我的Python代码:
从第3行到第81行是Python代码,略去
不幸的是,尽管它似乎正确启动了多个工作进程,但它卡住了,直到Glue作业最终超时。
这是我在CloudWatch输出日志中看到的。错误日志中没有错误。
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE1 LOADING: TABLE1
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE2 LOADING: TABLE2
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE3 LOADING: TABLE3
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE4 LOADING: TABLE4
2023-04-19T12:01:49.603+02:00 STARTED WORKER: TABLE5 LOADING: TABLE5
2023-04-19T12:01:49.604+02:00 STARTED WORKER: TABLE6 LOADING: TABLE6
2023-04-19T12:01:49.607+02:00 STARTED WORKER: TABLE7 LOADING: TABLE7
2023-04-19T12:01:49.608+02:00 STARTED WORKER: TABLE8 LOADING: TABLE8
2023-04-19T12:01:49.609+02:00 STARTED WORKER: TABLE9 LOADING: TABLE9
我尝试了几种方法,但我无法准确理解问题是什么,除了似乎卡在create_dynamic_frame.from_catalog()
上。
有人尝试过类似的操作并解决了吗?
为什么它不起作用?
非常感谢!
英文:
I am trying to use Python Multiprocessing to process data in parallel within the same AWS Glue 4.0 job. I know that I could use Glue Workflows with multiple jobs to achieve parallel data processing, but for reasons that are irrelevant here, it is something that I don't want to do.
This is my Python code:
from multiprocessing import Pool
import sys
import time
import random
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print(f"{args['JOB_NAME']} STARTED")
def worker(table_name, tmp_dir):
print(f"STARTED WORKER: {table_name}")
data = load_data(table_name, tmp_dir)
process_data(table_name, data)
print(f"FINISHED WORKER: {table_name}")
def load_data(table_name, tmp_dir):
print(f"LOADING: {table_name}")
data = glueContext.create_dynamic_frame.from_catalog(database="my_database",
table_name=table_name,
redshift_tmp_dir=f"{tmp_dir}/{table_name}",
transformation_ctx=f"data_source_{table_name}")
time.sleep(random.randint(1, 5)) # added here to simulate different loading times
print(f"LOADED: {table_name} has {data.count()} rows")
return data
def process_data(table_name, data):
print(f"PROCESSING: {table_name}")
# do something
time.sleep(random.randint(1, 5)) # added here to simulate different processing times
print(f"DONE: {table_name}")
pool = Pool(4)
tables = ['TABLE1', 'TABLE2', 'TABLE3', 'TABLE4', 'TABLE5', 'TABLE6', 'TABLE7', 'TABLE8', 'TABLE9']
for table in tables:
pool.apply_async(worker, args=(table, args['TempDir']))
pool.close()
pool.join()
print(f"{args['JOB_NAME']} COMPLETED")
job.commit()
Unfortunately, while it seems to start multiple workers correctly, it hangs and never completes until the Glue job finally times out.
This is what I see in the CloudWatch output log. There are no errors in the error log.
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE1 LOADING: TABLE1
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE2 LOADING: TABLE2
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE3 LOADING: TABLE3
2023-04-19T12:01:49.566+02:00 STARTED WORKER: TABLE4 LOADING: TABLE4
2023-04-19T12:01:49.603+02:00 STARTED WORKER: TABLE5 LOADING: TABLE5
2023-04-19T12:01:49.604+02:00 STARTED WORKER: TABLE6 LOADING: TABLE6
2023-04-19T12:01:49.607+02:00 STARTED WORKER: TABLE7 LOADING: TABLE7
2023-04-19T12:01:49.608+02:00 STARTED WORKER: TABLE8 LOADING: TABLE8
2023-04-19T12:01:49.609+02:00 STARTED WORKER: TABLE9 LOADING: TABLE9
I have tried several things, but I cannot understand exactly what the problem is, except that it seems to be hanging on create_dynamic_frame.from_catalog()
.
Has anybody attempted to do the same and solved it?
Why doesn't it work?
Thank you in advance!
答案1
得分: 1
经过多次尝试和添加额外的调试信息和异常处理,我发现 Python 的 multiprocessing
与 AWS Glue 不兼容。我从 create_dynamic_frame.from_catalog()
得到的错误是 JsonOptions does not exist in the JVM
,无法继续进行。
然而,将 multiprocessing.Pool()
替换为 concurrent.futures.ThreadPoolExecutor()
可以正常工作,现在我可以在同一个 Glue 作业中运行并行进程。
英文:
After several attempts and adding additional debugging information and exceptions handling, I found out that Python's multiprocessing
doesn't work with AWS Glue. The error I got from create_dynamic_frame.from_catalog()
is JsonOptions does not exist in the JVM
and couldn't go any further.
However, replacing multiprocessing.Pool()
with concurrent.futures.ThreadPoolExecutor()
worked and I can now run parallel processes within the same Glue job.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论