Python asyncio没有显示任何错误。

huangapple go评论82阅读模式
英文:

Python asyncio does not show any errors

问题

我试图使用asyncio从成千上万个URL中获取一些数据。
以下是设计的简要概述:

  1. 通过单个生产者一次性填充一个队列(Queue)以获取一堆URL。
  2. 启动一堆消费者(Consumers)。
  3. 每个消费者从队列中异步提取URL并发送GET请求。
  4. 对结果进行一些后处理。
  5. 合并所有处理后的结果并返回。

问题: asyncio几乎不会显示任何错误,它只会静默挂起,没有错误。我在各处放置了print语句以自己检测问题,但没有什么帮助。

根据输入URL的数量和消费者数量或限制,可能会出现以下错误:

  1. Task was destroyed but it is pending!
  2. task exception was never retrieved future: <Task finished coro=<consumer()
  3. aiohttp.client_exceptions.ServerDisconnectedError
  4. aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine

问题: 如何检测和处理asyncio中的异常?如何在不中断队列的情况下重试?

以下是我根据各种异步代码示例编译的代码。当前,在def get_video_title函数的末尾有一个有意的错误。运行时,什么也不会显示。

import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this

user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36"

def get_video_title(data):
    match = re.search(r'window\[["\']ytInitialPlayerResponse["\']\]\s*=\s*(.*)', data)
    string = match[1].strip()[:-1]
    result = json.loads(string)
    return result['videoDetails']['TEST_ERROR'] # <---- should be 'title'

async def fetch(session, url, c):
    async with session.get(url, headers={"user-agent": user_agent}, raise_for_status=True, timeout=60) as r:
        print('---------Fetching', c)
        if r.status != 200:
            r.raise_for_status()
        return await r.text()

async def consumer(queue, session, responses):
    while True:
        try:
            i, url = await queue.get()
            print("Fetching from a queue", i)
            html_page = await fetch(session, url, i)
            
            print('+++Processing', i)
            result = get_video_title(html_page) # should raise an error here!
            responses.append(result)
            queue.task_done()
            
            print('+++Task Done', i)

        except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
            print('>>>>>>>>>>>>>Error', i, type(e))
            await asyncio.sleep(1)
            queue.task_done()

async def produce(queue, urls):
    for i, url in enumerate(urls):
        print('Putting in a queue', i)
        await queue.put((i, url))

async def run(session, urls, consumer_num):
    queue, responses = asyncio.Queue(maxsize=2000), []
    
    print('[Making Consumers]')
    consumers = [asyncio.ensure_future(
        consumer(queue, session, responses)) 
                 for _ in range(consumer_num)]
    
    print('[Making Producer]')
    producer = await produce(queue=queue, urls=urls)
    
    print('[Joining queue]')
    await queue.join()
    
    print('[Cancelling]')
    for consumer_future in consumers:
        consumer_future.cancel()
    
    print('[Returning results]')
    return responses

async def main(loop, urls):
    print('Starting a Session')
    async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
        print('Calling main function')
        posts = await run(session, urls, 100)
        print('Done')
        return posts

if __name__ == '__main__':
    urls = ['https://www.youtube.com/watch?v=dNQs_Bef_V8'] * 100
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(loop, urls))
英文:

I'm trying to get some data from thousands of urls by using asyncio.
Here is a brief overview of the design:

  1. Fill up a Queue in one go with a bunch of urls using a single Producer
  2. Spawn a bunch of Consumers
  3. Each Consumer keeps asynchronously extracting urls from the Queue and sending GET requests
  4. Do some postprocessing on the result
  5. Combine all processed results and return

Problems: asyncio almost never shows if anything is wrong, it just silently hangs with no errors. I put print statements everywhere to detect problems myself, but it didn't help much.

Depending on the number of input urls and number of consumers or limits i might get these errors:

  1. Task was destroyed but it is pending!
  2. task exception was never retrieved future: &lt;Task finished coro=&lt;consumer()
  3. aiohttp.client_exceptions.ServerDisconnectedError
  4. aiohttp.client_exceptions.ClientOSError: [WinError 10053] An established connection was aborted by the software in your host machine

Questions: how to detect and handle exceptions in asyncio? how to retry without disrupting the Queue ?

Bellow is my code that i compiled looking at various examples of async code. Currently, there's in an intentional error at the end of a def get_video_title function. When run, nothing shows up.

import asyncio
import aiohttp
import json
import re
import nest_asyncio
nest_asyncio.apply() # jupyter notebook throws errors without this


user_agent = &quot;Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36&quot;

