英文:
AsyncMock coroutines never actually yield control?
问题
我正在尝试测试我编写的一些异步代码。结构如下:
- 一个工作对象负责接收任务并异步执行它,通常在某些异步IO调用上使用
await
。 - 每个工作对象上方有一个循环,从共享队列中读取任务,并在每个任务可用时将任务传递给工作对象。请注意,这个队列是持久的,实际上我正在使用一个阻塞的持久队列库。
- 最顶部是一个
asyncio.gather
调用,等待每个内部循环。
我已经构建了一个玩具示例:
import asyncio
async def loop_inner(worker, worker_num, tasks):
while tasks:
task = tasks.pop()
print('Worker number {} handled task number {}'.format(
worker_num, await worker.do_work(task)))
async def loop(workers, tasks):
tasks = [loop_inner(worker, worker_num, tasks) for
worker_num, worker in enumerate(workers)]
await asyncio.gather(*tasks)
在真实工作负载上运行时,这个结构工作得很好。具有高吞吐量,充分利用了并行性等等。
问题是,当我想要进行测试时。我想编写模拟任务在各个工作对象之间分发的测试。然而,我只想测试分发逻辑,同时模拟掉工作对象的实际代码。我的自然冲动是将真正的工作对象替换为AsyncMock
对象。问题是,当我运行这个测试用例时,所有的工作都由一个单一的工作对象处理:
from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch
class TestCase(IsolatedAsyncioTestCase):
async def test_yielding(self):
tasks = list(range(10))
workers = [AsyncMock() for i in range(2)]
for worker in workers:
worker.do_work.side_effect = lambda task: task
await loop(workers, tasks)
main()
我的输出如下:
Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0
是怎么回事?为什么只有一个工作对象处理所有的工作?事件循环没有将控制权传递给其他工作对象吗?AsyncMock
协程是否真的没有让出控制?我该如何更真实地测试这个?
英文:
I'm trying to test some asynchronous code I wrote. The structure is:
- A worker object is responsible for receiving a task and asynchronously executing it, often
await
ing on some asynchronous IO call - Sitting above each worker object is a loop that reads from a shared queue, passing tasks down to the worker as each task becomes available. Note that this queue is persistent, and in reality I'm using a blocking persistent queue library.
- At the very top is an
asyncio.gather
call, waiting on each of the inner loops.
I've constructed a toy example:
import asyncio
async def loop_inner(worker, worker_num, tasks):
while tasks:
task = tasks.pop()
print('Worker number {} handled task number {}'.format(
worker_num, await worker.do_work(task)))
async def loop(workers, tasks):
tasks = [loop_inner(worker, worker_num, tasks) for
worker_num, worker in enumerate(workers)]
await asyncio.gather(*tasks)
When run on a real workload, the structure works great. High throughput, good use of parallelism, etc.
The problem is when I want to test it. I'd like to write tests that mimic the distribute of tasks across the various workers. However, I want to test just the distribution logic, while mocking out the worker code itself. My natural impulse is to replace real workers with AsyncMock
objects. Problem is, when I run this test case, all the work is being handled by a single worker:
from unittest import IsolatedAsyncioTestCase, main
from unittest.mock import ANY, AsyncMock, patch
class TestCase(IsolatedAsyncioTestCase):
async def test_yielding(self):
tasks = list(range(10))
workers = [AsyncMock() for i in range(2)]
for worker in workers:
worker.do_work.side_effect = lambda task: task
await loop(workers, tasks)
main()
My output is as follows:
Worker number 0 handled task number 9
Worker number 0 handled task number 8
Worker number 0 handled task number 7
Worker number 0 handled task number 6
Worker number 0 handled task number 5
Worker number 0 handled task number 4
Worker number 0 handled task number 3
Worker number 0 handled task number 2
Worker number 0 handled task number 1
Worker number 0 handled task number 0
What gives? Why is just one worker handling all the work? Is the event loop not passing control off to the other worker? Is the AsyncMock
coroutine not really yielding control? What can I do to more realistically test this?
答案1
得分: 1
为什么只有一个工作线程处理所有的工作?
调度器永远不会在时间片到期时中断,因为我们在这里使用协作线程。
所以_协作_。
计算list(range(10))
永远不会放弃控制。
在lambda
中主动放弃控制,通过短暂的sleep
或将其构造为异步函数,
可以给调度器一个机会同时保持多个"盘子"在旋转。
英文:
> Why is just one worker handling all the work?
Scheduler will never interrupt upon quantum expiration, as
we are using cooperative threads here.
So cooperate.
Computing list(range(10))
never yields control.
Voluntarily yielding control with a brief sleep
in the lambda
,
or structuring it as an async function,
gives the scheduler an opportunity to keep more than one plate
spinning simultaneously.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论