如何在Python中的Service Bus触发器中延迟输入消息?

huangapple go评论63阅读模式
英文:

How to defer input message within a Service Bus Trigger in Python?

问题

我想延迟 Service Bus 触发器的输入消息,因为只有当另一个 Service Bus 触发器完成时,该消息才应标记为完成。然而,我找到的唯一关于 Python 中消息延迟的示例使用了队列接收器(来自 Azure 文档的示例)。如果我理解正确,我无法在触发器内使用这样的接收器获取消息,因为触发器已锁定输入消息。

我在一个帖子中看到了一种在 C# 中执行此操作的方法,但在 Python 中找不到相关信息。

是否有方法可以实现这一点?

以下是触发器的代码,以说明我想要做的事情:

import azure.functions as func
import logging

from typing import List

from __app__.config import config
from azure.servicebus import (
    ServiceBusClient,
    ServiceBusReceiver,
    NEXT_AVAILABLE_SESSION,
    ServiceBusReceivedMessage,
)

def main(msg: func.ServiceBusMessage):
    logging.info(
        f"complete-message test service bus trigger receives {msg.message_id=}, {msg.sequence_number=}"
    )

    service_bus_client = ServiceBusClient.from_connection_string(
        conn_str="conn_string"
    )

    queue_receiver: ServiceBusReceiver = service_bus_client.get_queue_receiver(
        queue_name="test-queue",
        session_id=NEXT_AVAILABLE_SESSION,
        prefetch=5,
    )

    # 这不起作用,因为 defer_message 期望的是 ServiceBusReceivedMessage,而不是 ServiceBusMessage
    # queue_receiver.defer_message(message=msg)

    # 由于消息已被触发器锁定,此调用不会返回任何消息,这是合理的
    messages: List[ServiceBusReceivedMessage] = queue_receiver.receive_messages(
        max_wait_time=3
    )

希望这能帮助你理解如何在 Python 中实现消息延迟。

英文:

I want to defer the input message of a Service Bus Trigger, since the message should only be marked as complete when another Service Bus Trigger finishes. However, the only example of message deferral in Python I've found uses a queue receiver (example from Azure documentation). If I understand correctly, I cannot get the message via such receiver within the trigger, since the input message is locked by the trigger.

I found a post where apparently there is a way to do this in C#, but nothing in Python.

Is there a way to do this?

Here's the code of the trigger to illustrate what I would like to do:

import azure.functions as func
import logging

from typing import List

 from __app__.config import config
 from azure.servicebus import (
     ServiceBusClient,
     ServiceBusReceiver,
     NEXT_AVAILABLE_SESSION,
     ServiceBusReceivedMessage,
 )

def main(msg: func.ServiceBusMessage):
    logging.info(
        f"complete-message test service bus trigger receives {msg.message_id=}, {msg.sequence_number=}"
    )

    service_bus_client = ServiceBusClient.from_connection_string(
         conn_str="conn_string"
     )
 
    queue_receiver: ServiceBusReceiver = service_bus_client.get_queue_receiver(
         queue_name="test-queue",
         session_id=NEXT_AVAILABLE_SESSION,
         prefetch=5,
     )

    # This does not work since defer_message expects a ServiceBusReceivedMessage, not a ServiceBusMessage
    # queue_receiver.defer_message(message=msg)

    # This call does not return any message, which makes sense since the message is locked by the trigger
    messages: List[ServiceBusReceivedMessage] = queue_receiver.receive_messages(
            max_wait_time=3
        )

答案1

得分: 2

以下是翻译好的部分:

"只有.NET的进程内模型具有执行显式消息完成操作的能力。因为函数代码与触发器本身共享同一进程,它能够调用触发器使用的底层Service Bus客户端的操作,并且不会受到进程边界的阻塞。"

"对于独立进程模型,包括非.NET语言的工作程序,正在调查这一点。这需要一套相当丰富的增强功能,以使Functions主机和运行时能够启用RPC通道,以便工作进程可以参与与主机以及触发器和输出绑定的请求/响应式通信。目前还不能分享关于何时可能会看到这一功能的时间表。"

英文:

The short version is - this can't be done from Python, other language workers, or isolated process Functions at this time.

Only the in-process model for .NET has the ability to perform explicit message completion operations. Because the Function code shares the same process with the trigger itself, it is able to invoke operations on the underlying Service Bus clients used by the trigger and isn't blocked by a process boundary.

This is something that is being investigated for the isolated process model - which includes workers for non-.NET languages. It requires a fairly extensive set of enhancements to the Functions host and runtime to enable an RPC channel so that worker processes can participate in request/response-style communication with the host and, by extension, triggers and output bindings. At present, there's no timeframe that can be shared for when we may see this.

huangapple
  • 本文由 发表于 2023年6月5日 22:10:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76407287.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定