def get_video_title(data):
    match = re.search(r&#39;window\[[&quot;\&#39;]ytInitialPlayerResponse[&quot;\&#39;]\]\s*=\s*(.*)&#39;, data)
    string = match[1].strip()[:-1]
    result = json.loads(string)
    return result[&#39;videoDetails&#39;][&#39;TEST_ERROR&#39;] # &lt;---- should be &#39;title&#39;

async def fetch(session, url, c):
    async with session.get(url, headers={&quot;user-agent&quot;: user_agent}, raise_for_status=True, timeout=60) as r:
        print(&#39;---------Fetching&#39;, c)
        if r.status != 200:
            r.raise_for_status()
        return await r.text()

async def consumer(queue, session, responses):
    while True:
        try:
            i, url = await queue.get()
            print(&quot;Fetching from a queue&quot;, i)
            html_page = await fetch(session, url, i)
            
            print(&#39;+++Processing&#39;, i)
            result = get_video_title(html_page) # should raise an error here!
            responses.append(result)
            queue.task_done()
            
            print(&#39;+++Task Done&#39;, i)

        except (aiohttp.http_exceptions.HttpProcessingError, asyncio.TimeoutError) as e:
            print(&#39;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;&gt;Error&#39;, i, type(e))
            await asyncio.sleep(1)
            queue.task_done()

async def produce(queue, urls):
    for i, url in enumerate(urls):
        print(&#39;Putting in a queue&#39;, i)
        await queue.put((i, url))

async def run(session, urls, consumer_num):
    queue, responses = asyncio.Queue(maxsize=2000), []
    
    print(&#39;[Making Consumers]&#39;)
    consumers = [asyncio.ensure_future(
        consumer(queue, session, responses)) 
                 for _ in range(consumer_num)]
    
    print(&#39;[Making Producer]&#39;)
    producer = await produce(queue=queue, urls=urls)
    
    print(&#39;[Joining queue]&#39;)
    await queue.join()
    
    print(&#39;[Cancelling]&#39;)
    for consumer_future in consumers:
        consumer_future.cancel()
    
    print(&#39;[Returning results]&#39;)
    return responses

async def main(loop, urls):
    print(&#39;Starting a Session&#39;)
    async with aiohttp.ClientSession(loop=loop, connector=aiohttp.TCPConnector(limit=300)) as session:
        print(&#39;Calling main function&#39;)
        posts = await run(session, urls, 100)
        print(&#39;Done&#39;)
        return posts


if __name__ == &#39;__main__&#39;:
    urls = [&#39;https://www.youtube.com/watch?v=dNQs_Bef_V8&#39;] * 100
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main(loop, urls))

答案1

得分: 11

问题在于你的consumer只捕获了两个非常特定的异常,并在这些情况下将任务标记为已完成。如果发生任何其他异常,比如与网络相关的异常,它会终止消费者。但是,这不会被run检测到,因为run正在等待与消费者(实际上)在后台运行的queue.join()。这就是为什么你的程序会挂起 - 排队的项目从未被计算,队列也从未完全处理。

有两种方法可以解决这个问题,具体取决于当程序遇到未预期的异常时你想要它执行什么操作。如果你希望它继续运行,你可以向消费者添加一个捕获所有异常的except子句,例如:

        except Exception as e
            print('other error', e)
            queue.task_done()

另一种方法是使一个_未处理的_消费者异常传播到run。这必须明确安排,但具有永远不允许异常静默传递的优点(请参阅此文章以获取详细信息)。实现它的一种方法是等待queue.join()和消费者同时完成;由于消费者处于无限循环中,它们只有在发生异常的情况下才会完成。

    print('[Joining queue]')
    # 等待要么`queue.join()`完成要么一个消费者引发异常
    done, _ = await asyncio.wait([queue.join(), *consumers],
                                 return_when=asyncio.FIRST_COMPLETED)
    consumers_raised = set(done) & set(consumers)
    if consumers_raised:
        await consumers_raised.pop()  # 传播异常

问题:如何在 asyncio 中检测和处理异常?

异常通过await传播,通常像在任何其他代码中一样检测和处理。特殊处理仅在需要捕获从“后台”任务(如consumer)泄漏的异常时才需要。

如何在不中断队列的情况下重试?

你可以在except块中调用await queue.put((i, url))。该项将被添加到队列的末尾,由消费者拾取。在这种情况下,你只需要第一个片段,并且不需要尝试将异常从consumer传播到run

英文:

The problem is that your consumer catches only two very specific exceptions, and in their case marks the task as done. If any other exception happens, such as a network-related exception, it will terminate the consumer. However, this is not detected by run, which is awaiting queue.join() with the consumer (effectively) running in the background. This is why your program hangs - queued items are never accounted for, and the queue is never fully processed.

There are two ways to fix this, depending on what you want your program to do when it encounters an unanticipated exception. If you want it to keep running, you can add a catch-all except clause to the consumer, e.g.:

        except Exception as e
            print(&#39;other error&#39;, e)
            queue.task_done()

The alternative is for an unhandled consumer exception to propagate to run. This must be arranged explicitly, but has the advantage of never allowing exceptions to pass silently. (See this article for a detailed treatment of the subject.) One way to achieve it is to wait for queue.join() and the consumers at the same time; since consumers are in an infinite loop, they will complete only in case of an exception.

    print(&#39;[Joining queue]&#39;)
    # wait for either `queue.join()` to complete or a consumer to raise
    done, _ = await asyncio.wait([queue.join(), *consumers],
                                 return_when=asyncio.FIRST_COMPLETED)
    consumers_raised = set(done) &amp; set(consumers)
    if consumers_raised:
        await consumers_raised.pop()  # propagate the exception

> Questions: how to detect and handle exceptions in asyncio?

Exceptions are propagated through await and normally detected and handled like in any other code. The special handling is only needed to catch exceptions that leak from a "background" task like the consumer.

> how to retry without disrupting the Queue ?

You can call await queue.put((i, url)) in the except block. The item will be added to the back of the queue, to be picked up by a consumer. In that case you only need the first snippet, and don't want to bother with trying to propagate the exception in consumer to run.

huangapple
  • 本文由 发表于 2020年1月7日 02:18:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/59617001.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定