实现在 Spring Boot 中通过 API 调用方法更新记录时的多线程。

huangapple go评论111阅读模式
英文:

implement multithreading while calling method to update record through API in springboot

问题

我正在使用Spring Boot编写一个应用程序从数据库中获取记录然后调用外部REST API将记录更新到另一张表中这段代码已经完成并且按预期工作由于我还需要提高性能因此我正在尝试在调用API时实现多线程以便可以一次发送多条记录

**结构**

从表中获取记录并将其存储在列表中 ---> 遍历列表 ---> 多线程调用API

***ProvRecordProcessing.java***此调用将从数据库获取记录并创建一个列表然后调用ProvRecordService.java
***ProvRecordService.java***此调用将处理所有API逻辑...

经过一些研究我尝试实现以下内容以实现多线程

 1. 使ProvRecordService类实现Runnable接口并重写void run方法
 2. 在调用方法的地方调用executorService.execute(new ProvRecordService(record));

  
**ProvRecordProcessing.java**

我从代码中删除了其他业务逻辑只保留了调用API方法的部分...

      @Component 
      public class ProvRecordProcessing {
      
      .....从数据库获取记录的代码.....
    	  
    	  List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
    	  
    	  //为多线程添加
    	  ExecutorService executorService = Executors.newFixedThreadPool(2);
     
    	 //循环遍历列表记录并调用API处理记录
    	 for(UpdateProvider record : provRecords) {
    		 	
    		 	executorService.execute(new ProvRecordService(record));
    	 }  
    	 executorService.shutdown();  		
      	}
    }

**ProvRecordService.java**

为了使其多线程化我在以下代码中添加了一些部分附有注释//为多线程添加

    package com.emerald.paymentengineapi.service;
    
    import java.security.KeyManagementException;
    import java.security.KeyStoreException;
    import java.security.NoSuchAlgorithmException;
    import java.util.Arrays;
    import java.util.List;
    
    import javax.sql.DataSource;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    @Service
    public class ProvRecordService implements IFxiProviderService, Runnable {
    
        //...(略去其他部分)...
    	
    	@Override
    	public void run() { // 为多线程添加
    		updateProvider(record);
    	}
    
        //...(略去其他部分)...
    
    }

我调试了这段代码它进入了void run方法并且record也被填充了但在那之后它没有进入updateProvider方法进行处理我得到了以下错误

     Exception in thread "pool-2-thread-1" java.lang.NullPointerException
    	at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
    	at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
        //...(略去其他部分)...

***更新***

经过更多调试我了解到问题发生在以下代码行

    dataSource = dbConfig.dataSource();
    JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

我在这里尝试设置dataSource在没有添加多线程代码之前这是正常工作的我无法理解原因请给予建议
英文:

I am writing an application in Springboot to fetch records from a database and then call an external rest api to update records into some other table. This code is completed and working as expected. As I need to improve performance as well. I am trying implement mulithreading while calling API, so that I can send multiple records at a time.

Structure :

Fetch records from a table and Store it in a list ---> Loop over list ---> multi threaded call to API

ProvRecordProcessing.java : This call will fetch records from database and create a list and call to ProvRecordService.java
ProvRecordService.java : This call will handle all API logic..

After some research, I tried to implement below to make it multithreaded :

  1. Make ProvRecordService class to implement Runnable and override void run method
  2. Instead of calling method, calling executorService.execute(new ProvRecordService(record));

ProvRecordProcessing.java :

I have removed other business logic from the code, only keep part where calling API method..

  @Component 
public class ProvRecordProcessing {
.....Code to fetch records from database....
List<UpdateProvider> provRecords = jdbcTemplate.query(sqlApiSelectQuery, new ProvRecordMapper());
//added for multithreading
ExecutorService executorService = Executors.newFixedThreadPool(2);
//looping over list records and calling API to process records
for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}  
executorService.shutdown();  		
}
}

ProvRecordService.java

Just to make it multithreaded, I have added few sections in the below code with comment : //added for multithreading

