英文:
Initiating opcua-asyncio Client in thread
问题
在使用asyncua.Client编写实现测试的上下文中,我想要(1)为asyncua.Server编写一个装置,并且(2)在单独的线程中调用最终启动客户端的函数。我想要实现的一个最简单的示例,不涉及编写conftest.py等复杂内容,是运行类似以下代码的内容:
import asyncio
import asyncua
import threading
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
t = threading.Thread(target=asyncio.run, args=(listen(endpoint),))
t.start()
t.join()
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
这总是导致在async with asyncua.Client这一行出现TimeoutError。
我找到的大多数示例都有两个名为server.py的文件,用于初始化asyncua.Server,以及名为client.py的文件,用于初始化asyncua.Client。然后在不同的终端中启动服务器和客户端。但是为了运行pytest,我认为应该从相同的入口点启动两者。如何实现这一目标?
英文:
In context of writing tests for an implementation using asyncua.Client, I'd like to (1) write a fixture for asyncua.Server and (2) call the function that eventually starts the client in a separate thread. A minimal example of what I'd like to achieve, without the complexity of writing conftest.py etc, is running something like below:
import asyncio
import asyncua
import threading
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
t = threading.Thread(target=asyncio.run, args=(listen(endpoint),))
t.start()
t.join()
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
This always leads to a TimeoutError in the line async with asyncua.Client.
Most examples I've found have two files called server.py for initializing asyncua.Server and client.py for initializing asyncua.Client. Then the server and client are started in separate terminals. However in order to run pytest, I believe both should start from the same entrypoint. How can this be achieved?
答案1
得分: 2
不要混合使用异步和线程,对于这种用例,请使用任务(Tasks):
import asyncio
import asyncua
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
task = asyncio.create_task(listen(endpoint))
await task
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
英文:
You should not mix async und threading, for such use cases. Use Tasks:
import asyncio
import asyncua
async def listen(endpoint):
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
async def run_all(endpoint):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
task = asyncio.create_task(listen(endpoint))
await task
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
答案2
得分: 0
除了Schroeder提供的批准答案外,在我这种情况下(存在大量多线程代码)有效的另一种方法是使用ThreadPoolExecutor的替代方法:
import asyncio
import asyncua
import multiprocessing
import concurrent.futures
async def listen(endpoint, condition):
with condition:
condition.wait()
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
def listen_sync(endpoint, condition):
return asyncio.run(listen(endpoint, condition))
async def run_server(endpoint, condition):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
with condition:
condition.notify_all()
while True:
await asyncio.sleep(1)
def run_server_sync(endpoint, condition):
return asyncio.run(run_server(endpoint, condition))
async def run_all(endpoint):
loop = asyncio.get_running_loop()
condition = multiprocessing.Condition()
with concurrent.futures.ThreadPoolExecutor(2) as pool:
server_task = loop.run_in_executor(pool, run_server_sync, endpoint, condition)
client_task = loop.run_in_executor(pool, listen_sync, endpoint, condition)
await asyncio.gather(*[server_task, client_task])
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
相关参考资料:
- opcua-asyncio库本身的测试,如Schroeder建议的。
- https://stackoverflow.com/questions/69741177/run-multiple-async-loops-in-separate-processes-within-a-main-async-app
英文:
Besides the approved answer provided by Schroeder, an alternative using a ThreadPoolExecutor that worked in my case (where there is substantial multithreaded code):
import asyncio
import asyncua
import multiprocessing
import concurrent.futures
async def listen(endpoint, condition):
with condition:
condition.wait()
async with asyncua.Client(url=endpoint) as client:
print(f'OPC UA client has started {client}.')
return
def listen_sync(endpoint, condition):
return asyncio.run(listen(endpoint, condition))
async def run_server(endpoint, condition):
server = asyncua.Server()
await server.init()
server.set_endpoint(endpoint)
async with server:
with condition:
condition.notify_all()
while True:
await asyncio.sleep(1)
def run_server_sync(endpoint, condition):
return asyncio.run(run_server(endpoint, condition))
async def run_all(endpoint):
loop = asyncio.get_running_loop()
condition = multiprocessing.Condition()
with concurrent.futures.ThreadPoolExecutor(2) as pool:
server_task = loop.run_in_executor(pool, run_server_sync, endpoint, condition)
client_task = loop.run_in_executor(pool, listen_sync, endpoint, condition)
await asyncio.gather(*[server_task, client_task])
if __name__ == '__main__':
asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
Relevant references:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论