一个MQTTnet客户端的生命周期应该如何管理?

huangapple go评论102阅读模式
英文:

How should an MQTTnet client's lifecycle be managed?

问题

分析:
您想知道如何在另一个线程中使用MQTTnet客户端时避免对其进行处理。您的代码涉及使用ManagedMqttClient,并且存在线程安全性问题和IsConnected等问题。

想法:
问题涉及到多个方面,包括线程安全性、对象处理以及代码结构。以下是一些可能的解决方案:

  1. 使用SemaphoreSlim进行线程同步:您可以尝试使用SemaphoreSlim来确保在多线程环境中安全地访问ManagedMqttClient。这可以防止多个线程同时访问该客户端对象,但需要小心处理可能的死锁情况。

  2. 考虑使用MqttClient:根据您提到的帖子,MqttClient可能更适合您的情况。您可以尝试使用单个MqttClient实例来管理连接,并在需要时调用DisconnectAsync()ConnectAsync()以适应更改的经纪人设置。但是,您仍然需要处理IsConnected等问题。

  3. 处理ObjectDisposed异常:您可以在每个调用ManagedMqttClient对象的地方添加try/catch块来捕获ObjectDisposed异常。然后,您可以根据需要创建新的ManagedMqttClient实例。这个方法可以解决处理问题,但不会解决IsConnected等问题。

推测:
最佳方法可能是综合使用上述解决方案,根据您的具体需求来决定。例如,您可以使用SemaphoreSlim确保线程安全性,使用MqttClient来管理连接的生命周期,并处理ObjectDisposed异常以应对处理问题。

汉化后的代码:

英文:

tl;dr: How can I avoid disposing the MQTTnet client while it is in use on another thread? Perhaps this pertains to any IDisposable, but in the case of ManagedMqttClient, there are also calls like IsConnected to worry about before async calls.

Note: We are on MQTTnet v3.0.16. I'm open to answers that include "upgrade to latest, then use approach X"

I inherited an application which uses the ManagedMqttClient and originally replaced/disposed that client when the user made changes to broker settings:

using MQTTnet;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;
using System;
using System.Threading.Tasks;

internal class OriginalApproach
{
    private IManagedMqttClient _mqttClient;
    private static MqttFactory _factory;

    public OriginalApproach()
    {
        _mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(MqttClientDisconnectedEventArgs => OnDisconnect(MqttClientDisconnectedEventArgs));
    }

    //Called if the user changes settings that affect the way we connect
    //to the broker.
    public async void OnSettingsChange()
    {
        if (_mqttClient != null && _mqttClient.IsConnected)
        {
            StopAsync();
            return;
        }

		//Disposal isn't the only thread safety issue
        if (_mqttClient != null && _mqttClient.IsStarted)
        {
            await Reconnect(TimeSpan.FromSeconds(2));
        }
    }

    public async void StopAsync()
    {
        if (_mqttClient != null)
        {
            await _mqttClient.StopAsync();
            await Task.Delay(System.TimeSpan.FromSeconds(2));
        }
    }

    public async void OnDisconnect(MqttClientDisconnectedEventArgs e)
    {
        await Reconnect(TimeSpan.FromSeconds(5));
    }

    public async Task Reconnect(TimeSpan delay)
    {
        StopAsync();
        await Task.Delay(delay);
        Connect();
    }

    public async void Connect()
    {
        await CreateManagedClient();

        try
        {
            if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
            {
                StartAsync();
            }
        }
        catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */  }
        catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */  }
    }

    public async Task<bool> CreateManagedClient()
    {
        try
        {
            if (_mqttClient != null)
                _mqttClient.Dispose();

            _factory = new MqttFactory();
            _mqttClient = _factory.CreateManagedMqttClient();
            await Task.Delay(System.TimeSpan.FromSeconds(2));
        }
        catch (Exception e)
        {
            _mqttClient.Dispose();
            _mqttClient = null;
            return false;
        }
        return true;
    }

    public async void StartAsync()
    {
        MqttApplicationMessage mess = new MqttApplicationMessage();

        mess.Payload = BuildDeathCertificate();
        mess.Topic = "...";

        MqttClientOptionsBuilder clientOptionsBuilder = new MqttClientOptionsBuilder();

        IMqttClientOptions options = clientOptionsBuilder.WithTcpServer("Broker Address", 1234)
                .WithClientId("ABCD")
                .WithCleanSession(true)
                .WithWillMessage(mess)
                .WithKeepAlivePeriod(new System.TimeSpan(1234))
                .WithCommunicationTimeout(new System.TimeSpan(int.MaxValue))
                .Build();

        var managedClientOptions = new ManagedMqttClientOptionsBuilder()
            .WithClientOptions(options)
            .Build();

        if (!_mqttClient.IsStarted && !_mqttClient.IsConnected)
        {
            try
            {
                await _mqttClient.StartAsync(managedClientOptions);
            }
            catch (Exception e) { /* ... */  }
        }
    }

    byte[] BuildDeathCertificate()
    {
        return new byte[1234];
    }

    public async void PublishMessage(byte[] payloadBytes)
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic("...")
            .WithPayload(payloadBytes)
            .WithExactlyOnceQoS()
            .WithRetainFlag(false)
            .Build();

        try
        {
            await _mqttClient.PublishAsync(message);
        }
        catch (NullReferenceException e) { /* ... */  }
    }
}

