通过RabbitMQ传递重型物品

huangapple go评论77阅读模式
英文:

Passing heavy objects via RabbitMQ

问题

我在想是否有一种有效的方式可以通过Rabbit队列传输重型对象(例如,包含超过100万个字符串的数组)。

我尝试过将数组分割成1000、10000和100000个元素的块。最快的速度我使用10000个元素的块大小,用时21秒。

我想出的解决方案类似于流式传输。有两个服务,一个向另一个发送一个包含临时队列名称的请求对象(该队列在发送请求之前创建)。后者获取请求并开始按块发送数组,当发送完毕时,发送一种END-OF-STREAM标志。

也许有一些优雅的解决方案,比如一些功能或库等。唯一的条件是重型对象应该通过消息队列传递。

有人遇到过这样的任务吗?

英文:

I'm wondering if there is a good way to transfer heavy objects (for example, arrays of >1mln strings) effectively via Rabbit queue.

What I tried was splitting the array on chunks by 1000, 10000, and 100000. The fastest I got was 21sec with 10000 chunk size.

The solution I came up with was something similar to streaming. There were 2 services, one sent to another a request object containing the temporal queue name (which was created right before sending the request). The latter gets the request and starts sending the array by chunks, and when it's over, it sends a kind of END-OF-STREAM sign.

Maybe there's some elegant solution for that, like some feature or library or whatever. The only condition is that the heavy object should be passed by using a message queue.

Have anyone encountered such a task?

答案1

得分: 1

好的,以下是翻译好的部分:

我还没有在这里收到任何回应,所以我只是想告诉你一些关于我的解决方案的更多信息。
我所做的是,在第一个服务中创建并声明了一个临时队列,而第二个服务已经有了监听请求的队列。因此,服务 #1 向队列发送请求,请求 DTO 包含临时队列的名称。

发送后,服务 #1 开始监听临时队列,如下所示:

while (condition) {
    Object obj = amqpTemplate.receiveAndConvert(queueToListen, DEFAULT_TIMEOUT);
}

DEFAULT_TIMEOUT 是服务 #1 监听临时队列的时间。例如,循环终止条件可以从服务 #2 发送,作为通知流结束的特殊对象。

为了实现更高的性能,您还可以简化通过临时队列传递的 DTO。换句话说,如果它们都具有一些公共字段,可以将它们一次性发送(在流头部或其他位置),并排除块 DTO。此外,JSON 属性名称也可以缩短(例如,value => v),因为本质上我们仍然传递信息的字节,而对象被序列化。

英文:

Ok, I haven't gotten any response here so just tell you some more about my solution.
What I did was, I just create and declare a temporal queue in the first service and the second service has already had the queue to listen to requests. So service #1 sends a request to the queue, and the request DTO contains the temporal queue name.

After sending, service #1 starts listening to the temporal queue like that:

while (condition) {
    Object obj = amqpTemplate.receiveAndConvert(queueToListen, DEFAULT_TIMEOUT);
}

DEFAULT_TIMEOUT is how long service #1 is going to listen to the temporal queue. The loop breaker could be, for instance, sent from service #2 as a special object that notifies that that was the end of the stream.

To achieve more performance, you can also simplify your DTOs that are being passed through the temporal queue. Namely, if they all have some common fields, they could be sent once (in the stream header or whatever) and excluded the chunk DTO. Additionally, the JSON property names could also be shortened (e.g. value => v) because essentially we still pass bytes of information, and the objects are serialized.

huangapple
  • 本文由 发表于 2020年8月10日 16:47:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/63336937.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定