Asyncio event loop within a thread issue.

huangapple go评论42阅读模式
英文:

Asyncio event loop within a thread issue

问题

尝试在线程内创建一个事件循环,其中该线程在类的构造函数中被初始化。我想在事件循环中运行多个任务。但是,每当我尝试在线程中运行并出现错误时,会出现错误“NoneType对象没有create_task属性”。是否在调用它时出现了问题。

import asyncio
import threading

class Test():
  def __init__(self):
    self.loop = None
    self.th = threading.Thread(target=self.create)
    self.th.start()

  def __del__(self):
    self.loop.close()

  def create(self):
    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)

  def fun(self):
    task = self.loop.create_task(coroutine)
    self.loop.run_until_complete(task)

  def fun2(self):
    task = self.loop.create_task(coroutine)
    self.loop.run_until_complete(task)

t = Test()
t.fun()
t.fun2()
英文:

Trying to create a event loop inside a thread, where the thread is initiated within the constructor of a class. I want to run multiple tasks within the event loop. However, having an issue whenever I try to run with the thread and get the error "NoneType object has no attribute create_task"
Is there something I am doing wrong in calling it.

import asyncio
import threading 

Class Test():
  def __init__(self):
    self.loop = None
    self.th = threading.Thread(target=self.create)
    self.th.start()

  def __del__(self):
    self.loop.close()

  def self.create(self):
    self.loop = new_event_loop()
    asyncio.set_event_loop(self.loop)

  def fun(self):
    task = self.loop.create_task(coroutine)
    loop.run_until_complete(task)

  def fun2(self):
    task = self.loop.create_task(coroutine)
    loop.run_until_complete(task)

t = Test()
t.fun()
t.fun2()

答案1

得分: 1

很棘手将线程和asyncio结合起来,尽管如果正确地完成可以很有用。

你提供的代码存在几个语法错误,很明显这并不是你实际运行的代码。请在将来为了尊重回答问题的人的时间,仔细检查你的帖子。如果你自己能发现这些可避免的错误,你将会得到更好、更快的答案。

  • 关键词"class"不应大写。
  • 类定义不需要空括号。
  • 函数定义中create前不需要有self.
  • 脚本中没有定义名为coroutine的变量。

接下来的问题是启动次要线程。方法threading.Thread.start()不会等待线程实际启动。新线程是“待定”的,会在不久的将来开始,但你无法控制启动时间。因此,start()会立即返回;你的__init__方法返回;而在线程开始之前调用了self.loop,此时self.loop实际上是None,正如错误信息所指示的那样。

一个很好的解决方法是使用threading.Barrier对象,可确保线程在__init__方法返回之前已经启动。

你的__del__方法可能是不必要的,并且通常只会在程序关闭时执行。如果在任何其他情况下运行,如果你在一个仍在运行的循环上调用loop.close,你将会得到一个错误。我认为最好是确保线程干净地关闭,所以我提供了一个用于此目的的Test.close方法。

你的函数funfun2的写法使它们变得不太有用。你启动一个任务,然后立即等待它完成。在这种情况下,根本没有理由使用asyncio。asyncio的整个理念是同时运行多个任务。逐个创建任务并始终等待每个任务完成没有太多意义。

大多数asyncio函数不是线程安全的。如果你想要跨线程运行asyncio代码,你必须使用两个重要的方法loop.call_soon_threadsafeasyncio.run_coroutine_threadsafe。方法funfun2在主线程中执行,因此你应该使用run_coroutine_threadsafe在次要线程中启动任务。

最后,在这样的程序中,通常最好提供一个线程关闭方法。在以下清单中,close获取所有正在运行的任务列表,对每个发送取消消息,然后发送停止命令到循环本身。然后它等待线程真正退出。主线程将被阻塞,直到次要线程完成,因此程序将干净地关闭。

以下是一个简单的可工作程序,具有你似乎想要的所有功能:

import asyncio
import threading

async def coro(s):
    print(s)
    await asyncio.sleep(3.0)

