英文:
How to setup KubeMQ Listener in Spring Boot
问题
目前,我有两个微服务。我想从第一个微服务发送消息到 KubeMQ 队列,并希望第二个微服务接收该消息。我可以使用以下代码将消息发送到 KubeMQ 队列:
Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
SendMessageResult resSend = queue.SendQueueMessage(new Message()
.setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
.setMetadata("someMeta"));
if (resSend.getIsError()) {
System.out.printf("Message enqueue error, error: %s", resSend.getError());
}
我需要在第二个微服务中添加监听器以接收来自队列的消息。以下是KubeMQ提供的代码来接收消息:
Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
ReceiveMessagesResponse resRec = queue.ReceiveQueueMessages(10, 1);
if (resRec.getIsError()) {
System.out.printf("Message dequeue error, error: %s", resRec.getError());
return;
}
System.out.printf("Received Messages %s:", resRec.getMessagesReceived());
for (Message msg : resRec.getMessages()) {
System.out.printf("MessageID: %s, Body:%s", msg.getMessageID(), Converter.FromByteArray(msg.getBody()));
}
如何在第二个微服务中配置它以立即接收添加到队列中的消息?请帮忙。
英文:
Currently, I have two microservices. I want to send message to KubeMQ Queue from first microservice and want it to be received by Second microservice. I am able to send message to a KubeMQ Queue using below code:
Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
SendMessageResult resSend = queue.SendQueueMessage(new Message()
.setBody(Converter.ToByteArray("some-simple_queue-queue-message"))
.setMetadata("someMeta"));
if (resSend.getIsError()) {
System.out.printf("Message enqueue error, error: %s", resSend.getError());
}
I need Listener in the second microservice in order to receive the message from Queue.
Below is code provided by KubeMQ to receive the message:
Queue queue = new Queue("QueueName", "ClientID", "localhost:50000");
ReceiveMessagesResponse resRec = queue.ReceiveQueueMessages(10, 1);
if (resRec.getIsError()) {
System.out.printf("Message dequeue error, error: %s", resRec.getError());
return;
}
System.out.printf("Received Messages %s:", resRec.getMessagesReceived());
for (Message msg : resRec.getMessages()) {
System.out.printf("MessageID: %s, Body:%s", msg.getMessageID(), Converter.FromByteArray(msg.getBody()));
}
How to configure it in the second microservice to receive message instantly as they are added into the queue?
Please help.
答案1
得分: 2
在第二个微服务中,我需要一个监听器来接收来自队列的消息。
为什么要轮询,当你可以通过KubeMQ Pub/Sub模式来接收通知呢?
在消息队列的上下文中,“轮询”指的是您的应用程序不断检查队列,以查看是否有新消息到达。这可能是低效的,因为它需要您的应用程序在可能没有新消息需要处理时进行多次请求。
另一方面,“监听器”(也称为订阅者或回调)是当新消息到达时自动调用的函数。这更有效,因为您的应用程序不需要不断检查队列;相反,它可以等待并在消息到达时作出反应。
发布-订阅模式(pub/sub)是由KubeMQ支持的一种消息模式,与您当前使用的基于队列的模式略有不同。
在发布-订阅模式中,消息的发送者(发布者)不会直接将消息发送到特定的接收者(订阅者)。相反,程序员“发布”消息(事件),而不知道可能存在哪些订阅者。同样,订阅者对一个或多个事件表达兴趣,并且只接收感兴趣的消息,而不知道任何发布者。
在此模式中,KubeMQ提供两种类型的事件处理,即“Events”和“Events Store”。
-
“Events”类型是一种异步实时的发布-订阅模式,意味着消息是实时发送和接收的,但只有在接收方当前连接到KubeMQ时才会接收到。在此模式下没有消息持久性可用。
-
“Events Store”类型是一种带有持久性的异步发布-订阅模式。这意味着消息被存储,并且可以由任何接收方重新播放,即使它们在消息发送时没有连接。该系统还支持从第一个存储的事件重新播放所有事件、仅重新播放最后一个事件或仅发送新事件。
然而,重要的是要注意,在使用Events Store时,客户端ID的唯一性是至关重要的。
在任何给定时间,只有一个接收方可以连接到具有唯一客户端ID的KubeMQ。
如果两个接收方尝试使用相同的客户端ID连接到KubeMQ,其中一个将被拒绝。
消息只能按照客户端ID和订阅类型一次重播。如果接收方断开连接并使用任何订阅类型重新连接,只有新事件将被传递给具有该客户端ID的特定接收方。要重播消息,接收方需要使用不同的客户端ID连接。
考虑到这些特点,如果您将架构切换为使用Events Store类型的发布-订阅模式,您的第二个微服务可以立即接收添加到通道中的消息,甚至在需要时重新播放旧消息。您需要确保每个微服务都有一个唯一的客户端ID,并适当管理其订阅。
然而,发布-订阅模式可能需要对微服务的架构和编码进行更改,因此您需要评估是否适合您的用例。还要注意,发布-订阅模式,特别是带有消息持久性的模式,可能与队列模式相比具有不同的性能特性和资源要求。
以下是当前存在的类以及它们的用途的高级概述:
-
Channel.java
:这个类似乎表示用于在发布-订阅模型中发送事件的通道。 -
ChannelParameters.java
:这个类定义了创建通道实例的参数。 -
Event.java
:这个类表示可以通过通道发送的事件。 -
EventReceive.java
:这个类用于处理接收到的事件。 -
Result.java
:这个类包含发送事件的结果。 -
Subscriber.java
:这个类允许您订阅通道并处理传入的事件。
以下是如何使用现有类来发布和订阅消息的示例:
import io.kubemq.sdk.Channel;
import io.kubemq.sdk.ChannelParameters;
import io.kubemq.sdk.Result;
import io.kubemq.sdk.event.Event;
import io.kubemq.sdk.event.Subscriber;
public class KubeMQExample {
public static void main(String[] args) {
try {
// 初始化ChannelParameters
ChannelParameters params = new ChannelParameters();
params.setChannel("your_channel");
params.setClient("your_client_id");
// 初始化一个新的通道
Channel channel = new Channel(params);
// 创建一个新的事件
Event event = new Event();
event.setBody("Your message here".getBytes());
// 发送事件
Result sendResult = channel.SendEvent(event);
System.out.println("Event sent, Result: " + sendResult.getIsError());
// 初始化一个新的订阅者
Subscriber subscriber = new Subscriber("localhost:5000");
// 订阅通道
subscriber.SubscribeToEvents(params, (eventReceive) -> {
System.out.println("Received Event: " + new String(eventReceive.getBody()));
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
请注意,此代码基于现有的SDK,并可能不反映原始代码的功能。您需要将"your_channel"和"your_client_id"替换为您的实际通道名称和客户端ID。事件主体也可以替换为您要发送的实际消息。
在这里,Subscriber
类用于监听和处理传入的事件。SubscribeToEvents
方法接受一个ChannelParameters
对象和处理接收到的事件的lambda函数。
还要注意,SDK似乎已经移除了Queue和EventsStore类。SDK现在似乎主要使用发布-订阅模型,不
英文:
> I need a Listener in the second microservice in order to receive the message from Queue.
Why polling, when you can be notified through the KubeMQ Pub/Sub pattern?
In the context of message queues, "polling" refers to a process where your application continually checks the queue to see if a new message has arrived. This can be inefficient, as it requires your application to make many requests when there may not be any new messages to process.
On the other hand, a "listener" (also known as a subscriber or a callback) is a function that is automatically called when a new message arrives. This is more efficient because your application does not need to continually check the queue; instead, it can wait and react when a message arrives.
The Publish-Subscribe pattern (or pub/sub) is a messaging pattern supported by KubeMQ, and it differs slightly from the queue-based pattern you are currently using.
In the pub/sub pattern, senders of messages (publishers) do not program the messages to be sent directly to specific receivers (subscribers). Instead, the programmer “publishes” messages (events), without any knowledge of any subscribers there may be. Similarly, subscribers express interest in one or more events and only receive messages that are of interest, without any knowledge of any publishers.
In this pattern, KubeMQ provides two types of event handling, Events
and Events Store
.
-
The
Events
type is an asynchronous real-time Pub/Sub pattern, meaning that messages are sent and received in real-time but only if the receiver is currently connected to KubeMQ. There is no message persistence available in this pattern. -
The
Events Store
type, however, is an asynchronous Pub/Sub pattern with persistence. This means that messages are stored and can be replayed by any receiver, even if they were not connected at the time the message was sent.
The system also supports replaying all events from the first stored event, replaying only the last event, or only sending new events.
However, it is important to note that the uniqueness of a client ID is essential when using Events Store.
At any given time, only one receiver can connect with a unique Client ID.
If two receivers try to connect to KubeMQ with the same Client ID, one of them will be rejected. Messages can only be replayed once per Client ID and Subscription type. If a receiver disconnects and reconnects with any subscription type, only new events will be delivered for this specific receiver with that Client ID. To replay messages, a receiver needs to connect with a different Client ID.
Given these features, if you switch your architecture to a pub/sub pattern using the Events Store type, your second microservice could instantly receive messages as they are added into the channel, and even replay old messages if needed. You would need to ensure each microservice has a unique Client ID and manages its subscriptions appropriately.
However, the pub/sub pattern may require changes in the architecture and coding of your microservices, so you would need to evaluate whether this change is suitable for your use case. It is also important to note that the pub/sub pattern, especially with message persistence, may have different performance characteristics and resource requirements compared to the queue pattern.
Here is a high-level overview of the classes that are present and their usage:
-
Channel.java
: This class appears to represent a channel for sending events in a publish-subscribe model. -
ChannelParameters.java
: This class defines the parameters for creating a Channel instance. -
Event.java
: This class represents an event that can be sent via a Channel. -
EventReceive.java
: This class is used to process received events. -
Result.java
: This class contains the result of a sent event. -
Subscriber.java
: This class allows you to subscribe to a channel and handle incoming events.
So here is an example of how you might use the existing classes to publish and subscribe to messages.
import io.kubemq.sdk.Channel;
import io.kubemq.sdk.ChannelParameters;
import io.kubemq.sdk.Result;
import io.kubemq.sdk.event.Event;
import io.kubemq.sdk.event.Subscriber;
public class KubeMQExample {
public static void main(String[] args) {
try {
// Initialize ChannelParameters
ChannelParameters params = new ChannelParameters();
params.setChannel("your_channel");
params.setClient("your_client_id");
// Initialize a new Channel
Channel channel = new Channel(params);
// Create a new Event
Event event = new Event();
event.setBody("Your message here".getBytes());
// Send the Event
Result sendResult = channel.SendEvent(event);
System.out.println("Event sent, Result: " + sendResult.getIsError());
// Initialize a new Subscriber
Subscriber subscriber = new Subscriber("localhost:5000");
// Subscribe to the Channel
subscriber.SubscribeToEvents(params, (eventReceive) -> {
System.out.println("Received Event: " + new String(eventReceive.getBody()));
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
Do note that this code is based on the existing SDK and may not reflect the functionality of the original code. You will need to replace "your_channel
" and "your_client_id
" with your actual channel name and client ID. The event body can also be replaced with the actual message you want to send.
The Subscriber
class is used here to listen for and process incoming events. The SubscribeToEvents
method takes a ChannelParameters
object and a lambda function that processes received events.
Do also note that the Queue
and EventsStore
classes seem to have been removed from the SDK. The SDK now seems to primarily use the publish-subscribe model, which differs from queue-based communication in that messages are not stored if no consumer is available to consume them.
Events Store was a hybrid model that allowed for persistence in the pub/sub model, storing events that could be replayed by receivers connecting at a later time.
For your original functionality of reading queue messages and peeking at messages in a queue, unfortunately, it does not seem like the current state of the Kubemq Java SDK on the provided GitHub repository supports these actions.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论