在Spring Boot中,在初始进程完成后异步运行另一个进程。

huangapple go评论54阅读模式
英文:

Run a process asynchronously after completion of initial process in Spring boot

问题

我有一个需求。我有两个进程:

  1. 联系人创建
  2. 将联系人关联到部门

目前我有一个Spring Boot API,其中有一个REST POST调用来在一个线程中执行上述两个进程。由于进程2需要更多的时间,我希望在完成步骤1后立即在后台中运行进程2。

我想要在完成步骤1后立即将响应返回给用户,并在后台执行步骤2。我该如何实现呢?

提前感谢。

英文:

I have a requirement. I have 2 processes

  1. Contact creation and
  2. Associating contact to the Department

Currently I have a spring boot API which has a REST POST call to perform both in one thread. Since process 2 is taking more time I wanted to run that in the
background immediately after finishing the step 1.

@PostMapping(value = "/processDeptContact", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<PayloadResponse> processDeptContact(@RequestBody String payload) {

    ResponseEntity response = new ResponseEntity(new ErrorResponse("Exception"),
            new HttpHeaders(), HttpStatus.INTERNAL_SERVER_ERROR);
    try {
        response = myService.processPayload(payload);

    } catch (Exception e) {
        logger.error("Exception in the controller");
    }
    return response;
}

I want to return the response to the user as soon as step 1 is done and performing step 2 at the background. How do I achieve that

Thanks in advance

答案1

得分: 1

在您的主类或@Configuration类中,使用@EnableAsync来启动一个线程池:

@EnableAsync
@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
}

您可以选择在spring.task.execution.pool属性下设置线程池属性。例如:

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size: 16

这里有一个栈帖子详细介绍了每个属性的含义:Stack Overflow链接

在您的控制器内部:

@RestController
public class TestController {

    private final ContactService contactService;
    private final DepartmentService departmentService;

    // 构造函数注入
    public TestController(ContactService contactService, DepartmentService departmentService) {
        this.contactService = contactService;
        this.departmentService = departmentService;
    }
    
    @PostMapping(value = "/processDeptContact", produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<PayloadResponse> processDeptContact(@RequestBody String payload) {
        
        List<Contact> contacts = contactService.processPayload(payload);
        departmentService.associateContacts(contacts);   // 这是一个异步调用
        return ResponseEntity.ok(new PayloadResponse(contacts));
    }
}

我已经从控制器方法中移除了try/catch,因为错误处理是横切关注点,会由AOP处理。了解更多信息,请查看这里:Baeldung链接

最后,在您的DepartmentService中,您可以使用@Async注解将其转换为异步方法:

@Service
public class DepartmentService {

    @Async
    public void associateContacts(List<Contact> contacts) {
        // 方法内容
    }
}

我看到其他回答基本上都在说相同的事情,而且是正确的,但不够完整,所以我觉得有必要为您将所有内容整合在一起。

英文:

In your main class, or a @Configuration class, use @EnableAsync to bootstrap a thread pool:

@EnableAsync
@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
}

You can optionally set Thread Pool properties under spring.task.execution.pool property. Example:

spring:
  task:
    execution:
      pool:
        core-size: 8
        max-size 16

Here's a stack post detailing what each property means: https://stackoverflow.com/questions/17659510/core-pool-size-vs-maximum-pool-size-in-threadpoolexecutor

Inside your controller:

@RestController
public class TestController {

    private final ContactService contactService;
    private final DepartmentService departmentService;

    // Constructor Injection
    public TestController(ContactService contactService, DepartmentService departmentService) {
        this.contactService = contactService;
        this.departmentService = departmentService;
    }

    @PostMapping(value = &quot;/processDeptContact&quot;, produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity&lt;PayloadResponse&gt; processDeptContact(@RequestBody String payload) {
    
        List&lt;Contact&gt; contacts = contactService.processPayload(payload);
        departmentService.associateContacts(contacts);   // This is an asynchronous call
        return ResponseEntity.ok(new PayloadResponse(contacts));
    }

}

I've removed the try/catch from the controller method since error handling is a cross cutting concern and is handled by AOP. More on that here: Baeldung

And finally in your DepartmentService, you use the @Async annotation to turn it into an asynchronous method:

@Service
public class DepartmentService {

