Duplicate Data Issue when Storing JSON Messages in Azure Storage Queue using Azure Functions in Go

huangapple go评论88阅读模式
英文:

Duplicate Data Issue when Storing JSON Messages in Azure Storage Queue using Azure Functions in Go

问题

现在我正在尝试使用Azure Functions和Go开发一个应用程序,利用自定义处理程序(Custom Handler)。
我的应用程序架构如下所示。

image

问题似乎出在将JSON消息存储在Azure存储队列中。
存储在队列中的消息在其"data"和"metadata"中有重复数据。

我的第一个函数是由Slack的HTTP响应触发的,将消息入队到队列中,代码如下。

func function1(c *gin.Context) {
	log.Printf("Start enqueueMessage")

    // 这里我使用一个JSON字符串而不是实际的HTTP请求,以便更容易理解
    stri := "{\"aaa\" : \"aaa\"}"
	base64BodyString := base64.StdEncoding.EncodeToString([]byte(stri))

	_ulr, err := url.Parse(fmt.Sprintf("https://%s.queue.core.windows.net/%s", accountName, queueName))
	if err != nil {
		log.Fatal("Error parsing url: ", err)
	}

	credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
	if err != nil {
		log.Fatal("Error creating shared key credential: ", err)
	}

	queueUrl := azqueue.NewQueueURL(*_ulr, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
	ctx := context.TODO()

	messageUrl := queueUrl.NewMessagesURL()
	_, err = messageUrl.Enqueue(ctx, base64BodyString, 0, 0)
	if err != nil {
		log.Fatal("Error enqueueing message: ", err)
	}
	log.Printf("Message enqueued successfully")
}

然后,我的第二个函数是由队列触发器触发的,处理从队列中出队的消息,代码如下。

func function2(c *gin.Context) {

	log.Printf("Start function")

    type QueueMessage struct {
	  Data struct {
		 MyQueueItem string `json:"myQueueItem"`
	  } `json:"data"`
	  Metadata map[string]interface{}
    }

	var queueMessage QueueMessage
	err := c.BindJSON(&queueMessage)
	if err != nil {
		log.Printf("Failed to bind request body: %v", err)
		return
	}

        log.Printf("Queue message : %v", queueMessage)
}

输出的数据如下所示。

{
  "message": "{\"token\": \"aaa\"}",
  "metadata": {
    "DequeueCount": 1,
    "ExpirationTime": "2023-08-01T13:25:29+00:00",
    "Id": "e2b5f133-cc27-46d5-92a8-0305251e65e7",
    "InsertionTime": "2023-07-25T13:25:29+00:00",
    "NextVisibleTime": "2023-07-25T13:35:31+00:00",
    "PopReceipt": "AgAAAAMAAAAAAAAAifLM4fy+2QE=",
    "sys": {
      "MethodName": "function2",
      "RandGuid": "9cdd85fc-9522-437a-8b2c-cc4ac4919ad4",
      "UtcNow": "2023-07-25T13:25:31.364779Z"
    },
    "token": "aaa"
  }
}

当我存储类似于"hoge"的消息而不是JSON消息时,它不会包含重复数据。

有人对如何解决这个问题有什么想法吗?

我希望能够在Azure存储队列中存储没有重复数据的消息。

英文:

Now I am trying to develop an application using Azure Functions and Go, utilizing Custom Handler.
The architecture of my application is like this.

image

The problem seems to lie in storing a JSON message in an Azure Storage Queue.
The message stored in queue has a duplicate data in its “data” and “metadata” .

My function 1, which is triggered by a HTTP Response from Slack and enqueues a message to the queue, is like this.

func function1(c *gin.Context) {
	log.Printf(“Start enqueueMessage”)

    // I use a JSON string here instead of an actual HTTP Request for an easy understanding
    stri := "{\"aaa\" : \"aaa\"}"
	base64BodyString := base64.StdEncoding.EncodeToString(stri)

	_ulr, err := url.Parse(fmt.Sprintf(“https://%s.queue.core.windows.net/%s”, accountName, queueName))
	if err != nil {
		log.Fatal(“Error parsing url: “, err)
	}

	credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
	if err != nil {
		log.Fatal(“Error creating shared key credential: “, err)
	}

	queueUrl := azqueue.NewQueueURL(*_ulr, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
	ctx := context.TODO()

	messageUrl := queueUrl.NewMessagesURL()
	_, err = messageUrl.Enqueue(ctx, base64BodyString, 0, 0)
	if err != nil {
		log.Fatal(“Error enqueueing message: “, err)
	}
	log.Printf(“Message enqueued successfully”)
}

Then, my function 2, which is fired by a Queue trigger and processes the message dequeued from the queue, is like this.

func function2(c *gin.Context) {

	log.Printf(“Start function”)

    type QueueMessage struct {
	  Data struct {
		 MyQueueItem string `json:"myQueueItem"`
	  } `json:"data"`
	  Metadata map[string]interface{}
    }

	var queueMessage QueueMessage
	err := c.BindJSON(&queueMessage)
	if err != nil {
		log.Printf(“Failed to bind request body: %v”, err)
		return
	}

        log.Printf(“Queue message : %v”, queueMessage)
}

The outputs data is like this.

{
  "message": "{\"token\": \"aaa\"}",
  "metadata": {
    "DequeueCount": 1,
    "ExpirationTime": "2023-08-01T13:25:29+00:00",
    "Id": "e2b5f133-cc27-46d5-92a8-0305251e65e7",
    "InsertionTime": "2023-07-25T13:25:29+00:00",
    "NextVisibleTime": "2023-07-25T13:35:31+00:00",
    "PopReceipt": "AgAAAAMAAAAAAAAAifLM4fy+2QE=",
    "sys": {
      "MethodName": "function2",
      "RandGuid": "9cdd85fc-9522-437a-8b2c-cc4ac4919ad4",
      "UtcNow": "2023-07-25T13:25:31.364779Z"
    },
    "token": "aaa"
  }
}

When I stored a message like “hoge” instead of a JSON message, it didn’t contain duplicate data.

Does anyone have an idea about how to solve this?

I would like to store a message without duplicate data in Azure Storage Queue.

答案1

得分: 0

Azure Storage队列消息是由于在将JSON有效负载编码为base64字符串之前对其进行编码的方式引起的,并将其作为消息正文传递。消息被出列并处理时,我们将其绑定到QueueMessage结构体上,该结构体期望具有datametadata字段。

要避免这种重复数据,您可以修改function1以在不使用Azure Function内置响应序列化的情况下入队消息。

有关更多详细信息,请参阅Azure存储队列go

英文:

Azure Storage Queue message is caused by the way you're encoding the JSON payload before enqueuing it and base64 encoding the JSON string and then passing it as the message body the message is dequeued and processed, we are binding it to the QueueMessage struct, which expects a specific structure with data and metadata fields.

func function1(c *gin.Context) {
    log.Printf("Start enqueueMessage")

    payload := map[string]string{
        "myQueueItem": "aaa",
    }

    payloadBytes, err := json.Marshal(payload)
    if err != nil {
        log.Fatal("Error marshaling JSON: ", err)
    }

    base64BodyString := base64.StdEncoding.EncodeToString(payloadBytes)

    _url, err := url.Parse(fmt.Sprintf("https://%s.queue.core.windows.net/%s", accountName, queueName))
    if err != nil {
        log.Fatal("Error parsing url: ", err)
    }

    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        log.Fatal("Error creating shared key credential: ", err)
    }

    queueUrl := azqueue.NewQueueURL(*_url, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
    ctx := context.TODO()

    messageUrl := queueUrl.NewMessagesURL()
    _, err = messageUrl.Enqueue(ctx, base64BodyString, 0, 0)
    if err != nil {
        log.Fatal("Error enqueueing message: ", err)
    }

    log.Printf("Message enqueued successfully")
}


func function2(c *gin.Context) {
    log.Printf("Start function")

    type QueueMessage struct {
        MyQueueItem string `json:"myQueueItem"`
    }

    var queueMessage QueueMessage
    err := c.BindJSON(&queueMessage)
    if err != nil {
        log.Printf("Failed to bind request body: %v", err)
        return
    }

    log.Printf("Queue message : %v", queueMessage)
}
  • The duplicate data in the message is actually introduced by Azure Functions when it processes the incoming request and stores it in the Azure Storage Queue. When you store a simple string like "hoge" directly as the message content, Azure Functions doesn't add any additional metadata, hence no duplicates.

  • The duplicate data in the message is actually introduced by Azure Functions when it processes the incoming request and stores it in the Azure Storage Queue. When you store a simple string like "hoge" directly as the message content, Azure Functions doesn't add any additional metadata, hence no duplicates.

  • To avoid this duplicate data, you can modify your function1 to enqueue a message without using Azure Function's built-in response serialization

  • For more details Azure storage queue go

huangapple
  • 本文由 发表于 2023年7月26日 12:57:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76768141.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定