AWS SQS – 消息消费者在一段时间后停止接收消息

huangapple go评论137阅读模式
英文:

AWS SQS - MessageConsumer stops to receive messages after a while

问题

My application registers a listeners to a SQS queue (queue itself is populated by a SNS topic).
当我启动应用程序时,消息消费者按预期工作,但过一段时间后就不再接收任何消息。消费者是否会在一段时间后关闭?

Suggestions or comments would be much appreciated.
欢迎提供建议或评论。

SQSConnection:

@Bean
public SQSConnection amazonSQSConnection(
      @Value("${aws.access.key}") String accessKey,
      @Value("${aws.secret.key}") String secretKey) throws JMSException {

BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonSQSClientBuilder client = AmazonSQSClientBuilder
        .standard()
        .withRegion(Regions.GovCloud)
        .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials));
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), client);

return connectionFactory.createConnection();
}

Consumer:

 @Bean
 public MessageConsumer workOrderChangeConsumer(
      SQSConnection connection,
      WorkOrderKittingService workOrderKittingService,
      AuthenticationProvider authProvider,
      @Value("${app.user.name}") String appUserName,
      @Value("${aws.sqs.workorder.change.queue}") String woChangeQueue) throws JMSException {

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(woChangeQueue);

WorkOrderChangeIngestor workOrderChangeIngestor = new WorkOrderChangeIngestor(
        workOrderKittingService,
        authProvider,
        appUserName);

MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(workOrderChangeIngestor);
connection.start();

return consumer;
}
英文:

My application registers a listeners to a SQS queue (queue itself is populated by a SNS topic).
When I start the application, message consumer is working as expected but after a while it stops to receive any messages. Can it be that consumer is shutting down after a while?

Suggestions or comments would be much appreciated.

SQSConnection:

@Bean
public SQSConnection amazonSQSConnection(
      @Value("${aws.access.key}") String accessKey,
      @Value("${aws.secret.key}") String secretKey) throws JMSException {


BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonSQSClientBuilder client = AmazonSQSClientBuilder
        .standard()
        .withRegion(Regions.GovCloud)
        .withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials));
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), client);

return connectionFactory.createConnection();
}

Consummer:

 @Bean
 public MessageConsumer workOrderChangeConsumer(
      SQSConnection connection,
      WorkOrderKittingService workOrderKittingService,
      AuthenticationProvider authProvider,
      @Value("${app.user.name}") String appUserName,
      @Value("${aws.sqs.workorder.change.queue}") String woChangeQueue) throws JMSException {

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(woChangeQueue);

WorkOrderChangeIngestor workOrderChangeIngestor = new WorkOrderChangeIngestor(
        workOrderKittingService,
        authProvider,
        appUserName);

MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(workOrderChangeIngestor);
connection.start();

return consumer;
}

答案1

得分: 3

你正在尝试自行管理连接生命周期。

我建议你使用Spring Cloud AWS来让Spring来管理连接,可以通过以下链接了解更多信息:

https://docs.spring.io/spring-cloud-aws/docs/2.2.3.RELEASE/reference/html/#receiving-a-message

你可以通过注解创建一个监听器:

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}
英文:

You're trying to manage the connection lifecycle by yourself.

I recommend you to let spring manage that for you, by using spring-cloud-aws

https://docs.spring.io/spring-cloud-aws/docs/2.2.3.RELEASE/reference/html/#receiving-a-message

You can create a listener through annotations:

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}

huangapple
  • 本文由 发表于 2020年8月11日 08:28:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63349843.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定