SQS JMS消息在Spring Boot中发生故障时未能进行重试。

huangapple go评论79阅读模式
英文:

SQS JMS Message not being retried on a failure in spring boot

问题

给定如下 JMS / SQS 配置

private final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(
        new ProviderConfiguration().withNumberOfMessagesToPrefetch(10),
        AmazonSQSClientBuilder.defaultClient());

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.sqsConnectionFactory);
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

以及如下接收器

@Service
public class HandleMessage {
    @Transactional
    @JmsListener(destination = "${sqs.handler}")
    public void receive(String message) throws IOException, JMSException {
        ...
        if (message.contains("test"))
            throw new JMSException("boom!");
        ...
    }

我发现所有消息都被处理了并且包含 "test" 的消息消失了而不是重新尝试也许在 SQS 配置中有一些需要更改的内容

`@Transactional` 属性可能是需要的但是我想要的是让 Spring Boot 在出现异常时向 SQS 发信号表明消息处理失败我相信这是可能的

最大消息大小 256 KB
最近更新时间 2020 年 5 月 25 日12:00:10
消息保留期限 4 天
默认可见性超时时间 30 秒
可用消息数目 0
传递延迟时间 0 秒
在处理中的消息数目其他消费者不可见0
接收消息等待时间 0 秒
延迟的消息数目 0
基于内容的去重 -
英文:

Given a JMS / SQS configuration like this:

private final SQSConnectionFactory sqsConnectionFactory = new SQSConnectionFactory(
        new ProviderConfiguration().withNumberOfMessagesToPrefetch(10),
        AmazonSQSClientBuilder.defaultClient());

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(this.sqsConnectionFactory);
    factory.setDestinationResolver(new DynamicDestinationResolver());
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

and a receiver like this:

@Service
public class HandleMessage {
    @Transactional
    @JmsListener(destination = "${sqs.handler}")
    public void receive(String message) throws IOException, JMSException {
        ...
        if (message.contains("test"))
          throw new JMSException("boom!");
        ...
    }

I am finding that all the messages are being processed, and the message containing test vanishes, instead of retries. Is there perhaps something that needs to be changed in the SQS configuration.

The @Transactional attribute may or may not be required, but what I want is for spring boot to signal to SQS the message failed in the presence of an exception, which I am sure is possible.

Maximum message size 256 KB
Last updated 5/25/2020, 12:00:10
Message retention period 4 Days
Default visibility timeout 30 Seconds
Messages available 0
Delivery delay 0 Seconds
Messages in flight (not available to other consumers) 0
Receive message wait time 0 Seconds
Messages delayed 0
Content-based deduplication -

答案1

得分: 1

我希望这能解答你的问题,或者至少能帮你朝着正确的方向前进。

你需要将一个JTA事务管理器部署为Spring的PlatformTransactionManager。有许多可用的选项,我在使用Arjuna方面取得了不错的效果。添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-narayana</artifactId>
</dependency>

将会部署Arjuna并将其配置为PlatformTransactionManager。

有用的日志记录:

<logger name="com.arjuna" level="TRACE" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

移除:

factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

并添加:

factory.setSessionTransacted(true);

当消息监听容器开始侦听消息时,它应该创建一个事务上下文。当进入到事务方法时,该方法应该加入事务上下文。如果方法的提交成功,那么你应该在Arjuna日志中看到消息接收被提交的记录。如果出现异常,监听器事务将会回滚,你的消息将会被返回到队列中。

仅为了增加复杂性... 你只有一个事务性资源 - JMS会话。你可以避免使用完整的JTA事务管理器。我个人认为它们的资源消耗并不过度。你可以创建一个JmsTransactionManager,并将你的连接工厂传递给它。然后,你可以将JmsTransactionManager设置为消息监听容器工厂的事务管理器。

英文:

I hope that this answers your question, or at least get you going in the right direction.

You will need to have a JTA transaction manager deployed as the Spring PlatformTransactionManager. There are many available, I have had good results with Arjuna. Adding the starter:

    &lt;dependency&gt;
        &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
        &lt;artifactId&gt;spring-boot-starter-jta-narayana&lt;/artifactId&gt;
    &lt;/dependency&gt;

will deploy Arjuna and configure it as the PlatformTransactionManager.

Helpful logging:

&lt;logger name=&quot;com.arjuna&quot; level=&quot;TRACE&quot; additivity=&quot;false&quot;&gt;
    &lt;appender-ref ref=&quot;STDOUT&quot; /&gt;
&lt;/logger&gt;

Drop the:

factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

and add:

factory.setSessionTransacted(true);

When the message listener container starts listening for a message, it should create a transaction context. When you get to your transactional method, that method should then join the transaction context. If the commit for the method is successful, then you should see the message receive get committed in the Arjuna logging. If there is an exception, then the listener transaction will get rolled back, and your message will be returned to your queue.

Just to muddy the waters.... You only have the one transactional resource - the JMS session. You could avoid using a full up JTA transaction manager. I personally don't think that they are overly resource intensive. You can create a JmsTransactionManager, and pass your connnection factory into it. Then, you can set the JmsTransactionManager on the message listener container factory as the transaction manager.

答案2

得分: 0

我实际上解决了这个问题,而且真的感到很尴尬。
我真的很喜欢上面答案中的所有建议,我已经标记它为正确答案,因为在某些情况下,将事务列入其中将会非常有益。

但是对于那些可能会遇到这个问题的人,温馨提醒要确保地球上没有另一台服务器(在我的情况下是不同的大陆)正在消耗失败的消息。

一旦我确保我是地球上唯一收听这些消息的机器,我得到了我期望收到的所有重试。

英文:

I actually solved this problem, and it's embarrassing.
I really like all the suggestions in the above answer though, and have marked it correct as enlisting in the transaction in some cases will be of huge benefit.

But for those of you who may stumble upon this question, a gentle reminder to make sure that there is not another server somewhere on the planet (in my case a different continent) that was consuming the failed messages.

Once I made sure I was the only machine on the planet listening to the messages, I got all the retries I was expecting to receive.

huangapple
  • 本文由 发表于 2020年9月9日 18:38:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/63809878.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定