英文:
How to send Messages after Spring Integration application restarted?
问题
我有一个小的Spring Integration应用程序,我在数据库中存储消息和消息组。目前,我遇到一种情况,即当某些消息/组在组超时后等待发送时,应用程序重新启动。当应用程序启动时,我仍然在DB中有消息,它们不会被发送。我需要一些配置来从DB发送过期的消息组或恢复定时器。我尝试使用reaper,但它没有按预期工作。我的代码如下:
@Configuration
public class ConsumingChannelConfig {
@Bean
public DirectChannel consumingChannel() {
return new DirectChannel();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setGenerateMessageId(true);
kafkaMessageDrivenChannelAdapter.setRecordMessageConverter(messageConverter);
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public DataSource getDataSource() {
return ...;
}
@Bean
public JdbcMessageStore jdbcMessageStore() {
return new JdbcMessageStore(getDataSource());
}
@ServiceActivator(inputChannel = "consumingChannel")
@Bean
public MessageHandler aggregator() {
long timeout = 10000L;
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageStore());
aggregator.setOutputChannel((message, l) -> {
System.out.println("MESSAGE: " + message);
return true;
});
aggregator.setGroupTimeoutExpression(new ValueExpression<>(timeout));
// aggregator.setTaskScheduler(this.taskScheduler);
aggregator.setCorrelationStrategy(new MyCorrelationStrategy());
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setExpireGroupsUponTimeout(true);
aggregator.setDiscardChannel((message, timeout1) -> {
System.out.println("DISCARD: " + message + ", timeout: " + timeout1);
return true;
});
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return System.currentTimeMillis() - group.getTimestamp() >= timeout;
}
});
return aggregator;
}
@Bean
public MessageGroupStoreReaper reaper() {
MessageGroupStoreReaper reaper = new MessageGroupStoreReaper(jdbcMessageStore());
reaper.setPhase(1);
reaper.setTimeout(2000L);
reaper.setAutoStartup(true);
// reaper.setExpireOnDestroy(true);
return reaper;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties("spring-integration-topic");
return new ConcurrentMessageListenerContainer<>(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-integration");
// automatically reset the offset to the earliest offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
return properties;
}
}
更新:我的解决方案
@EnableScheduling
@SpringBootApplication
public class SpringIntegrationExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationExampleApplication.class, args);
}
@Autowired
private MessageGroupStoreReaper reaper;
@Scheduled(initialDelay = 2000, fixedDelay = Long.MAX_VALUE)
public void start() {
reaper.run();
}
}
英文:
I have a small Spring Integration application, I'm storing messages and messaging groups in the database. Currently, I have a case when some messages/groups are waiting to be sent after group timeout, but the application restarted. And when the application started I still have messages in DB and they won't be sent. I need some configuration to send expired message group from DB or resume timer. I tried to use reaper, but it does not work as expected. My code is:
@Configuration
public class ConsumingChannelConfig {
@Bean
public DirectChannel consumingChannel() {
return new DirectChannel();
}
@Bean
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setGenerateMessageId(true);
kafkaMessageDrivenChannelAdapter.setRecordMessageConverter(messageConverter);
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public DataSource getDataSource() {
return ...;
}
@Bean
public JdbcMessageStore jdbcMessageStore() {
return new JdbcMessageStore(getDataSource());
}
@ServiceActivator(inputChannel = "consumingChannel")
@Bean
public MessageHandler aggregator() {
long timeout = 10000L;
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageStore());
aggregator.setOutputChannel((message, l) -> {
System.out.println("MESSAGE: " + message);
return true;
});
aggregator.setGroupTimeoutExpression(new ValueExpression<>(timeout));
// aggregator.setTaskScheduler(this.taskScheduler);
aggregator.setCorrelationStrategy(new MyCorrelationStrategy());
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setExpireGroupsUponTimeout(true);
aggregator.setDiscardChannel((message, timeout1) -> {
System.out.println("DISCARD: " + message + ", timeout: " + timeout1);
return true;
});
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return System.currentTimeMillis() - group.getTimestamp() >= timeout;
}
});
return aggregator;
}
@Bean
public MessageGroupStoreReaper reaper() {
MessageGroupStoreReaper reaper = new MessageGroupStoreReaper(jdbcMessageStore());
reaper.setPhase(1);
reaper.setTimeout(2000L);
reaper.setAutoStartup(true);
// reaper.setExpireOnDestroy(true);
return reaper;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties("spring-integration-topic");
return new ConcurrentMessageListenerContainer<>(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-integration");
// automatically reset the offset to the earliest offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
return properties;
}
}
UPD: My Solution
@EnableScheduling
@SpringBootApplication
public class SpringIntegrationExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationExampleApplication.class, args);
}
@Autowired
private MessageGroupStoreReaper reaper;
@Scheduled(initialDelay = 2000, fixedDelay = Long.MAX_VALUE)
public void start() {
reaper.run();
}
}
答案1
得分: 1
MessageGroupStoreReaper
不能单独工作,必须从一个 @Scheduled
方法中调用:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#reaper
但是从聚合器的角度来看,有一个很好的选项:
/**
- 使用提供的 {@link #expireTimeout} 执行 {@link MessageGroupStore#expireMessageGroups(long)}。
- 可以随时从外部调用。
- 在内部,它会在配置的 {@link #expireDuration} 任务中调用。
- @since 5.4
*/
public void purgeOrphanedGroups() {
只需设置 expireTimeout > 0
:
/**
- 为从存储中清除旧孤立的组配置超时时间(以毫秒为单位)。
- 在启动时使用,并在提供 {@link #expireDuration} 时,计划运行 {@link #purgeOrphanedGroups()} 任务的周期。
- {@link #forceReleaseProcessor} 用于根据“强制完成”选项处理那些已过期的组。
- 如果在重启后没有新消息到达,那么一个组可能会成为孤立的,如果使用了持久消息组存储。
- @param expireTimeout 用于确定要清除存储中的旧孤立组的毫秒数。
- @since 5.4
- @see #purgeOrphanedGroups()
*/
public void setExpireTimeout(long expireTimeout) {
也可以查看相关文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator-xml
从版本5.4开始,聚合器(和重新排序器)可以配置为清除孤立的组(存储在持久消息存储中的组,否则可能不会被释放)。
英文:
The MessageGroupStoreReaper
doesn't work by itself, it has to be called from a @Scheduled
method: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#reaper
However there is a nice option for you from an aggregator perspective:
/**
* Perform a {@link MessageGroupStore#expireMessageGroups(long)} with the provided {@link #expireTimeout}.
* Can be called externally at any time.
* Internally it is called from the scheduled task with the configured {@link #expireDuration}.
* @since 5.4
*/
public void purgeOrphanedGroups() {
You just need to set that expireTimeout > 0
:
/**
* Configure a timeout in milliseconds for purging old orphaned groups from the store.
* Used on startup and when an {@link #expireDuration} is provided, the task for running
* {@link #purgeOrphanedGroups()} is scheduled with that period.
* The {@link #forceReleaseProcessor} is used to process those expired groups according
* the "force complete" options. A group can be orphaned if a persistent message group
* store is used and no new messages arrive for that group after a restart.
* @param expireTimeout the number of milliseconds to determine old orphaned groups in the store to purge.
* @since 5.4
* @see #purgeOrphanedGroups()
*/
public void setExpireTimeout(long expireTimeout) {
See also docs on the matter: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator-xml
> Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (groups in a persistent message store that might not otherwise be released).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论