执行大量HTTP请求,每次异步执行N个。

huangapple go评论71阅读模式
英文:

Perform large numbers of HTTP requests asyncronously N at a time

问题

以下是翻译好的部分:

我需要执行一项一次性任务,需要遍历数据库中的所有记录,其中有数百万条记录,读取单元格中的值,发出HTTP请求并更新另一个当前为NULL的单元格。

我想通过asyncio以异步方式将它们分批发送,但不要一次发送太多,因为远程服务器可能会封禁我:每秒不超过50个请求或同时进行。

我找到了这段代码:

import asyncio
import aiohttp

async def one(session, url):
    # 请求URL并读取完整或取消
    async with session.get(url) as resp:
        await resp.text()

async def fire(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(loop.create_task(one(session, url)))

        # 10秒超时
        try:
            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
        except asyncio.TimeoutError:
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(fire([urls...]))

但它会一次发送所有请求。

我该如何以每次发送N个请求的方式来做? 意思是发送N个请求,然后等待1...甚至全部返回值,然后再发送另一批N...以此类推。

英文:

For some one-time task I need to go through all the records in a database of which there are a few millions, read a value in a cell, make a HTTP request and update another cell which is currently NULL.

I want to send all of them by portions, asynchronously, via asyncio. And not too many at a time because remote server may ban me: No more than 50 requests/second or at a time.

I've found this code:

import asyncio
import aiohttp


async def one(session, url):
    # request the URL and read it until complete or canceled
    async with session.get(url) as resp:
        await resp.text()


async def fire(urls):
    loop = asyncio.get_event_loop()
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(loop.create_task(one(session, url)))

        # 10 seconds
        try:
            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
        except asyncio.TimeoutError:
            pass


loop = asyncio.get_event_loop()
loop.run_until_complete(fire([urls...]))

But it will send all the request at once.

How could I do it N at a time? Meaning, send N, then wait for 1 ... a few or even all of them to return values, then send another lot of N... and so on.

答案1

得分: 1

以下是代码部分的翻译:

## 选项 A:仅使用 `asyncio` 批处理

#### Python `<3.11`

```python
from asyncio import create_task, gather, run, sleep

from aiohttp import ClientSession

async def get_one(session: ClientSession, url: str) -> None:
    print("请求中", url)
    async with session.get(url) as resp:
        text = await resp.text()
        await sleep(2)  # 仅供演示
        print("来自", url, "的响应", text.strip().split("\n", 1)[0])

async def get_all(urls: list[str], num_concurrent: int) -> None:
    url_iterator = iter(urls)
    keep_going = True
    async with ClientSession() as session:
        while keep_going:
            tasks = []
            for _ in range(num_concurrent):
                try:
                    url = next(url_iterator)
                except StopIteration:
                    keep_going = False
                    break
                new_task = create_task(get_one(session, url))
                tasks.append(new_task)
            await gather(*tasks)

async def main() -> None:
    urls = [
        "https://github.com",
        "https://stackoverflow.com",
        "https://python.org",
    ]
    await get_all(urls, 2)

run(main())

输出:

请求中 https://github.com
请求中 https://stackoverflow.com
来自 https://github.com 的响应 <!DOCTYPE html>
来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
请求中 https://python.org
来自 https://python.org 的响应 <!doctype html>

你会注意到第三个请求(到 python.org)仅在前两个请求都返回响应后才发送。这个设置实际上会批量执行总请求数,每次执行 num_concurrent 个请求。

Python >=3.11

使用较新的 TaskGroup 类,我们可以使 get_all 函数更加简洁:

from asyncio import TaskGroup, run, sleep

from aiohttp import ClientSession


async def get_one(session: ClientSession, url: str) -> None:
    ...  # 与上面相同


async def get_all(urls: list[str], num_concurrent: int) -> None:
    url_iterator = iter(urls)
    keep_going = True
    async with ClientSession() as session:
        while keep_going:
            with TaskGroup() as tg:
                for _ in range(num_concurrent):
                    try:
                        url = next(url_iterator)
                    except StopIteration:
                        keep_going = False
                        break
                    tg.create_task(get_one(session, url))

...

选项 B:仅使用 asyncio 中的 Queue

asyncio.Queue 允许我们为其设置最大大小。这样可以限制最大并发执行任务的数量,但我们需要使用 消费者-生产者模式

from asyncio import Queue, create_task, gather, run, sleep

from aiohttp import ClientSession

async def get_one(session: ClientSession, url: str) -> None:
    ...  # 与上面相同

STOP_SENTINEL = object()

async def consumer(session: ClientSession, q: Queue[str]) -> None:
    url = await q.get()
    while url is not STOP_SENTINEL:
        await get_one(session, url)
        q.task_done()
        url = await q.get()
    q.task_done()

async def main() -> None:
    urls = [
        "https://github.com",
        "https://stackoverflow.com",
        "https://python.org",
    ]
    num_concurrent = 2
    q = Queue(maxsize=num_concurrent)
    async with ClientSession() as session:
        consumers = [
            create_task(consumer(session, q))
            for _ in range(num_concurrent)
        ]
        for url in urls:
            await q.put(url)
        for _ in range(num_concurrent):
            await q.put(STOP_SENTINEL)
        await gather(*consumers)

run(main())

输出:

请求中 https://github.com
请求中 https://stackoverflow.com
来自 https://github.com 的响应 <!DOCTYPE html>
请求中 https://python.org
来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
来自 https://python.org 的响应 <!doctype html>

现在你可以看到,第三个请求可以在前两个返回响应后尽早发送。这可能更有效,尽管设置稍微繁琐一些。

选项 C:使用额外的包

我曾遇到类似的问题,需要在大量的 实际 任务上设置固定数量的 asyncio 任务。为了更容易解决这个问题,我编写了 asyncio-taskpool 包。使用它,我们可以像这样做:

from asyncio import run, sleep

from aiohttp import ClientSession
from asyncio_taskpool import TaskPool

async def get_one(session: ClientSession, url: str) -> None:
    ...  # 与上面相同

async def get_all(urls: list[str], num_concurrent: int) -> None:
    pool = TaskPool()
    async with ClientSession() as session:
        pool.starmap(
            get_one,
            ((session, url) for url in urls),
            num_concurrent=num_concurrent,
        )
        await pool.gather_and_close()

async def main() -> None:
    urls = [
        "https://github.com",
        "https://stackoverflow.com",
        "https://python.org",
    ]
    await get_all(urls, 2)

run(main())

输出:(与 Queue 方法相同)

请求中 https://github.com
请求中 https://stackoverflow.com
来自 https://github.com 的响应 <!DOCTYPE html>
请求中 https://python.org
来自 https://stackoverflow.com 的响应 <!DOCTYPE html>
来自 https://python.org 的响应 <!doctype html>

你会再次注意到,第三个请求仅在其他两个中的至少一个返回响应后才会被发出。你可以尝试使用更多的任务。在任何给定时间内并发执行的数量不会超过传递给 mapnum_concurrent

我尽量模拟了标准的 multiprocessing.Pool 接口,这在处理长时间运行的任务时更方便。

英文:

Option A: With just asyncio in batches

Python &lt;3.11

from asyncio import create_task, gather, run, sleep

from aiohttp import ClientSession

async def get_one(session: ClientSession, url: str) -&gt; None:
    print(&quot;Requesting&quot;, url)
    async with session.get(url) as resp:
        text = await resp.text()
        await sleep(2)  # for demo purposes
        print(&quot;Got response from&quot;, url, text.strip().split(&quot;\n&quot;, 1)[0])

async def get_all(urls: list[str], num_concurrent: int) -&gt; None:
    url_iterator = iter(urls)
    keep_going = True
    async with ClientSession() as session:
        while keep_going:
            tasks = []
            for _ in range(num_concurrent):
                try:
                    url = next(url_iterator)
                except StopIteration:
                    keep_going = False
                    break
                new_task = create_task(get_one(session, url))
                tasks.append(new_task)
            await gather(*tasks)

async def main() -&gt; None:
    urls = [
        &quot;https://github.com&quot;,
        &quot;https://stackoverflow.com&quot;,
        &quot;https://python.org&quot;,
    ]
    await get_all(urls, 2)

run(main())

Output:

Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com &lt;!DOCTYPE html&gt;
Got response from https://stackoverflow.com &lt;!DOCTYPE html&gt;
Requesting https://python.org
Got response from https://python.org &lt;!doctype html&gt;

You'll notice that the third requests (to python.org) is only sent after both previous requests have returned a response. This setup will essentially perform your total number of requests in batches of num_concurrent.

Python &gt;=3.11

With the newer TaskGroup class, we can make the get_all function a bit more concise:

from asyncio import TaskGroup, run, sleep

from aiohttp import ClientSession


async def get_one(session: ClientSession, url: str) -&gt; None:
    ...  # same as above


async def get_all(urls: list[str], num_concurrent: int) -&gt; None:
    url_iterator = iter(urls)
    keep_going = True
    async with ClientSession() as session:
        while keep_going:
            with TaskGroup() as tg:
                for _ in range(num_concurrent):
                    try:
                        url = next(url_iterator)
                    except StopIteration:
                        keep_going = False
                        break
                    tg.create_task(get_one(session, url))

...

Option B: With just asyncio in a Queue

The asyncio.Queue allows us to set a maximum size for it. That makes it possible to limit the maximum number of concurrently executing tasks, but we will need to use the consumer-producer-pattern:

from asyncio import Queue, create_task, gather, run, sleep

from aiohttp import ClientSession

async def get_one(session: ClientSession, url: str) -&gt; None:
    ...  # same as above

STOP_SENTINEL = object()

async def consumer(session: ClientSession, q: Queue[str]) -&gt; None:
    url = await q.get()
    while url is not STOP_SENTINEL:
        await get_one(session, url)
        q.task_done()
        url = await q.get()
    q.task_done()

async def main() -&gt; None:
    urls = [
        &quot;https://github.com&quot;,
        &quot;https://stackoverflow.com&quot;,
        &quot;https://python.org&quot;,
    ]
    num_concurrent = 2
    q = Queue(maxsize=num_concurrent)
    async with ClientSession() as session:
        consumers = [
            create_task(consumer(session, q))
            for _ in range(num_concurrent)
        ]
        for url in urls:
            await q.put(url)
        for _ in range(num_concurrent):
            await q.put(STOP_SENTINEL)
        await gather(*consumers)

run(main())

Output:

Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com &lt;!DOCTYPE html&gt;
Requesting https://python.org
Got response from https://stackoverflow.com &lt;!DOCTYPE html&gt;
Got response from https://python.org &lt;!doctype html&gt;

As you can see now, that third request can be sent as soon as either of the previous two returns with a response.

That may be more efficient, even though the setup is a bit more cumbersome.


Option C: With an extra package

I used to run into similar issues with setting a fixed number of asyncio tasks to work on a large number of actual tasks. To make this easier I wrote the asyncio-taskpool package. With it I can do something like this:

from asyncio import run, sleep

from aiohttp import ClientSession
from asyncio_taskpool import TaskPool

async def get_one(session: ClientSession, url: str) -&gt; None:
    ...  # same as above

async def get_all(urls: list[str], num_concurrent: int) -&gt; None:
    pool = TaskPool()
    async with ClientSession() as session:
        pool.starmap(
            get_one,
            ((session, url) for url in urls),
            num_concurrent=num_concurrent,
        )
        await pool.gather_and_close()

async def main() -&gt; None:
    urls = [
        &quot;https://github.com&quot;,
        &quot;https://stackoverflow.com&quot;,
        &quot;https://python.org&quot;,
    ]
    await get_all(urls, 2)

run(main())

Output: (same as with the Queue approach)

Requesting https://github.com
Requesting https://stackoverflow.com
Got response from https://github.com &lt;!DOCTYPE html&gt;
Requesting https://python.org
Got response from https://stackoverflow.com &lt;!DOCTYPE html&gt;
Got response from https://python.org &lt;!doctype html&gt;

You'll notice again that the third request will only be made after at least one of the other two returns with a response.

You can try that out with larger numbers of tasks. The number being executed concurrently at any given time will never exceed num_concurrent as passed to map (starmap is just a variant of map).

I tried to emulate the standard multiprocessing.Pool interface to an extent and find this more convenient to use, especially with long-running tasks.

huangapple
  • 本文由 发表于 2023年5月22日 11:05:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76302813.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定