执行大量HTTP请求,每次异步执行N个。

huangapple go评论98阅读模式
英文:

Perform large numbers of HTTP requests asyncronously N at a time

问题

以下是翻译好的部分:

我需要执行一项一次性任务,需要遍历数据库中的所有记录,其中有数百万条记录,读取单元格中的值,发出HTTP请求并更新另一个当前为NULL的单元格。

我想通过asyncio以异步方式将它们分批发送,但不要一次发送太多,因为远程服务器可能会封禁我:每秒不超过50个请求或同时进行。

我找到了这段代码:

  1. import asyncio
  2. import aiohttp
  3. async def one(session, url):
  4. # 请求URL并读取完整或取消
  5. async with session.get(url) as resp:
  6. await resp.text()
  7. async def fire(urls):
  8. loop = asyncio.get_event_loop()
  9. async with aiohttp.ClientSession() as session:
  10. tasks = []
  11. for url in urls:
  12. tasks.append(loop.create_task(one(session, url)))
  13. # 10秒超时
  14. try:
  15. await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
  16. except asyncio.TimeoutError:
  17. pass
  18. loop = asyncio.get_event_loop()
  19. loop.run_until_complete(fire([urls...]))

但它会一次发送所有请求。

我该如何以每次发送N个请求的方式来做? 意思是发送N个请求,然后等待1...甚至全部返回值,然后再发送另一批N...以此类推。

英文:

For some one-time task I need to go through all the records in a database of which there are a few millions, read a value in a cell, make a HTTP request and update another cell which is currently NULL.

I want to send all of them by portions, asynchronously, via asyncio. And not too many at a time because remote server may ban me: No more than 50 requests/second or at a time.

I've found this code:

  1. import asyncio
  2. import aiohttp
  3. async def one(session, url):
  4. # request the URL and read it until complete or canceled
  5. async with session.get(url) as resp:
  6. await resp.text()
  7. async def fire(urls):
  8. loop = asyncio.get_event_loop()
  9. async with aiohttp.ClientSession() as session:
  10. tasks = []
  11. for url in urls:
  12. tasks.append(loop.create_task(one(session, url)))
  13. # 10 seconds
  14. try:
  15. await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
  16. except asyncio.TimeoutError:
  17. pass
  18. loop = asyncio.get_event_loop()
  19. loop.run_until_complete(fire([urls...]))

But it will send all the request at once.

How could I do it N at a time? Meaning, send N, then wait for 1 ... a few or even all of them to return values, then send another lot of N... and so on.

答案1

得分: 1