Obviously, there are numerous thread-safety issues here, and various situations have yielded ObjectDisposed exceptions.

I played with using a single ManagedMqttClient for the lifetime of the application:

internal class SingleClientTest
{
	private IManagedMqttClient _mqttClient;
	public SingleClientTest()
	{
		var factory = new MqttFactory();

		//Used for lifetime of application
		_mqttClient = factory.CreateManagedMqttClient();
	}

	public async void Connect()
	{
		//No longer calling CreateManagedClient() here

		try
		{
			if (!_mqttClient.IsConnected && !_mqttClient.IsStarted)
			{
				StartAsync();
			}
		}
		catch (MQTTnet.Exceptions.MqttCommunicationException ex) { /* ... */  }
		catch (MQTTnet.Exceptions.MqttProtocolViolationException ex) { /* ... */  }
	}

	//The other methods are mostly unchanged
}

Overall it solves the ObjectDisposed issue, but it doesn't address thread-safety of calling IsConnected before the async calls. And, given that MqttFactory exists, reusing one client feels like a hack. Also, I've run into one use case that acts a bit like this issue. Specifically, StartAsync() yielded the exception "The managed client is already started" despite IsStarted being false. I can provide more detail if desired, but for now I'll avoid muddying the question.

I also explored adding locks around calls to the client, but they cannot be used around awaited calls because of deadlock risk.

Finally, I've read through the MQTTnet samples, wiki, a few of the issues, and poked around the code a bit. So far, I haven't found additional concurrency mechanisms in the library.

I'm exploring a few options (perhaps a combination of these):

  1. Using SemaphorSlim around all calls to the client, as described here - It looks like it may work around awaited calls. Not sure if this would introduce new timing issues, and given that we are on .NET Framework, use appears to come with risks
  2. Using MqttClient, as opposed to ManagedMqttClient. This thread makes it sound like MqttClient is preferred. Should I be using it instead? Is it reasonable to use one MqttClient for the life of the app (using DisconnectAsync()/ConnectAsync() when broker settings change)? (This still doesn't address checks like _mqttClient.IsConnected)
  3. Surround every call to the client object with a try/catch for ObjectDisposed exceptions, and replace the client like this:
var oldClient = _mqttClient
_mqttClient = _factory.CreateManagedMqttClient();
oldClient?.Dispose();

Again, this doesn't address checks like _mqttClient.IsConnected.

Just wondering if anyone can offer insight as to the commonly accepted way of doing this.

答案1

得分: 1

以下是已翻译的内容:

我还与实时通信(OPC DA)一起工作,并根据我的经验想提供一些建议。

在多线程应用程序的情况下,使用单个客户端的方法似乎是最合理的。您只需要为您的应用程序创建一个完整功能的包装器,以防止访问“危险”方法,主要提供必要的功能。

您的客户端应在单独的线程中运行,并提供用于启动、关闭和重新启动的方法,以在应用程序的主循环中使用。

此外,客户端还应该集成一个自我控制的看门狗。

更新

如何“解决像在调用IsConnected和StartAsync()之间存在中断的潜在问题”?使用包装器并防止直接调用(IsConnected和StartAsync())MqttClient。包装器为每个危险方法提供单一的入口点,并可以在内部管理所有调用。为此使用调用队列。

看门狗通常是监视操作的计时器,确定状态(如IsConnected),并在必要时保持/重新启动客户端。

英文:

I also work with real-time communications (OPC DA) and would like to provide some recommendations based on my experience.

The approach using a single client looks the most resonable in case of multi-threaded application. Just you need creating a full-function wrapper specially for your application to prevent access to "dangerous" methods and mainly to provide only necessary functionality.

You client should be run in the separate thread and provide methods to start, shutdown and restart to be used in the application main cycle.

Also the client should incorporate a watchdog for the self-control.

UPD

How to "solve issues like the potential for interruption between calling IsConnected and StartAsync()"? Use wrapper and prevent direct call (of IsConnected and StartAsync()) the MqttClient. Wrapper provides a single entry point for each dangerous method and can internally manage all calls. Use a call queue for it.

The watchdog typically is the timer which monitors operation, determines the status (like IsConnected), and holds/restarts the client if necessary.

huangapple
  • 本文由 发表于 2023年7月27日 23:54:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76781508.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定