英文:
AWS SQS - MessageConsumer stops to receive messages after a while
问题
My application registers a listeners to a SQS queue (queue itself is populated by a SNS topic).
当我启动应用程序时,消息消费者按预期工作,但过一段时间后就不再接收任何消息。消费者是否会在一段时间后关闭?
Suggestions or comments would be much appreciated.
欢迎提供建议或评论。
SQSConnection:
@Bean
public SQSConnection amazonSQSConnection(
@Value("${aws.access.key}") String accessKey,
@Value("${aws.secret.key}") String secretKey) throws JMSException {
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonSQSClientBuilder client = AmazonSQSClientBuilder
.standard()
.withRegion(Regions.GovCloud)
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials));
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), client);
return connectionFactory.createConnection();
}
Consumer:
@Bean
public MessageConsumer workOrderChangeConsumer(
SQSConnection connection,
WorkOrderKittingService workOrderKittingService,
AuthenticationProvider authProvider,
@Value("${app.user.name}") String appUserName,
@Value("${aws.sqs.workorder.change.queue}") String woChangeQueue) throws JMSException {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(woChangeQueue);
WorkOrderChangeIngestor workOrderChangeIngestor = new WorkOrderChangeIngestor(
workOrderKittingService,
authProvider,
appUserName);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(workOrderChangeIngestor);
connection.start();
return consumer;
}
英文:
My application registers a listeners to a SQS queue (queue itself is populated by a SNS topic).
When I start the application, message consumer is working as expected but after a while it stops to receive any messages. Can it be that consumer is shutting down after a while?
Suggestions or comments would be much appreciated.
SQSConnection:
@Bean
public SQSConnection amazonSQSConnection(
@Value("${aws.access.key}") String accessKey,
@Value("${aws.secret.key}") String secretKey) throws JMSException {
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonSQSClientBuilder client = AmazonSQSClientBuilder
.standard()
.withRegion(Regions.GovCloud)
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials));
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), client);
return connectionFactory.createConnection();
}
Consummer:
@Bean
public MessageConsumer workOrderChangeConsumer(
SQSConnection connection,
WorkOrderKittingService workOrderKittingService,
AuthenticationProvider authProvider,
@Value("${app.user.name}") String appUserName,
@Value("${aws.sqs.workorder.change.queue}") String woChangeQueue) throws JMSException {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(woChangeQueue);
WorkOrderChangeIngestor workOrderChangeIngestor = new WorkOrderChangeIngestor(
workOrderKittingService,
authProvider,
appUserName);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(workOrderChangeIngestor);
connection.start();
return consumer;
}
答案1
得分: 3
你正在尝试自行管理连接生命周期。
我建议你使用Spring Cloud AWS来让Spring来管理连接,可以通过以下链接了解更多信息:
https://docs.spring.io/spring-cloud-aws/docs/2.2.3.RELEASE/reference/html/#receiving-a-message
你可以通过注解创建一个监听器:
@Component
public class MyMessageHandler {
@SqsListener("queueName")
void handle(String message) {
...
throw new MyException("something went wrong");
}
@MessageExceptionHandler(MyException.class)
void handleException(MyException e) {
...
}
}
英文:
You're trying to manage the connection lifecycle by yourself.
I recommend you to let spring manage that for you, by using spring-cloud-aws
https://docs.spring.io/spring-cloud-aws/docs/2.2.3.RELEASE/reference/html/#receiving-a-message
You can create a listener through annotations:
@Component
public class MyMessageHandler {
@SqsListener("queueName")
void handle(String message) {
...
throw new MyException("something went wrong");
}
@MessageExceptionHandler(MyException.class)
void handleException(MyException e) {
...
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论