英文:
How should an MQTTnet client's lifecycle be managed?
问题
分析:
您想知道如何在另一个线程中使用MQTTnet客户端时避免对其进行处理。您的代码涉及使用ManagedMqttClient
,并且存在线程安全性问题和IsConnected
等问题。
想法:
问题涉及到多个方面,包括线程安全性、对象处理以及代码结构。以下是一些可能的解决方案:
-
使用
SemaphoreSlim
进行线程同步:您可以尝试使用SemaphoreSlim
来确保在多线程环境中安全地访问ManagedMqttClient
。这可以防止多个线程同时访问该客户端对象,但需要小心处理可能的死锁情况。 -
考虑使用
MqttClient
:根据您提到的帖子,MqttClient
可能更适合您的情况。您可以尝试使用单个MqttClient
实例来管理连接,并在需要时调用DisconnectAsync()
和ConnectAsync()
以适应更改的经纪人设置。但是,您仍然需要处理IsConnected
等问题。 -
处理
ObjectDisposed
异常:您可以在每个调用ManagedMqttClient
对象的地方添加try/catch
块来捕获ObjectDisposed
异常。然后,您可以根据需要创建新的ManagedMqttClient
实例。这个方法可以解决处理问题,但不会解决IsConnected
等问题。
推测:
最佳方法可能是综合使用上述解决方案,根据您的具体需求来决定。例如,您可以使用SemaphoreSlim
确保线程安全性,使用MqttClient
来管理连接的生命周期,并处理ObjectDisposed
异常以应对处理问题。
汉化后的代码:
英文:
tl;dr: How can I avoid disposing the MQTTnet client while it is in use on another thread? Perhaps this pertains to any IDisposable
, but in the case of ManagedMqttClient
, there are also calls like IsConnected
to worry about before async calls.
Note: We are on MQTTnet v3.0.16. I'm open to answers that include "upgrade to latest, then use approach X"
I inherited an application which uses the ManagedMqttClient
and originally replaced/disposed that client when the user made changes to broker settings:
using MQTTnet;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Threading.Tasks;
internal class OriginalApproach
{
private IManagedMqttClient _mqttClient;
private static MqttFactory _factory;
public OriginalApproach()
{
_mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedEventArgs => OnDisconnect(MqttClientDisconnectedEventArgs));
}
//Called if the user changes settings that affect the way we connect
//to the broker.
public async void OnSettingsChange()
{
if (_mqttClient != null && _mqttClient.IsConnected)
{
StopAsync();
return;
}
//Disposal isn't the only thread safety issue
if (_mqttClient != null && _mqttClient.IsStarted)
{
await Reconnect(TimeSpan.FromSeconds(2));
}
}
public async void StopAsync()
{
if (_mqttClient != null)
{
await _mqttClient.StopAsync();
await Task.Delay(System.TimeSpan.FromSeconds(2));
}
}
public async void OnDisconnect(MqttClientDisconnectedEventArgs e)
{
await Reconnect(TimeSpan.FromSeconds(5));
}
public async Task Reconnect(TimeSpan delay)
{
StopAsync();
await Task.Delay(delay);
Connect();
}
public async void Connect()
{
await CreateManagedClient();
try
{
if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
{
StartAsync();
}
}
catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */ }
catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */ }
}
public async Task<bool> CreateManagedClient()
{
try
{
if (_mqttClient != null)
_mqttClient.Dispose();
_factory = new MqttFactory();
_mqttClient = _factory.CreateManagedMqttClient();
await Task.Delay(System.TimeSpan.FromSeconds(2));
}
catch (Exception e)
{
_mqttClient.Dispose();
_mqttClient = null;
return false;
}
return true;
}
public async void StartAsync()
{
MqttApplicationMessage mess = new MqttApplicationMessage();
mess.Payload = BuildDeathCertificate();
mess.Topic = "...";
MqttClientOptionsBuilder clientOptionsBuilder = new MqttClientOptionsBuilder();
IMqttClientOptions options = clientOptionsBuilder.WithTcpServer("Broker Address", 1234)
.WithClientId("ABCD")
.WithCleanSession(true)
.WithWillMessage(mess)
.WithKeepAlivePeriod(new System.TimeSpan(1234))
.WithCommunicationTimeout(new System.TimeSpan(int.MaxValue))
.Build();
var managedClientOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(options)
.Build();
if (!_mqttClient.IsStarted && !_mqttClient.IsConnected)
{
try
{
await _mqttClient.StartAsync(managedClientOptions);
}
catch (Exception e) { /* ... */ }
}
}
byte[] BuildDeathCertificate()
{
return new byte[1234];
}
public async void PublishMessage(byte[] payloadBytes)
{
var message = new MqttApplicationMessageBuilder()
.WithTopic("...")
.WithPayload(payloadBytes)
.WithExactlyOnceQoS()
.WithRetainFlag(false)
.Build();
try
{
await _mqttClient.PublishAsync(message);
}
catch (NullReferenceException e) { /* ... */ }
}
}
Obviously, there are numerous thread-safety issues here, and various situations have yielded ObjectDisposed
exceptions.
I played with using a single ManagedMqttClient
for the lifetime of the application:
internal class SingleClientTest
{
private IManagedMqttClient _mqttClient;
public SingleClientTest()
{
var factory = new MqttFactory();
//Used for lifetime of application
_mqttClient = factory.CreateManagedMqttClient();
}
public async void Connect()
{
//No longer calling CreateManagedClient() here
try
{
if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
{
StartAsync();
}
}
catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */ }
catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */ }
}
//The other methods are mostly unchanged
}
Overall it solves the ObjectDisposed
issue, but it doesn't address thread-safety of calling IsConnected
before the async calls. And, given that MqttFactory
exists, reusing one client feels like a hack. Also, I've run into one use case that acts a bit like this issue. Specifically, StartAsync()
yielded the exception "The managed client is already started" despite IsStarted
being false. I can provide more detail if desired, but for now I'll avoid muddying the question.
I also explored adding lock
s around calls to the client, but they cannot be used around awaited calls because of deadlock risk.
Finally, I've read through the MQTTnet samples, wiki, a few of the issues, and poked around the code a bit. So far, I haven't found additional concurrency mechanisms in the library.
I'm exploring a few options (perhaps a combination of these):
- Using
SemaphorSlim
around all calls to the client, as described here - It looks like it may work aroundawait
ed calls. Not sure if this would introduce new timing issues, and given that we are on .NET Framework, use appears to come with risks - Using
MqttClient
, as opposed toManagedMqttClient
. This thread makes it sound likeMqttClient
is preferred. Should I be using it instead? Is it reasonable to use oneMqttClient
for the life of the app (usingDisconnectAsync()/ConnectAsync()
when broker settings change)? (This still doesn't address checks like_mqttClient.IsConnected
) - Surround every call to the client object with a
try/catch
forObjectDisposed
exceptions, and replace the client like this:
var oldClient = _mqttClient
_mqttClient = _factory.CreateManagedMqttClient();
oldClient?.Dispose();
Again, this doesn't address checks like _mqttClient.IsConnected
.
Just wondering if anyone can offer insight as to the commonly accepted way of doing this.
答案1
得分: 1
以下是已翻译的内容:
我还与实时通信(OPC DA)一起工作,并根据我的经验想提供一些建议。
在多线程应用程序的情况下,使用单个客户端的方法似乎是最合理的。您只需要为您的应用程序创建一个完整功能的包装器,以防止访问“危险”方法,主要提供必要的功能。
您的客户端应在单独的线程中运行,并提供用于启动、关闭和重新启动的方法,以在应用程序的主循环中使用。
此外,客户端还应该集成一个自我控制的看门狗。
更新
如何“解决像在调用IsConnected和StartAsync()之间存在中断的潜在问题”?使用包装器并防止直接调用(IsConnected和StartAsync())MqttClient。包装器为每个危险方法提供单一的入口点,并可以在内部管理所有调用。为此使用调用队列。
看门狗通常是监视操作的计时器,确定状态(如IsConnected),并在必要时保持/重新启动客户端。
英文:
I also work with real-time communications (OPC DA) and would like to provide some recommendations based on my experience.
The approach using a single client looks the most resonable in case of multi-threaded application. Just you need creating a full-function wrapper specially for your application to prevent access to "dangerous" methods and mainly to provide only necessary functionality.
You client should be run in the separate thread and provide methods to start, shutdown and restart to be used in the application main cycle.
Also the client should incorporate a watchdog for the self-control.
UPD
How to "solve issues like the potential for interruption between calling IsConnected and StartAsync()"? Use wrapper and prevent direct call (of IsConnected and StartAsync()) the MqttClient. Wrapper provides a single entry point for each dangerous method and can internally manage all calls. Use a call queue for it.
The watchdog typically is the timer which monitors operation, determines the status (like IsConnected), and holds/restarts the client if necessary.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论