如何在Spring集成应用程序重新启动后发送消息?

huangapple go评论107阅读模式
英文:

How to send Messages after Spring Integration application restarted?

问题

我有一个小的Spring Integration应用程序,我在数据库中存储消息和消息组。目前,我遇到一种情况,即当某些消息/组在组超时后等待发送时,应用程序重新启动。当应用程序启动时,我仍然在DB中有消息,它们不会被发送。我需要一些配置来从DB发送过期的消息组或恢复定时器。我尝试使用reaper,但它没有按预期工作。我的代码如下:

@Configuration
public class ConsumingChannelConfig {

    @Bean
    public DirectChannel consumingChannel() {
        return new DirectChannel();
    }

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
                new KafkaMessageDrivenChannelAdapter<>(kafkaListenerContainer());
        kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
        MessagingMessageConverter messageConverter = new MessagingMessageConverter();
        messageConverter.setGenerateMessageId(true);
        kafkaMessageDrivenChannelAdapter.setRecordMessageConverter(messageConverter);
        return kafkaMessageDrivenChannelAdapter;
    }

    @Bean
    public DataSource getDataSource() {
        return ...;
    }

    @Bean
    public JdbcMessageStore jdbcMessageStore() {
        return new JdbcMessageStore(getDataSource());
    }

    @ServiceActivator(inputChannel = "consumingChannel")
    @Bean
    public MessageHandler aggregator() {
        long timeout = 10000L;
        AggregatingMessageHandler aggregator =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        jdbcMessageStore());
        aggregator.setOutputChannel((message, l) -> {
            System.out.println("MESSAGE: " + message);
            return true;
        });
        aggregator.setGroupTimeoutExpression(new ValueExpression<>(timeout));
//        aggregator.setTaskScheduler(this.taskScheduler);
        aggregator.setCorrelationStrategy(new MyCorrelationStrategy());
        aggregator.setSendPartialResultOnExpiry(true);
        aggregator.setExpireGroupsUponCompletion(true);
        aggregator.setExpireGroupsUponTimeout(true);
        aggregator.setDiscardChannel((message, timeout1) -> {
            System.out.println("DISCARD: " + message + ", timeout: " + timeout1);
            return true;
        });
        aggregator.setReleaseStrategy(new ReleaseStrategy() {
            @Override
            public boolean canRelease(MessageGroup group) {
                return System.currentTimeMillis() - group.getTimestamp() >= timeout;
            }
        });
        return aggregator;
    }

    @Bean
    public MessageGroupStoreReaper reaper() {
        MessageGroupStoreReaper reaper = new MessageGroupStoreReaper(jdbcMessageStore());
        reaper.setPhase(1);
        reaper.setTimeout(2000L);
        reaper.setAutoStartup(true);
//        reaper.setExpireOnDestroy(true);
        return reaper;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> kafkaListenerContainer() {
        ContainerProperties containerProps = new ContainerProperties("spring-integration-topic");

        return new ConcurrentMessageListenerContainer<>(
                consumerFactory(), containerProps);
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-integration");
        // automatically reset the offset to the earliest offset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//        DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
        return properties;
    }
}

更新:我的解决方案

@EnableScheduling
@SpringBootApplication
public class SpringIntegrationExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationExampleApplication.class, args);
    }

    @Autowired
    private MessageGroupStoreReaper reaper;

    @Scheduled(initialDelay = 2000, fixedDelay = Long.MAX_VALUE)
    public void start() {
        reaper.run();
    }

}
英文:

I have a small Spring Integration application, I'm storing messages and messaging groups in the database. Currently, I have a case when some messages/groups are waiting to be sent after group timeout, but the application restarted. And when the application started I still have messages in DB and they won't be sent. I need some configuration to send expired message group from DB or resume timer. I tried to use reaper, but it does not work as expected. My code is:

