处理消息很长时间时,在@RabbitListener中不会自动确认。

huangapple go评论59阅读模式
英文:

When processing message long time don't work auto acknowledgement in @RabbitListner

问题

I am using RabbitMQ to run several Spring Batch jobs. Execution takes a long time, up to 10 minutes per job. After all the work is performed, acknowledgement in @RabbitListener does not work out in the listener, and the work starts again. If you reduce the working time, then everything works well. How can this be fixed?
Important: Jobs are completed correctly and without exceptions!

Configuration

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.queue-name}")
    private String queueName;

    @Value("${spring.rabbitmq.exchange-name}")
    private String exchangeName;

    @Value("${spring.rabbitmq.routing-key}")
    private String routingKey;

    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(routingKey);
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter(jsonMapper());
    }

    @Bean
    public JsonMapper jsonMapper() {
        JsonMapper jsonMapper = new JsonMapper();
        jsonMapper.registerModule(new JavaTimeModule());
        jsonMapper.setDateFormat(new StdDateFormat())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return jsonMapper;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPort(port);
        return connectionFactory;
    }

    @Bean
    public AmqpTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

@RabbitListener, for example, if from = to.minusDays(6) is working well.

@RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD)
public void processQueue(FieldDto newField) {
    PolygonDto polygonDto = mapper.map(newField);
    PolygonDto save = polygonService.save(polygonDto);

    LocalDateTime to = DateUtil.now();
    LocalDateTime from = to.minusYears(6);

    evalScriptTypeList.forEach(type -> {
        try {
            startJob(save, to, from, type);
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
            logger.error(e.getMessage(), e.getCause());
        }
    });
}

private void startJob(PolygonDto save, LocalDateTime to, LocalDateTime from, EvalScriptType type) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
    JobParameters jobParameters = new JobParametersBuilder()
            .addString("t", to.toString())
            .addString(JOB_PARAMETER_DATE_TO, to.toString())
            .addString(JOB_PARAMETER_DATE_FROM, from.toString())
            .addString(JOB_PARAMETER_NEW_POLYGON_ID, save.getId())
            .addString(JOB_PARAMETER_EVAL_SCRIPT_TYPE, type.toString())
            .addString(JOB_PARAMETER_VERSION_SCRIPT, type.getVersion())
            .toJobParameters();

    jobLauncher.run(uploadSatelliteImageJob, jobParameters);
}

I do not know if manual acknowledgement will work, but maybe there is a way to fix the automatic acknowledgement.

英文:

I am using RabbitMQ to run several Spring Batch jobs. Execution takes a long time, up to 10 minutes per job. After all the work is performed, acknowledgement in @RabbitListner does not work out in the listener and the work starts again. If you reduce the working time, then everything works well. How can this be fixed?
Important: Jobs is completed correctly and without exceptions!

Configuration

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.queue-name}")
private String queueName;
@Value("${spring.rabbitmq.exchange-name}")
private String exchangeName;
@Value("${spring.rabbitmq.routing-key}")
private String routingKey;
@Bean
public Queue queue() {
return new Queue(queueName);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(exchange())
.with(routingKey);
}
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter(jsonMapper());
}
@Bean
public JsonMapper jsonMapper() {
JsonMapper jsonMapper = new JsonMapper();
jsonMapper.registerModule(new JavaTimeModule());
jsonMapper.setDateFormat(new StdDateFormat())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return jsonMapper;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPort(port);
return connectionFactory;
}
@Bean
public AmqpTemplate rabbitTemp() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
}

@RabbitListner, for example if from = to.minusDays(6) working well

 @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD)
public void processQueue(FieldDto newField) {
PolygonDto polygonDto = mapper.map(newField);
PolygonDto save = polygonService.save(polygonDto);
LocalDateTime to = DateUtil.now();
LocalDateTime from = to.minusYears(6);
evalScriptTypeList.forEach(type -> {
try {
startJob(save, to, from, type);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
logger.error(e.getMessage(), e.getCause());
}
});
}
private void startJob(PolygonDto save, LocalDateTime to, LocalDateTime from, EvalScriptType type) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {
JobParameters jobParameters = new JobParametersBuilder()
.addString("t", to.toString())
.addString(JOB_PARAMETER_DATE_TO, to.toString())
.addString(JOB_PARAMETER_DATE_FROM, from.toString())
.addString(JOB_PARAMETER_NEW_POLYGON_ID, save.getId())
.addString(JOB_PARAMETER_EVAL_SCRIPT_TYPE, type.toString())
.addString(JOB_PARAMETER_VERSION_SCRIPT, type.getVersion())
.toJobParameters();
jobLauncher.run(uploadSatelliteImageJob, jobParameters);
}

I do not know mb will work manually acknowledgement, but mb is there a way to fix the automatic acknowledgement?

答案1

得分: 0

我使用手动提交

    @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD, ackMode = "MANUAL")
    public void processQueue(FieldDto newField, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {

            channel.basicAck(tag, false);

            //您的代码

        } catch (Exception e) {
            channel.basicReject(tag, true);
            logger.error(e.getMessage(), e.getCause());
        }
    }
英文:

I used manual commit

    @RabbitListener(queues = ApplicationConstants.NAME_QUEUE_FOR_NEW_FIELD, ackMode = "MANUAL")
    public void processQueue(FieldDto newField, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {

            channel.basicAck(tag, false);

            //your code

        } catch (Exception e) {
            channel.basicReject(tag, true);
            logger.error(e.getMessage(), e.getCause());
        }
    }

huangapple
  • 本文由 发表于 2023年3月31日 16:49:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/75896566.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定