英文:
Using MQTTNet, how to subscribe to topic, parse then push back in a console application all together
问题
使用MQTTNet,我对C#中的异步编程不太熟悉,所以需要一点帮助。
我想订阅一个MQTT主题(外部代理,如mosquitto),解析消息,并对特定的消息进行回应。这在我的代码中已经相当简单并且已经运行正常
这是我的订阅代码:
public static async Task Subscribe_Topic()
{
/*
* This sample subscribes to a topic.
*/
var mqttFactory = new MqttFactory();
var mqttClient = mqttFactory.CreateMqttClient();
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
await ConnectAndSendMessage(); // 不工作
Console.WriteLine("Received application message.");
Console.WriteLine(e.ClientId);
return Task.CompletedTask;
};
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("my_topic");
})
.Build();
var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
Console.WriteLine("MQTT client subscribed to topic ");
// The response contains additional data sent by the server after subscribing.
//response.DumpToConsole();
//}
}
这是用于连接的部分:
public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("10.77.150.243", 8883)
.WithTimeout(new TimeSpan(600000000))
.WithCredentials("mydevice1", "mypass")
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId("MY_ID")
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
AllowUntrustedCertificates = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
IgnoreCertificateChainErrors = true,
UseTls = true,
})
.Build();
这是我的发布(PUB)函数:
public static async Task ConnectAndSendMessage()
{
var mqttFactory = new MqttFactory();
using (var mqttClient = mqttFactory.CreateMqttClient())
{
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
string tmp = "xxntrary to popular belief, Lorem Ipsum ...";
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("my_other_topic")
.WithRetainFlag(false)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithPayload(tmp)
.Build();
await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
await mqttClient.DisconnectAsync();
Console.WriteLine("MQTT application message is published.");
}
}
现在,如果我收到特定消息,我该如何发送回应呢?
我可以实现一个后台工作线程来替代那个 while()
循环,但我认为已经在这里使用了异步操作,所以可能不需要。
只是不知道如何正确地将它们连接起来。
谢谢你的提示!
英文:
Using MQTTNet, I am not very familiar with async in C# so I need a little bit help.
I want to subscribe to a MQTT topic (external broker like mosquitto), parse messages, and for particular ones push something back. Pretty simple and already working in my pieces
This is my subscribe
public static async Task Subscribe_Topic()
{
/*
* This sample subscribes to a topic.
*/
var mqttFactory = new MqttFactory();
var mqttClient = mqttFactory.CreateMqttClient();
mqttClient.ApplicationMessageReceivedAsync += e =>
{
await ConnectAndSendMessage(); // NOT working
Console.WriteLine("Received application message.");
Console.WriteLine(e.ClientId);
return Task.CompletedTask;
};
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(
f =>
{
f.WithTopic("my_topic");
})
.Build();
var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
Console.WriteLine("MQTT client subscribed to topic ");
// The response contains additional data sent by the server after subscribing.
//response.DumpToConsole();
//}
}
Which works, using this ugly while() I came up with
static async Task Main(string[] args)
{
await Subscribe_Topic();
while (true) ;
}
This is what I use for connect
public static MqttClientOptions mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("10.77.150.243", 8883)
.WithTimeout(new TimeSpan(600000000))
.WithCredentials("mydevice1", "mypass")
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithClientId("MY_ID")
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
AllowUntrustedCertificates = true,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
IgnoreCertificateChainErrors = true,
UseTls = true,
})
.Build();
This my PUB function
public static async Task ConnectAndSendMessage()
{
var mqttFactory = new MqttFactory();
using (var mqttClient = mqttFactory.CreateMqttClient())
{
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
string tmp = @"xxntrary to popular belief, Lorem Ipsum ...";
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("my_other_topic")
.WithRetainFlag(false)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.ExactlyOnce)
.WithPayload(tmp)
.Build();
await mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
await mqttClient.DisconnectAsync();
Console.WriteLine("MQTT application message is published.");
}
}
Now how can I send if I receive certain message?
What I can do is to implement a background worker instead of that while() but I think I already have async stuff here, so may be not needed.
Just don't know how to connect them properly.
Thanks for hints!
答案1
得分: 1
为了使您提供的代码行await ConnectAndSendMessage();
正常工作,它需要位于一个异步函数内。为此,我建议修改代码如下:
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
await ConnectAndSendMessage(); // 仅适用于异步函数
Console.WriteLine("Received application message.");
Console.WriteLine(e.ClientId);
return; // 您可以选择使用 'return;' 或者直接省略 return 语句。
};
您可以通过在参数e
之前添加async
关键字来使匿名函数变为异步。至于返回语句,您可以选择使用return;
或完全省略它;这两种方法都是可以接受的。
英文:
In order for the line of code you provided, await ConnectAndSendMessage();
, to work correctly, it needs to be within an async function. To address this, I recommend modifying the code as shown below:
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
await ConnectAndSendMessage(); // only work for async function
Console.WriteLine("Received application message.");
Console.WriteLine(e.ClientId);
return; // You can either use 'return;' or simply omit the return statement.
};
You can make the anonymous function asynchronous by adding the async
keyword before the argument e
. Regarding the return statement, you have the option to use return;
or omit it entirely; both approaches are acceptable.
答案2
得分: 0
你需要检查你收到的消息,并执行你想要的操作(比如 MQTT 发布)。
mqttClient.ApplicationMessageReceivedAsync += e =>
{
if (e.ApplicationMessage.Topic == "A")
{
// 执行某些操作
}
Console.WriteLine("接收到应用程序消息。");
Console.WriteLine(e.ClientId);
return Task.CompletedTask;
};
MQTTnet wiki(仅适用于版本3):
https://github.com/dotnet/MQTTnet/wiki/Client#consuming-messages
MQTTnet 示例:
https://github.com/dotnet/MQTTnet/blob/master/Samples/Client/Client_Subscribe_Samples.cs
<details>
<summary>英文:</summary>
You need to check the message you received in it, and then perform the action you want to do (such as MQTT Publish).
mqttClient.ApplicationMessageReceivedAsync += e =>
{
if (e.ApplicationMessage.Topic == "A")
{
// Do something
}
Console.WriteLine("Received application message.");
Console.WriteLine(e.ClientId);
return Task.CompletedTask;
};
MQTTnet wiki (only valid for version 3) :
https://github.com/dotnet/MQTTnet/wiki/Client#consuming-messages
MQTTnet Sample :
https://github.com/dotnet/MQTTnet/blob/master/Samples/Client/Client_Subscribe_Samples.cs
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论