使用asyncio批量下载数据并返回列表的列表

huangapple go评论56阅读模式
英文:

asyncio to download data in batches and return a list of lists

问题

我想要以异步方式分批下载数据。

每个 name 的数据将以批次方式下载,我希望 asyncio.gather(*coroutines) 返回一个列表的列表(每个名称的批次列表)。

import asyncio
import datetime

async def run(names):
    """为每个名称启动一个协程。"""
    coroutines = [_fetch_data(name) for name in names]
    return await asyncio.gather(*coroutines)  # 这里出错了!

async def _fetch_data(name):
    """按批次获取单个符号的数据。"""
    start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
    end_timestamp = datetime.datetime(2021, 9, 2).timestamp()

    i = 1
    while start_timestamp < end_timestamp:
        batch = f"批次 {i} for {name}"
        await asyncio.sleep(2)  # 一些异步API调用,例如
        # 如果我移除 yield,它可以工作。但我想要使用结果!
        yield batch
        start_timestamp += 3600
        i += 1

async def main():
    names = ["Jack", "Jill", "Bob"]
    return await run(names)

output = asyncio.run(main())
print(output)
# 我期望类似这样的结果
# [["批次 1 for Jack", "批次 2 for Jack", ...], ["批次 1 for Jill", "批次 2 for Jill", ...], ...]

不幸的是,这段代码对 asyncio.gather(*coroutines) 报错:

TypeError: 需要一个 asyncio.Future、协程或可等待对象

_fetch_data 不是协程吗?这个错误在告诉我什么?我怎样才能解决这个问题?

我正在尝试了解 Python 中的 asyncio,并且我相当确定我在这里错过了一些基本知识。

英文:

I want to download data in batches asynchronously.

The data for each name is downloaded in batches, and I'd like asyncio.gather(*coroutines) to return a list of lists (a list of batches for each name). So far I have this code, but it raises an exception:

import asyncio
import datetime


async def run(names):
    &quot;&quot;&quot;Start one coroutine for each name.&quot;&quot;&quot;
    coroutines = [_fetch_data(name) for name in names]
    return await asyncio.gather(*coroutines)  # This fails!


async def _fetch_data(name):
    &quot;&quot;&quot;Fetch data for a single symbol in batches.&quot;&quot;&quot;
    start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
    end_timestamp = datetime.datetime(2021, 9, 2).timestamp()

    i = 1
    while start_timestamp &lt; end_timestamp:
        batch = f&quot;Batch {i} for {name}&quot;
        await asyncio.sleep(2)  # Some async API call, for example
        # If I remove the yield, it works. But I want to use the results!
        yield batch
        start_timestamp += 3600
        i += 1


async def main():
    names = [&quot;Jack&quot;, &quot;Jill&quot;, &quot;Bob&quot;]
    return await run(names)


output = asyncio.run(main())
print(output)
# I&#39;d expect something like 
# [[&quot;Batch 1 for Jack&quot;, &quot;Batch 2 for Jack&quot;, ...], [&quot;Batch 1 for Jill&quot;, &quot;Batch 2 for Jill&quot;, ...], ...]

Unfortunately, this code returns an exception for asyncio.gather(*coroutines):

> TypeError: An asyncio.Future, a coroutine or an awaitable is required

Isn't _fetch_data a coroutine? What is this error trying to tell me? And how can I get past it?

I'm trying to learn more about asyncio in Python and I'm quite sure I'm missing some basics here.

答案1

得分: 1

不是协程函数。通过在其中使用yield语句,你将其转换为异步生成器,默认情况下不能等待。有关区别的详细信息请参见此处

如果你确实希望它成为生成器,并且想要同时消耗多个async生成器,你需要稍微修改代码,并引入一个异步消耗生成器的协程。通常可以通过async for循环来实现。

像这样会起作用:

...

async def run(names):
    coroutines = [fetch(name) for name in names]
    return await asyncio.gather(*coroutines)


async def fetch(name):
    return [batch async for batch in _fetch_data(name)]

...

我不知道你的实际用例,所以不知道是否在这里实际使用生成器是明智的。但作为替代,你当然可以通过将_fetch_data实际更改为返回列表而不是异步生成器_yield_列表项来回避整个问题:

import asyncio
import datetime


async def run(names):
    return await asyncio.gather(*(_fetch_data(name) for name in names))


async def _fetch_data(name):
    start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
    end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
    results = []
    i = 1
    while start_timestamp < end_timestamp:
        batch = f"Batch {i} for {name}"
        await asyncio.sleep(2)  # Some async API call, for example
        results.append(batch)
        start_timestamp += 3600
        i += 1
    return results


async def main():
    names = ["Jack", "Jill", "Bob"]
    return await run(names)


output = asyncio.run(main())
print(output)

这两种方法都有效,产生你期望的输出,并且(至少对于你的简化示例)基本上是等效的。

英文:

No, _fetch_data is not a coroutine function. By using a yield statement inside it you turned it into an asynchronous generator, which is not awaitable by default. See here for details about the distinction.

If you actually want it to be a generator and you want multiple async generators to be consumed concurrently, you'll need to modify your code a bit and introduce a coroutine that asynchronously consumes the generator. This is typically done via an async for-loop.

Something like this will work:

...

async def run(names):
    coroutines = [fetch(name) for name in names]
    return await asyncio.gather(*coroutines)


async def fetch(name):
    return [batch async for batch in _fetch_data(name)]

...

I don't know your actual use case, so I don't know, if using a generator is actually prudent here. But alternatively you could of course sidestep the entire issue by just making _fetch_data an actual coroutine function that returns a list, rather than an asynchronous generator yielding list items:

import asyncio
import datetime


async def run(names):
    return await asyncio.gather(*(_fetch_data(name) for name in names))


async def _fetch_data(name):
    start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
    end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
    results = []
    i = 1
    while start_timestamp &lt; end_timestamp:
        batch = f&quot;Batch {i} for {name}&quot;
        await asyncio.sleep(2)  # Some async API call, for example
        results.append(batch)
        start_timestamp += 3600
        i += 1
    return results


async def main():
    names = [&quot;Jack&quot;, &quot;Jill&quot;, &quot;Bob&quot;]
    return await run(names)


output = asyncio.run(main())
print(output)

Both of these work, produce the output you expect, and are (at least with your simplified example) essentially equivalent.

huangapple
  • 本文由 发表于 2023年4月16日 23:31:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76028664.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定