英文:
Spring Integration JPA Inbound Adapter Sends Duplicate Mails
问题
我们有一个应用程序,将邮件/通知存储在数据库中,并向用户发送“提醒”邮件,告诉他们他们收到了新邮件/通知。
我们使用Spring集成和JPA入站适配器来轮询处于“PENDING”状态的JPA实体,然后发送邮件。然而,由于我们有多个运行实例,偶尔会向用户发送两封邮件,导致混淆。
我们尝试通过使用PESSIMISTIC_WRITE
事务锁锁定实体来解决此问题,但偶尔仍会发生。
我对Spring集成还相对新,所以也许我错过了一些明显的东西,但以下是配置:
@Configuration
@EnableConfigurationProperties(SendMailProperties::class, MailProperties::class)
class SendMailConfiguration(
private val entityManagerFactory: EntityManagerFactory,
private val transactionManager: TransactionManager
) {
@Bean
fun sendMailFlow(
mailProperties: MailProperties,
sendMailProperties: SendMailProperties,
mailSenderMessageHandler: MessageHandler,
mimeMailTransformer: MimeMailTransformer
): IntegrationFlow =
queryPendingMails()
.transform(::updateAndLockEntity)
.transform(mimeMailTransformer::convertToMimeMessage)
.enrichHeaders(Mail.headers().to(sendMailProperties.recipient).from(mailProperties.username))
.log()
.handle(mailSenderMessageHandler)
.get()
private fun queryPendingMails() = IntegrationFlow.from(
Jpa.inboundAdapter(entityManagerFactory)
.entityClass(JpaSecuremail::class.java)
.maxResults(1)
.jpaQuery(
"""
select s from JpaSecuremail s
where s.mail.status = '$PENDING' and s.mail.direction = '$OUTBOUND'
""".trimIndent()
)
.expectSingleResult(true),
mailPoller()
)
private fun mailPoller(): (SourcePollingChannelAdapterSpec) -> Unit =
{ spec: SourcePollingChannelAdapterSpec ->
spec.poller { factory: PollerFactory ->
factory
.fixedDelay(SECONDS_10_MILLIS)
.transactional(
TransactionInterceptorBuilder(true)
.transactionManager(transactionManager)
.build()
)
}
}
private fun updateAndLockEntity(secureMail: JpaSecuremail) =
with(getTransactionalEntityManager(entityManagerFactory) as EntityManager) {
lock(secureMail, PESSIMISTIC_WRITE)
secureMail.mail.apply {
status = SENT
messageId = UUID.randomUUID().toString()
sentDate = now()
}
persist(secureMail)
flush()
secureMail
}
@Bean
fun mailSenderMessageHandler(mailSender: MailSender) =
CircuitBreakerMessageHandler(Mail.outboundAdapter(mailSender))
companion object {
private const val SECONDS_10_MILLIS = 1000L
}
}
有人知道为什么会发生这种情况吗?事务是否在整个集成过程中得到了保证?锁定是否执行正确?或者是否有一些示例指针(找不到匹配的示例)?任何指针都会有所帮助,谢谢
英文:
We have an application that stores mails/notifications in a database and sends a "reminder" email to the user, that they have received a new mail/notification.
We use Spring integration and JPA inbound adapter to poll the JPA entity for mails in the "PENDING" state and then send the mails. However, due to the fact that we have multiple instances running, occasionally two emails are sent to the user leading to confusion.
We tried to solve this by locking the entity with a PESSIMISTIC_WRITE
transaction lock but it still occasionally happens.
I'm quite new to Spring integration so maybe I'm missing something obvious but here's the configuration:
@Configuration
@EnableConfigurationProperties(SendMailProperties::class, MailProperties::class)
class SendMailConfiguration(
private val entityManagerFactory: EntityManagerFactory,
private val transactionManager: TransactionManager
) {
@Bean
fun sendMailFlow(
mailProperties: MailProperties,
sendMailProperties: SendMailProperties,
mailSenderMessageHandler: MessageHandler,
mimeMailTransformer: MimeMailTransformer
): IntegrationFlow =
queryPendingMails()
.transform(::updateAndLockEntity)
.transform(mimeMailTransformer::convertToMimeMessage)
.enrichHeaders(Mail.headers().to(sendMailProperties.recipient).from(mailProperties.username))
.log()
.handle(mailSenderMessageHandler)
.get()
private fun queryPendingMails() = IntegrationFlow.from(
Jpa.inboundAdapter(entityManagerFactory)
.entityClass(JpaSecuremail::class.java)
.maxResults(1)
.jpaQuery(
"""
select s from JpaSecuremail s
where s.mail.status = '$PENDING' and s.mail.direction = '$OUTBOUND'
""".trimIndent()
)
.expectSingleResult(true),
mailPoller()
)
private fun mailPoller(): (SourcePollingChannelAdapterSpec) -> Unit =
{ spec: SourcePollingChannelAdapterSpec ->
spec.poller { factory: PollerFactory ->
factory
.fixedDelay(SECONDS_10_MILLIS)
.transactional(
TransactionInterceptorBuilder(true)
.transactionManager(transactionManager)
.build()
)
}
}
private fun updateAndLockEntity(secureMail: JpaSecuremail) =
with(getTransactionalEntityManager(entityManagerFactory) as EntityManager) {
lock(secureMail, PESSIMISTIC_WRITE)
secureMail.mail.apply {
status = SENT
messageId = UUID.randomUUID().toString()
sentDate = now()
}
persist(secureMail)
flush()
secureMail
}
@Bean
fun mailSenderMessageHandler(mailSender: MailSender) =
CircuitBreakerMessageHandler(Mail.outboundAdapter(mailSender))
companion object {
private const val SECONDS_10_MILLIS = 1000L
}
}
Does anyone have an idea, why this happens? Is the transaction guaranteed over the integration process? Is the lock done correctly? Or maybe some pointers to an example (didn't find a matching one)? Any pointers help, thanks
答案1
得分: 1
尝试在可以指定FOR UPDATE
的地方使用nativeQuery()
。您的PESSIMISTIC_WRITE
假设是正确的,但是在Jpa.inboundAdapter()
已经完成其工作后再做这个操作有点晚了。
我们可能可以改进Spring Integration JPA模块,以接受像这样的用例中的PESSIMISTIC_WRITE
作为一个选项。
英文:
Try to use nativeQuery()
instead where you can specify FOR UPDATE
. Your PESSIMISTIC_WRITE
assumption is correct, but you do that a bit late already when Jpa.inboundAdapter()
has done its job.
We probably can improve Spring Integration JPA module to accept that PESSIMISTIC_WRITE
as an option for use-cases like this.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论