Spring Webflux中的实体管理器问题

huangapple go评论66阅读模式
英文:

Problem with entity manager in Spring Webflux

问题

在我的Web客户端中,我开始使用Spring WebFlux时,出现了以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call

代码:

@Service
@Transactional
@Slf4j
public class PersonSyncServiceImpl
        extends HotelApiCommunicationService implements PersonSyncService {

    private static final String REST_ENDPOINT_PATH = "/api/sync/person";

    @PersistenceContext
    private EntityManager em;
    @Autowired
    private SynchronizationService syncMgr;

    @Override
    public int initialSyncPersonData(ObiektDTO obiekt) {

        WebClient client = WebClient.create("http://" + obiekt.getAdresIp());
        AtomicInteger count = new AtomicInteger(0);

        Flux<Person> personFlux = client.get()
                .uri(REST_ENDPOINT_PATH)
                .retrieve()
                .bodyToFlux(Person.class);

        personFlux.subscribe(person -> {
            CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
            cnPerson.setObiektLoid(obiekt.getLoid());
            em.persist(cnPerson);
            count.getAndIncrement();
        });

        return count.get();
    }
}

我知道问题出现在这一行,因为Reactor无法获取实体管理器。

em.persist(cnPerson);

服务器上的控制器方法:

@GetMapping(value = "/person", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Person> getPersonList() {
    return Flux.fromStream(personDao.findAll().stream());
}

如何修复它?如何使用Reactor访问事务,并使用JPA将记录保存到数据库中?

英文:

In my web client, where I started using Spring WebFlux, I got following exception:

reactor.core.Exceptions$ErrorCallbackNotImplemented: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process &#39;persist&#39; call
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process &#39;persist&#39; call

Code:

@Service
@Transactional
@Slf4j
public class PersonSyncServiceImpl
		extends HotelApiCommunicationService implements PersonSyncService {

	private static final String REST_ENDPOINT_PATH = &quot;/api/sync/person&quot;;

	@PersistenceContext
	private EntityManager em;
	@Autowired
	private SynchronizationService syncMgr;


	@Override
	public int initialSyncPersonData(ObiektDTO obiekt) {

		WebClient client = WebClient.create(&quot;http://&quot; + obiekt.getAdresIp());
		AtomicInteger count = new AtomicInteger(0);

		Flux&lt;Person&gt; personFlux = client.get()
				.uri(REST_ENDPOINT_PATH)
				.retrieve()
				.bodyToFlux(Person.class);

		personFlux.subscribe(person -&gt; {
			CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
			cnPerson.setObiektLoid(obiekt.getLoid());
			em.persist(cnPerson);
			count.getAndIncrement();
		});

		return count.get();
	}
}

I know that the problem is on this line, because reactor can't get entity manager.

em.persist(cnPerson);

Controller method on server:

	@GetMapping(value = &quot;/person&quot;, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux&lt;Person&gt; getPersonList() {

		return Flux.fromStream(
				personDao.findAll().stream());
	}

How to fix it? How to access transaction with reactor and using JPA save a record to database?

答案1

得分: 2

首先,Spring Data R2DBC不与JPA/Hibernate和阻塞式JDBC驱动程序兼容。因此,您需要删除Webflux或JPA之一。

  1. 使用JPA

如果您决定继续使用JPA,那么您可以使用Spring RestTemplate或WebClient的阻塞调用来调用Person Sync API,如下所示:

   List<Person> persons = client.get()
                .uri(REST_ENDPOINT_PATH)
                .retrieve()
                .bodyToFlux(Person.class)
                .collectList()
                .block();

然后像您所做的那样进行转换:

List<CNPerson> cnPersons = new ArrayList();
persons.forEach(person -> {
  CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
  cnPerson.setObiektLoid(obiekt.getLoid());
  cnPersons.add(cnPerson);
});

然后,使用org.springframework.data.jpa.repository.JpaRepository.saveAll()将它们一次性保存起来【为此,您需要为CNPerson实体创建一个Repository】。

或者在事务中手动管理并保存每个CNPerson

最后,回答您的问题是否有其他方法在客户端上获取流? - 答案是没有,如果您想在一个事务中执行此操作。因为您必须将Person Sync API的整个响应加载到内存中。

  1. 没有使用JPA(使用Spring Webflux和R2DBC)

您仍然可以在Spring Data R2DBC中进行事务处理。您可以在这里了解更多信息。请注意,使用R2DBC,您必须使用与您正在使用的数据库类型相对应的非阻塞驱动程序。可以在这里找到列表。

英文:

First of all, Spring Data R2DBC doesn't work with JPA/Hibernate and blocking JDBC Driver. So either, you need to get rid of webflux or JPA.

  1. With JPA

If suppose, you decide to proceed with JPA, then you can call the Person Sync API using Spring RestTemplate or WebClient Blocking call like this:

   List&lt;Person&gt; persons = client.get()
                .uri(REST_ENDPOINT_PATH)
                .retrieve()
                .bodyToFlux(Person.class)
                .collectList()
                .block();

Transform them like you are doing;

List&lt;CNPerson&gt; cnPersons = new ArrayList();
persons.forEach(person -&gt; {
  CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
  cnPerson.setObiektLoid(obiekt.getLoid());
  cnPersons.add(cnPerson);
});

And then save them all in one shot, using your org.springframework.data.jpa.repository.JpaRepository.saveAll() [For this, you need to create a Repository for the CNPerson Entity.]

Or manually manage the transaction and save each CNPerson within it.

Lastly, to answer your question Is there other method to get a stream on client side? - Answer is NO if you want to do it in one transaction. Coz you have to load the entire response of the Person Sync API in memory.

  1. Without JPA (Using Spring Webflux and R2DBC)

You can still have transactional with Spring Data R2DBC. You can read more about it here. Note that with R2DBC, you have to use the Non-blocking drivers for whatever type of database you are using. A list is given here.

答案2

得分: 0

如果您在响应式上下文中使用JPA,并且不需要在事务内执行所有持久性操作,您可以尝试为每个操作手动创建新事务。

1 - 注入PlatformTransactionManager

@Autowired
private PlatformTransactionManager transactionManager;

2 - 创建新的TransactionTemplate(可以在构造函数中完成)

this.transactionTemplate = new TransactionTemplate(transactionManager);

3 - 在调用transactionTemplate.execute时执行持久性调用:

personFlux.subscribe(person -&gt; 
  transactionTemplate.execute(transactionStatus -&gt; {
            CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
            cnPerson.setObiektLoid(obiekt.getLoid());
            em.persist(cnPerson);
            count.getAndIncrement();
});

请注意,这将为每次调用em.persist创建一个新事务。

英文:

If you use JPA in a reactive context and you don't need to execute all your persistance operations inside a transaction, you can try to manually create a new transaction for each operation.

1 - Inject a PlatformTransactionManager

@Autowired
private PlatformTransactionManager transactionManager;

2 - Create a new TransactionTemplate (this could be done in constructor)

this.transactionTemplate = new TransactionTemplate(transactionManager);

3 - Execute your persistent calls inside a call to transactionTemplate.execute :

personFlux.subscribe(person -&gt; 
  transactionTemplate.execute(transactionStatus -&gt; {
            CNPerson cnPerson = syncMgr.convertObject(person, CNPerson.class);
            cnPerson.setObiektLoid(obiekt.getLoid());
            em.persist(cnPerson);
            count.getAndIncrement();
});

Note that this will create a new transaction for each call to em.persist.

huangapple
  • 本文由 发表于 2020年7月21日 22:27:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/63016759.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定