如何获取Spring Boot @Async方法被拒绝的任务的详细信息?

huangapple go评论80阅读模式
英文:

How to get details of Spring Boot @Async method's rejected task?

问题

在我的应用程序中,我正在使用一个@Async方法,该方法调用一个REST服务,并根据REST服务的结果更新数据库中的MyJob状态。

@Async("thatOneTaskExecutor")
public void myAsyncTask(MyJob job) {
    // 从作业获取作业详情并调用REST服务
    // 使用来自REST服务的结果更新作业,并将更新后的MyJob保存到数据库
}

我正在使用Spring的ThreadPoolTaskExecutor,以下是我在AsyncConfiguration类中声明此任务执行程序的摘要。

private ThreadPoolTaskExecutor createExecutor(String name, int core, int max, int queue) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(core);
    executor.setMaxPoolSize(max);
    executor.setQueueCapacity(queue);
    executor.setThreadNamePrefix(name);
    executor.setTaskDecorator(new MdcAwareTaskDecorator());
    executor.initialize();
    return executor;
}

@Bean(name = "thatOneTaskExecutor")
public Executor taskExecutor() {
    String prefix = "thatOneTask-";
    int corePoolSize = 12;
    int maxPoolSize = 20;
    int queueSize = 1000;

    ThreadPoolTaskExecutor executor = createExecutor(prefix, corePoolSize, maxPoolSize, queueSize);
    executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
    return executor;
}

正如您所见,我已经为我的执行程序配置了一个RejectedExecutionHandler
根据Spring文档,在队列满时,将调用此方法。

  • 当{@link ThreadPoolExecutor#execute execute}无法接受任务时,可能会被{@link ThreadPoolExecutor}调用的方法。
  • 当没有更多的线程或队列插槽可用时,可能会发生这种情况,因为它们的界限将被超越,或者在执行程序关闭时。
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        log.error("由于最大队列大小,任务被拒绝");
        // 如何获取关于被任务执行器拒绝的特定作业的信息??
    } 
}

对于我来说,被拒绝的执行处理程序运行良好,现在在rejectedExecution方法内部,我想要访问MyJob(我的异步方法的参数),用于被拒绝的异步任务。我想要更新特定被拒绝的作业的状态,以便稍后运行一个定时任务并处理这些被拒绝的作业。在rejectedExecution方法内部,我只有RunnableThreadPoolExecutor,如何在此处提取/获取有关MyJob的信息?

我应用程序的Spring Boot版本是2.2.2.RELEASE

英文:

In my application I'm using an @Async method which is calling a rest service and based on the rest service result I'm updating the MyJob status in DB.

@Async("thatOneTaskExecutor")
public void myAsyncTask(MyJob job) {
    // get job details from the job and call rest service
    // update the job with the result from rest service and save updated MyJob to DB
}

I'm using Spring's ThreadPoolTaskExucutor, Below is a snap from my AsyncConfiguration class where I declared this task executor.

private ThreadPoolTaskExecutor createExecutor(String name, int core, int max, int queue) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(core);
        executor.setMaxPoolSize(max);
        executor.setQueueCapacity(queue);
        executor.setThreadNamePrefix(name);
        executor.setTaskDecorator(new MdcAwareTaskDecorator());
        executor.initialize();
        return executor;
    }

@Bean(name = "thatOneTaskExecutor")
public Executor taskExecutor() {

    String prefix = "thatOneTask-";
    String corePoolSize = 12;
    String maxPoolSize = 20;
    String queueSize = 1000;

    ThreadPoolTaskExecutor executor = createExecutor(prefix, corePoolSize, maxPoolSize, queueSize);
    executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
    return executor;
}

As you can see I had configured a RejectedExecutionHandler for my Executor.
According to Spring documentation when queue is full this method will be called.

> * Method that may be invoked by a {@link ThreadPoolExecutor} when
> * {@link ThreadPoolExecutor#execute execute} cannot accept a
> * task. This may occur when no more threads or queue slots are
> * available because their bounds would be exceeded, or upon
> * shutdown of the Executor.

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        log.error("Task Rejected because of max queue size");
        // How to get info about that particular job, for which Task executor rejected this task??
    } 
}

Rejected execution handler is working fine for me, now inside this rejectedExecutorion method, I want to access the MyJob(parameter of my async method) for which the async task is rejected. I want to update that particular rejected job with a status so that I can later run a corn and process those rejected jobs. Inside this rejectedExecution method I only have Runnable and ThreadPoolExucutor, how can I extract/get info about MyJob here?

My application's Spring boot version is 2.2.2.RELEASE

答案1

得分: 2

你可以考虑直接使用TaskExecutor,而不是通过为MyJob类实现Runnable接口并在run()方法内执行所需的异步操作来使用@Async注解。

在处理程序的rejectedExecution方法中,Runnable r可以转换回MyJob对象,因此您可以从中检索作业信息。

public class Myjob implements Runnable{
    .......
    @Override
    public void run(){
        //从作业中获取作业详情并调用 REST 服务
        //使用来自 REST 服务的结果更新作业,并将更新后的 MyJob 保存到数据库
    }
}

@Autowired
TaskExecutor taskExecutor;

public void myAsyncTask(MyJob job) {
    taskExecutor.execute(job);
}

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        log.error("因为最大队列大小而被拒绝的任务");
        if(r.getClass()==MyJob.class)
        {
            MyJob failedJob=(MyJob)r; //作业信息
        }
    } 
}
英文:

You could consider using the TaskExecutor directly instead of the @Async annotation by implementing the Runnable interface for MyJob Class and perform the required async operation inside the run() method.

The Runnable r could be cast back to MyJob Object in the rejectedExecution method of the handler and hence you could retrieve information of your job from there.

 public class Myjob implements Runnable{
   .......
   @Override
   public void run(){
         //get job details from the job and call rest service
         //update the job with the result from rest service and save updated MyJob to DB
      }
 }

@Autowired
TaskExecutor taskExecutor;
 
public void myAsyncTask(MyJob job) {
   taskExecutor.execute(job)
}

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        log.error("Task Rejected because of max queue size");
        if(r.getClass()==MyJob.class)
         {
            MyJob failedJob=(MyJob)r; //Job info
         }

    } 
 }

huangapple
  • 本文由 发表于 2020年7月23日 20:59:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/63054885.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定