以下是代码部分的翻译:

  1. ## 选项 A:仅使用 `asyncio` 批处理
  2. #### Python `<3.11`
  3. ```python
  4. from asyncio import create_task, gather, run, sleep
  5. from aiohttp import ClientSession
  6. async def get_one(session: ClientSession, url: str) -> None:
  7. print("请求中", url)
  8. async with session.get(url) as resp:
  9. text = await resp.text()
  10. await sleep(2) # 仅供演示
  11. print("来自", url, "的响应", text.strip().split("\n", 1)[0])
  12. async def get_all(urls: list[str], num_concurrent: int) -> None:
  13. url_iterator = iter(urls)
  14. keep_going = True
  15. async with ClientSession() as session:
  16. while keep_going:
  17. tasks = []
  18. for _ in range(num_concurrent):
  19. try:
  20. url = next(url_iterator)
  21. except StopIteration:
  22. keep_going = False
  23. break
  24. new_task = create_task(get_one(session, url))
  25. tasks.append(new_task)
  26. await gather(*tasks)
  27. async def main() -> None:
  28. urls = [
  29. "https://github.com",
  30. "https://stackoverflow.com",
  31. "https://python.org",
  32. ]
  33. await get_all(urls, 2)
  34. run(main())

输出:

  1. 请求中 https://github.com
  2. 请求中 https://stackoverflow.com
  3. 来自 https://github.com 的响应 <!DOCTYPE html>
  4. 来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
  5. 请求中 https://python.org
  6. 来自 https://python.org 的响应 <!doctype html>

你会注意到第三个请求(到 python.org)仅在前两个请求都返回响应后才发送。这个设置实际上会批量执行总请求数,每次执行 num_concurrent 个请求。

Python >=3.11

使用较新的 TaskGroup 类,我们可以使 get_all 函数更加简洁:

  1. from asyncio import TaskGroup, run, sleep
  2. from aiohttp import ClientSession
  3. async def get_one(session: ClientSession, url: str) -> None:
  4. ... # 与上面相同
  5. async def get_all(urls: list[str], num_concurrent: int) -> None:
  6. url_iterator = iter(urls)
  7. keep_going = True
  8. async with ClientSession() as session:
  9. while keep_going:
  10. with TaskGroup() as tg:
  11. for _ in range(num_concurrent):
  12. try:
  13. url = next(url_iterator)
  14. except StopIteration:
  15. keep_going = False
  16. break
  17. tg.create_task(get_one(session, url))
  18. ...

选项 B:仅使用 asyncio 中的 Queue

asyncio.Queue 允许我们为其设置最大大小。这样可以限制最大并发执行任务的数量,但我们需要使用 消费者-生产者模式

  1. from asyncio import Queue, create_task, gather, run, sleep
  2. from aiohttp import ClientSession
  3. async def get_one(session: ClientSession, url: str) -> None:
  4. ... # 与上面相同
  5. STOP_SENTINEL = object()
  6. async def consumer(session: ClientSession, q: Queue[str]) -> None:
  7. url = await q.get()
  8. while url is not STOP_SENTINEL:
  9. await get_one(session, url)
  10. q.task_done()
  11. url = await q.get()
  12. q.task_done()
  13. async def main() -> None:
  14. urls = [
  15. "https://github.com",
  16. "https://stackoverflow.com",
  17. "https://python.org",
  18. ]
  19. num_concurrent = 2
  20. q = Queue(maxsize=num_concurrent)
  21. async with ClientSession() as session:
  22. consumers = [
  23. create_task(consumer(session, q))
  24. for _ in range(num_concurrent)
  25. ]
  26. for url in urls:
  27. await q.put(url)
  28. for _ in range(num_concurrent):
  29. await q.put(STOP_SENTINEL)
  30. await gather(*consumers)
  31. run(main())

输出:

  1. 请求中 https://github.com
  2. 请求中 https://stackoverflow.com
  3. 来自 https://github.com 的响应 <!DOCTYPE html>
  4. 请求中 https://python.org
  5. 来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
  6. 来自 https://python.org 的响应 <!doctype html>

现在你可以看到,第三个请求可以在前两个返回响应后尽早发送。这可能更有效,尽管设置稍微繁琐一些。

选项 C:使用额外的包

我曾遇到类似的问题,需要在大量的 实际 任务上设置固定数量的 asyncio 任务。为了更容易解决这个问题,我编写了 asyncio-taskpool 包。使用它,我们可以像这样做:

  1. from asyncio import run, sleep
  2. from aiohttp import ClientSession
  3. from asyncio_taskpool import TaskPool
  4. async def get_one(session: ClientSession, url: str) -> None:
  5. ... # 与上面相同
  6. async def get_all(urls: list[str], num_concurrent: int) -> None:
  7. pool = TaskPool()
  8. async with ClientSession() as session:
  9. pool.starmap(
  10. get_one,
  11. ((session, url) for url in urls),
  12. num_concurrent=num_concurrent,
  13. )
  14. await pool.gather_and_close()
  15. async def main() -> None:
  16. urls = [
  17. "https://github.com",
  18. "https://stackoverflow.com",
  19. "https://python.org",
  20. ]
  21. await get_all(urls, 2)
  22. run(main())

输出:(与 Queue 方法相同)

  1. 请求中 https://github.com
  2. 请求中 https://stackoverflow.com
  3. 来自 https://github.com 的响应 <!DOCTYPE html>
  4. 请求中 https://python.org
  5. 来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
  6. 来自 https://python.org 的响应 <!doctype html>

你会再次注意到,第三个请求仅在其他两个中的至少一个返回响应后才会被发出。你可以尝试使用更多的任务。在任何给定时间内并发执行的数量不会超过传递给 mapnum_concurrent

我尽量模拟了标准的 multiprocessing.Pool 接口,这在处理长时间运行的任务时更方便。

英文:

Option A: With just asyncio in batches

Python &lt;3.11

  1. from asyncio import create_task, gather, run, sleep
  2. from aiohttp import ClientSession
  3. async def get_one(session: ClientSession, url: str) -&gt; None:
  4. print(&quot;Requesting&quot;, url)
  5. async with session.get(url) as resp:
  6. text = await resp.text()
  7. await sleep(2) # for demo purposes
  8. print(&quot;Got response from&quot;, url, text.strip().split(&quot;\n&quot;, 1)[0])
  9. async def get_all(urls: list[str], num_concurrent: int) -&gt; None:
  10. url_iterator = iter(urls)
  11. keep_going = True
  12. async with ClientSession() as session:
  13. while keep_going:
  14. tasks = []
  15. for _ in range(num_concurrent):
  16. try:
  17. url = next(url_iterator)
  18. except StopIteration:
  19. keep_going = False
  20. break
  21. new_task = create_task(get_one(session, url))
  22. tasks.append(new_task)
  23. await gather(*tasks)
  24. async def main() -&gt; None:
  25. urls = [
  26. &quot;https://github.com&quot;,
  27. &quot;https://stackoverflow.com&quot;,
  28. &quot;https://python.org&quot;,
  29. ]
  30. await get_all(urls, 2)
  31. run(main())

Output:

  1. Requesting https://github.com
  2. Requesting https://stackoverflow.com
  3. Got response from https://github.com &lt;!DOCTYPE html&gt;
  4. Got response from https://stackoverflow.com &lt;!DOCTYPE html&gt;
  5. Requesting https://python.org
  6. Got response from https://python.org &lt;!doctype html&gt;

You'll notice that the third requests (to python.org) is only sent after both previous requests have returned a response. This setup will essentially perform your total number of requests in batches of num_concurrent.

Python &gt;=3.11

With the newer TaskGroup class, we can make the get_all function a bit more concise:

  1. from asyncio import TaskGroup, run, sleep
  2. from aiohttp import ClientSession
  3. async def get_one(session: ClientSession, url: str) -&gt; None:
  4. ... # same as above
  5. async def get_all(urls: list[str], num_concurrent: int) -&gt; None:
  6. url_iterator = iter(urls)
  7. keep_going = True
  8. async with ClientSession() as session:
  9. while keep_going:
  10. with TaskGroup() as tg:
  11. for _ in range(num_concurrent):
  12. try:
  13. url = next(url_iterator)
  14. except StopIteration:
  15. keep_going = False
  16. break
  17. tg.create_task(get_one(session, url))
  18. ...

Option B: With just asyncio in a Queue

The asyncio.Queue allows us to set a maximum size for it. That makes it possible to limit the maximum number of concurrently executing tasks, but we will need to use the consumer-producer-pattern:

  1. from asyncio import Queue, create_task, gather, run, sleep
  2. from aiohttp import ClientSession
  3. async def get_one(session: ClientSession, url: str) -&gt; None:
  4. ... # same as above
  5. STOP_SENTINEL = object()
  6. async def consumer(session: ClientSession, q: Queue[str]) -&gt; None:
  7. url = await q.get()
  8. while url is not STOP_SENTINEL:
  9. await get_one(session, url)
  10. q.task_done()
  11. url = await q.get()
  12. q.task_done()
  13. async def main() -&gt; None:
  14. urls = [
  15. &quot;https://github.com&quot;,
  16. &quot;https://stackoverflow.com&quot;,
  17. &quot;https://python.org&quot;,
  18. ]
  19. num_concurrent = 2
  20. q = Queue(maxsize=num_concurrent)
  21. async with ClientSession() as session:
  22. consumers = [
  23. create_task(consumer(session, q))
  24. for _ in range(num_concurrent)
  25. ]
  26. for url in urls:
  27. await q.put(url)
  28. for _ in range(num_concurrent):
  29. await q.put(STOP_SENTINEL)
  30. await gather(*consumers)
  31. run(main())

Output:

  1. Requesting https://github.com
  2. Requesting https://stackoverflow.com
  3. Got response from https://github.com &lt;!DOCTYPE html&gt;
  4. Requesting https://python.org
  5. Got response from https://stackoverflow.com &lt;!DOCTYPE html&gt;
  6. Got response from https://python.org &lt;!doctype html&gt;

As you can see now, that third request can be sent as soon as either of the previous two returns with a response.

That may be more efficient, even though the setup is a bit more cumbersome.


Option C: With an extra package

I used to run into similar issues with setting a fixed number of asyncio tasks to work on a large number of actual tasks. To make this easier I wrote the asyncio-taskpool package. With it I can do something like this:

  1. from asyncio import run, sleep
  2. from aiohttp import ClientSession
  3. from asyncio_taskpool import TaskPool
  4. async def get_one(session: ClientSession, url: str) -&gt; None:
  5. ... # same as above
  6. async def get_all(urls: list[str], num_concurrent: int) -&gt; None:
  7. pool = TaskPool()
  8. async with ClientSession() as session:
  9. pool.starmap(
  10. get_one,
  11. ((session, url) for url in urls),
  12. num_concurrent=num_concurrent,
  13. )
  14. await pool.gather_and_close()
  15. async def main() -&gt; None:
  16. urls = [
  17. &quot;https://github.com&quot;,
  18. &quot;https://stackoverflow.com&quot;,
  19. &quot;https://python.org&quot;,
  20. ]
  21. await get_all(urls, 2)
  22. run(main())

Output: (same as with the Queue approach)

  1. Requesting https://github.com
  2. Requesting https://stackoverflow.com
  3. Got response from https://github.com &lt;!DOCTYPE html&gt;
  4. Requesting https://python.org
  5. Got response from https://stackoverflow.com &lt;!DOCTYPE html&gt;
  6. Got response from https://python.org &lt;!doctype html&gt;

You'll notice again that the third request will only be made after at least one of the other two returns with a response.

You can try that out with larger numbers of tasks. The number being executed concurrently at any given time will never exceed num_concurrent as passed to map (starmap is just a variant of map).

I tried to emulate the standard multiprocessing.Pool interface to an extent and find this more convenient to use, especially with long-running tasks.

huangapple
  • 本文由 发表于 2023年5月22日 11:05:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76302813.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定