如何在控制台应用程序中一起订阅主题、解析然后推送,使用MQTTNet。

huangapple go评论68阅读模式
英文:

Using MQTTNet, how to subscribe to topic, parse then push back in a console application all together

问题

使用MQTTNet,我对C#中的异步编程不太熟悉,所以需要一点帮助。

我想订阅一个MQTT主题(外部代理,如mosquitto),解析消息,并对特定的消息进行回应。这在我的代码中已经相当简单并且已经运行正常 如何在控制台应用程序中一起订阅主题、解析然后推送,使用MQTTNet。

这是我的订阅代码:

public static async Task Subscribe_Topic() 
{
    /*
     * This sample subscribes to a topic.
     */

    var mqttFactory = new MqttFactory();

    var mqttClient = mqttFactory.CreateMqttClient();

    mqttClient.ApplicationMessageReceivedAsync += async e =>
    {
        await ConnectAndSendMessage(); // 不工作

        Console.WriteLine("Received application message.");
        Console.WriteLine(e.ClientId);
        return Task.CompletedTask;
    };

    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(
            f =>
            {
                f.WithTopic("my_topic");
            })
        .Build();

    var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

    Console.WriteLine("MQTT client subscribed to topic ");

    // The response contains additional data sent by the server after subscribing.
    //response.DumpToConsole();
    //}
} 

这是用于连接的部分:

public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
    .WithTcpServer("10.77.150.243", 8883)
    .WithTimeout(new TimeSpan(600000000))
    .WithCredentials("mydevice1", "mypass")
    .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
    .WithClientId("MY_ID")
    .WithTls(new MqttClientOptionsBuilderTlsParameters()
    {
        AllowUntrustedCertificates = true,
        SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
        IgnoreCertificateChainErrors = true,
        UseTls = true,
    })
    .Build();

这是我的发布(PUB)函数:

public static async Task ConnectAndSendMessage()
{
    var mqttFactory = new MqttFactory();

    using (var mqttClient = mqttFactory.CreateMqttClient())
    {
        await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

        string tmp = "xxntrary to popular belief, Lorem Ipsum ...";

        var applicationMessage = new MqttApplicationMessageBuilder()
            .WithTopic("my_other_topic")
            .WithRetainFlag(false)
            .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
            .WithPayload(tmp)
            .Build();

        await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

        await mqttClient.DisconnectAsync();

        Console.WriteLine("MQTT application message is published.");
    }
}

现在,如果我收到特定消息,我该如何发送回应呢?

我可以实现一个后台工作线程来替代那个 while() 循环,但我认为已经在这里使用了异步操作,所以可能不需要。

只是不知道如何正确地将它们连接起来。

谢谢你的提示!

英文:

Using MQTTNet, I am not very familiar with async in C# so I need a little bit help.

I want to subscribe to a MQTT topic (external broker like mosquitto), parse messages, and for particular ones push something back. Pretty simple and already working in my pieces 如何在控制台应用程序中一起订阅主题、解析然后推送,使用MQTTNet。

This is my subscribe

public static async Task Subscribe_Topic() 
{
    /*
     * This sample subscribes to a topic.
     */

    var mqttFactory = new MqttFactory();

    var mqttClient = mqttFactory.CreateMqttClient();

    mqttClient.ApplicationMessageReceivedAsync += e =>
        {
            await ConnectAndSendMessage(); // NOT working

            Console.WriteLine("Received application message.");
            Console.WriteLine(e.ClientId);
            return Task.CompletedTask;
        };

    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(
            f =>
            {
                f.WithTopic("my_topic");
            })
        .Build();

    var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

    Console.WriteLine("MQTT client subscribed to topic ");

    // The response contains additional data sent by the server after subscribing.
    //response.DumpToConsole();
    //}
} 

Which works, using this ugly while() I came up with 如何在控制台应用程序中一起订阅主题、解析然后推送,使用MQTTNet。

static async Task Main(string[] args)
{

    await Subscribe_Topic();


    while (true) ;


}

