opcua-asyncio解决看门狗循环中的错误

huangapple go评论55阅读模式
英文:

opcua-asyncio solution to error in watchdog loop

问题

我遇到的问题是,我的客户端与 OPC UA 服务器之间的连接每隔几天就会中断。服务器每两秒发布新值,客户端通过 pub sub 接收并存储为 JSON 格式。

对于客户端,我使用的是搭载 Python 和 opcua-asyncio 的树莓派 4。服务器和客户端在同一网络内。服务器通过 KEPServerEX6 运行。

客户端失去连接几分钟实际上并不是一个大问题。唯一的问题是,它在发生错误后完全停止运行,我必须手动重新启动它。

我已经在 SubHandler 中试过很多方法,试图在出现错误消息时重新启动程序。但这当然不是一个好的解决办法,而且并不总是奏效。

以下是代码的一部分:

class SubHandler:
    # 处理订阅接收的数据
    def __init__(self, crude, acid, base, water, waste, waste_mass, centri_1_IN, centri_2_IN, centri_1_OUT, centri_2_OUT):
        self.crude = crude
        self.acid = acid

    def datachange_notification(self, node: Node, val, data):
        if node == self.crude:
            write_file("crude_flow", val)
        elif node == self.acid:
            write_file("acid_flow", val)

    def event_notification(self, event: ua.EventNotificationList):
        _logger.warning("event_notification %s", status)
        pass

    def status_change_notification(self, status: ua.StatusChangeNotification):
        _logger.warning("status_change_notification %s", status)
        if status != 0:
            _logger.warning("发生错误,正在重新启动程序")
            python = sys.executable
            os.execl(python, python, *sys.argv)
        pass

# ...

async def main():
    while True:
        client = Client(url=url_scada)
        try:
            async with client:
                _logger.warning("已连接")

                crude = client.get_node(nodeID_crude)
                acid = client.get_node(nodeID_acid)

                nodeIDs = [crude, acid]

                handler = SubHandler(crude, acid)
                sub = await client.create_subscription(500, handler)

                # 创建过滤器
                filter = ua.DataChangeFilter(Trigger=ua.DataChangeTrigger.StatusValueTimestamp)

                # 添加带有过滤器的监视项
                for node_id in nodeIDs:
                    handle = await sub._subscribe(
                        node_id,
                        ua.AttributeIds.Value,
                        mfilter=filter
                    )

                while True:
                    await asyncio.sleep(0.5)
                    await client.check_connection()
        except(ConnectionError, ua.Ua.Error):
            _logger.warning("1 秒后重新连接")
            await asyncio.sleep(1)

以下是部分错误信息:

错误:asyncua.client.client:监视循环中的错误
Traceback (most recent call last):
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/client.py", line 457, in _monitor_server_loop
    _ = await self.nodes.server_state.read_value()
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 179, in read_value
    result = await self.read_data_value()
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 190, in read_data_value
    return await self.read_attribute(ua.AttributeIds.Value, None, raise_on_bad_status)
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 304, in read_attribute
    result = await self.session.read(params)
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/ua_client.py", line 397, in read
    data = await self.protocol.send_request(request)
  File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/ua_client.py", line 160, in send_request
    data = await asyncio.wait_for(self._send_request(request, timeout, message_type), timeout if timeout else None)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 423, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
INFO:asyncua.common.subscription:发布回调以结果调用: PublishResult(SubscriptionId=9, AvailableSequenceNumbers=[], MoreNotifications=True, NotificationMessage_=NotificationMessage(SequenceNumber=0, PublishTime=datetime.datetime(2023, 5, 30, 17, 39, 59, 457322), NotificationData=[StatusChangeNotification(Status=2148270080, DiagnosticInfo_=DiagnosticInfo(SymbolicId=None, NamespaceURI=None, Locale=None, LocalizedText=None, AdditionalInfo=None, InnerStatusCode=None, InnerDiagnosticInfo=None))]), Results=[], DiagnosticInfos=[])
警告:__main__:status_change_notification 2148270080
警告:__main__:发生严重状态更改,正在重新启动程序
错误:asyncua.common.subscription:调用状态更改处理程序时出现异常
英文:

I have the problem that the connection between my client and an OPC UA server is interrupted every few days. The server publishes new values every two seconds, which the client receives via a pub sub and stores in a JSON.

For the client I use a Raspberry Pi 4 with Python and opcua-asyncio. Server and client are in the same network. The server runs via KEPServerEX6.

That the client loses the connection for a few minutes is actually not a big problem. The only problem is that it stops running completely after the error and I have to restart it manually.

