英文:
Using MQTTNet, how to subscribe to topic, parse then push back in a console application all together
问题
使用MQTTNet,我对C#中的异步编程不太熟悉,所以需要一点帮助。
我想订阅一个MQTT主题(外部代理,如mosquitto),解析消息,并对特定的消息进行回应。这在我的代码中已经相当简单并且已经运行正常 ![]()
这是我的订阅代码:
public static async Task Subscribe_Topic() 
{
    /*
     * This sample subscribes to a topic.
     */
    var mqttFactory = new MqttFactory();
    var mqttClient = mqttFactory.CreateMqttClient();
    mqttClient.ApplicationMessageReceivedAsync += async e =>
    {
        await ConnectAndSendMessage(); // 不工作
        Console.WriteLine("Received application message.");
        Console.WriteLine(e.ClientId);
        return Task.CompletedTask;
    };
    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(
            f =>
            {
                f.WithTopic("my_topic");
            })
        .Build();
    var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
    Console.WriteLine("MQTT client subscribed to topic ");
    // The response contains additional data sent by the server after subscribing.
    //response.DumpToConsole();
    //}
} 
这是用于连接的部分:
public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
    .WithTcpServer("10.77.150.243", 8883)
    .WithTimeout(new TimeSpan(600000000))
    .WithCredentials("mydevice1", "mypass")
    .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
    .WithClientId("MY_ID")
    .WithTls(new MqttClientOptionsBuilderTlsParameters()
    {
        AllowUntrustedCertificates = true,
        SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
        IgnoreCertificateChainErrors = true,
        UseTls = true,
    })
    .Build();
这是我的发布(PUB)函数:
public static async Task ConnectAndSendMessage()
{
    var mqttFactory = new MqttFactory();
    using (var mqttClient = mqttFactory.CreateMqttClient())
    {
        await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
        string tmp = "xxntrary to popular belief, Lorem Ipsum ...";
        var applicationMessage = new MqttApplicationMessageBuilder()
            .WithTopic("my_other_topic")
            .WithRetainFlag(false)
            .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
            .WithPayload(tmp)
            .Build();
        await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
        await mqttClient.DisconnectAsync();
        Console.WriteLine("MQTT application message is published.");
    }
}
现在,如果我收到特定消息,我该如何发送回应呢?
我可以实现一个后台工作线程来替代那个 while() 循环,但我认为已经在这里使用了异步操作,所以可能不需要。
只是不知道如何正确地将它们连接起来。
谢谢你的提示!
英文:
Using MQTTNet, I am not very familiar with async in C# so I need a little bit help.
I want to subscribe to a MQTT topic (external broker like mosquitto), parse messages, and for particular ones push something back. Pretty simple and already working in my pieces ![]()
This is my subscribe
public static async Task Subscribe_Topic() 
{
    /*
     * This sample subscribes to a topic.
     */
    var mqttFactory = new MqttFactory();
    var mqttClient = mqttFactory.CreateMqttClient();
    mqttClient.ApplicationMessageReceivedAsync += e =>
        {
            await ConnectAndSendMessage(); // NOT working
            Console.WriteLine("Received application message.");
            Console.WriteLine(e.ClientId);
            return Task.CompletedTask;
        };
    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
    var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
        .WithTopicFilter(
            f =>
            {
                f.WithTopic("my_topic");
            })
        .Build();
    var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
    Console.WriteLine("MQTT client subscribed to topic ");
    // The response contains additional data sent by the server after subscribing.
    //response.DumpToConsole();
    //}
} 
Which works, using this ugly while() I came up with ![]()
static async Task Main(string[] args)
{
    await Subscribe_Topic();
    while (true) ;
}
This is what I use for connect
   public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("10.77.150.243", 8883)
        .WithTimeout(new TimeSpan(600000000))
.WithCredentials("mydevice1", "mypass")
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId("MY_ID")
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
    AllowUntrustedCertificates = true,
    SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
    IgnoreCertificateChainErrors = true,
    UseTls = true,
})
.Build();
This my PUB function
    public static async Task ConnectAndSendMessage()
    {
        var mqttFactory = new MqttFactory();
        using (var mqttClient = mqttFactory.CreateMqttClient())
        {
            await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
            string tmp = @"xxntrary to popular belief, Lorem Ipsum ...";
            var applicationMessage = new MqttApplicationMessageBuilder()
                .WithTopic("my_other_topic")
                .WithRetainFlag(false)
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
                .WithPayload(tmp)
                .Build();
            await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
            await mqttClient.DisconnectAsync();
            Console.WriteLine("MQTT application message is published.");
        }
    }
Now how can I send if I receive certain message?
What I can do is to implement a background worker instead of that while() but I think I already have async stuff here, so may be not needed.
Just don't know how to connect them properly.
Thanks for hints!
答案1
得分: 1
为了使您提供的代码行await ConnectAndSendMessage();正常工作,它需要位于一个异步函数内。为此,我建议修改代码如下:
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
    await ConnectAndSendMessage(); // 仅适用于异步函数
    Console.WriteLine("Received application message.");
    Console.WriteLine(e.ClientId);
    return; // 您可以选择使用 'return;' 或者直接省略 return 语句。
};
您可以通过在参数e之前添加async关键字来使匿名函数变为异步。至于返回语句,您可以选择使用return;或完全省略它;这两种方法都是可以接受的。
英文:
In order for the line of code you provided, await ConnectAndSendMessage();, to work correctly, it needs to be within an async function. To address this, I recommend modifying the code as shown below:
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
    await ConnectAndSendMessage(); // only work for async function
    Console.WriteLine("Received application message.");
    Console.WriteLine(e.ClientId);
    return; // You can either use 'return;' or simply omit the return statement.
};
You can make the anonymous function asynchronous by adding the async keyword before the argument e. Regarding the return statement, you have the option to use return; or omit it entirely; both approaches are acceptable.
答案2
得分: 0
你需要检查你收到的消息,并执行你想要的操作(比如 MQTT 发布)。
mqttClient.ApplicationMessageReceivedAsync += e =>
    {
        if (e.ApplicationMessage.Topic == "A")
        {
            // 执行某些操作
        }
        Console.WriteLine("接收到应用程序消息。");
        Console.WriteLine(e.ClientId);
        return Task.CompletedTask;
    };
MQTTnet wiki(仅适用于版本3):
https://github.com/dotnet/MQTTnet/wiki/Client#consuming-messages
MQTTnet 示例:
https://github.com/dotnet/MQTTnet/blob/master/Samples/Client/Client_Subscribe_Samples.cs
<details>
<summary>英文:</summary>
You need to check the message you received in it, and then perform the action you want to do (such as MQTT Publish).
mqttClient.ApplicationMessageReceivedAsync += e =>
    {
        if (e.ApplicationMessage.Topic == "A")
		{
			// Do something 
		}
        Console.WriteLine("Received application message.");
        Console.WriteLine(e.ClientId);
        return Task.CompletedTask;
    };
MQTTnet wiki (only valid for version 3) :
https://github.com/dotnet/MQTTnet/wiki/Client#consuming-messages
MQTTnet Sample :
https://github.com/dotnet/MQTTnet/blob/master/Samples/Client/Client_Subscribe_Samples.cs
</details>
				通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论