This is what I use for connect

   public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("10.77.150.243", 8883)
        .WithTimeout(new TimeSpan(600000000))
.WithCredentials("mydevice1", "mypass")
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId("MY_ID")
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
    AllowUntrustedCertificates = true,
    SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
    IgnoreCertificateChainErrors = true,
    UseTls = true,
})
.Build();

This my PUB function

    public static async Task ConnectAndSendMessage()
    {
        var mqttFactory = new MqttFactory();

        using (var mqttClient = mqttFactory.CreateMqttClient())
        {

            await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);


            string tmp = @"xxntrary to popular belief, Lorem Ipsum ...";

            var applicationMessage = new MqttApplicationMessageBuilder()
                .WithTopic("my_other_topic")
                .WithRetainFlag(false)
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
                .WithPayload(tmp)
                .Build();

            await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

            await mqttClient.DisconnectAsync();

            Console.WriteLine("MQTT application message is published.");
        }

    }

Now how can I send if I receive certain message?

What I can do is to implement a background worker instead of that while() but I think I already have async stuff here, so may be not needed.

Just don't know how to connect them properly.

Thanks for hints!

答案1

得分: 1

为了使您提供的代码行await ConnectAndSendMessage();正常工作,它需要位于一个异步函数内。为此,我建议修改代码如下:

mqttClient.ApplicationMessageReceivedAsync += async e =>
{
    await ConnectAndSendMessage(); // 仅适用于异步函数
    Console.WriteLine("Received application message.");
    Console.WriteLine(e.ClientId);
    return; // 您可以选择使用 'return;' 或者直接省略 return 语句。
};

您可以通过在参数e之前添加async关键字来使匿名函数变为异步。至于返回语句,您可以选择使用return;或完全省略它;这两种方法都是可以接受的。

英文:

In order for the line of code you provided, await ConnectAndSendMessage();, to work correctly, it needs to be within an async function. To address this, I recommend modifying the code as shown below:

mqttClient.ApplicationMessageReceivedAsync += async e =>
{
    await ConnectAndSendMessage(); // only work for async function
    Console.WriteLine("Received application message.");
    Console.WriteLine(e.ClientId);
    return; // You can either use 'return;' or simply omit the return statement.
};

You can make the anonymous function asynchronous by adding the async keyword before the argument e. Regarding the return statement, you have the option to use return; or omit it entirely; both approaches are acceptable.

答案2

得分: 0

你需要检查你收到的消息,并执行你想要的操作(比如 MQTT 发布)。

mqttClient.ApplicationMessageReceivedAsync += e =>
    {
        if (e.ApplicationMessage.Topic == "A")
        {
            // 执行某些操作
        }

        Console.WriteLine("接收到应用程序消息。");
        Console.WriteLine(e.ClientId);
        return Task.CompletedTask;
    };

MQTTnet wiki(仅适用于版本3):

https://github.com/dotnet/MQTTnet/wiki/Client#consuming-messages

MQTTnet 示例:

https://github.com/dotnet/MQTTnet/blob/master/Samples/Client/Client_Subscribe_Samples.cs


<details>
<summary>英文:</summary>

You need to check the message you received in it, and then perform the action you want to do (such as MQTT Publish).
mqttClient.ApplicationMessageReceivedAsync += e =&gt;
    {
        if (e.ApplicationMessage.Topic == &quot;A&quot;)
		{
			// Do something 
		}

        Console.WriteLine(&quot;Received application message.&quot;);
        Console.WriteLine(e.ClientId);
        return Task.CompletedTask;
    };

MQTTnet wiki (only valid for version 3) :

https://github.com/dotnet/MQTTnet/wiki/Client#consuming-messages

MQTTnet Sample :

https://github.com/dotnet/MQTTnet/blob/master/Samples/Client/Client_Subscribe_Samples.cs



</details>



huangapple
  • 本文由 发表于 2023年7月3日 23:09:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/76606018.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定