AsyncMock协程实际上从不放弃控制?

huangapple go评论68阅读模式
英文:

AsyncMock coroutines never actually yield control?

问题

我正在尝试测试我编写的一些异步代码。结构如下:

  • 一个工作对象负责接收任务并异步执行它,通常在某些异步IO调用上使用await
  • 每个工作对象上方有一个循环,从共享队列中读取任务,并在每个任务可用时将任务传递给工作对象。请注意,这个队列是持久的,实际上我正在使用一个阻塞的持久队列库。
  • 最顶部是一个asyncio.gather调用,等待每个内部循环。

我已经构建了一个玩具示例:

import asyncio

async def loop_inner(worker, worker_num, tasks):
    while tasks:
        task = tasks.pop()
        print('Worker number {} handled task number {}'.format(
            worker_num, await worker.do_work(task)))

async def loop(workers, tasks):
    tasks = [loop_inner(worker, worker_num, tasks) for
                     worker_num, worker in enumerate(workers)]
    await asyncio.gather(*tasks)

在真实工作负载上运行时,这个结构工作得很好。具有高吞吐量,充分利用了并行性等等。

问题是,当我想要进行测试时。我想编写模拟任务在各个工作对象之间分发的测试。然而,我只想测试分发逻辑,同时模拟掉工作对象的实际代码。我的自然冲动是将真正的工作对象替换为AsyncMock对象。问题是,当我运行这个测试用例时,所有的工作都由一个单一的工作对象处理:

from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch

class TestCase(IsolatedAsyncioTestCase):
    async def test_yielding(self):
        tasks = list(range(10))
        workers = [AsyncMock() for i in range(2)]
        for worker in workers:
            worker.do_work.side_effect = lambda task: task

        await loop(workers, tasks)

main()

我的输出如下:

Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0

是怎么回事?为什么只有一个工作对象处理所有的工作?事件循环没有将控制权传递给其他工作对象吗?AsyncMock协程是否真的没有让出控制?我该如何更真实地测试这个?

英文:

I'm trying to test some asynchronous code I wrote. The structure is:

  • A worker object is responsible for receiving a task and asynchronously executing it, often awaiting on some asynchronous IO call
  • Sitting above each worker object is a loop that reads from a shared queue, passing tasks down to the worker as each task becomes available. Note that this queue is persistent, and in reality I'm using a blocking persistent queue library.
  • At the very top is an asyncio.gather call, waiting on each of the inner loops.
    I've constructed a toy example:
import asyncio


async def loop_inner(worker, worker_num, tasks):
    while tasks:
        task = tasks.pop()
        print('Worker number {} handled task number {}'.format(
            worker_num, await worker.do_work(task)))


async def loop(workers, tasks):
    tasks = [loop_inner(worker, worker_num, tasks) for
                     worker_num, worker in enumerate(workers)]
    await asyncio.gather(*tasks)

When run on a real workload, the structure works great. High throughput, good use of parallelism, etc.

The problem is when I want to test it. I'd like to write tests that mimic the distribute of tasks across the various workers. However, I want to test just the distribution logic, while mocking out the worker code itself. My natural impulse is to replace real workers with AsyncMock objects. Problem is, when I run this test case, all the work is being handled by a single worker:

from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch

class TestCase(IsolatedAsyncioTestCase):
    async def test_yielding(self):
        tasks = list(range(10))
        workers = [AsyncMock() for i in range(2)]
        for worker in workers:
            worker.do_work.side_effect = lambda task: task

        await loop(workers, tasks)

main()

My output is as follows:

Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0

What gives? Why is just one worker handling all the work? Is the event loop not passing control off to the other worker? Is the AsyncMock coroutine not really yielding control? What can I do to more realistically test this?

答案1

得分: 1

为什么只有一个工作线程处理所有的工作?

调度器永远不会在时间片到期时中断,因为我们在这里使用协作线程。
所以_协作_。

计算list(range(10))永远不会放弃控制。

lambda中主动放弃控制,通过短暂的sleep或将其构造为异步函数,
可以给调度器一个机会同时保持多个"盘子"在旋转。

英文:

> Why is just one worker handling all the work?

Scheduler will never interrupt upon quantum expiration, as
we are using cooperative threads here.
So cooperate.

Computing list(range(10)) never yields control.

Voluntarily yielding control with a brief sleep in the lambda,
or structuring it as an async function,
gives the scheduler an opportunity to keep more than one plate
spinning simultaneously.

huangapple
  • 本文由 发表于 2023年3月10日 00:54:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/75687698.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定