`aiomultiprocessing`池冻结和OSError:[Errno 24] 打开文件太多

huangapple go评论65阅读模式
英文:

aiomultiprocessing pool freezes and OSError: [Errno 24] Too many open files

问题

我看了这个问题,和chatGPT/bing进行了交流(哈哈),仍然无法解决。

我正在尝试在一个有32个CPU的机器上执行约7百万次API请求,将数据加载到postgres。我的代码基本上设置如下:

from aiomultiprocess import Pool
CPUS = 30 #(有32个CPU可用)
batch_size = 5000

for i in range(0, len(args_list), batch_size):
        log.info(f"<日志消息>")
        async with Pool(
            processes=CPUS,
            maxtasksperchild=100,
            childconcurrency=3,
            queuecount=int(CPUS / 3),
        ) as pool:
            await pool.starmap(fetch_api_results, args_list[i : i + batch_size])

--------编辑:
根据评论中的请求,添加了对fetch_api_results的编辑。基本上是一组构建API网址的函数,然后递归地进行aiohttp请求,直到API请求结果中不再有next_url令牌为止。

在这里。

from aiohttp import request

async def fetch_api_results(*args)
    try:
        result_objects= APIPaginator(*args)
        await result_objects.fetch()
        log.info("上传数据")
        #上传到数据库的函数

    except planned_exceptions as e:
        log.warning(e, exc_info=False)


class APIPaginator(object):
    async def query_data(self):
        url = self.api_base + "<arg中的字符串>"
        payload = {"limit": 1000}
        await self.query_all(url, payload)

    async def query_all(self, url, payload):
        try:
            async with request(method="GET", url=url, params=payload) as response:
                log.info(f"状态码: {response.status}")
                if response.status == 200:
                    results = await response.json()
                    self.results.append(results)
                    next_url = results.get("next_url")
                    if next_url:
                        await self.query_all(next_url)
                else:
                    response.raise_for_status()
         except: #(省略了except块)

    async def fetch(self):
        await self.query_data()

这将运行一到两个小时(预计需要一两天),然后会卡住。没有抛出错误。当我键盘中断时,会看到OSError: [Errno 24] Too many open files错误。

我把完整的回溯放在下面。

File "<path-to-file>.py", line 127, in import_data
    await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
    return await self.pool.results(self.task_ids)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
    await asyncio.sleep(0.005)
  File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
    return await future

File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/<path-to-file>/main.py", line 39, in add_all_data
    await import_data(args)
  File "/<path-to-file>/orchestrator.py", line 120, in import_data
    async with Pool(
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
    await self.join()
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
    await self._loop
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
    self.processes[self.create_worker(qid)] = qid
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
    process.start()
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
    return self.aio_process.start()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
    self.pid = util.spawnv_passfds(spawn.get_executable(),
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
    errpipe_read, errpipe_write = os.pipe()
                                  ^^^^^^^^^
OSError: [Errno 24] Too many open files
英文:

I've looked at this question and have talked with chatGPT/bing (lol) and still can't figure this one out.

I am trying to execute ~7mn requests to an API on a machine with 32 CPUs, loading the data to postgres. I have my code essentially set up like this:

from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000
for i in range(0, len(args_list), batch_size):
log.info(f&quot;&lt;log message&gt;&quot;)
async with Pool(
processes=CPUS,
maxtasksperchild=100,
childconcurrency=3,
queuecount=int(CPUS / 3),
) as pool:
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])

--------EDIT:
Adding a redaction of fetch_api_results per request in the comments. It is basically a set of functions that construct the api url, and then make aiohttp requests recursively until there are no more next_url tokens in the api request results.

Here it is.

from aiohttp import request
async def fetch_api_results(*args)
try:
result_objects= APIPaginator(*args)
await result_objects.fetch()
log.info(&quot;uploading data&quot;)
#upload to db function
except planned_exceptions as e:
log.warning(e, exc_info=False)
class APIPaginator(object):
async def query_data(self):
url = self.api_base + &quot;&lt;str from arg&gt;&quot;
payload = {&quot;limit&quot;: 1000}
await self.query_all(url, payload)
async def query_all(self, url, payload):
try:
async with request(method=&quot;GET&quot;, url=url, params=payload) as response:
log.info(f&quot;status code: {response.status}&quot;)
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get(&quot;next_url&quot;)
if next_url:
await self.query_all(next_url)
else:
response.raise_for_status()
except: #(except block omitted)
async def fetch(self):
await self.query_data()

END OF EDIT --------------
It will run for an hour or two (expecting it to take a day or two) and will then freeze. No errors thrown. When I keyboard-interrupt it, I will see the OSError: [Errno 24] Too many open files error.