class Test:
    def __init__(self):
        self.loop = None
        self.barrier = threading.Barrier(2)  # 添加
        self.th = threading.Thread(target=self.create)
        self.th.start()
        self.barrier.wait()   # 阻塞,直到新线程运行

    def create(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.barrier.wait()
        print("线程已启动")
        self.loop.run_forever()
        print("循环已停止")
        self.loop.close()  # 清理循环资源

    def close(self):  # 从主线程调用此方法
        self.loop.call_soon_threadsafe(self._close)
        self.th.join()  # 等待线程退出(确保循环已关闭)

    def _close(self):  # 在self.th线程中执行
        tasks = asyncio.all_tasks(self.loop)
        for task in tasks:
            task.cancel()
        self.loop.call_soon(self.loop.stop)

    def fun(self): 
        return asyncio.run_coroutine_threadsafe(coro("你好 1"), self.loop)

    def fun2(self):
        return asyncio.run_coroutine_threadsafe(coro("你好 2"), self.loop)

t = Test()
print("Test构造完成")
t.fun()
fut = t.fun2()
# 如果不想在此等待,请注释下一行
# fut.result()  # 等待fun2完成
print("正在关闭")
t.close()
print("完成")
英文:

It is tricky to combine threading and asyncio, although it can be useful if done properly.

The code you gave has several syntax errors, so obviously it isn't the code you are actually running. Please, in the future, check your post carefully out of respect for the time of those who answer questions here. You'll get better and quicker answers if you spot these avoidable errors yourself.

  • The keyword "class" should not be capitalized.
  • The class definition does not need empty parenthesis.
  • The function definition for create should not have self. in front of it.
  • There is no variable named coroutine defined in the script.

The next problem is the launching of the secondary thread. The method threading.Thread.start() does not wait for the thread to actually start. The new thread is "pending" and will start sometime soon, but you don't have control over when that happens. So start() returns immediately; your __init__ method returns; and your call to t.fun() happens before the thread starts. At that point self.loop is in fact None, as the error message indicates.

An nice way to overcome this is with a threading.Barrier object, which can be used to insure that the thread has started before the __init__ method returns.

Your __del__ method is probably not necessary, and will normally only get executed during program shut down. If it runs under any other circumstances, you will get an error if you call loop.close on a loop that is still running. I think it's better to insure that the thread shuts down cleanly, so I've provided a Test.close method for that purpose.

Your functions fun and fun2 are written in a way that makes them not very useful. You start a task and then you immediately wait for it to finish. In that case, there's no good reason to use asyncio at all. The whole idea of asyncio is to run more than one task concurrently. Creating tasks one at a time and always waiting for each one to finish doesn't make a lot of sense.

Most asyncio functions are not threadsafe. You have to use the two important methods loop.call_soon_threadsafe and asyncio.run_coroutine_threadsafe if you want to run asyncio code across threads. The methods fun and fun2 execute in the main thread, so you should use run_coroutine_threadsafe to launch tasks in the secondary thread.

Finally, with such programs it's usually a good idea to provide a thread shutdown method. In the following listing, close obtains a list of all the running tasks, sends a cancel message to each, and then sends the stop command to the loop itself. Then it waits for the thread to really exit. The main thread will be blocked until the secondary thread is finished, so the program will shut down cleanly.

Here is a simple working program, with all the functionality that you seem to want:

import asyncio
import threading

async def coro(s):
    print(s)
    await asyncio.sleep(3.0)

class Test:
    def __init__(self):
        self.loop = None
        self.barrier = threading.Barrier(2)  # Added
        self.th = threading.Thread(target=self.create)
        self.th.start()
        self.barrier.wait()   # Blocks until the new thread is running

    def create(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.barrier.wait()
        print("Thread started")
        self.loop.run_forever() 
        print("Loop stopped")
        self.loop.close()  # Clean up loop resources
    
    def close(self):  # call this from main thread
        self.loop.call_soon_threadsafe(self._close)
        self.th.join()  # Wait for the thread to exit (insures loop is closed)
        
    def _close(self):  # Executes in thread self.th
        tasks = asyncio.all_tasks(self.loop)
        for task in tasks:
            task.cancel()
        self.loop.call_soon(self.loop.stop)

    def fun(self): 
        return asyncio.run_coroutine_threadsafe(coro("Hello 1"), self.loop)

    def fun2(self):
        return asyncio.run_coroutine_threadsafe(coro("Hello 2"), self.loop)

t = Test()
print("Test constructor complete")
t.fun()
fut = t.fun2()
# Comment out the next line if you don't want to wait here
# fut.result()  # Wait for fun2 to finish
print("Closing")
t.close()
print("Finished")

huangapple
  • 本文由 发表于 2023年2月8日 23:16:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/75387872.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定