英文:
Python Async what is causing the memory leak?
问题
async def zip_reader(self, blobFileName, blobEndPoint, semaphore):
try:
# 访问 blob
async with ClientSecretCredential(TENANT, CLIENTID, CLIENTSECRET) as credential:
async with BlobServiceClient(account_url="https://blob1.blob.core.windows.net/", credential=credential, max_single_get_size=64 * 1024 * 1024, max_chunk_get_size=32 * 1024 * 1024) as blob_service_client:
async with blob_service_client.get_blob_client(container=blobEndPoint, blob=blobFileName) as blob_client:
async with semaphore:
logger.info(f"开始处理: {blobFileName}, {blobEndPoint}")
# 打开字节流
writtenbytes = io.BytesIO()
# 写入文件
stream = await blob_client.download_blob(max_concurrency=25)
stream = await stream.readinto(writtenbytes)
# ZIP 文件
f = ZipFile(writtenbytes)
# 文件列表
file_list = 展开收缩
# 发送到数据框
t_df = pd.DataFrame({'fileList': file_list})
# 添加文件名
t_df['blobFileName'] = blobFileName
t_df['blobEndPoint'] = blobEndPoint
if semaphore.locked():
await asyncio.sleep(1)
logger.info(f"处理完成: {blobFileName}")
# 在此处清理; 也尝试在这里删除对象
self.cleanup()
return t_df
async def cleanup(self):
gc.collect()
await asyncio.sleep(1)
async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, semaphore):
"""
主调用函数
"""
semaphore = asyncio.Semaphore(value=semaphore)
return await asyncio.gather(*[self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)], # 也尝试在这里附加)
英文:
I am downloading zip files and looking inside them to check their contents for a few million items, but I am constantly accruing memory and I will eventually go OOM, even with small semaphores.
Consider the block:
async def zip_reader(self, blobFileName, blobEndPoint, semaphore):
try:
# access blob
async with ClientSecretCredential(TENANT, CLIENTID, CLIENTSECRET) as credential:
async with BlobServiceClient(account_url="https://blob1.blob.core.windows.net/", credential=credential, max_single_get_size=64 * 1024 * 1024, max_chunk_get_size=32 * 1024 * 1024) as blob_service_client:
async with blob_service_client.get_blob_client(container=blobEndPoint, blob=blobFileName) as blob_client:
async with semaphore:
logger.info(f"Starting: {blobFileName}, {blobEndPoint}")
# open bytes
writtenbytes = io.BytesIO()
# write file to it
stream = await blob_client.download_blob(max_concurrency=25)
stream = await stream.readinto(writtenbytes)
# zipfile
f = ZipFile(writtenbytes)
# file list
file_list = 展开收缩
# send to df
t_df = pd.DataFrame({'fileList': file_list})
# add fileName
t_df['blobFileName'] = blobFileName
t_df['blobEndPoint'] = blobEndPoint
if semaphore.locked():
await asyncio.sleep(1)
logger.info(f"Completed: {blobFileName}")
# clean up here; also tried del on objs here as well
self.cleanup()
return t_df
async def cleanup(self):
gc.collect()
await asyncio.sleep(1)
async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, semaphore):
"""
main caller
"""
semaphore = asyncio.Semaphore(value=semaphore)
return await asyncio.gather(*[self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)], # also tried attaching here)
答案1
得分: 1
asyncio.gather
没有策略来限制同时执行任务的数量。你的信号量可以限制同时获取和处理的任务数量,但gather
会等待所有数据帧都可用后一次性返回。
不要使用单个await asyncio.gather
,而是使用类似asyncio.wait
的方式,并带有超时,以便控制运行的任务数量,并在数据帧准备好时逐个返回完整的数据帧。
此外,你没有展示调用async_file_as_bytes_generator
的程序的其余部分,但它必须按顺序获取生成的数据帧并进行处理。
另外,不需要显式调用gc.collect
:这是一个无操作。如果你的程序正确,Python会自动释放内存,而不会保留消耗内存的对象的引用。否则,gc.collect
也无济于事。
你的 "main caller" 可以像这样 - 但如我所指出的,你必须检查调用它的代码,以便一次处理每个数据帧,而不是期望当前代码一次返回所有数据帧的列表。
async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, task_limit):
"""
主调用者
"""
semaphore = asyncio.Semaphore(value=task_limit)
all_tasks = {self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)}
current_tasks = set()
while all_tasks or current_tasks:
while all_tasks and len(current_tasks) < task_limit:
current_tasks.add(all_tasks.pop())
done, incomplete = await asyncio.wait(current_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
# 可选择检查任务异常
yield task.result()
current_tasks = incomplete
注意:我已经将引号字符("
)替换为正常的引号字符。
英文:
asyncio.gather
has no strategy to limit the number of simultaneous tasks in execution at all. Your semaphore may limit how many are being fetched and processed at once - but gather will wait for all data frames to be avalible, and return all at once.
Instead of using a single await asyncio.gather
use something like asyncio.wait
with a timeout, and keep control of how many tasks are running, yielding the complete dataframes as they become ready.
And then, you didn't show the remaining of your program leading to the call to async_file_as_bytes_generator
, but it will have to consume the dataframes as they are yielded and dispose of them, of course.
Also: no need to do explicit calls to gc.collect
ever: this is a no-operation. Python does free your memory if your program is correct, and keep no references to objects consuming it. Otherwise there is nothing gc.collect
could do anyway.
Your "main caller" can be something along this - but as I denoted, you have to check the code that calls it so that it consumes each dataframe at once, and not expect a list with all dataframes as your current code do.
async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, task_limit):
"""
main caller
"""
semaphore = asyncio.Semaphore(value=task_limit)
all_tasks = {self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)}
current_tasks = set()
while all_tasks or current_tasks:
while all_tasks and len(current_tasks < task_limit):
current_tasks.add(all_tasks.pop())
done, incomplete = await asyncio.wait(current_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
# optionally check for task exception
yield task.result()
current_tasks = incomplete
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论