如何从RabbitMQ队列中删除一条消息

huangapple go评论85阅读模式
英文:

How to delete a message from Queue in RabbitMQ

问题

以下是您要翻译的内容:

我正在使用Rabbit MQ来复制Jenkins的操作。
我面临的唯一问题是,假设队列中有10条消息。并且有一些重复的消息处于“未确认”状态。
我需要从队列中删除这些消息,我该如何实现?

我的RabbitMQ配置如下,每个队列只有一个消费者。因此,如果有10条消息,所有消息都将通过同一个消费者的线程进行处理。

Queue queue = new Queue(sfdcConnectionDetails.getGitRepoId() + "_" + sfdcConnectionDetails.getBranchConnectedTo(), true);
rabbitMqSenderConfig.amqpAdmin().declareQueue(queue);
rabbitMqSenderConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange(byRepositoryRepositoryId.getRepository().getRepositoryId())).withQueueName());
RabbitMqConsumer container = new RabbitMqConsumer();
container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
container.setQueueNames(queue.getName());
container.setConcurrentConsumers(1);
container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(****, ***), new Jackson2JsonMessageConverter()));
container.startConsumers();
英文:

I am using Rabbit MQ to replicate what Jenkins does.
The only issue I am facing is, lets say, when 10 messages are in queue. And there are some duplicate messages which are in unacknowledged state.
And I need to delete those messages from queue, how do I achieve this?

My rabbitmq configuration is as follows, where each queue only has one consumer. So if I have 10 messages, all will get processed through same consumer's thread.

Queue queue = new Queue(sfdcConnectionDetails.getGitRepoId() + "_" + sfdcConnectionDetails.getBranchConnectedTo(), true);
rabbitMqSenderConfig.amqpAdmin().declareQueue(queue);
rabbitMqSenderConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange(byRepositoryRepositoryId.getRepository().getRepositoryId())).withQueueName());
RabbitMqConsumer container = new RabbitMqConsumer();
container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
container.setQueueNames(queue.getName());
container.setConcurrentConsumers(1);
container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(****, ***), new Jackson2JsonMessageConverter()));
container.startConsumers();

答案1

得分: 1

Sure, here's the translated content:

  1. 你可以在RabbitMQ一侧使用任何插件(例如 此插件)来进行消息的去重。
  2. 在消费者端使用缓存,以便检测最近是否已经处理过相同的消息。
英文:
  1. You can use any plugin (e.g this) for deduplicating messages on the rabbit side.
  2. Use cache on your consumer for detecting if the same message was processing recently.

答案2

得分: 0

正如@ekiryuhin已经建议的那样,你可以采取的一种方法是在将消息生成到RabbitMQ之前,将一个request_id标签分配给载荷,并且在消费者端缓存该request_id。如果已经存在request_id,请注意查找它,然后忽略载荷并将其删除。这个request_id可能会作为载荷的去重ID起作用。

英文:

As already suggested by @ekiryuhin, One of the approach you could take is assign a request_id tag it to the payload before producing message to RabbitMQ & on your consumer's end cache the request_id. Look out for the request_id if already present ignore payload and delete it.
This request_id might work as deduplication-id for your payloads.

huangapple
  • 本文由 发表于 2020年3月15日 14:34:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/60690349.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定