package com.emerald.paymentengineapi.service;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service
public class ProvRecordService implements IFxiProviderService, Runnable {
@Autowired
RestSslException restSslTemplate;
@Autowired
DbConfig dbConfig;
@Autowired
UpdateProvider updateProvider; // added for multithreading
@Autowired
JdbcTemplate jdbcTemplate;
@Autowired
TokenService tokenService;
@Value("${SHIELD_API_URL}")
private String SHIELD_API_URL;
@Value("${token_expire_time}")
private String token_expire;
RestTemplate restTemplate;
DataSource dataSource;
UpdateProvider record; // added for multithreading
Logger logger = LoggerFactory.getLogger(ProvRecordService.class);
private static String FETCH_OPTIONS_SQL = "select OPTION_NAME, OPTION_VALUE from FSG.FSG_PRCB_PE_API_REQ_CONFIG";
public ProvRecordService(UpdateProvider record) { // added for multithreading
// TODO Auto-generated constructor stub
this.record = record;
}
@Override
public void run() { // added for multithreading
updateProvider(record);
}
@Scheduled(fixedRateString = "token_expire")
public ResponseEntity<String> runTokenScheduler() throws KeyManagementException, KeyStoreException, NoSuchAlgorithmException {
logger.info("Fetching Token..." + token_expire);
ResponseEntity<String> response = tokenService.getOauth2Token();
return response;
}
@Override
public ResponseEntity<String> updateProvider(UpdateProvider updateProviderRequest) {
dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
try {
restTemplate = restSslTemplate.restTemplate();
} catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ResponseEntity<String> response = null;
try {
if (null == TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN))
runTokenScheduler();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
System.out.println("value :" + TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE));
System.out.println("access_token :" + TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.AUTHORIZATION, TokenService.TOKEN_VALUE.get(ConfigConstants.TOKEN_TYPE) + " "
+ TokenService.TOKEN_VALUE.get(ConfigConstants.ACCESS_TOKEN));
headers.add(ConfigConstants.CLIENT_CODE, ConfigConstants.CSP_PROVIDER_BATCH);
List<RequestOptions> customers = jdbcTemplate.query(FETCH_OPTIONS_SQL,new BeanPropertyRowMapper(RequestOptions.class));
updateProviderRequest.getXpfRequestData().setRequestOptions(customers);
HttpEntity<UpdateProvider> entity = new HttpEntity<UpdateProvider>(updateProviderRequest, headers);
response = restTemplate.exchange(SHIELD_API_URL, HttpMethod.PUT, entity, String.class);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId());
logger.info(updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NO_CONTENT",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.info("Provider has been updated successfully");
} else if (response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "INTERNAL_SERVER_ERROR",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Internal Server error occures");
} else if (response.getStatusCode() == HttpStatus.NOT_FOUND) {
updateStatusInDB(String.valueOf(response.getStatusCodeValue()), "NOT_FOUND",
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
logger.error("Provider not found");
}
} catch (TokenServiceException ex) {
logger.error("Exception occures in calling Token API");
updateStatusInDB(ex.getMessage(), ex.getLocalizedMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
} catch (HttpClientErrorException ex) {
logger.error("HttpClientErrorException occures in calling API");
updateStatusInDB(ex.getStatusText(), ex.getStatusText(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new HttpClientErrorException(ex.getStatusCode(), ex.getStatusText());
} catch (Exception ex) {
logger.error("Exception occures in calling API");
updateStatusInDB(ex.getMessage(), ex.getMessage(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getTaxId(),
updateProviderRequest.getXpfRequestData().getGroupRecord().getProviderData().getProviderId());
//throw new RuntimeException("Exception occures in API " + ex);
}
return response;
}
private int updateStatusInDB(String errorCode, String errorMessage, String taxId, String providerId) {
return jdbcTemplate.update(
"update FSG_WRK.FSG_PRCB_PE_API_REQUEST set ERRORCODE = ?, ERRORMESSAGE = ? where TAXID = ? and PROVIDERID= ?",
errorCode, errorMessage, taxId, providerId);
}
}

I debug this code , and it's going void run method and record is also getting populated , but after that, it's not going into the updateProvider method for processing and I am getting below error :

 Exception in thread "pool-2-thread-1" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-2" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-3" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Exception in thread "pool-2-thread-5" java.lang.NullPointerException
at com.emerald.paymentengineapi.service.ProvRecordService.updateProvider(ProvRecordService.java:92)
at com.emerald.paymentengineapi.service.ProvRecordService.run(ProvRecordService.java:78)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

Update :

After more debugging, I got to know, the issue is occurring on the below line :

dataSource = dbConfig.dataSource();
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

I am trying to set dataSource here and this was working fine, when I haven't added code for multithreading. I am not able to get the reason. Please suggest.

答案1

得分: 0

代码这里有问题:

  1. 每次请求执行都创建新的ExecutorService是没有意义的。

更好的方法是只创建一次,并将其作为ProvRecordProcessing组件的数据字段保留下来。创建线程是昂贵的 + 在你的方法中,你不知道可以同时创建多少个线程(如果这个方法被多个用户并行调用 - 如果每个用户都创建线程池,会非常昂贵)。

另外,如果你使用线程池执行器,最好在应用程序关闭时将其关闭,所以不要忘记在predestroy或其他地方调用close。

  1. 不要使用new关键字创建一个服务,Spring将无法管理它,也不会“处理”它的任何注解(Autowired、Value等),所以这段代码是错误的:
     for(UpdateProvider record : provRecords) {
            
            executorService.execute(new ProvRecordService(record));
     }  

而是应该将服务作为单例注入到ProvRecordProcessing组件中,并在可运行/可调用的方法中调用其负责发送HTTP请求的方法。这是我所指的示例示意图:

@Component
class ProvRecordProcessing {

   @Autowired
   private ProvRecordService provRecordService;

   ....

    for(UpdateProvider record : provRecords) {
            
            executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
     }
} 

通过这种方法,ProvRecordService将成为一个常规的由Spring管理的Bean。

还有更高级的解决方案,即使用@Async方法,可以消除“手动”维护线程池的需求。参见这个教程作为示例...由于你的问题中没有展示这些内容,我假设这超出了你所询问的范围,所以只需要记住这也是存在的。当然,如果你正确实现了你的代码,它会表现得很好。

英文:

The code is wrong here:

  1. There is no point to create new ExecutorService per request execution.

a better approach would be creating it only once and keeping it as a data field of the ProvRecordProcessing component. Creating threads is expensive + in your approach, you don't know how many threads can be created simultaneously (what if this method is called by many users in parallel - if each creates thread pool you it can be really expensive).

In addition to the above if you use the thread pool executor you should ideally close it when the application shuts down, so don't forget to call close on predestroy or something.

  1. Don't create a service with a new keyword, Spring won't be able to manage it and won't "process" any annotation of it (Autowired, Value, etc), so this code is wrong:
     for(UpdateProvider record : provRecords) {
executorService.execute(new ProvRecordService(record));
}  

Instead, Inject the service into ProvRecordProcessing Component as a singleton and call its method responsible for sending http request from runnable / callable. Here is a schematic example of what I mean:

@Component
class ProvRecordProcessing {
@Autowired
private ProvRecordService provRecordService;
....
for(UpdateProvider record : provRecords) {
executorService.execute(() -> provRecordService.updateHttpOrWhatever(record));
}
} 

With this approach, ProvRecordService becomes a regular spring managed bean.

There are more advanced solutions for this, namely using @Async methods that can eliminate the need to "manually" maintain the thread pool. See This tutorial for example... Since you haven't shown those in a question, I assume it's beyond the scope of what you're asking so just keep in mind that it also exists. Of course, if you implement your code right it will do just fine.

huangapple
  • 本文由 发表于 2020年9月11日 17:58:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/63844833.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定