Python Async 是什么导致了内存泄漏?

huangapple go评论62阅读模式
英文:

Python Async what is causing the memory leak?

问题

    async def zip_reader(self, blobFileName, blobEndPoint, semaphore):

        try:
            # 访问 blob
            async with ClientSecretCredential(TENANT, CLIENTID, CLIENTSECRET) as credential:
                async with BlobServiceClient(account_url="https://blob1.blob.core.windows.net/", credential=credential, max_single_get_size=64 * 1024 * 1024, max_chunk_get_size=32 * 1024 * 1024) as blob_service_client:
                    async with blob_service_client.get_blob_client(container=blobEndPoint, blob=blobFileName) as blob_client:
                        async with semaphore:
                            logger.info(f"开始处理: {blobFileName}, {blobEndPoint}")

                            # 打开字节流
                            writtenbytes = io.BytesIO()

                            # 写入文件
                            stream = await blob_client.download_blob(max_concurrency=25)
                            stream = await stream.readinto(writtenbytes)

                            # ZIP 文件
                            f = ZipFile(writtenbytes)

                            # 文件列表
                            file_list = 
展开收缩
# 发送到数据框 t_df = pd.DataFrame({'fileList': file_list}) # 添加文件名 t_df['blobFileName'] = blobFileName t_df['blobEndPoint'] = blobEndPoint if semaphore.locked(): await asyncio.sleep(1) logger.info(f"处理完成: {blobFileName}") # 在此处清理; 也尝试在这里删除对象 self.cleanup() return t_df async def cleanup(self): gc.collect() await asyncio.sleep(1) async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, semaphore): """ 主调用函数 """ semaphore = asyncio.Semaphore(value=semaphore) return await asyncio.gather(*[self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)], # 也尝试在这里附加)
英文:

I am downloading zip files and looking inside them to check their contents for a few million items, but I am constantly accruing memory and I will eventually go OOM, even with small semaphores.

Consider the block:

    async def zip_reader(self, blobFileName, blobEndPoint, semaphore):
try:
# access blob
async with ClientSecretCredential(TENANT, CLIENTID, CLIENTSECRET) as credential:
async with BlobServiceClient(account_url="https://blob1.blob.core.windows.net/", credential=credential, max_single_get_size=64 * 1024 * 1024, max_chunk_get_size=32 * 1024 * 1024) as blob_service_client:
async with blob_service_client.get_blob_client(container=blobEndPoint, blob=blobFileName) as blob_client:
async with semaphore:
logger.info(f"Starting: {blobFileName}, {blobEndPoint}")
# open bytes
writtenbytes = io.BytesIO()
# write file to it
stream = await blob_client.download_blob(max_concurrency=25)
stream = await stream.readinto(writtenbytes)
# zipfile
f = ZipFile(writtenbytes)
# file list
file_list = 
展开收缩
# send to df t_df = pd.DataFrame({'fileList': file_list}) # add fileName t_df['blobFileName'] = blobFileName t_df['blobEndPoint'] = blobEndPoint if semaphore.locked(): await asyncio.sleep(1) logger.info(f"Completed: {blobFileName}") # clean up here; also tried del on objs here as well self.cleanup() return t_df async def cleanup(self): gc.collect() await asyncio.sleep(1) async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, semaphore): """ main caller """ semaphore = asyncio.Semaphore(value=semaphore) return await asyncio.gather(*[self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)], # also tried attaching here)

答案1

得分: 1

asyncio.gather没有策略来限制同时执行任务的数量。你的信号量可以限制同时获取和处理的任务数量,但gather会等待所有数据帧都可用后一次性返回。

不要使用单个await asyncio.gather,而是使用类似asyncio.wait的方式,并带有超时,以便控制运行的任务数量,并在数据帧准备好时逐个返回完整的数据帧。

此外,你没有展示调用async_file_as_bytes_generator的程序的其余部分,但它必须按顺序获取生成的数据帧并进行处理。

另外,不需要显式调用gc.collect:这是一个无操作。如果你的程序正确,Python会自动释放内存,而不会保留消耗内存的对象的引用。否则,gc.collect也无济于事。

你的 "main caller" 可以像这样 - 但如我所指出的,你必须检查调用它的代码,以便一次处理每个数据帧,而不是期望当前代码一次返回所有数据帧的列表。

async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, task_limit):
    """
    主调用者
    """
    semaphore = asyncio.Semaphore(value=task_limit)
    
    all_tasks = {self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)}
    current_tasks = set()
    while all_tasks or current_tasks:
        while all_tasks and len(current_tasks) < task_limit:
            current_tasks.add(all_tasks.pop())
            
        done, incomplete = await asyncio.wait(current_tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            # 可选择检查任务异常
            yield task.result()
        current_tasks = incomplete

注意:我已经将引号字符(&quot;)替换为正常的引号字符。

英文:

asyncio.gather has no strategy to limit the number of simultaneous tasks in execution at all. Your semaphore may limit how many are being fetched and processed at once - but gather will wait for all data frames to be avalible, and return all at once.

Instead of using a single await asyncio.gather use something like asyncio.wait with a timeout, and keep control of how many tasks are running, yielding the complete dataframes as they become ready.

And then, you didn't show the remaining of your program leading to the call to async_file_as_bytes_generator, but it will have to consume the dataframes as they are yielded and dispose of them, of course.

Also: no need to do explicit calls to gc.collect ever: this is a no-operation. Python does free your memory if your program is correct, and keep no references to objects consuming it. Otherwise there is nothing gc.collect could do anyway.

Your "main caller" can be something along this - but as I denoted, you have to check the code that calls it so that it consumes each dataframe at once, and not expect a list with all dataframes as your current code do.


async def async_file_as_bytes_generator(self, blobFileName, blobEndPoint, task_limit):
&quot;&quot;&quot;
main caller
&quot;&quot;&quot;
semaphore = asyncio.Semaphore(value=task_limit)
all_tasks = {self.zip_reader(fn, ep, semaphore) for fn, ep in zip(blobFileName, blobEndPoint)}
current_tasks = set()
while all_tasks or current_tasks:
while all_tasks and len(current_tasks &lt; task_limit):
current_tasks.add(all_tasks.pop())
done, incomplete = await asyncio.wait(current_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
# optionally check for task exception
yield task.result()
current_tasks = incomplete

huangapple
  • 本文由 发表于 2023年2月6日 09:29:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/75356644.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定