英文:
Routing device to cloud messages from Azure IoT Hub to Azure Blob Storage
问题
有一个物联网设备每5分钟向Azure IoT Hub发送消息。我想将每条消息保存在Azure Blob Storage容器中。在Azure门户中,在消息路由
部分,我创建了一个新的路由,如下所示。接下来,我创建了一个自定义端点,如下所示。通过这样做,我能够将消息保存在Blob Storage中。我想要做的是创建一个动态路由。我希望每条发送的消息都保存在这个路径中:{iothub}/{deviceId}/{messageParameter}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}
,其中deviceId
是在Azure IoT Hub中注册的设备的名称,而customValue
是IoT设备发送到Azure IoT Hub的JSON消息中的属性值。这是我用来发送消息的代码:
// 你的 C# 代码这里
message
输入是一个JSON字符串,如下所示:
// 你的 JSON 数据这里
这是可能的吗,还是我需要手动保存消息在Azure Blob Storage容器中?
注意:我的目标是保存设备发送的消息,并随后能够读取由特定设备发送的消息(因此我将deviceId放在路径中),与消息中的特定参数(messageParameter)相关。
英文:
I have an IoT device sending messages to Azure IoT Hub every 5 minutes. I would like to save every message inside an Azure Blob Storage Container. Inside the azure portal, in the Message routing
section I created a new route like this one.
Next, I created a custom endpoint like this one:
By doing so I am able to save the messages inside the blob storage.
What I would like to do, is to create a dynamic route. I would like every sent message to be saved within this path: {iothub}/{deviceId}/{messageParameter}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}
where deviceId
is the name of the device registered in Azure IoT Hub and customValue
is a property value from the json message that the IoT device send to the Azure IoT Hub.
This is the code that I use to send messages:
public class Sender : ISender
{
private static DeviceClient _deviceClient;
public void SendDeviceToCloudMessage(string deviceId, string iotHubUri, string deviceKey, string message)
{
_deviceClient = DeviceClient.Create(iotHubUri,
new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey), TransportType.Mqtt);
var twin = _deviceClient.GetTwinAsync().ConfigureAwait(false).GetAwaiter().GetResult();
var desiredProperties = twin.Properties.Desired;
var messageObj = JObject.Parse(message);
if (desiredProperties.Contains("TelemetryData"))
{
var telemetryData = (TwinCollection)desiredProperties["TelemetryData"];
telemetryData["Temperature"] = messageObj["Temperature"];
telemetryData["Humidity"] = messageObj["Humidity"];
telemetryData["TimeStamp"] = messageObj["TimeStamp"];
}
else
{
var telemetryData = new TwinCollection();
telemetryData["Temperature"] = messageObj["Temperature"];
telemetryData["Humidity"] = messageObj["Humidity"];
telemetryData["TimeStamp"] = messageObj["TimeStamp"];
desiredProperties["TelemetryData"] = telemetryData;
}
// Update the reported properties with the updated desired properties
var reportedProperties = new TwinCollection();
reportedProperties["TelemetryData"] = desiredProperties["TelemetryData"];
_deviceClient.UpdateReportedPropertiesAsync(reportedProperties).ConfigureAwait(false).GetAwaiter().GetResult();
using var iotMessage = new Message(Encoding.UTF8.GetBytes(message))
{
ContentEncoding = "utf-8",
ContentType = "application/json",
};
// Submit the message to the hub.
_deviceClient.SendEventAsync(iotMessage).ConfigureAwait(false).GetAwaiter().GetResult();
}
}
The message
input is a json string like this one:
{
"Temperature": 20,
"Humidity": 50,
"TimeStamp": "2023-02-26 14:02:59.7715110 +00:00",
"MessageId": "MessageIdentifier"
}
Is this possible or I need to manually save the message in Azure Blob Storage Container?
Note: My goal is to save the messages sent by the device and subsequently be able to read the messages sent by a specific device (for this reason I put the deviceId in the path) relating to a specific parameter found within the message sent (messageParameter)
答案1
得分: 1
如前面的答案所提到的,在自定义端点的Blob存储中,没有为您的需求提供内置的IoT Hub功能。以下示例显示了一种解决方案,其中使用Azure Event Grid Pub/Sub模型将设备遥测数据推送到Blob存储:
订阅者是一个Azure EventGridTrigger函数,处理Blob名称等。
#r "Newtonsoft.Json"
#r "System.Text.Json"
#r "Azure.Storage.Blobs"
#r "System.Memory.Data"
#r "Azure.Core"
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Blobs.Models;
using System.Collections.Generic;
public static async Task Run(JObject eventGridEvent, BlobContainerClient blobContainer, ILogger log)
{
log.LogInformation(eventGridEvent.ToString());
string iothubname = eventGridEvent["data"]?["properties"]?["iothubname"]?.Value<string>() ?? "unknown";
string deviceId = eventGridEvent["data"]?["systemProperties"]?["iothub-connection-device-id"]?.Value<string>() ?? "unknown";
DateTime dt = eventGridEvent["data"]?["systemProperties"]?["iothub-enqueuedtime"]?.Value<DateTime>() ?? default(DateTime);
string blobName = $"{iothubname}/{deviceId}/{dt.ToString("yyyy/MM/dd/HH/mm/ss")}.json";
log.LogInformation($"blobname = {blobName}");
var tags = new Dictionary<string, string>();
tags.Add("iothubname", iothubname);
tags.Add("deviceId", deviceId);
tags.Add("value", "CustomValue");
var blobClient = blobContainer.GetBlobClient(blobName);
await blobClient.UploadAsync(BinaryData.FromString(eventGridEvent["data"]?["body"]?.ToString() ?? eventGridEvent.ToString()), overwrite: true);
//blobClient.SetTags(tags); // 用于 'blob index tags' 的选项
}
请注意,Azure IoT Hub路由的消息可以通过设备双元(例如,$twin.tags.field或$twin.properties.desired.value)中有用的值进行增强,并在订阅者中作为Blob名称的一部分或Blob索引标签中使用它们。
英文:
As answers mentioned, there is no built-in IoT Hub feature for your needs in the custom endpoint for blob storage. The following example shows a workaround where the Device Telemetry Data are pushed to the Blob Storage using the Azure Event Grid Pub/Sub model:
The subscriber is an Azure EventGridTrigger Function with handling a blob name, etc.
<!-- language-all: lang-csharp -->
#r "Newtonsoft.Json"
#r "System.Text.Json"
#r "Azure.Storage.Blobs"
#r "System.Memory.Data"
#r "Azure.Core"
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Blobs.Models;
using System.Collections.Generic;
public static async Task Run(JObject eventGridEvent, BlobContainerClient blobContainer, ILogger log)
{
log.LogInformation(eventGridEvent.ToString());
string iothubname = eventGridEvent["data"]?["properties"]?["iothubname"]?.Value<string>() ?? "unknown";
string deviceId = eventGridEvent["data"]?["systemProperties"]?["iothub-connection-device-id"]?.Value<string>() ?? "unknown";
DateTime dt = eventGridEvent["data"]?["systemProperties"]?["iothub-enqueuedtime"]?.Value<DateTime>() ?? default(DateTime);
string blobName = $"{iothubname}/{deviceId}/{dt.ToString("yyyy/MM/dd/HH/mm/ss")}.json";
log.LogInformation($"blobname = {blobName}");
var tags = new Dictionary<string, string>();
tags.Add("iothubname", iothubname);
tags.Add("deviceId", deviceId);
tags.Add("value", "CustomValue");
var blobClient = blobContainer.GetBlobClient(blobName);
await blobClient.UploadAsync(BinaryData.FromString(eventGridEvent["data"]?["body"]?.ToString() ?? eventGridEvent.ToString() ), overwrite: true);
//blobClient.SetTags(tags); // option for 'blob index tags'
}
Note, that the Azure IoT Hub routed message can be enriched by useful values from device twin (for example, $twin.tags.field or $twin.properties.desired.value) and used them in the subscriber as a part of the blob name or in the blob index tags.
答案2
得分: 0
这在IoT Hub的存储端点功能中是不可能的。在创建存储端点时,您只能使用以下标记(并且必须全部使用):
- {iothub}
- {partition}
- {YYYY}
- {MM}
- {DD}
- {HH}
- {mm}
要在路径中包含 deviceId
和 customValue
,您必须自行实现该功能。一个选项是编写Azure函数将消息存储在正确的路径中。
英文:
This is not possible by leveraging the storage endpoint functionality in IoT Hub. When creating a storage endpoint, you can only use the following tokens (and have to use all of them):
- {iothub}
- {partition}
- {YYYY}
- {MM}
- {DD}
- {HH}
- {mm}
To include deviceId
and customValue
in the path, you must implement that functionality yourself. One option would be to write an Azure Function to store the message in the correct path.
答案3
得分: 0
你可以考虑使用路由查询和多个端点来实现这个目标(https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-routing-query-syntax)。然而,看起来你需要为每个设备使用单独的 Blob 存储端点,并且每个 Hub 限制为 10 个自定义端点。
我注意到你提出的路径中包括了 {deviceId}
和 {partition}
- 设备与分区是“粘性”的,所以路径段中存在一些冗余。
总的来说,如果你想要对存储值的路径有如此精细的控制,你可能最好将消息路由到 Azure 函数,然后在函数中编写逻辑,按照你需要的结构将消息保存到 Blob 存储中。
你希望使用特定结构将消息存储到 Blob 存储中的原因是什么?
英文:
You could consider using routing queries and multiple endpoints to achieve this (https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-routing-query-syntax). However, it looks like you'd need a separate blob storage endpoint for each device, and you're limited to 10 custom endpoints per hub.
I notice that your proposed path includes both {deviceId}
and {partition}
- devices are "sticky" to partitions, so there's some redundancy in the path segments.
Overall, if you want that level of control of the path where the values are stored, you might be better off routing your messages to an Azure Function and then having some logic in the function that saves messages to blob storage in the structure you require.
What's your reason for wanting to store the messages in blob storage using that particular structure?
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论