英文:
Initiating opcua-asyncio Client in thread
问题
在使用asyncua.Client
编写实现测试的上下文中,我想要(1)为asyncua.Server
编写一个装置,并且(2)在单独的线程中调用最终启动客户端的函数。我想要实现的一个最简单的示例,不涉及编写conftest.py
等复杂内容,是运行类似以下代码的内容:
import asyncio
import asyncua
import threading
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
t = threading.Thread(target=asyncio.run, args=(listen(endpoint),))
t.start()
t.join()
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
这总是导致在async with asyncua.Client
这一行出现TimeoutError
。
我找到的大多数示例都有两个名为server.py
的文件,用于初始化asyncua.Server
,以及名为client.py
的文件,用于初始化asyncua.Client
。然后在不同的终端中启动服务器和客户端。但是为了运行pytest,我认为应该从相同的入口点启动两者。如何实现这一目标?
英文:
In context of writing tests for an implementation using asyncua.Client
, I'd like to (1) write a fixture for asyncua.Server
and (2) call the function that eventually starts the client in a separate thread. A minimal example of what I'd like to achieve, without the complexity of writing conftest.py
etc, is running something like below:
import asyncio
import asyncua
import threading
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
t = threading.Thread(target=asyncio.run, args=(listen(endpoint),))
t.start()
t.join()
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
This always leads to a TimeoutError
in the line async with asyncua.Client
.
Most examples I've found have two files called server.py
for initializing asyncua.Server
and client.py
for initializing asyncua.Client
. Then the server and client are started in separate terminals. However in order to run pytest, I believe both should start from the same entrypoint. How can this be achieved?
答案1
得分: 2
不要混合使用异步和线程,对于这种用例,请使用任务(Tasks):
import asyncio
import asyncua
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
task = asyncio.create_task(listen(endpoint))
await task
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
英文:
You should not mix async und threading, for such use cases. Use Tasks:
import asyncio
import asyncua
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
task = asyncio.create_task(listen(endpoint))
await task
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
答案2
得分: 0
除了Schroeder提供的批准答案外,在我这种情况下(存在大量多线程代码)有效的另一种方法是使用ThreadPoolExecutor的替代方法:
import asyncio
import asyncua
import multiprocessing
import concurrent.futures
async def listen(endpoint, condition):
with condition:
condition.wait()
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
def listen_sync(endpoint, condition):
return asyncio.run(listen(endpoint, condition))
async def run_server(endpoint, condition):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
with condition:
condition.notify_all()
while True:
await asyncio.sleep(1)
def run_server_sync(endpoint, condition):
return asyncio.run(run_server(endpoint, condition))
async def run_all(endpoint):
loop = asyncio.get_running_loop()
condition = multiprocessing.Condition()
with concurrent.futures.ThreadPoolExecutor(2) as pool:
server_task = loop.run_in_executor(pool, run_server_sync, endpoint, condition)
client_task = loop.run_in_executor(pool, listen_sync, endpoint, condition)
await asyncio.gather(*[server_task, client_task])
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
相关参考资料:
- opcua-asyncio库本身的测试,如Schroeder建议的。
- https://stackoverflow.com/questions/69741177/run-multiple-async-loops-in-separate-processes-within-a-main-async-app
英文:
Besides the approved answer provided by Schroeder, an alternative using a ThreadPoolExecutor that worked in my case (where there is substantial multithreaded code):
import asyncio
import asyncua
import multiprocessing
import concurrent.futures
async def listen(endpoint, condition):
with condition:
condition.wait()
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
def listen_sync(endpoint, condition):
return asyncio.run(listen(endpoint, condition))
async def run_server(endpoint, condition):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
with condition:
condition.notify_all()
while True:
await asyncio.sleep(1)
def run_server_sync(endpoint, condition):
return asyncio.run(run_server(endpoint, condition))
async def run_all(endpoint):
loop = asyncio.get_running_loop()
condition = multiprocessing.Condition()
with concurrent.futures.ThreadPoolExecutor(2) as pool:
server_task = loop.run_in_executor(pool, run_server_sync, endpoint, condition)
client_task = loop.run_in_executor(pool, listen_sync, endpoint, condition)
await asyncio.gather(*[server_task, client_task])
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
Relevant references:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论