    @Async
    public void associateContacts(List&lt;Contact&gt; contacts) {
        // method
    }
}

I see other answers are basically saying the same thing and are correct, but not complete so I felt the need to put everything together for you.

答案2

得分: 0

Spring框架提供了开箱即用的异步处理支持。Spring可以通过提供对各种TaskExecutor抽象的支持来为我们创建和管理线程。

我们可以在一个新类中创建一个方法来执行第二个处理(将联系人关联到部门),并使用@Async注解对该方法进行注解。该注解确保Spring根据返回类型将此方法执行为Runnable/Future。

示例实现(我们需要在任何配置类中添加@EnableAsync

@Component
class ContactManager {
   
   @Async
   public void associateContactToDepartment(){
       //方法实现在这里
   }
}

class MyService {
   
    @Autowired
    private ContactManager contactManager;
    
    public PayloadResponse processPayload(String payload){
        PayloadResponse payloadResponse = createContact(); //第一个处理
        contactManager.associateContactToDepartment(); //此调用将以异步方式执行。
        return payloadResponse;
    }
}

请参阅此链接以获取有关异步方法的快速介绍。

英文:

Spring framework provides support for asynchronous processing out of the box. Spring can create & manage threads for us by providing support for various TaskExecutor abstraction.

We can create a method in a new class that will do the second process (associate contact to the Department) and annotate that method with @Aysnc. The annotation ensures the spring executes this method as a Runnable/Future depending on return type.

Sample Implementation (We have to add @EnableAsync in any of our configuration class)

@Component
class ContactManager {
   
   @Async
   public void associateContactToDepartment(){
       //method implementation goes here
   }
}

class MyService {
   
    @Autowired
    private ContactManager contactManager;
    
    public PayloadResponse processPayload(String payload){
        payloadResponse payloadResponse = createContact();//first process
        contactManager.associateContactToDepartment() // this call will be executed asynchronously.
        return payloadResponse;
    }
}

Refer this for quick intro to async methods.

答案3

得分: 0

以下是翻译好的内容:

跟随以下步骤:

1)在主 Spring Boot 应用程序类中添加 @EnableAsync 注解并添加 TaskExecutor Bean:

@SpringBootApplication
@EnableAsync
public class AsynchronousSpringBootApplication {
     
    private static final Logger logger = LoggerFactory.getLogger(SpringBootApplication.class);
     
    @Bean(name="processExecutor")
    public TaskExecutor workExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("Async-");
        threadPoolTaskExecutor.setCorePoolSize(3);
        threadPoolTaskExecutor.setMaxPoolSize(3);
        threadPoolTaskExecutor.setQueueCapacity(600);
        threadPoolTaskExecutor.afterPropertiesSet();
        logger.info("ThreadPoolTaskExecutor set");
        return threadPoolTaskExecutor;
    }
     
    public static void main(String[] args) throws Exception {
        SpringApplication.run(SpringBootApplication.class, args);
    }
}
  1. 添加以下联系部门的方法:
@Service
public class DepartmentProcess {
     
    private static final Logger logger = LoggerFactory.getLogger(ProcessServiceImpl.class);
     
    @Async("processExecutor")
    @Override
    public void processDepartment() {
        logger.info("Received request to process in DepartmentProcess.processDepartment()");
        try {
            Thread.sleep(15 * 1000);
            logger.info("Processing complete");
        }
        catch (InterruptedException ie) {
            logger.error("Error in ProcessServiceImpl.process(): {}", ie.getMessage());
        }
    }
}
  1. 从控制器调用方法如下:
