英文:
aiomultiprocessing pool freezes and OSError: [Errno 24] Too many open files
问题
我看了这个问题,和chatGPT/bing进行了交流(哈哈),仍然无法解决。
我正在尝试在一个有32个CPU的机器上执行约7百万次API请求,将数据加载到postgres。我的代码基本上设置如下:
from aiomultiprocess import Pool
CPUS = 30 #(有32个CPU可用)
batch_size = 5000
for i in range(0, len(args_list), batch_size):
log.info(f"<日志消息>")
async with Pool(
processes=CPUS,
maxtasksperchild=100,
childconcurrency=3,
queuecount=int(CPUS / 3),
) as pool:
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
--------编辑:
根据评论中的请求,添加了对fetch_api_results
的编辑。基本上是一组构建API网址的函数,然后递归地进行aiohttp请求,直到API请求结果中不再有next_url
令牌为止。
在这里。
from aiohttp import request
async def fetch_api_results(*args)
try:
result_objects= APIPaginator(*args)
await result_objects.fetch()
log.info("上传数据")
#上传到数据库的函数
except planned_exceptions as e:
log.warning(e, exc_info=False)
class APIPaginator(object):
async def query_data(self):
url = self.api_base + "<arg中的字符串>"
payload = {"limit": 1000}
await self.query_all(url, payload)
async def query_all(self, url, payload):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"状态码: {response.status}")
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
if next_url:
await self.query_all(next_url)
else:
response.raise_for_status()
except: #(省略了except块)
async def fetch(self):
await self.query_data()
这将运行一到两个小时(预计需要一两天),然后会卡住。没有抛出错误。当我键盘中断时,会看到OSError: [Errno 24] Too many open files
错误。
我把完整的回溯放在下面。
File "<path-to-file>.py", line 127, in import_data
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
return await self.pool.results(self.task_ids)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
await asyncio.sleep(0.005)
File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
return await future
File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/<path-to-file>/main.py", line 39, in add_all_data
await import_data(args)
File "/<path-to-file>/orchestrator.py", line 120, in import_data
async with Pool(
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
await self.join()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
await self._loop
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
self.processes[self.create_worker(qid)] = qid
^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
process.start()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
return self.aio_process.start()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
errpipe_read, errpipe_write = os.pipe()
^^^^^^^^^
OSError: [Errno 24] Too many open files
英文:
I've looked at this question and have talked with chatGPT/bing (lol) and still can't figure this one out.
I am trying to execute ~7mn requests to an API on a machine with 32 CPUs, loading the data to postgres. I have my code essentially set up like this:
from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000
for i in range(0, len(args_list), batch_size):
log.info(f"<log message>")
async with Pool(
processes=CPUS,
maxtasksperchild=100,
childconcurrency=3,
queuecount=int(CPUS / 3),
) as pool:
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
--------EDIT:
Adding a redaction of fetch_api_results
per request in the comments. It is basically a set of functions that construct the api url, and then make aiohttp requests recursively until there are no more next_url
tokens in the api request results.
Here it is.
from aiohttp import request
async def fetch_api_results(*args)
try:
result_objects= APIPaginator(*args)
await result_objects.fetch()
log.info("uploading data")
#upload to db function
except planned_exceptions as e:
log.warning(e, exc_info=False)
class APIPaginator(object):
async def query_data(self):
url = self.api_base + "<str from arg>"
payload = {"limit": 1000}
await self.query_all(url, payload)
async def query_all(self, url, payload):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
if next_url:
await self.query_all(next_url)
else:
response.raise_for_status()
except: #(except block omitted)
async def fetch(self):
await self.query_data()
END OF EDIT --------------
It will run for an hour or two (expecting it to take a day or two) and will then freeze. No errors thrown. When I keyboard-interrupt it, I will see the OSError: [Errno 24] Too many open files
error.
I've put the traceback below.
From my understanding, it seems like the issue of too many files handlers being open has to do with the spawning of new worker processes in the pool. What confuses me is that the docs say that the maxtasksperchild limit, when reached, should result in an old worker process getting killed and a new one being spawned. This is too prevent memory leaks, and I would assume, to keep this problem from happening.
However, changing the maxtasksperchild parameter has yielded no change.
Furthermore, I implemented the batch processing to effectively kill the pool and start a new one after every 5000 tasks to also prevent an accumulation of file_handlers. The with pool:
implementation should effectively kill everything to do with that pool once the with block closes. But this has also failed. There was no change after implementing the batching method.
It has all left me pretty confused. It's clear it has to do with the piping of newly spawned processes, but I'm not sure what to do. Any feedback would be welcome.
A short-term fix that would just extend the amount of time I have before the script fails could be to increase the max number of possible files to open (per the linked answer, using ulimit -n
). But I would fear this will be exceeded too, since this is going to be a pretty long running job.
Any help is very appreciated!
Here is the full traceback:
File "<path-to-file>.py", line 127, in import_data
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
return await self.pool.results(self.task_ids)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
await asyncio.sleep(0.005)
File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
return await future
File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/<path-to-file>/main.py", line 39, in add_all_data
await import_data(args)
File "/<path-to-file>/orchestrator.py", line 120, in import_data
async with Pool(
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
await self.join()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
await self._loop
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
self.processes[self.create_worker(qid)] = qid
^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
process.start()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
return self.aio_process.start()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
errpipe_read, errpipe_write = os.pipe()
^^^^^^^^^
OSError: [Errno 24] Too many open files
答案1
得分: 1
将递归调用移到async with request...
块之外。
基础请求中使用的任何资源直到一切恢复原样之前都不会被释放:
def query_all(self, ...):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
next_url = None
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
else:
response.raise_for_status()
except ...:
...
if next_url:
await self.query_all(next_url)
如果问题仍然存在,则将此嵌套调用更改为`query_all`的独立任务,但在实例中使用FIFO队列,以便在`query_data`中的调用等待所有`query_all`子调用。
这应该改善情况 - 没有可复现的示例就不能确定。
英文:
Move the recursive call to fetch the next URL outside the async with request...
block.
Any resources used in the base request are not freed until everything returns the way it is:
def query_all(self, ...):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
next_url = None
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
else:
response.raise_for_status()
except ...:
...
if next_url:
await self.query_all(next_url)
If the problem persists, than change this nested call to query_all
to its own task, but use a FIFO queue in the instance so that the call at query_data
awaits for all sub-calls to query_all
.
That should improve things there - can't be sure without a reproducible example, tough.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论