I've put the traceback below.

From my understanding, it seems like the issue of too many files handlers being open has to do with the spawning of new worker processes in the pool. What confuses me is that the docs say that the maxtasksperchild limit, when reached, should result in an old worker process getting killed and a new one being spawned. This is too prevent memory leaks, and I would assume, to keep this problem from happening.

However, changing the maxtasksperchild parameter has yielded no change.

Furthermore, I implemented the batch processing to effectively kill the pool and start a new one after every 5000 tasks to also prevent an accumulation of file_handlers. The with pool: implementation should effectively kill everything to do with that pool once the with block closes. But this has also failed. There was no change after implementing the batching method.

It has all left me pretty confused. It's clear it has to do with the piping of newly spawned processes, but I'm not sure what to do. Any feedback would be welcome.

A short-term fix that would just extend the amount of time I have before the script fails could be to increase the max number of possible files to open (per the linked answer, using ulimit -n). But I would fear this will be exceeded too, since this is going to be a pretty long running job.

Any help is very appreciated!

Here is the full traceback:

File &quot;&lt;path-to-file&gt;.py&quot;, line 127, in import_data
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/pool.py&quot;, line 136, in results
return await self.pool.results(self.task_ids)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/pool.py&quot;, line 312, in results
await asyncio.sleep(0.005)
File &quot;/&lt;path-to-env&gt;/lib/python3.11/asyncio/tasks.py&quot;, line 639, in sleep
return await future
File &quot;/&lt;path-to-env&gt;/3.11.1/lib/python3.11/asyncio/runners.py&quot;, line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/asyncio/base_events.py&quot;, line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-file&gt;/main.py&quot;, line 39, in add_all_data
await import_data(args)
File &quot;/&lt;path-to-file&gt;/orchestrator.py&quot;, line 120, in import_data
async with Pool(
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/pool.py&quot;, line 196, in __aexit__
await self.join()
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/pool.py&quot;, line 379, in join
await self._loop
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/pool.py&quot;, line 229, in loop
self.processes[self.create_worker(qid)] = qid
^^^^^^^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/pool.py&quot;, line 261, in create_worker
process.start()
File &quot;/&lt;path-to-env&gt;/lib/python3.11/site-packages/aiomultiprocess/core.py&quot;, line 153, in start
return self.aio_process.start()
^^^^^^^^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/multiprocessing/process.py&quot;, line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/multiprocessing/context.py&quot;, line 288, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/multiprocessing/popen_spawn_posix.py&quot;, line 32, in __init__
super().__init__(process_obj)
File &quot;/&lt;path-to-env&gt;/lib/python3.11/multiprocessing/popen_fork.py&quot;, line 19, in __init__
self._launch(process_obj)
File &quot;/home/&lt;path-to-env&gt;/lib/python3.11/multiprocessing/popen_spawn_posix.py&quot;, line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File &quot;/&lt;path-to-env&gt;/lib/python3.11/multiprocessing/util.py&quot;, line 451, in spawnv_passfds
errpipe_read, errpipe_write = os.pipe()
^^^^^^^^^
OSError: [Errno 24] Too many open files

答案1

得分: 1

将递归调用移到async with request...块之外。

基础请求中使用的任何资源直到一切恢复原样之前都不会被释放:

def query_all(self, ...):
    try:
        async with request(method="GET", url=url, params=payload) as response:
            log.info(f"status code: {response.status}")
            next_url = None
            if response.status == 200:
                results = await response.json()
                self.results.append(results)
                next_url = results.get("next_url")
            else:
                response.raise_for_status()
    except ...:
        ...
    if next_url:
        await self.query_all(next_url)

如果问题仍然存在则将此嵌套调用更改为`query_all`的独立任务但在实例中使用FIFO队列以便在`query_data`中的调用等待所有`query_all`子调用

这应该改善情况 - 没有可复现的示例就不能确定
英文:

Move the recursive call to fetch the next URL outside the async with request... block.

Any resources used in the base request are not freed until everything returns the way it is:

def query_all(self, ...):
try:
async with request(method=&quot;GET&quot;, url=url, params=payload) as response:
log.info(f&quot;status code: {response.status}&quot;)
next_url = None
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get(&quot;next_url&quot;)
else:
response.raise_for_status()
except ...:
...
if next_url:
await self.query_all(next_url)

If the problem persists, than change this nested call to query_all to its own task, but use a FIFO queue in the instance so that the call at query_data awaits for all sub-calls to query_all.

That should improve things there - can't be sure without a reproducible example, tough.

huangapple
  • 本文由 发表于 2023年4月6日 21:21:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/75950029.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定