Spring Integration JPA入站适配器发送重复的邮件

huangapple go评论68阅读模式
英文:

Spring Integration JPA Inbound Adapter Sends Duplicate Mails

问题

我们有一个应用程序,将邮件/通知存储在数据库中,并向用户发送“提醒”邮件,告诉他们他们收到了新邮件/通知。

我们使用Spring集成和JPA入站适配器来轮询处于“PENDING”状态的JPA实体,然后发送邮件。然而,由于我们有多个运行实例,偶尔会向用户发送两封邮件,导致混淆。

我们尝试通过使用PESSIMISTIC_WRITE事务锁锁定实体来解决此问题,但偶尔仍会发生。

我对Spring集成还相对新,所以也许我错过了一些明显的东西,但以下是配置:

@Configuration
@EnableConfigurationProperties(SendMailProperties::class, MailProperties::class)
class SendMailConfiguration(
    private val entityManagerFactory: EntityManagerFactory,
    private val transactionManager: TransactionManager
) {

    @Bean
    fun sendMailFlow(
        mailProperties: MailProperties,
        sendMailProperties: SendMailProperties,
        mailSenderMessageHandler: MessageHandler,
        mimeMailTransformer: MimeMailTransformer
    ): IntegrationFlow =
        queryPendingMails()
            .transform(::updateAndLockEntity)
            .transform(mimeMailTransformer::convertToMimeMessage)
            .enrichHeaders(Mail.headers().to(sendMailProperties.recipient).from(mailProperties.username))
            .log()
            .handle(mailSenderMessageHandler)
            .get()

    private fun queryPendingMails() = IntegrationFlow.from(
        Jpa.inboundAdapter(entityManagerFactory)
            .entityClass(JpaSecuremail::class.java)
            .maxResults(1)
            .jpaQuery(
                """
                select s from JpaSecuremail s 
                where s.mail.status = '$PENDING' and s.mail.direction = '$OUTBOUND'
                """.trimIndent()
            )
            .expectSingleResult(true),
        mailPoller()
    )

    private fun mailPoller(): (SourcePollingChannelAdapterSpec) -> Unit =
        { spec: SourcePollingChannelAdapterSpec ->
            spec.poller { factory: PollerFactory ->
                factory
                    .fixedDelay(SECONDS_10_MILLIS)
                    .transactional(
                        TransactionInterceptorBuilder(true)
                            .transactionManager(transactionManager)
                            .build()
                    )
            }
        }

    private fun updateAndLockEntity(secureMail: JpaSecuremail) =
        with(getTransactionalEntityManager(entityManagerFactory) as EntityManager) {
            lock(secureMail, PESSIMISTIC_WRITE)

            secureMail.mail.apply {
                status = SENT
                messageId = UUID.randomUUID().toString()
                sentDate = now()
            }
            persist(secureMail)
            flush()
            secureMail
        }

    @Bean
    fun mailSenderMessageHandler(mailSender: MailSender) =
        CircuitBreakerMessageHandler(Mail.outboundAdapter(mailSender))

    companion object {
        private const val SECONDS_10_MILLIS = 1000L
    }
}

有人知道为什么会发生这种情况吗?事务是否在整个集成过程中得到了保证?锁定是否执行正确?或者是否有一些示例指针(找不到匹配的示例)?任何指针都会有所帮助,谢谢 Spring Integration JPA入站适配器发送重复的邮件

英文:

We have an application that stores mails/notifications in a database and sends a "reminder" email to the user, that they have received a new mail/notification.

We use Spring integration and JPA inbound adapter to poll the JPA entity for mails in the "PENDING" state and then send the mails. However, due to the fact that we have multiple instances running, occasionally two emails are sent to the user leading to confusion.

We tried to solve this by locking the entity with a PESSIMISTIC_WRITE transaction lock but it still occasionally happens.

I'm quite new to Spring integration so maybe I'm missing something obvious but here's the configuration:

@Configuration
@EnableConfigurationProperties(SendMailProperties::class, MailProperties::class)
class SendMailConfiguration(
    private val entityManagerFactory: EntityManagerFactory,
    private val transactionManager: TransactionManager
) {

    @Bean
    fun sendMailFlow(
        mailProperties: MailProperties,
        sendMailProperties: SendMailProperties,
        mailSenderMessageHandler: MessageHandler,
        mimeMailTransformer: MimeMailTransformer
    ): IntegrationFlow =
        queryPendingMails()
            .transform(::updateAndLockEntity)
            .transform(mimeMailTransformer::convertToMimeMessage)
            .enrichHeaders(Mail.headers().to(sendMailProperties.recipient).from(mailProperties.username))
            .log()
            .handle(mailSenderMessageHandler)
            .get()

    private fun queryPendingMails() = IntegrationFlow.from(
        Jpa.inboundAdapter(entityManagerFactory)
            .entityClass(JpaSecuremail::class.java)
            .maxResults(1)
            .jpaQuery(
                """
                select s from JpaSecuremail s 
                where s.mail.status = '$PENDING' and s.mail.direction = '$OUTBOUND'
                """.trimIndent()
            )
            .expectSingleResult(true),
        mailPoller()
    )

    private fun mailPoller(): (SourcePollingChannelAdapterSpec) -> Unit =
        { spec: SourcePollingChannelAdapterSpec ->
            spec.poller { factory: PollerFactory ->
                factory
                    .fixedDelay(SECONDS_10_MILLIS)
                    .transactional(
                        TransactionInterceptorBuilder(true)
                            .transactionManager(transactionManager)
                            .build()
                    )
            }
        }

    private fun updateAndLockEntity(secureMail: JpaSecuremail) =
        with(getTransactionalEntityManager(entityManagerFactory) as EntityManager) {
            lock(secureMail, PESSIMISTIC_WRITE)

            secureMail.mail.apply {
                status = SENT
                messageId = UUID.randomUUID().toString()
                sentDate = now()
            }
            persist(secureMail)
            flush()
            secureMail
        }

    @Bean
    fun mailSenderMessageHandler(mailSender: MailSender) =
        CircuitBreakerMessageHandler(Mail.outboundAdapter(mailSender))

    companion object {
        private const val SECONDS_10_MILLIS = 1000L
    }
}

Does anyone have an idea, why this happens? Is the transaction guaranteed over the integration process? Is the lock done correctly? Or maybe some pointers to an example (didn't find a matching one)? Any pointers help, thanks Spring Integration JPA入站适配器发送重复的邮件

答案1

得分: 1

尝试在可以指定FOR UPDATE的地方使用nativeQuery()。您的PESSIMISTIC_WRITE假设是正确的,但是在Jpa.inboundAdapter()已经完成其工作后再做这个操作有点晚了。

我们可能可以改进Spring Integration JPA模块,以接受像这样的用例中的PESSIMISTIC_WRITE作为一个选项。

英文:

Try to use nativeQuery() instead where you can specify FOR UPDATE. Your PESSIMISTIC_WRITE assumption is correct, but you do that a bit late already when Jpa.inboundAdapter() has done its job.

We probably can improve Spring Integration JPA module to accept that PESSIMISTIC_WRITE as an option for use-cases like this.

huangapple
  • 本文由 发表于 2023年7月12日 20:38:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76670644.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定