Python Asyncio – 基于第一次调用的响应进行第二次调用

huangapple go评论86阅读模式
英文:

Python Asyncio - making a second call based on the first call's response

问题

我正在尝试开发一个系统,在该系统中,我可以向一个API发送大量请求并捕获成功发送的请求。我想要使用asyncio来快速发送这些请求,并在第一个API请求成功后立即调用不同API的第二个调用。如何在不必等待所有第一个API调用完成的情况下执行第二个调用?也就是说,一旦第一个调用有响应,就调用第二个API?

import asyncio
import aiohttp

def get_tasks(session, url, payload):
    tasks = []
    for p in payload:
        response = session.get(url, params=payload)
        tasks.append(response)
        
        # 我需要的是:如果 response.status == 200: 做某事
    return tasks

async def main():
    url = "foo"
    payload = [{'a': 1}, {'b': 2}]
    
    async with aiohttp.ClientSession() as session:
        tasks = get_tasks(session, url, payload)
        responses = await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())
英文:

I am trying to develop a system in which I can send a large number of requests to an API and capture which ones were successfully sent. I want to use asyncio to send these requests quickly and follow up with a second call to a different API if the first one was successful. How can I do this without having to wait for all the first API calls to be made before making the second ones? I.e. as soon as the first call has a response, call the second API?

import asyncio
import aiottp


def get_tasks(session, url, payload):
    tasks = []
    for p in payload:
        response = session.get(url, params=payload))
        tasks.append(response)
        
        # What I need: if response.status == 200: do something
    return tasks


async def main():
    url = "foo"
    payload = ['{"a": 1}', '{"b": 2}']
    
    async with aiottp.ClientSession() as session:
        tasks = get_tasks(session, url, payload)
        responses = await asyncio.gather(*tasks)

if __name__ == "__main__":
     asyncio.run(main())

答案1

得分: 1

以下是您要翻译的内容:

"更容易的做法是将相关的调用打包在一个小的异步函数中,这些调用必须按顺序发生,然后创建基于此函数的任务,而不是使每个调用单独执行任务。

async def atomic_task(session, url, payload):
    response = await session.get(url, params=payload))
        
    if response.status == 200:
        # 做一些事情
        ... # 随时使用 `await`

def get_tasks(session, url, payload):
    tasks = []
    for p in payload:
        tasks.append(atomic_task(session, url, params=payload)

    return tasks

另一种方法是将第二个调用添加为每个任务的“完成回调”:

extra_tasks = set()

def task_done(task):
    response = task.result()
    if response.status == 200:
         # 做一些事情。但这是一个同步函数:不能使用 await。
         # 如果需要执行异步操作,可以在这里创建一个任务,并将它们添加到某个地方以等待:

         new_task = asyncio.create_task(...)
         extra_tasks.add(new_task)

def get_tasks(session, url, payload):
    tasks = []
    for p in payload:
        task = session.get(url, params=payload))
        task.add_done_callback(task_done)
        tasks.append(task)


async def main():
    url = "foo"
    payload = [{'a': 1}, {'b': 2}]
    
    async with aiottp.ClientSession() as session:
        tasks = get_tasks(session, url, payload)
        responses = await asyncio.gather(*tasks)
        await asyncio.gather(*extra_tasks)

请注意,第一种选项,将顺序操作内联到协程中,是推荐的方法,也更简单 - 回调代码有点像“hack”。但它确实有效:我们在等待初始任务后立即等待“extra_tasks”并不意味着它们只会在所有任务都完成后才执行:它们将在回调中创建时立即开始运行 - 最后一行的额外 gather 仅确保程序不会在所有新任务也完成之前终止。"

英文:

The easier thing to do is to pack the calls that are related, and have to happen in sequence, in a small async function, and then create tasks out of this function, instead of creating tasks with each call taking place solo.

async def atomic_task(session, url, payload):
    response = await session.get(url, params=payload))
        
    if response.status == 200:
        # do something
        ... # use `await` at will

def get_tasks(session, url, payload):
    tasks = []
    for p in payload:
        tasks.append(atomic_task(session, url, params=payload)

    return tasks

Another way is to add the second call as a "done callback" of each task:

extra_tasks = set()

def task_done(task):
    response = task.result()
    if response.status == 200:
         # do stuff. But this is a sync function:  can't use await.
         # if there are async things to be done, you can create a task here, and add them to a registry to be awaited somewhere else:

         new_task = asyncio.create_task(...)
         extra_tasks.add(new_task)

def get_tasks(session, url, payload):
    tasks = []
    for p in payload:
        task = session.get(url, params=payload))
        task.add_done_callback(task_done)
        tasks.append(task)


async def main():
    url = "foo"
    payload = ['{"a": 1}', '{"b": 2}']
    
    async with aiottp.ClientSession() as session:
        tasks = get_tasks(session, url, payload)
        responses = await asyncio.gather(*tasks)
        await asyncio.gather(*extra_tasks)

Note that the first option, inlining the sequencial operations in a co-routine, is the recommended way, and also simpler - with callback code being somewhat of a "hack". Butit works: the fact we await for the "extra_tasks" just after the initial tasks are awaited, does not mean they will be executed only after all those finish: they will start to run as soon as they are created, in the callback - the extra gather on the last line will just ensure the program does not terminate until all of the new tasks are also done.

huangapple
  • 本文由 发表于 2023年6月15日 02:16:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76476473.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定