英文:
asyncio to download data in batches and return a list of lists
问题
我想要以异步方式分批下载数据。
每个 name
的数据将以批次方式下载,我希望 asyncio.gather(*coroutines)
返回一个列表的列表(每个名称的批次列表)。
import asyncio
import datetime
async def run(names):
"""为每个名称启动一个协程。"""
coroutines = [_fetch_data(name) for name in names]
return await asyncio.gather(*coroutines) # 这里出错了!
async def _fetch_data(name):
"""按批次获取单个符号的数据。"""
start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
i = 1
while start_timestamp < end_timestamp:
batch = f"批次 {i} for {name}"
await asyncio.sleep(2) # 一些异步API调用,例如
# 如果我移除 yield,它可以工作。但我想要使用结果!
yield batch
start_timestamp += 3600
i += 1
async def main():
names = ["Jack", "Jill", "Bob"]
return await run(names)
output = asyncio.run(main())
print(output)
# 我期望类似这样的结果
# [["批次 1 for Jack", "批次 2 for Jack", ...], ["批次 1 for Jill", "批次 2 for Jill", ...], ...]
不幸的是,这段代码对 asyncio.gather(*coroutines)
报错:
TypeError: 需要一个 asyncio.Future、协程或可等待对象
_fetch_data
不是协程吗?这个错误在告诉我什么?我怎样才能解决这个问题?
我正在尝试了解 Python 中的 asyncio,并且我相当确定我在这里错过了一些基本知识。
英文:
I want to download data in batches asynchronously.
The data for each name
is downloaded in batches, and I'd like asyncio.gather(*coroutines)
to return a list of lists (a list of batches for each name). So far I have this code, but it raises an exception:
import asyncio
import datetime
async def run(names):
"""Start one coroutine for each name."""
coroutines = [_fetch_data(name) for name in names]
return await asyncio.gather(*coroutines) # This fails!
async def _fetch_data(name):
"""Fetch data for a single symbol in batches."""
start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
i = 1
while start_timestamp < end_timestamp:
batch = f"Batch {i} for {name}"
await asyncio.sleep(2) # Some async API call, for example
# If I remove the yield, it works. But I want to use the results!
yield batch
start_timestamp += 3600
i += 1
async def main():
names = ["Jack", "Jill", "Bob"]
return await run(names)
output = asyncio.run(main())
print(output)
# I'd expect something like
# [["Batch 1 for Jack", "Batch 2 for Jack", ...], ["Batch 1 for Jill", "Batch 2 for Jill", ...], ...]
Unfortunately, this code returns an exception for asyncio.gather(*coroutines)
:
> TypeError: An asyncio.Future, a coroutine or an awaitable is required
Isn't _fetch_data
a coroutine? What is this error trying to tell me? And how can I get past it?
I'm trying to learn more about asyncio in Python and I'm quite sure I'm missing some basics here.
答案1
得分: 1
不是协程函数。通过在其中使用yield
语句,你将其转换为异步生成器,默认情况下不能等待。有关区别的详细信息请参见此处。
如果你确实希望它成为生成器,并且想要同时消耗多个async
生成器,你需要稍微修改代码,并引入一个异步消耗生成器的协程。通常可以通过async for
循环来实现。
像这样会起作用:
...
async def run(names):
coroutines = [fetch(name) for name in names]
return await asyncio.gather(*coroutines)
async def fetch(name):
return [batch async for batch in _fetch_data(name)]
...
我不知道你的实际用例,所以不知道是否在这里实际使用生成器是明智的。但作为替代,你当然可以通过将_fetch_data
实际更改为返回列表而不是异步生成器_yield_列表项来回避整个问题:
import asyncio
import datetime
async def run(names):
return await asyncio.gather(*(_fetch_data(name) for name in names))
async def _fetch_data(name):
start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
results = []
i = 1
while start_timestamp < end_timestamp:
batch = f"Batch {i} for {name}"
await asyncio.sleep(2) # Some async API call, for example
results.append(batch)
start_timestamp += 3600
i += 1
return results
async def main():
names = ["Jack", "Jill", "Bob"]
return await run(names)
output = asyncio.run(main())
print(output)
这两种方法都有效,产生你期望的输出,并且(至少对于你的简化示例)基本上是等效的。
英文:
No, _fetch_data
is not a coroutine function. By using a yield
statement inside it you turned it into an asynchronous generator, which is not awaitable by default. See here for details about the distinction.
If you actually want it to be a generator and you want multiple async
generators to be consumed concurrently, you'll need to modify your code a bit and introduce a coroutine that asynchronously consumes the generator. This is typically done via an async for
-loop.
Something like this will work:
...
async def run(names):
coroutines = [fetch(name) for name in names]
return await asyncio.gather(*coroutines)
async def fetch(name):
return [batch async for batch in _fetch_data(name)]
...
I don't know your actual use case, so I don't know, if using a generator is actually prudent here. But alternatively you could of course sidestep the entire issue by just making _fetch_data
an actual coroutine function that returns a list, rather than an asynchronous generator yielding list items:
import asyncio
import datetime
async def run(names):
return await asyncio.gather(*(_fetch_data(name) for name in names))
async def _fetch_data(name):
start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
results = []
i = 1
while start_timestamp < end_timestamp:
batch = f"Batch {i} for {name}"
await asyncio.sleep(2) # Some async API call, for example
results.append(batch)
start_timestamp += 3600
i += 1
return results
async def main():
names = ["Jack", "Jill", "Bob"]
return await run(names)
output = asyncio.run(main())
print(output)
Both of these work, produce the output you expect, and are (at least with your simplified example) essentially equivalent.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论