Python混合使用asyncio和线程

huangapple go评论55阅读模式
英文:

Python mixing asyncio and threading

问题

从架构角度来看,混合使用 asyncio 和线程代码是否存在任何问题?

示例:

import asyncio
from threading import Thread

class ThreadTask:
    def __init__(self, universe_reload_id):
        self.keep_running = True
        self.universe_reload_id = universe_reload_id

class SomeAlgo:

    def process_universe(self, universe, task):
        while task.keep_running:
            ...

    async def start(self):
        task = None
        universe_reload_id = 1
        for message in self.app_context.kafka_consumer:
            if task:
                task.keep_running = False
            pairs = json.loads(message.value.decode('utf-8'))
            task = ThreadTask(universe_reload_id=universe_reload_id)
            t = Thread(target=self.process_universe, args=(pairs, task))
            t.start()

async def _run_jobs():
    provider = SomeAlgo(param)
    await provider.start()

asyncio.get_event_loop().run_until_complete(_run_jobs())

我注意到Python有一个 asyncio Semaphore 的版本和一个普通版本的 Threading Semaphore

那么在上面的代码中,我应该/可以使用哪个信号量?是 asyncio 版本还是普通版本?

英文:

From an architectural perspective, are there any concerns mixing asyncio and threading code?

Example,

import asyncio
from threading import Thread

class ThreadTask:
    def __init__(self, universe_reload_id) -> None:
        self.keep_running = True
        self.universe_reload_id = universe_reload_id


class SomeAlgo:

    def process_universe(self, universe, task):
        while task.keep_running:
            ...

    async def start(self):

        task = None

        universe_reload_id = 1
        for message in self.app_context.kafka_consumer:
            if task:
                task.keep_running = False

            pairs = json.loads(message.value.decode('utf-8'))

            task = ThreadTask(universe_reload_id=universe_reload_id)
            t = Thread(target=self.process_universe, args = (pairs, task))
            t.start() # here we'd call process_universe in non-blocking manner

async def _run_jobs():
    provider = SomeAlgo(param)
    await provider.start()

asyncio.get_event_loop().run_until_complete(_run_jobs())

I notice Python has an asyncio version of a semaphore:
Asyncio Semaphore in Python vs. Threading Semaphore in Python.

So in the above code, which semaphore should/can I use? The asyncio version or the plain version?

答案1

得分: 0

第一个问题的观点:

将asyncio和线程结合使用可能会导致复杂的代码,因为它们具有不同的并发模型。然而,如果小心处理,这并不是固有的问题。以下是您可能需要记住的一些注意事项:

  1. 线程安全性:如果您的异步和线程代码都访问共享数据结构,您需要确保访问是线程安全的。这通常涉及使用锁或其他同步原语来防止竞态条件。在您的情况下,task.keep_running是一个从线程读取并从asyncio上下文中写入的共享变量。您应该使用锁来保护此变量或使其成为原子操作。

  2. 错误处理:在混合使用异步和线程代码时,错误处理可能会变得复杂。如果在线程代码中抛出异常,它不会传播到您的异步代码,反之亦然。您需要确保错误在两个上下文中都得到正确捕获和处理。

  3. 性能:Python的全局解释器锁(GIL)可能会导致线程运行速度比预期要慢,因为一次只能执行一个线程的Python字节码。然而,如果您的线程代码受到I/O限制或者是用C编写并释放了GIL,这可能不是一个问题。

  4. 停止线程:在您当前的设置中,您通过将task.keep_running设置为False来停止线程。然而,不能保证线程实际上会在何时停止,因为这取决于它何时下次检查此变量。如果您需要更即时地停止线程,您可能需要使用不同的方法,如thread.interrupt()(在Python 3.7及更高版本中可用)。

  5. 资源使用:每个线程使用一定数量的资源,拥有大量线程可能会导致高内存使用。此外,线程之间的上下文切换可能会消耗CPU。根据您计划使用的线程数量和可用资源,这可能是一个关注点或不是一个关注点。

总的来说,混合使用asyncio和线程可以用于以非阻塞方式运行阻塞代码,但会增加程序的复杂性。在可能的情况下,最好坚持一种并发模型。如果您的线程代码受到I/O限制,考虑将其转换为异步代码。如果它是CPU绑定的,请考虑使用进程而不是线程以绕过GIL。