I have already tried more or less desperately in the SubHandler to restart the program in case of an error message. But of course it is not nice and does not always work.

Here is some part of the code (or most of it).

class SubHandler:
# Handle data that is received for the subscriptions
def __init__(self, crude, acid, base, water, waste, waste_mass, centri_1_IN, centri_2_IN, centri_1_OUT, centri_2_OUT):
self.crude = crude
self.acid = acid
def datachange_notification(self, node: Node, val, data):
if node == self.crude:
write_file("crude_flow", val)
elif node == self.acid:
write_file("acid_flow", val)
def event_notification(self, event: ua.EventNotificationList):
_logger.warning("event_notification %s", status)
pass
def status_change_notification(self, status: ua.StatusChangeNotification):
_logger.warning("status_change_notification %s", status)
if status != 0:
_logger.warning("Some bad status change, restarting program")
python = sys.executable
os.execl(python, python, *sys.argv)
pass
...
async def main():
while True:
client = Client(url=url_scada)
try:
async with client:
_logger.warning("Connected")
crude = client.get_node(nodeID_crude)
acid = client.get_node(nodeID_acid)
nodeIDs = [crude, acid]
handler = SubHandler(crude, acid)
sub = await client.create_subscription(500, handler)
# Filter erstellen
filter = ua.DataChangeFilter(Trigger=ua.DataChangeTrigger.StatusValueTimestamp)
# MonitoredItems mit Filter hinzufügen
for node_id in nodeIDs:
handle = await sub._subscribe(
node_id,
ua.AttributeIds.Value,
mfilter=filter
)
while True:
await asyncio.sleep(0.5)
await client.check_connection()
except(ConnectionError, ua.Ua.Error):
_logger.warning("Reconnecting in 1 second")
await asyncio.sleep(1)
...

And here is some part of the error. During the error handling other errors occur and ultimately the program stops.

ERROR:asyncua.client.client:Error in watchdog loop
Traceback (most recent call last):
File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/client.py", line 457, in _monitor_server_loop
_ = await self.nodes.server_state.read_value()
File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 179, in read_value
result = await self.read_data_value()
File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 190, in read_data_value
return await self.read_attribute(ua.AttributeIds.Value, None, raise_on_bad_status)
File "/home/pi/.local/lib/python3.7/site-packages/asyncua/common/node.py", line 304, in read_attribute
result = await self.session.read(params)
File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/ua_client.py", line 397, in read
data = await self.protocol.send_request(request)
File "/home/pi/.local/lib/python3.7/site-packages/asyncua/client/ua_client.py", line 160, in send_request
data = await asyncio.wait_for(self._send_request(request, timeout, message_type), timeout if timeout else None)
File "/usr/lib/python3.7/asyncio/tasks.py", line 423, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
INFO:asyncua.common.subscription:Publish callback called with result: PublishResult(SubscriptionId=9, AvailableSequenceNumbers=[], MoreNotifications=True, NotificationMessage_=NotificationMessage(SequenceNumber=0, PublishTime=datetime.datetime(2023, 5, 30, 17, 39, 59, 457322), NotificationData=[StatusChangeNotification(Status=2148270080, DiagnosticInfo_=DiagnosticInfo(SymbolicId=None, NamespaceURI=None, Locale=None, LocalizedText=None, AdditionalInfo=None, InnerStatusCode=None, InnerDiagnosticInfo=None))]), Results=[], DiagnosticInfos=[])
WARNING:__main__:status_change_notification 2148270080
WARNING:__main__:Some bad status change, restarting program
ERROR:asyncua.common.subscription:Exception calling status change handler

I tried to find a solution in the opcua-asyncio library to handle the error. But could not really find anything. Would be really nice if anyone could help.

答案1

得分: 1

看门狗是一个后台任务,用于保持连接活动。因此,如果它出现错误,连接将中断。在你的代码中没有看门狗,你可能不会注意到连接中断。

也许在asyncua中的示例不完整,因为它缺少一个异常处理程序:

except (concurrent.futures.TimeoutError, ConnectionError, ua.Ua.Error):
    _logger.warning("在1秒后重新连接")
    await asyncio.sleep(1)
英文:

The watchdog is a background task, to keep the connection alive. So if it has a error, the connection is broken. Without the watchdog you wouldn't notice a broken connection, in your code.

Maybe the example in asyncua is not complete because it misses a exception, try this exception handler:

except(concurrent.futures.TimeoutError, ConnectionError, ua.Ua.Error):
_logger.warning("Reconnecting in 1 second")
await asyncio.sleep(1)

huangapple
  • 本文由 发表于 2023年6月2日 06:01:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/76385997.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定