@Configuration
public class ConsumingChannelConfig {
@Bean
public DirectChannel consumingChannel() {
return new DirectChannel();
}
@Bean
public KafkaMessageDrivenChannelAdapter&lt;String, String&gt; kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter&lt;String, String&gt; kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter&lt;&gt;(kafkaListenerContainer());
kafkaMessageDrivenChannelAdapter.setOutputChannel(consumingChannel());
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
messageConverter.setGenerateMessageId(true);
kafkaMessageDrivenChannelAdapter.setRecordMessageConverter(messageConverter);
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public DataSource getDataSource() {
return ...;
}
@Bean
public JdbcMessageStore jdbcMessageStore() {
return new JdbcMessageStore(getDataSource());
}
@ServiceActivator(inputChannel = &quot;consumingChannel&quot;)
@Bean
public MessageHandler aggregator() {
long timeout = 10000L;
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageStore());
aggregator.setOutputChannel((message, l) -&gt; {
System.out.println(&quot;MESSAGE: &quot; + message);
return true;
});
aggregator.setGroupTimeoutExpression(new ValueExpression&lt;&gt;(timeout));
//        aggregator.setTaskScheduler(this.taskScheduler);
aggregator.setCorrelationStrategy(new MyCorrelationStrategy());
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setExpireGroupsUponTimeout(true);
aggregator.setDiscardChannel((message, timeout1) -&gt; {
System.out.println(&quot;DISCARD: &quot; + message + &quot;, timeout: &quot; + timeout1);
return true;
});
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return System.currentTimeMillis() - group.getTimestamp() &gt;= timeout;
}
});
return aggregator;
}
@Bean
public MessageGroupStoreReaper reaper() {
MessageGroupStoreReaper reaper = new MessageGroupStoreReaper(jdbcMessageStore());
reaper.setPhase(1);
reaper.setTimeout(2000L);
reaper.setAutoStartup(true);
//        reaper.setExpireOnDestroy(true);
return reaper;
}
@Bean
public ConcurrentMessageListenerContainer&lt;String, String&gt; kafkaListenerContainer() {
ContainerProperties containerProps = new ContainerProperties(&quot;spring-integration-topic&quot;);
return new ConcurrentMessageListenerContainer&lt;&gt;(
consumerFactory(), containerProps);
}
@Bean
public ConsumerFactory&lt;String, String&gt; consumerFactory() {
return new DefaultKafkaConsumerFactory&lt;&gt;(consumerConfigs());
}
@Bean
public Map&lt;String, Object&gt; consumerConfigs() {
Map&lt;String, Object&gt; properties = new HashMap&lt;&gt;();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;127.0.0.1:9092&quot;);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;spring-integration&quot;);
// automatically reset the offset to the earliest offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
//        DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
return properties;
}
}

UPD: My Solution

@EnableScheduling
@SpringBootApplication
public class SpringIntegrationExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationExampleApplication.class, args);
}
@Autowired
private MessageGroupStoreReaper reaper;
@Scheduled(initialDelay = 2000, fixedDelay = Long.MAX_VALUE)
public void start() {
reaper.run();
}
}

答案1

得分: 1

MessageGroupStoreReaper 不能单独工作,必须从一个 @Scheduled 方法中调用:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#reaper

但是从聚合器的角度来看,有一个很好的选项:

/**

  • 使用提供的 {@link #expireTimeout} 执行 {@link MessageGroupStore#expireMessageGroups(long)}。
  • 可以随时从外部调用。
  • 在内部,它会在配置的 {@link #expireDuration} 任务中调用。
  • @since 5.4
    */
    public void purgeOrphanedGroups() {

只需设置 expireTimeout &gt; 0

/**

  • 为从存储中清除旧孤立的组配置超时时间(以毫秒为单位)。
  • 在启动时使用,并在提供 {@link #expireDuration} 时,计划运行 {@link #purgeOrphanedGroups()} 任务的周期。
  • {@link #forceReleaseProcessor} 用于根据“强制完成”选项处理那些已过期的组。
  • 如果在重启后没有新消息到达,那么一个组可能会成为孤立的,如果使用了持久消息组存储。
  • @param expireTimeout 用于确定要清除存储中的旧孤立组的毫秒数。
  • @since 5.4
  • @see #purgeOrphanedGroups()
    */
    public void setExpireTimeout(long expireTimeout) {

也可以查看相关文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator-xml

从版本5.4开始,聚合器(和重新排序器)可以配置为清除孤立的组(存储在持久消息存储中的组,否则可能不会被释放)。

英文:

The MessageGroupStoreReaper doesn't work by itself, it has to be called from a @Scheduled method: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#reaper

However there is a nice option for you from an aggregator perspective:

/**
* Perform a {@link MessageGroupStore#expireMessageGroups(long)} with the provided {@link #expireTimeout}.
* Can be called externally at any time.
* Internally it is called from the scheduled task with the configured {@link #expireDuration}.
* @since 5.4
*/
public void purgeOrphanedGroups() {

You just need to set that expireTimeout &gt; 0:

/**
* Configure a timeout in milliseconds for purging old orphaned groups from the store.
* Used on startup and when an {@link #expireDuration} is provided, the task for running
* {@link #purgeOrphanedGroups()} is scheduled with that period.
* The {@link #forceReleaseProcessor} is used to process those expired groups according
* the &quot;force complete&quot; options. A group can be orphaned if a persistent message group
* store is used and no new messages arrive for that group after a restart.
* @param expireTimeout the number of milliseconds to determine old orphaned groups in the store to purge.
* @since 5.4
* @see #purgeOrphanedGroups()
*/
public void setExpireTimeout(long expireTimeout) {

See also docs on the matter: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator-xml

> Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (groups in a persistent message store that might not otherwise be released).

huangapple
  • 本文由 发表于 2023年6月2日 04:05:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76385338.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定