英文:
implement multithreading while calling method to update record through API in springboot
问题
我正在使用Spring Boot编写一个应用程序,从数据库中获取记录,然后调用外部REST API将记录更新到另一张表中。这段代码已经完成并且按预期工作。由于我还需要提高性能,因此我正在尝试在调用API时实现多线程,以便可以一次发送多条记录。
**结构:**
从表中获取记录并将其存储在列表中 ---> 遍历列表 ---> 多线程调用API
***ProvRecordProcessing.java***:此调用将从数据库获取记录并创建一个列表,然后调用ProvRecordService.java
***ProvRecordService.java***:此调用将处理所有API逻辑...
经过一些研究,我尝试实现以下内容以实现多线程:
1. 使ProvRecordService类实现Runnable接口并重写void run方法
2. 在调用方法的地方,调用executorService.execute(new ProvRecordService(record));
**ProvRecordProcessing.java:**
我从代码中删除了其他业务逻辑,只保留了调用API方法的部分...
@Component
public class ProvRecordProcessing {
.....从数据库获取记录的代码.....
List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
//为多线程添加
ExecutorService executorService = Executors.newFixedThreadPool(2);
//循环遍历列表记录并调用API处理记录
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
executorService.shutdown();
}
}
**ProvRecordService.java**
为了使其多线程化,我在以下代码中添加了一些部分,附有注释://为多线程添加
package com.emerald.paymentengineapi.service;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class ProvRecordService implements IFxiProviderService, Runnable {
//...(略去其他部分)...
@Override
public void run() { // 为多线程添加
updateProvider(record);
}
//...(略去其他部分)...
}
我调试了这段代码,它进入了void run方法并且record也被填充了,但在那之后,它没有进入updateProvider方法进行处理,我得到了以下错误:
Exception in thread "pool-2-thread-1" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
//...(略去其他部分)...
***更新:***
经过更多调试,我了解到问题发生在以下代码行:
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
我在这里尝试设置dataSource,在没有添加多线程代码之前,这是正常工作的。我无法理解原因。请给予建议。
英文:
I am writing an application in Springboot to fetch records from a database and then call an external rest api to update records into some other table. This code is completed and working as expected. As I need to improve performance as well. I am trying implement mulithreading while calling API, so that I can send multiple records at a time.
Structure :
Fetch records from a table and Store it in a list ---> Loop over list ---> multi threaded call to API
ProvRecordProcessing.java : This call will fetch records from database and create a list and call to ProvRecordService.java
ProvRecordService.java : This call will handle all API logic..
After some research, I tried to implement below to make it multithreaded :
- Make ProvRecordService class to implement Runnable and override void run method
- Instead of calling method, calling executorService.execute(new ProvRecordService(record));
ProvRecordProcessing.java :
I have removed other business logic from the code, only keep part where calling API method..
@Component
public class ProvRecordProcessing {
.....Code to fetch records from database....
List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
//added for multithreading
ExecutorService executorService = Executors.newFixedThreadPool(2);
//looping over list records and calling API to process records
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
executorService.shutdown();
}
}
ProvRecordService.java
Just to make it multithreaded, I have added few sections in the below code with comment : //added for multithreading
package com.emerald.paymentengineapi.service;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class ProvRecordService implements IFxiProviderService, Runnable {
@Autowired
RestSslException restSslTemplate;
@Autowired
DbConfig dbConfig;
@Autowired
UpdateProvider updateProvider; // added for multithreading
@Autowired
JdbcTemplate jdbcTemplate;
@Autowired
TokenService tokenService;
@Value("${SHIELD_API_URL}")
private String SHIELD_API_URL;
@Value("${token_expire_time}")
private String token_expire;
RestTemplate restTemplate;
DataSource dataSource;
UpdateProvider record; // added for multithreading
Logger logger = LoggerFactory.getLogger(ProvRecordService.class);
private static String FETCH_OPTIONS_SQL = "select OPTION_NAME, OPTION_VALUE from FSG.FSG_PRCB_PE_API_REQ_CONFIG";
public ProvRecordService(UpdateProvider record) { // added for multithreading
// TODO Auto-generated constructor stub
this.record = record;
}
@Override
public void run() { // added for multithreading
updateProvider(record);
}
@Scheduled(fixedRateString = "token_expire")
public ResponseEntity<String> runTokenScheduler() throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException {
logger.info("Fetching Token..." + token_expire);
ResponseEntity<String> response = tokenService.getOauth2Token();
return response;
}
@Override
public ResponseEntity<String> updateProvider(UpdateProvider updateProviderRequest) {
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
try {
restTemplate = restSslTemplate.restTemplate();
} catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ResponseEntity<String> response = null;
try {
if (null == TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN))
runTokenScheduler();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
System.out.println("value :" + TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE));
System.out.println("access_token :" + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.AUTHORIZATION, TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE) + " "
+ TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.CLIENT_CODE, ConfigConstants.CSP_PROVIDER_BATCH);
List<RequestOptions> customers = jdbcTemplate.query(FETCH_OPTIONS_SQL,new BeanPropertyRowMapper(RequestOptions.class));
updateProviderRequest.getXpfRequestData().setRequestOptions(customers);
HttpEntity<UpdateProvider> entity = new HttpEntity<UpdateProvider>(updateProviderRequest, headers);
response = restTemplate.exchange(SHIELD_API_URL, HttpMethod.PUT, entity, String.class);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId());
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NO_CONTENT",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.info("Provider has been updated successfully");
} else if (response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "INTERNAL_SERVER_ERROR",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Internal Server error occures");
} else if (response.getStatusCode() == HttpStatus.NOT_FOUND) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NOT_FOUND",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Provider not found");
}
} catch (TokenServiceException ex) {
logger.error("Exception occures in calling Token API");
updateStatusInDB(ex.getMessage(), ex.getLocalizedMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
} catch (HttpClientErrorException ex) {
logger.error("HttpClientErrorException occures in calling API");
updateStatusInDB(ex.getStatusText(), ex.getStatusText(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new HttpClientErrorException(ex.getStatusCode(), ex.getStatusText());
} catch (Exception ex) {
logger.error("Exception occures in calling API");
updateStatusInDB(ex.getMessage(), ex.getMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
}
return response;
}
private int updateStatusInDB(String errorCode, String errorMessage, String taxId, String providerId) {
return jdbcTemplate.update(
"update FSG_WRK.FSG_PRCB_PE_API_REQUEST set ERRORCODE = ?, ERRORMESSAGE = ? where TAXID = ? and PROVIDERID= ?",
errorCode, errorMessage, taxId, providerId);
}
}
I debug this code , and it's going void run method and record is also getting populated , but after that, it's not going into the updateProvider method for processing and I am getting below error :
Exception in thread "pool-2-thread-1" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-2" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-3" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-5" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Update :
After more debugging, I got to know, the issue is occurring on the below line :
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
I am trying to set dataSource here and this was working fine, when I haven't added code for multithreading. I am not able to get the reason. Please suggest.
答案1
得分: 0
代码这里有问题:
- 每次请求执行都创建新的ExecutorService是没有意义的。
更好的方法是只创建一次,并将其作为ProvRecordProcessing
组件的数据字段保留下来。创建线程是昂贵的 + 在你的方法中,你不知道可以同时创建多少个线程(如果这个方法被多个用户并行调用 - 如果每个用户都创建线程池,会非常昂贵)。
另外,如果你使用线程池执行器,最好在应用程序关闭时将其关闭,所以不要忘记在predestroy
或其他地方调用close。
- 不要使用
new
关键字创建一个服务,Spring将无法管理它,也不会“处理”它的任何注解(Autowired、Value等),所以这段代码是错误的:
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
而是应该将服务作为单例注入到ProvRecordProcessing
组件中,并在可运行/可调用的方法中调用其负责发送HTTP请求的方法。这是我所指的示例示意图:
@Component
class ProvRecordProcessing {
@Autowired
private ProvRecordService provRecordService;
....
for(UpdateProvider record : provRecords) {
executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
}
}
通过这种方法,ProvRecordService
将成为一个常规的由Spring管理的Bean。
还有更高级的解决方案,即使用@Async
方法,可以消除“手动”维护线程池的需求。参见这个教程作为示例...由于你的问题中没有展示这些内容,我假设这超出了你所询问的范围,所以只需要记住这也是存在的。当然,如果你正确实现了你的代码,它会表现得很好。
英文:
The code is wrong here:
- There is no point to create new ExecutorService per request execution.
a better approach would be creating it only once and keeping it as a data field of the ProvRecordProcessing
component. Creating threads is expensive + in your approach, you don't know how many threads can be created simultaneously (what if this method is called by many users in parallel - if each creates thread pool you it can be really expensive).
In addition to the above if you use the thread pool executor you should ideally close it when the application shuts down, so don't forget to call close on predestroy or something.
- Don't create a service with a
new
keyword, Spring won't be able to manage it and won't "process" any annotation of it (Autowired, Value, etc), so this code is wrong:
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}
Instead, Inject the service into ProvRecordProcessing
Component as a singleton and call its method responsible for sending http request from runnable / callable. Here is a schematic example of what I mean:
@Component
class ProvRecordProcessing {
@Autowired
private ProvRecordService provRecordService;
....
for(UpdateProvider record : provRecords) {
executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
}
}
With this approach, ProvRecordService
becomes a regular spring managed bean.
There are more advanced solutions for this, namely using @Async
methods that can eliminate the need to "manually" maintain the thread pool. See This tutorial for example... Since you haven't shown those in a question, I assume it's beyond the scope of what you're asking so just keep in mind that it also exists. Of course, if you implement your code right it will do just fine.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论