如何使Azure Functions持续消耗/处理Azure Service Bus队列消息以保持顺序?

huangapple go评论85阅读模式
英文:

How to make Azure Functions to continuously consume/process Azure Service Bus Queue messages in order?

问题

(免责声明:我对消息队列和Azure Functions都很新)我有一个Azure Service Bus队列。它使用FIFO技术,这正是我所需要的。我也成功地设置了一个定时任务,不断地将消息推送到队列中。现在,队列中有大约100K条消息,这些消息已经按正确的顺序放置。

现在,我需要按顺序消耗/处理这些消息。每条消息的处理大约需要1秒钟。但是,当涉及到自我处理时,我只知道在Azure Functions中使用“定时器函数”的方法。但是,当我使用“定时器函数”时,我需要每5分钟运行一次,然后每次需要拉取大约300条消息。我的代码会像这样(我正在使用NodeJS):

function.json

{
"bindings": [
{
"name": "myTimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */5 * * * *"
}
]
}

index.js

..
..
let allMessages = [];
while (allMessages.length < 300) {
const messages = await sbReceiver.receiveMessages(300, {
maxWaitTimeInMs: 60 * 1000,
});

if (!messages.length) {
  break;
}

allMessages.push(...messages);

for (let message of messages) {
  
  await axios({
    /**
     * 在这里处理单个消息
     * 每个消息大约需要1秒
     */
  })
  .then(async function (response) {
    await sbReceiver.completeMessage(message);
  })
  .catch(function (error) {
    break;
  });
  
}

}
..
..

这种方法能够很好地完成工作。但我对通过定时作业(定时器函数)来处理处理过程不满意。

问题

除了使用“定时器函数”之外,是否有一种方法可以使Azure Function(NodeJS)始终保持消耗/处理队列中的消息,同时确保以有序的方式处理这些消息?

关键点在于:

  1. 保持消耗/处理消息,最好不使用定时器
  2. 以有序方式消耗/处理消息

编辑:
我知道“Azure Service Bus队列触发器”,但我不认为这些触发器会等待前一个作业“完成处理”。因为我的处理涉及调用另一端的API,这需要按顺序一一完成。我认为队列触发器只要新消息到达就会被触发,这会干扰另一端的顺序。如果我理解错误,请纠正我。

先行感谢您的亲切建议。

英文:

