英文:
Perform large numbers of HTTP requests asyncronously N at a time
问题
以下是翻译好的部分:
我需要执行一项一次性任务,需要遍历数据库中的所有记录,其中有数百万条记录,读取单元格中的值,发出HTTP请求并更新另一个当前为NULL
的单元格。
我想通过asyncio
以异步方式将它们分批发送,但不要一次发送太多,因为远程服务器可能会封禁我:每秒不超过50个请求或同时进行。
我找到了这段代码:
import asyncio
import aiohttp
async def one(session, url):
# 请求URL并读取完整或取消
async with session.get(url) as resp:
await resp.text()
async def fire(urls):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(loop.create_task(one(session, url)))
# 10秒超时
try:
await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
except asyncio.TimeoutError:
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(fire([urls...]))
但它会一次发送所有请求。
我该如何以每次发送N
个请求的方式来做? 意思是发送N
个请求,然后等待1...甚至全部返回值,然后再发送另一批N
...以此类推。
英文:
For some one-time task I need to go through all the records in a database of which there are a few millions, read a value in a cell, make a HTTP request and update another cell which is currently NULL
.
I want to send all of them by portions, asynchronously, via asyncio
. And not too many at a time because remote server may ban me: No more than 50 requests/second or at a time.
I've found this code:
import asyncio
import aiohttp
async def one(session, url):
# request the URL and read it until complete or canceled
async with session.get(url) as resp:
await resp.text()
async def fire(urls):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(loop.create_task(one(session, url)))
# 10 seconds
try:
await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
except asyncio.TimeoutError:
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(fire([urls...]))
But it will send all the request at once.
How could I do it N
at a time? Meaning, send N
, then wait for 1 ... a few or even all of them to return values, then send another lot of N
... and so on.
答案1
得分: 1
以下是代码部分的翻译:
## 选项 A:仅使用 `asyncio` 批处理
#### Python `<3.11`
```python
from asyncio import create_task, gather, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
print("请求中", url)
async with session.get(url) as resp:
text = await resp.text()
await sleep(2) # 仅供演示
print("来自", url, "的响应", text.strip().split("\n", 1)[0])
async def get_all(urls: list[str], num_concurrent: int) -> None:
url_iterator = iter(urls)
keep_going = True
async with ClientSession() as session:
while keep_going:
tasks = []
for _ in range(num_concurrent):
try:
url = next(url_iterator)
except StopIteration:
keep_going = False
break
new_task = create_task(get_one(session, url))
tasks.append(new_task)
await gather(*tasks)
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
await get_all(urls, 2)
run(main())
输出:
请求中 https://github.com
请求中 https://stackoverflow.com
来自 https://github.com 的响应 <!DOCTYPE html>
来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
请求中 https://python.org
来自 https://python.org 的响应 <!doctype html>
你会注意到第三个请求(到 python.org
)仅在前两个请求都返回响应后才发送。这个设置实际上会批量执行总请求数,每次执行 num_concurrent
个请求。
Python >=3.11
使用较新的 TaskGroup
类,我们可以使 get_all
函数更加简洁:
from asyncio import TaskGroup, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
... # 与上面相同
async def get_all(urls: list[str], num_concurrent: int) -> None:
url_iterator = iter(urls)
keep_going = True
async with ClientSession() as session:
while keep_going:
with TaskGroup() as tg:
for _ in range(num_concurrent):
try:
url = next(url_iterator)
except StopIteration:
keep_going = False
break
tg.create_task(get_one(session, url))
...
选项 B:仅使用 asyncio
中的 Queue
asyncio.Queue
允许我们为其设置最大大小。这样可以限制最大并发执行任务的数量,但我们需要使用 消费者-生产者模式:
from asyncio import Queue, create_task, gather, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
... # 与上面相同
STOP_SENTINEL = object()
async def consumer(session: ClientSession, q: Queue[str]) -> None:
url = await q.get()
while url is not STOP_SENTINEL:
await get_one(session, url)
q.task_done()
url = await q.get()
q.task_done()
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
num_concurrent = 2
q = Queue(maxsize=num_concurrent)
async with ClientSession() as session:
consumers = [
create_task(consumer(session, q))
for _ in range(num_concurrent)
]
for url in urls:
await q.put(url)
for _ in range(num_concurrent):
await q.put(STOP_SENTINEL)
await gather(*consumers)
run(main())
输出:
请求中 https://github.com
请求中 https://stackoverflow.com
来自 https://github.com 的响应 <!DOCTYPE html>
请求中 https://python.org
来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
来自 https://python.org 的响应 <!doctype html>
现在你可以看到,第三个请求可以在前两个返回响应后尽早发送。这可能更有效,尽管设置稍微繁琐一些。
选项 C:使用额外的包
我曾遇到类似的问题,需要在大量的 实际 任务上设置固定数量的 asyncio
任务。为了更容易解决这个问题,我编写了 asyncio-taskpool
包。使用它,我们可以像这样做:
from asyncio import run, sleep
from aiohttp import ClientSession
from asyncio_taskpool import TaskPool
async def get_one(session: ClientSession, url: str) -> None:
... # 与上面相同
async def get_all(urls: list[str], num_concurrent: int) -> None:
pool = TaskPool()
async with ClientSession() as session:
pool.starmap(
get_one,
((session, url) for url in urls),
num_concurrent=num_concurrent,
)
await pool.gather_and_close()
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
await get_all(urls, 2)
run(main())
输出:(与 Queue
方法相同)
请求中 https://github.com
请求中 https://stackoverflow.com
来自 https://github.com 的响应 <!DOCTYPE html>
请求中 https://python.org
来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
来自 https://python.org 的响应 <!doctype html>
你会再次注意到,第三个请求仅在其他两个中的至少一个返回响应后才会被发出。你可以尝试使用更多的任务。在任何给定时间内并发执行的数量不会超过传递给 map
的 num_concurrent
。
我尽量模拟了标准的 multiprocessing.Pool
接口,这在处理长时间运行的任务时更方便。
英文:
Option A: With just asyncio
in batches
Python <3.11
from asyncio import create_task, gather, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
print("Requesting", url)
async with session.get(url) as resp:
text = await resp.text()
await sleep(2) # for demo purposes
print("Got response from", url, text.strip().split("\n", 1)[0])
async def get_all(urls: list[str], num_concurrent: int) -> None:
url_iterator = iter(urls)
keep_going = True
async with ClientSession() as session:
while keep_going:
tasks = []
for _ in range(num_concurrent):
try:
url = next(url_iterator)
except StopIteration:
keep_going = False
break
new_task = create_task(get_one(session, url))
tasks.append(new_task)
await gather(*tasks)
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
await get_all(urls, 2)
run(main())
Output:
Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com <!DOCTYPE html>
Got response from https://stackoverflow.com <!DOCTYPE html>
Requesting https://python.org
Got response from https://python.org <!doctype html>
You'll notice that the third requests (to python.org
) is only sent after both previous requests have returned a response. This setup will essentially perform your total number of requests in batches of num_concurrent
.
Python >=3.11
With the newer TaskGroup
class, we can make the get_all
function a bit more concise:
from asyncio import TaskGroup, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
... # same as above
async def get_all(urls: list[str], num_concurrent: int) -> None:
url_iterator = iter(urls)
keep_going = True
async with ClientSession() as session:
while keep_going:
with TaskGroup() as tg:
for _ in range(num_concurrent):
try:
url = next(url_iterator)
except StopIteration:
keep_going = False
break
tg.create_task(get_one(session, url))
...
Option B: With just asyncio
in a Queue
The asyncio.Queue
allows us to set a maximum size for it. That makes it possible to limit the maximum number of concurrently executing tasks, but we will need to use the consumer-producer-pattern:
from asyncio import Queue, create_task, gather, run, sleep
from aiohttp import ClientSession
async def get_one(session: ClientSession, url: str) -> None:
... # same as above
STOP_SENTINEL = object()
async def consumer(session: ClientSession, q: Queue[str]) -> None:
url = await q.get()
while url is not STOP_SENTINEL:
await get_one(session, url)
q.task_done()
url = await q.get()
q.task_done()
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
num_concurrent = 2
q = Queue(maxsize=num_concurrent)
async with ClientSession() as session:
consumers = [
create_task(consumer(session, q))
for _ in range(num_concurrent)
]
for url in urls:
await q.put(url)
for _ in range(num_concurrent):
await q.put(STOP_SENTINEL)
await gather(*consumers)
run(main())
Output:
Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com <!DOCTYPE html>
Requesting https://python.org
Got response from https://stackoverflow.com <!DOCTYPE html>
Got response from https://python.org <!doctype html>
As you can see now, that third request can be sent as soon as either of the previous two returns with a response.
That may be more efficient, even though the setup is a bit more cumbersome.
Option C: With an extra package
I used to run into similar issues with setting a fixed number of asyncio
tasks to work on a large number of actual tasks. To make this easier I wrote the asyncio-taskpool
package. With it I can do something like this:
from asyncio import run, sleep
from aiohttp import ClientSession
from asyncio_taskpool import TaskPool
async def get_one(session: ClientSession, url: str) -> None:
... # same as above
async def get_all(urls: list[str], num_concurrent: int) -> None:
pool = TaskPool()
async with ClientSession() as session:
pool.starmap(
get_one,
((session, url) for url in urls),
num_concurrent=num_concurrent,
)
await pool.gather_and_close()
async def main() -> None:
urls = [
"https://github.com",
"https://stackoverflow.com",
"https://python.org",
]
await get_all(urls, 2)
run(main())
Output: (same as with the Queue
approach)
Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com <!DOCTYPE html>
Requesting https://python.org
Got response from https://stackoverflow.com <!DOCTYPE html>
Got response from https://python.org <!doctype html>
You'll notice again that the third request will only be made after at least one of the other two returns with a response.
You can try that out with larger numbers of tasks. The number being executed concurrently at any given time will never exceed num_concurrent
as passed to map
(starmap
is just a variant of map
).
I tried to emulate the standard multiprocessing.Pool
interface to an extent and find this more convenient to use, especially with long-running tasks.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论