英文:
When processing message long time don't work auto acknowledgement in @RabbitListner
问题
I am using RabbitMQ to run several Spring Batch jobs. Execution takes a long time, up to 10 minutes per job. After all the work is performed, acknowledgement in @RabbitListener does not work out in the listener, and the work starts again. If you reduce the working time, then everything works well. How can this be fixed?
Important: Jobs are completed correctly and without exceptions!
Configuration
@Configuration
@EnableRabbit
public class RabbitMQConfiguration {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.queue-name}")
private String queueName;
@Value("${spring.rabbitmq.exchange-name}")
private String exchangeName;
@Value("${spring.rabbitmq.routing-key}")
private String routingKey;
@Bean
public Queue queue() {
return new Queue(queueName);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(exchange())
.with(routingKey);
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter(jsonMapper());
}
@Bean
public JsonMapper jsonMapper() {
JsonMapper jsonMapper = new JsonMapper();
jsonMapper.registerModule(new JavaTimeModule());
jsonMapper.setDateFormat(new StdDateFormat())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return jsonMapper;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPort(port);
return connectionFactory;
}
@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
@RabbitListener, for example, if from = to.minusDays(6)
is working well.
@RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD)
public void processQueue(FieldDto newField) {
PolygonDto polygonDto = mapper.map(newField);
PolygonDto save = polygonService.save(polygonDto);
LocalDateTime to = DateUtil.now();
LocalDateTime from = to.minusYears(6);
evalScriptTypeList.forEach(type -> {
try {
startJob(save, to, from, type);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
logger.error(e.getMessage(), e.getCause());
}
});
}
private void startJob(PolygonDto save, LocalDateTime to, LocalDateTime from, EvalScriptType type) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("t", to.toString())
.addString(JOB_PARAMETER_DATE_TO, to.toString())
.addString(JOB_PARAMETER_DATE_FROM, from.toString())
.addString(JOB_PARAMETER_NEW_POLYGON_ID, save.getId())
.addString(JOB_PARAMETER_EVAL_SCRIPT_TYPE, type.toString())
.addString(JOB_PARAMETER_VERSION_SCRIPT, type.getVersion())
.toJobParameters();
jobLauncher.run(uploadSatelliteImageJob, jobParameters);
}
I do not know if manual acknowledgement will work, but maybe there is a way to fix the automatic acknowledgement.
英文:
I am using RabbitMQ to run several Spring Batch jobs. Execution takes a long time, up to 10 minutes per job. After all the work is performed, acknowledgement in @RabbitListner does not work out in the listener and the work starts again. If you reduce the working time, then everything works well. How can this be fixed?
Important: Jobs is completed correctly and without exceptions!
Configuration
@Configuration
@EnableRabbit
public class RabbitMQConfiguration {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.queue-name}")
private String queueName;
@Value("${spring.rabbitmq.exchange-name}")
private String exchangeName;
@Value("${spring.rabbitmq.routing-key}")
private String routingKey;
@Bean
public Queue queue() {
return new Queue(queueName);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(exchange())
.with(routingKey);
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter(jsonMapper());
}
@Bean
public JsonMapper jsonMapper() {
JsonMapper jsonMapper = new JsonMapper();
jsonMapper.registerModule(new JavaTimeModule());
jsonMapper.setDateFormat(new StdDateFormat())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return jsonMapper;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPort(port);
return connectionFactory;
}
@Bean
public AmqpTemplate rabbitTemp() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
@RabbitListner, for example if from = to.minusDays(6) working well
@RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD)
public void processQueue(FieldDto newField) {
PolygonDto polygonDto = mapper.map(newField);
PolygonDto save = polygonService.save(polygonDto);
LocalDateTime to = DateUtil.now();
LocalDateTime from = to.minusYears(6);
evalScriptTypeList.forEach(type -> {
try {
startJob(save, to, from, type);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
logger.error(e.getMessage(), e.getCause());
}
});
}
private void startJob(PolygonDto save, LocalDateTime to, LocalDateTime from, EvalScriptType type) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("t", to.toString())
.addString(JOB_PARAMETER_DATE_TO, to.toString())
.addString(JOB_PARAMETER_DATE_FROM, from.toString())
.addString(JOB_PARAMETER_NEW_POLYGON_ID, save.getId())
.addString(JOB_PARAMETER_EVAL_SCRIPT_TYPE, type.toString())
.addString(JOB_PARAMETER_VERSION_SCRIPT, type.getVersion())
.toJobParameters();
jobLauncher.run(uploadSatelliteImageJob, jobParameters);
}
I do not know mb will work manually acknowledgement, but mb is there a way to fix the automatic acknowledgement?
答案1
得分: 0
我使用手动提交
@RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD, ackMode = "MANUAL")
public void processQueue(FieldDto newField, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
channel.basicAck(tag, false);
//您的代码
} catch (Exception e) {
channel.basicReject(tag, true);
logger.error(e.getMessage(), e.getCause());
}
}
英文:
I used manual commit
@RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD, ackMode = "MANUAL")
public void processQueue(FieldDto newField, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
channel.basicAck(tag, false);
//your code
} catch (Exception e) {
channel.basicReject(tag, true);
logger.error(e.getMessage(), e.getCause());
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论