初始化opcua-asyncio客户端在线程中

huangapple go评论104阅读模式
英文:

Initiating opcua-asyncio Client in thread

问题

在使用asyncua.Client编写实现测试的上下文中,我想要(1)为asyncua.Server编写一个装置,并且(2)在单独的线程中调用最终启动客户端的函数。我想要实现的一个最简单的示例,不涉及编写conftest.py等复杂内容,是运行类似以下代码的内容:

import asyncio
import asyncua

import threading

async def listen(endpoint):
    async with asyncua.Client(url=endpoint) as client:
        print(f'OPC UA client has started {client}.')
        return

async def run_all(endpoint):
    server = asyncua.Server()
    await server.init()
    server.set_endpoint(endpoint)
    async with server:
        t = threading.Thread(target=asyncio.run, args=(listen(endpoint),))
        t.start()
        t.join()

if __name__ == '__main__':
    asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))

这总是导致在async with asyncua.Client这一行出现TimeoutError

我找到的大多数示例都有两个名为server.py的文件,用于初始化asyncua.Server,以及名为client.py的文件,用于初始化asyncua.Client。然后在不同的终端中启动服务器和客户端。但是为了运行pytest,我认为应该从相同的入口点启动两者。如何实现这一目标?

英文:

In context of writing tests for an implementation using asyncua.Client, I'd like to (1) write a fixture for asyncua.Server and (2) call the function that eventually starts the client in a separate thread. A minimal example of what I'd like to achieve, without the complexity of writing conftest.py etc, is running something like below:

import asyncio
import asyncua

import threading

async def listen(endpoint):
    async with asyncua.Client(url=endpoint) as client:
        print(f'OPC UA client has started {client}.')
        return

async def run_all(endpoint):
    server = asyncua.Server()
    await server.init()
    server.set_endpoint(endpoint)
    async with server:
        t = threading.Thread(target=asyncio.run, args=(listen(endpoint),))
        t.start()
        t.join()

if __name__ == '__main__':
    asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))

This always leads to a TimeoutError in the line async with asyncua.Client.

Most examples I've found have two files called server.py for initializing asyncua.Server and client.py for initializing asyncua.Client. Then the server and client are started in separate terminals. However in order to run pytest, I believe both should start from the same entrypoint. How can this be achieved?

答案1

得分: 2

不要混合使用异步和线程,对于这种用例,请使用任务(Tasks):

import asyncio
import asyncua


async def listen(endpoint):
    async with asyncua.Client(url=endpoint) as client:
        print(f'OPC UA client has started {client}.')


async def run_all(endpoint):
    server = asyncua.Server()
    await server.init()
    server.set_endpoint(endpoint)
    async with server:
        task = asyncio.create_task(listen(endpoint))
        await task


if __name__ == '__main__':
    asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))
英文:

You should not mix async und threading, for such use cases. Use Tasks:

import asyncio
import asyncua


async def listen(endpoint):
    async with asyncua.Client(url=endpoint) as client:
        print(f'OPC UA client has started {client}.')
        return

async def run_all(endpoint):
    server = asyncua.Server()
    await server.init()
    server.set_endpoint(endpoint)
    async with server:
        task = asyncio.create_task(listen(endpoint))
        await task

if __name__ == '__main__':
    asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))

答案2

得分: 0

除了Schroeder提供的批准答案外,在我这种情况下(存在大量多线程代码)有效的另一种方法是使用ThreadPoolExecutor的替代方法:

import asyncio
import asyncua

import multiprocessing
import concurrent.futures

async def listen(endpoint, condition):
    with condition:
        condition.wait()
    async with asyncua.Client(url=endpoint) as client:
        print(f'OPC UA client has started {client}.')
        return

def listen_sync(endpoint, condition):
    return asyncio.run(listen(endpoint, condition))

async def run_server(endpoint, condition):
    server = asyncua.Server()
    await server.init()
    server.set_endpoint(endpoint)
    async with server:
        with condition:
            condition.notify_all()
        while True:
            await asyncio.sleep(1)

def run_server_sync(endpoint, condition):
    return asyncio.run(run_server(endpoint, condition))

async def run_all(endpoint):
    loop = asyncio.get_running_loop()
    condition = multiprocessing.Condition()
    with concurrent.futures.ThreadPoolExecutor(2) as pool:
        server_task = loop.run_in_executor(pool, run_server_sync, endpoint, condition)
        client_task = loop.run_in_executor(pool, listen_sync, endpoint, condition)
        await asyncio.gather(*[server_task, client_task])

if __name__ == '__main__':
    asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))

相关参考资料:

英文:

Besides the approved answer provided by Schroeder, an alternative using a ThreadPoolExecutor that worked in my case (where there is substantial multithreaded code):

import asyncio
import asyncua

import multiprocessing
import concurrent.futures

async def listen(endpoint, condition):
    with condition:
        condition.wait()
    async with asyncua.Client(url=endpoint) as client:
        print(f'OPC UA client has started {client}.')
        return

def listen_sync(endpoint, condition):
    return asyncio.run(listen(endpoint, condition))

async def run_server(endpoint, condition):
    server = asyncua.Server()
    await server.init()
    server.set_endpoint(endpoint)
    async with server:
        with condition:
            condition.notify_all()
        while True:
            await asyncio.sleep(1)

def run_server_sync(endpoint, condition):
    return asyncio.run(run_server(endpoint, condition))

async def run_all(endpoint):
    loop = asyncio.get_running_loop()
    condition = multiprocessing.Condition()
    with concurrent.futures.ThreadPoolExecutor(2) as pool:
        server_task = loop.run_in_executor(pool, run_server_sync, endpoint, condition)
        client_task = loop.run_in_executor(pool, listen_sync, endpoint, condition)
        await asyncio.gather(*[server_task, client_task])

if __name__ == '__main__':
    asyncio.run(run_all('opc.tcp://0.0.0.0:5840/freeopcua/server/'))

Relevant references:

huangapple
  • 本文由 发表于 2023年7月27日 19:37:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76779345.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定