@PostMapping(value = "/processDeptContact", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<PayloadResponse> processDeptContact(@RequestBody String payload) {

    ResponseEntity response = new ResponseEntity(new ErrorResponse("Exception"),
            new HttpHeaders(), HttpStatus.INTERNAL_SERVER_ERROR);
    try {
        response = myService.processPayload(payload);
        myService.processDepartment(); // 异步方法
    } catch (Exception e) {
        logger.error("Exception in the controller");
    }
    return response;
}
英文:

Follow the below steps:

  1. Add @EnableAsync annotation and Add TaskExecutor Bean to main spring boot application class

     @SpringBootApplication
     @EnableAsync
     public class AsynchronousSpringBootApplication {
    
         private static final Logger logger = LoggerFactory.getLogger(SpringBootApplication.class);
    
         @Bean(name=&quot;processExecutor&quot;)
         public TaskExecutor workExecutor() {
             ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
             threadPoolTaskExecutor.setThreadNamePrefix(&quot;Async-&quot;);
             threadPoolTaskExecutor.setCorePoolSize(3);
             threadPoolTaskExecutor.setMaxPoolSize(3);
             threadPoolTaskExecutor.setQueueCapacity(600);
             threadPoolTaskExecutor.afterPropertiesSet();
             logger.info(&quot;ThreadPoolTaskExecutor set&quot;);
             return threadPoolTaskExecutor;
         }
    
         public static void main(String[] args) throws Exception {
       SpringApplication.run(SpringBootApplication.class,args);
      }
    
  1. Add the contact to department method as below:

         @Service
         public class DepartmentProcess {
    
             private static final Logger logger = LoggerFactory.getLogger(ProcessServiceImpl.class);
    
             @Async(&quot;processExecutor&quot;)
             @Override
             public void processDepartment() {
                 logger.info(&quot;Received request to process in DepartmentProcess.processDepartment()&quot;);
                 try {
                     Thread.sleep(15 * 1000);
                     logger.info(&quot;Processing complete&quot;);
                 }
                 catch (InterruptedException ie) {
                     logger.error(&quot;Error in ProcessServiceImpl.process(): {}&quot;, ie.getMessage());
                 }
             }
         }
    
  2. Call the method from the controller as below:

     @PostMapping(value = &quot;/processDeptContact&quot;, produces = MediaType.APPLICATION_JSON_VALUE)
     public ResponseEntity&lt;PayloadResponse&gt; processDeptContact(@RequestBody String payload) {
    
    ResponseEntity response = new ResponseEntity(new ErrorResponse(&quot;Exception&quot;),
             new HttpHeaders(), HttpStatus.INTERNAL_SERVER_ERROR);
     try {
         response = myService.processPayload(payload);
          myService.processDepartment();//async method
     } catch (Exception e) {
         logger.error(&quot;Exception in the controller&quot;);
     }
     return response;
    

    }

答案4

得分: -1

点1和点2不在这里,但无所谓,让我们称它们为 foo1()foo2()

myService.processPayload() 中,您想要执行以下操作:

    ResponseEntity result = foo1();
    Runnable runnable = () -> {
      foo2();
    };
    Thread thread = new Thread(runnable);
    thread.start(); // foo2 中的逻辑将在后台线程中执行,因此不会在此行上阻塞,考虑改用线程池
    return result;

顺便说一句,这听起来像是过早的优化,您应该考虑并行线程中的竞争条件,但这不是问题的要求。

还有一件事情,将这部分代码移到 catch 语句块中会更好,因为如果 try 成功,这些实例化将是一种浪费,而这种情况应该是大多数情况下会发生的。

ResponseEntity response = new ResponseEntity(new ErrorResponse("Exception"),
            new HttpHeaders(), HttpStatus.INTERNAL_SERVER_ERROR);
英文:

Points 1 and 2 are not here but it doesn't matter, let's call them foo1() and foo2().

In myService.processPayload() you want to do:

    ResponseEntity result = foo1();
    Runnable runnable = () -&gt; {
      foo2()
    };
    Thread thread = new Thread(runnable);
    thread.start(); // the logic in foo2 will happen in a background thread so it will not block on this line, consider using a thread pool instead
    return result;

BTW, this sounds like premature optimization and you should think about race conditions with parallel threads but this is not what the question was asking.

One more thing, move this to the catch because it's a waste of instantiations if the try will succeed, which should happen most of the time.

ResponseEntity response = new ResponseEntity(new ErrorResponse(&quot;Exception&quot;),
            new HttpHeaders(), HttpStatus.INTERNAL_SERVER_ERROR);

huangapple
  • 本文由 发表于 2020年9月24日 04:39:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/64035886.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定