(Disclaimer: I'm new to both Message Queues and the Azure Functions)

I have a Azure Service Bus Queue. And it uses the FIFO technique, which is what I needed. I've also successfully setup a job that keeps pushing the Messages into the Queue. I now have about 100K of Messages in the queue which have been put in correct order.

Now, I need to consume/process those Messages in order. The processing of each Message takes about 1 second each. But I only know the approach using "Timer Function" in Azure Functions, when it comes to self processing. But again, when I use "Timer Function", I need to run, say, every 5mins and then I need to pull, say, about 300 messages each time. My codes would look like this (I'm using NodeJS):

function.json

{
  &quot;bindings&quot;: [
    {
      &quot;name&quot;: &quot;myTimer&quot;,
      &quot;type&quot;: &quot;timerTrigger&quot;,
      &quot;direction&quot;: &quot;in&quot;,
      &quot;schedule&quot;: &quot;0 */5 * * * *&quot;
    }
  ]
}

index.js

..
..
let allMessages = [];	
while (allMessages.length &lt; 300) {
	const messages = await sbReceiver.receiveMessages(300, {
		maxWaitTimeInMs: 60 * 1000,
	});
	
	if (!messages.length) {
		break;
	}
	
	allMessages.push(...messages);
	
	for (let message of messages) {
		
		await axios({
			/**
    		 * PROCESS SINGLE MESSAGE HERE
    		 * TAKES ABOUT 1 SEC EACH
    		 */
		})
		.then(async function (response) {
			await sbReceiver.completeMessage(message);
		})
		.catch(function (error) {
			break;
		});
		
	}
}
..
..

This approach does the work done just fine. But I'm just not satisfied with the idea that I have to do the processings through the a scheduled job (Timer Function).

Question

Other than using "Timer Functions", is there a way (or) how to make the Azure Function (NodeJS) to always keep consuming/processing the Queued Messages on it's own while making sure the Messages are processed in ordered manner?

The key points here are to:

  1. To keep consuming/processing the Messages, preferably without using Timers
  2. To consume/process the Messages in order

Edit:
I am aware of the "Azure Service Bus Queue Triggers", but I do not think those Triggers wait for the previous job to "finish processing". Because my processing involves API call to another side, which needs to be done only one after another in ordered manner. I think Queue Triggers just anyhow got fired as soon as the new Message is arrived, which would then mess up with the ordering at the other end. Please correct me if I was mistaken on that concept.

Thanks in advance for your kind advices.

答案1

得分: 1

以下是要翻译的内容:

"Posting my comments as an answer for the community."

"You directly make use of Azure Service Bus queue trigger to process and defer the message as soon as the queue arrives in the service bus. You can utilise the deferral method, deferMessage(), together with the Service Bus Queue Trigger in your Node.js code. As soon as the new message arrives in the queue. The Azure Service bus queue trigger will trigger the function and then you can add the deferral message code block to defer the message and process it."

"You can refer the defer.js code sample code from this Github repository"

"Sample service bus js function:-"

"index.js:-"


module.exports = async function(context, mySbMsg) {
    context.log('JavaScript ServiceBus queue trigger function processed message', mySbMsg);
};

const { ServiceBusClient, delay } = require("@azure/service-bus");

// Load the .env file if it exists
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString = "Endpoint=sb://siliconservicebus76.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xcMOcbMJAPnPM+LTl+zRq4Ix2EjHQFkrs+ASbBXJ2v4=";
const queueName = "test";

async function main() {
  await sendMessages();
  await receiveMessage();
}

// Shuffle and send messages
async function sendMessages() {
  const sbClient = new ServiceBusClient(connectionString);
  // createSender() can also be used to create a sender for a topic.
  const sender = sbClient.createSender(queueName);

  const data = [
    { step: 1, title: "Shop" },
    { step: 2, title: "Unpack" },
    { step: 3, title: "Prepare" },
    { step: 4, title: "Cook" },
    { step: 5, title: "Eat" },
  ];
  const promises = new Array();
  for (let index = 0; index < data.length; index++) {
    const message = {
      body: data[index],
      subject: "RecipeStep",
      contentType: "application/json",
    };
    // the way we shuffle the message order is to introduce a tiny random delay before each of the messages is sent
    promises.push(
      delay(Math.random() * 30).then(async () => {
        try {
          await sender.sendMessages(message);
          console.log("Sent message step:", data[index].step);
        } catch (err) {
          console.log("Error while sending message", err);
        }
      })
    );
  }
  // wait until all the send tasks are complete
  await Promise.all(promises);
  await sender.close();
  await sbClient.close();
}

async function receiveMessage() {
  const sbClient = new ServiceBusClient(connectionString);

  // If receiving from a subscription, you can use the createReceiver(topicName, subscriptionName) overload
  let receiver = sbClient.createReceiver(queueName);

  const deferredSteps = new Map();
  let lastProcessedRecipeStep = 0;
  try {
    const processMessage = async (brokeredMessage) => {
      if (
        brokeredMessage.subject === "RecipeStep" &&
        brokeredMessage.contentType === "application/json"
      ) {
        const message = brokeredMessage.body;
        // now let's check whether the step we received is the step we expect at this stage of the workflow
        if (message.step === lastProcessedRecipeStep + 1) {
          console.log("Process received message:", message);
          lastProcessedRecipeStep++;
          await receiver.completeMessage(brokeredMessage);
        } else {
          // if this is not the step we expected, we defer the message, meaning that we leave it in the queue but take it out of
          // the delivery order. We put it aside. To retrieve it later, we remember its sequence number
          const sequenceNumber = brokeredMessage.sequenceNumber;
          deferredSteps.set(message.step, sequenceNumber);
          console.log("Defer received message:", message);
          await receiver.deferMessage(brokeredMessage);
        }
      } else {
        // we dead-letter the message if we don't know what to do with it.
        console.log(
          "Unknown message received, moving it to the dead-letter queue ",
          brokeredMessage.body
        );
        await receiver.deadLetterMessage(brokeredMessage);
      }
    };
    const processError = async (args) => {
      console.log("Error from error source", args.errorSource, "occurred: ", args.error);
    };

    receiver.subscribe(
      { processMessage, processError },
      {
        autoCompleteMessages: false,
      }
    ); // Disabling autoCompleteMessages so we can control when messages can be completed, deferred, or dead-lettered
    await delay(10000);
    await receiver.close();
    console.log("Total number of deferred messages:", deferredSteps.size);

    receiver = sbClient.createReceiver(queueName);
    // Now we process the deferred messages
    while (deferredSteps.size > 0) {
      const step = lastProcessedRecipeStep + 1;
      const sequenceNumber = deferredSteps.get(step);
      const [message] = await receiver.receiveDeferredMessages(sequenceNumber);
      if (message) {
        console.log("Process deferred message:", message.body);
        await receiver.completeMessage(message);
      } else {
        console.log("No message found for step number", step);
      }
      deferredSteps.delete(step);
      lastProcessedRecipeStep++;
    }
    await receiver.close();
  } finally {
    await sbClient.close();
  }
}

main().catch((err) => {
  console.log("Deferral Sample - Error occurred: ", err);
  process.exit(1);
});

module.exports = { main };

"Output:-"

英文:

Posting my comments as an answer for the community.

You directly make use of Azure Service Bus queue trigger to process and defer the message as soon as the queue arrives in the service bus. You can utilise the deferral method, deferMessage(), together with the Service Bus Queue Trigger in your Node.js code. As soon as the new message arrives in the queue. The Azure Service bus queue trigger will trigger the function and then you can add the deferral message code block to defer the message and process it.

You can refer the defer.js code sample code from this Github repository

Sample service bus js function:-

index.js:-


module.exports = async function(context, mySbMsg) {
    context.log(&#39;JavaScript ServiceBus queue trigger function processed message&#39;, mySbMsg);
};

const { ServiceBusClient, delay } = require(&quot;@azure/service-bus&quot;);

// Load the .env file if it exists
require(&quot;dotenv&quot;).config();

// Define connection string and related Service Bus entity names here
const connectionString = &quot;Endpoint=sb://siliconservicebus76.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xcMOcbMJAPnPM+LTl+zRq4Ix2EjHQFkrs+ASbBXJ2v4=&quot;;
const queueName = &quot;test&quot;;

async function main() {
  await sendMessages();
  await receiveMessage();
}

// Shuffle and send messages
async function sendMessages() {
  const sbClient = new ServiceBusClient(connectionString);
  // createSender() can also be used to create a sender for a topic.
  const sender = sbClient.createSender(queueName);

  const data = [
    { step: 1, title: &quot;Shop&quot; },
    { step: 2, title: &quot;Unpack&quot; },
    { step: 3, title: &quot;Prepare&quot; },
    { step: 4, title: &quot;Cook&quot; },
    { step: 5, title: &quot;Eat&quot; },
  ];
  const promises = new Array();
  for (let index = 0; index &lt; data.length; index++) {
    const message = {
      body: data[index],
      subject: &quot;RecipeStep&quot;,
      contentType: &quot;application/json&quot;,
    };
    // the way we shuffle the message order is to introduce a tiny random delay before each of the messages is sent
    promises.push(
      delay(Math.random() * 30).then(async () =&gt; {
        try {
          await sender.sendMessages(message);
          console.log(&quot;Sent message step:&quot;, data[index].step);
        } catch (err) {
          console.log(&quot;Error while sending message&quot;, err);
        }
      })
    );
  }
  // wait until all the send tasks are complete
  await Promise.all(promises);
  await sender.close();
  await sbClient.close();
}

async function receiveMessage() {
  const sbClient = new ServiceBusClient(connectionString);

  // If receiving from a subscription, you can use the createReceiver(topicName, subscriptionName) overload
  let receiver = sbClient.createReceiver(queueName);

  const deferredSteps = new Map();
  let lastProcessedRecipeStep = 0;
  try {
    const processMessage = async (brokeredMessage) =&gt; {
      if (
        brokeredMessage.subject === &quot;RecipeStep&quot; &amp;&amp;
        brokeredMessage.contentType === &quot;application/json&quot;
      ) {
        const message = brokeredMessage.body;
        // now let&#39;s check whether the step we received is the step we expect at this stage of the workflow
        if (message.step === lastProcessedRecipeStep + 1) {
          console.log(&quot;Process received message:&quot;, message);
          lastProcessedRecipeStep++;
          await receiver.completeMessage(brokeredMessage);
        } else {
          // if this is not the step we expected, we defer the message, meaning that we leave it in the queue but take it out of
          // the delivery order. We put it aside. To retrieve it later, we remeber its sequence number
          const sequenceNumber = brokeredMessage.sequenceNumber;
          deferredSteps.set(message.step, sequenceNumber);
          console.log(&quot;Defer received message:&quot;, message);
          await receiver.deferMessage(brokeredMessage);
        }
      } else {
        // we dead-letter the message if we don&#39;t know what to do with it.
        console.log(
          &quot;Unknown message received, moving it to dead-letter queue &quot;,
          brokeredMessage.body
        );
        await receiver.deadLetterMessage(brokeredMessage);
      }
    };
    const processError = async (args) =&gt; {
      console.log(`&gt;&gt;&gt;&gt;&gt; Error from error source ${args.errorSource} occurred: `, args.error);
    };

    receiver.subscribe(
      { processMessage, processError },
      {
        autoCompleteMessages: false,
      }
    ); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
    await delay(10000);
    await receiver.close();
    console.log(&quot;Total number of deferred messages:&quot;, deferredSteps.size);

    receiver = sbClient.createReceiver(queueName);
    // Now we process the deferred messages
    while (deferredSteps.size &gt; 0) {
      const step = lastProcessedRecipeStep + 1;
      const sequenceNumber = deferredSteps.get(step);
      const [message] = await receiver.receiveDeferredMessages(sequenceNumber);
      if (message) {
        console.log(&quot;Process deferred message:&quot;, message.body);
        await receiver.completeMessage(message);
      } else {
        console.log(&quot;No message found for step number &quot;, step);
      }
      deferredSteps.delete(step);
      lastProcessedRecipeStep++;
    }
    await receiver.close();
  } finally {
    await sbClient.close();
  }
}

main().catch((err) =&gt; {
  console.log(&quot;Deferral Sample - Error occurred: &quot;, err);
  process.exit(1);
});

module.exports = { main };

Output:-

如何使Azure Functions持续消耗/处理Azure Service Bus队列消息以保持顺序?

huangapple
  • 本文由 发表于 2023年7月6日 15:37:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/76626519.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定