第二个问题的部分回答:

您应该使用的Semaphore取决于它的使用上下文。

在您提供的代码上下文中,您既与asyncio交互又与Thread交互,选择Semaphore将取决于您试图限制的位置和内容。

  • 如果您试图限制asyncio事件循环内异步任务的并发性,应该使用asyncio.Semaphore

  • 如果您试图限制线程的并发性或在线程之间进行同步,应该使用threading.Semaphore

请记住,asynciothreading是两种不同的范式,同步原语(如Semaphore)不能互换使用。

asyncio.Semaphore设计用于与asyncio任务一起使用,并尊重asyncio的事件循环。当信号量被锁定时,它会将控制权交还给事件循环,以允许其他任务运行。

另一方面,threading.Semaphore是传统的阻塞信号量。当线程等待信号量被释放时,它确实等待,不让其他Python线程运行。

在您的情况下,如果您计划使用信号量来保护共享的task.keep_running变量,您可能会使用threading.Semaphore,因为task.keep_running是从线程访问的。然而,请注意,如果您只是要保护一个单一的布尔变量,使用threading.Lockthreading.Condition可能更简单和更高效。

祝您好运!

英文:

For the first question a Perspective :

Combining asyncio and threading can lead to complex code because they have different concurrency models. However, it's not inherently problematic if done carefully. Here are a few concerns you might want to keep in mind:

  1. Thread-Safety: If both your async and threaded code are accessing shared data structures, you need to ensure that the access is thread-safe. This usually involves using locks or other synchronization primitives to prevent race conditions. In your case, task.keep_running is a shared variable that is read from a thread and written from the asyncio context. You should protect this variable with a lock or make it an atomic operation.

  2. Error Handling: Error handling can get tricky when mixing async and threaded code. If an exception is thrown in your threaded code, it won't propagate to your async code and vice versa. You need to make sure that errors are properly caught and handled in both contexts.

  3. Performance: Python's GIL (Global Interpreter Lock) can cause threads to run slower than expected because only one thread can execute Python bytecode at a time. However, this might not be a problem if your threaded code is I/O bound or if it's written in C and releases the GIL.

  4. Stopping Threads: In your current setup, you stop a thread by setting task.keep_running to False. However, there's no guarantee when the thread will actually stop because it depends on when it next checks this variable. If you need more immediate stopping of threads, you may need to use a different approach, like thread.interrupt() (available in Python 3.7 and later).

  5. Resource Usage: Each thread uses a certain amount of resources, and having a large number of threads can lead to high memory usage. Also, context-switching between threads can consume CPU. Depending on the number of threads you're planning to use and the resources available, this may or may not be a concern.

Overall, mixing asyncio and threading can be useful for running blocking code in a non-blocking manner, but it adds complexity to your program. Whenever possible, it's simpler to stick to one model of concurrency. If your threaded code is I/O bound, consider converting it to async code. If it's CPU-bound, consider using processes instead of threads to bypass the GIL.

For the second part question :

The Semaphore you should use depends on the context in which it's being used.

In the context of your provided code, where you're interacting with both asyncio and Thread, the choice of Semaphore would depend on where and what you're trying to limit.

  • If you're trying to limit the concurrency of asynchronous tasks within your asyncio event loop, you should use asyncio.Semaphore.

  • If you're trying to limit the concurrency of threads or synchronize across threads, you should use threading.Semaphore.

Remember, asyncio and threading are two different paradigms and the synchronization primitives (like Semaphore) are not interchangeable.

asyncio.Semaphore is designed to work with asyncio tasks and respects asyncio's event loop. It yields control back to the event loop when the semaphore is locked, allowing other tasks to run.

On the other hand, threading.Semaphore is a traditional blocking semaphore. When a thread waits for a semaphore to be released, it truly waits and does not let other Python threads run.

In your case, if you're planning to use a semaphore for the shared task.keep_running variable, you would likely use threading.Semaphore, as task.keep_running is being accessed from a thread. However, be aware that using a semaphore might be overkill if you're just protecting a single boolean variable. Using a threading.Lock or threading.Condition may be simpler and more efficient.

Cheers**!!

huangapple
  • 本文由 发表于 2023年7月10日 13:13:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76650810.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定