使用Quarkus执行后台作业,并不等待结果。

huangapple go评论52阅读模式
英文:

Execute background job with quarkus and don't wait for result

问题

我遇到的问题是,我有一个Quarkus REST Web服务,它以阻塞/顺序方式执行一些任务,然后我想以非阻塞方式执行一个运行时间较长的任务,但我找不到一个可行的解决方案。

因此,Web服务看起来像这样(简化了一些内容):

import io.vertx.mutiny.core.eventbus.EventBus;

@Path("/booking")
@ApplicationScoped
public class BookingResource {

    @Inject
    EventBus eventBus;

    @POST
    @Path("/{bookingId}/print-tickets/")
    @Produces(MediaType.APPLICATION_JSON)
    public PdfTicket printTickets(@PathParam("bookingId") String bookingId) throws Exception {

        var optBooking = daoBooking.getBookingDetailsById(bookingId);
        //....
        var eventOpt = daoEvent.getEventById(booking.getEventId());
        //...
        PdfTicket pdfTicket = myconverter(optBooking, eventOpt); //一些我在这里删除的神奇代码

        //这是应该在后台运行的代码,REST调用不应该等待结果
        if (booking.hasFixedSeatingTickets()) {
            // 标记座位已打印,以防固定座位被重新分配给其他客人
            eventBus.requestAndForget("greeting", booking.getBookingId());
        }
        return pdfTicket;
    }
}

我尝试过简单的Java线程,尝试了Mutiny,最后尝试了Quarkus事件总线方法,你可以在上面的代码中看到。在这里,我向“greeting”发送了一条消息。我像这样消费事件:

import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class TestResource {
    private static final Logger LOG = Logger.getLogger(TestResource.class);

    @Inject
    private DaoBooking daoBooking;

    @Inject
    ManagedExecutor executor;

    @ConsumeEvent("greeting")
    public void markSeatsAsPrinted(String bookingId) {
        LOG.info("Received markSeatsAsPrinted event");
        Uni.createFrom().voidItem().invoke(() -> {
            LOG.info("Start long running mark tickets as printed job");
            try {
                daoBooking.markSeatsAsPrinted(bookingId);
            } catch (FileMakerException e) {
                e.printStackTrace();
            }
            LOG.info("End long running mark tickets as printed job");

        }).emitOn(executor);

    }
}

根据 https://quarkus.io/guides/reactive-event-bus,当方法返回void时,调用服务不应等待结果。

现在,当我调用我的REST服务时,我可以在日志中看到“Received markSeatsAsPrinted event”,但从未看到“Start long running mark tickets as printed job”。因此,Uni块中的代码根本没有执行。

当我将代码更改为返回Uni<Boolean>时,如下所示:

@ConsumeEvent("greeting")
public Uni<Boolean> markSeatsAsPrinted(String bookingId) {
    LOG.info("Received markSeatsAsPrinted event");
    return Uni.createFrom().item(() -> {
        LOG.info("Start long running mark tickets as printed job");
        try {
            daoBooking.markSeatsAsPrinted(bookingId);
        } catch (FileMakerException e) {
            e.printStackTrace();
        }
        LOG.info("End long running mark tickets as printed job");
        return true;

    }).emitOn(executor);
}

然后日志文件会显示“Start long running mark tickets as printed job”,但现在我的主REST服务也会阻塞,直到事件完全处理(这需要10秒,所以我必须在后台处理)。

问题出在哪里?
此外,当我没有使用Quarkus事件总线而是在一个简单的方法中使用Uni/Executor的内容时,也出现了相同的行为。同样,使用Java线程也不起作用,因为Quarkus会等待线程完成后才响应REST响应。

最终,我只是想在不等待REST调用的情况下在后台运行需要很长时间的任务。

请帮忙:-)

谢谢,
schube

英文:

My problem is, that I have a quarkus REST webservice which executes some tasks in a blocking/sequential way and then I want to execute a longer running task none blocking, but I found no running solution.

So the webservice looks like that (simplyfied!:

import io.vertx.mutiny.core.eventbus.EventBus;

@Path(&quot;/booking&quot;)
@ApplicationScoped
public class BookingResource {

    @Inject
	EventBus eventBus;

	@POST
	@Path(&quot;/{bookingId}/print-tickets/&quot;)
	@Produces(MediaType.APPLICATION_JSON)
	public PdfTicket printTickets(@PathParam(&quot;bookingId&quot;) String bookingId) throws Exception {

		var optBooking = daoBooking.getBookingDetailsById(bookingId);
		//....
		var eventOpt = daoEvent.getEventById(booking.getEventId());
		//...
		PdfTicket pdfTicket = myconverter(optBooking, eventOpt); //Some magic code which I removed here
	
		//this is the code which should run in background and the REST call should NOT wait vor the result
		if (booking.hasFixedSeatingTickets()) {
			// Mark seats as printed, so the fixed seats are not re-assigned to other guests
			eventBus.requestAndForget(&quot;greeting&quot;, booking.getBookingId());
		}
		return pdfTicket;
	}
}

I tried simple java threads, I tried Mutinity and lastly I tried the quarkus eventbus approach which you can see in the code above.
So here I am sending a message to "greeting". I consume the event like that:

import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class TestResource {
	private static final Logger LOG = Logger.getLogger(TestResource.class);

	@Inject
	private DaoBooking daoBooking;

	@Inject
	ManagedExecutor executor;

	@ConsumeEvent(&quot;greeting&quot;)
	public void markSeatsAsPrinted(String bookingId) {
		LOG.info(&quot;Received markSeatsAsPrinted event&quot;);
		Uni.createFrom().voidItem().invoke(() -&gt; {
			LOG.info(&quot;Start long running mark tickets as printed job&quot;);
			try {
				daoBooking.markSeatsAsPrinted(bookingId);
			} catch (FileMakerException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			LOG.info(&quot;End long running mark tickets as printed job&quot;);

		}).emitOn(executor);

	}

}

According to https://quarkus.io/guides/reactive-event-bus when the method returns void the calling service should NOT wait for the result.

Now, when I call my REST service, I can see in the log "Received markSeatsAsPrinted event" but never "Start long running mark tickets as printed job". So the code in the Uni block is just not executed.

When I change my code to return a Uni&lt;Boolean&gt; like that:

@ConsumeEvent(&quot;greeting&quot;)
public Uni&lt;Boolean&gt; markSeatsAsPrinted(String bookingId) {
	LOG.info(&quot;Received markSeatsAsPrinted event&quot;);;
	return Uni.createFrom().item(() -&gt; {
		LOG.info(&quot;Start long running mark tickets as printed job&quot;);
		try {
			daoBooking.markSeatsAsPrinted(bookingId);
		} catch (FileMakerException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		LOG.info(&quot;End long running mark tickets as printed job&quot;);
		return true;

	}).emitOn(executor);

}

then the logfile does display "Start long running mark tickets as printed job" but now, also my main REST service blocks until the event is fully processed (which takes 10 seconds, so I have to do it in the background).

What is wrong here?
Also, I had the same behaviour, when I did not use quarkus event bus but the Uni/Executor stuff in a simple method. Also, using Java Threads did not work because quarkus waited until the thread was finished with the REST response.

At the end of the day I just want to have that task which takes so much time running in background without waiting for the answer in the REST call.

Please help 使用Quarkus执行后台作业,并不等待结果。

Thank you,
schube

答案1

得分: 1

以下是翻译好的部分:

总是使用订阅

Mutiny作为一个响应式和事件驱动的库,使用延迟评估的事件和订阅来运行异步代码。这意味着,没有订阅的任何操作都不会运行。

永远不要阻塞事件循环

正如您可以看到的,前面的操作是在“事件循环”线程池上执行的。作为黄金法则,您不应该阻塞事件循环。当您需要运行一些阻塞操作(例如数据库修改)或其他长时间运行的任务时,它必须传递到工作线程。在方法上使用 @Blocking 注解,Quarkus 将在工作线程上运行它。

建议:使用自定义工作器而不是事件总线

根据代码我理解的业务需求,我认为事件总线不是完美的解决方案。也许我错了,但我更喜欢为PDF生成创建一个单独的工作器。

这是一个基础工作。

自定义工作器类:

@Singleton
@Startup
public class CustomWorker {

    private static final Logger LOG = Logger.getLogger(CustomWorker.class);

    private final WorkerExecutor executor;

    CustomWorker(Vertx vertx) {
        executor = vertx.createSharedWorkerExecutor("pdf-printer");
    }

    void tearDown(@Observes ShutdownEvent ev) {
        executor.close();
    }

    public void markSeatsAsPrinted(String bookingId) {

        LOG.info("Received markSeatsAsPrinted event");
        executor.executeBlocking(promise -> {
            LOG.info("Start long running mark tickets as printed job");
            try {
                // 阻塞线程5秒钟
                Thread.sleep(5000L);
                LOG.info("marked as printed");
            } catch (Exception e) {
                e.printStackTrace();
            }
            LOG.info("End long running mark tickets as printed job");
            promise.complete();
        });
    }
}

REST资源类:

@Path("/booking")
public class ExampleResource {

    private static final Logger LOG = Logger.getLogger(ExampleResource.class);

    @Inject
    CustomWorker worker;

    @GET
    @Path("/{bookingId}/print-tickets/")
    @Produces(MediaType.APPLICATION_JSON)
    public PdfTicket printTickets(@PathParam("bookingId") String bookingId) {

        LOG.info("Got request");

        // 我没有实现太多(例如DAO逻辑)
        var booking = new Booking(bookingId);
        //....
        PdfTicket pdfTicket = new PdfTicket(bookingId);

        //这是应该在后台运行的代码,REST调用不应该等待结果
        if (booking.hasFixedSeatingTickets()) {
            worker.markSeatsAsPrinted(booking.getBookingId());
        }

        LOG.info("Return response");
        return pdfTicket;
    }
}

希望这些翻译对您有帮助。

英文:

There are several mistakes in the code.

Always use subscription

Mutiny as a reative and event driven library is using lazy evaluated events and subscriptions to run asynchronous code. That means, any operation without subscription will never run.

Uni.createFrom().voidItem().invoke(() -&gt; {
    LOG.info(&quot;Start long running mark tickets as printed job&quot;);
    try {
        LOG.info(&quot;marked as printed&quot;);
    } catch (Exception e) {
        e.printStackTrace();
    }
    LOG.info(&quot;End long running mark tickets as printed job&quot;);

}).emitOn(executor).subscribe().with(
        item -&gt; LOG.info(&quot;Finished&quot;),
        failure -&gt; LOG.error(&quot;Failed with &quot; + failure)
);

NOTE: other methods are available e.q. onItem(), onFailure() and so on.

The output is:

2023-06-01 15:11:58,545 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) Received markSeatsAsPrinted event
2023-06-01 15:11:58,548 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) Start long running mark tickets as printed job
2023-06-01 15:11:58,549 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) marked as printed
2023-06-01 15:11:58,549 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) End long running mark tickets as printed job
2023-06-01 15:11:58,551 INFO  [com.exa.TestRunner] (executor-thread-1) Finished

Never block the event loop

As you can see the previous operation was executed on event-loop thread pool. As a golden rule you should never block the event loop. When you have to run some blocking operations (e.g database modifications) or other long running tasks it has to be propagated to worker threads. Using @Blocking annotation on the method Quarkus will run that on worker thread.

@ConsumeEvent(&quot;greeting&quot;)
@Blocking
public void markSeatsAsPrinted(String bookingId) {
    LOG.info(&quot;Received markSeatsAsPrinted event&quot;);
    Uni.createFrom().voidItem().invoke(() -&gt; {
                LOG.info(&quot;Start long running mark tickets as printed job&quot;);
                try {
                    printer.doSome(bookingId);
                    LOG.info(&quot;marked as printed&quot;);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                LOG.info(&quot;End long running mark tickets as printed job&quot;);

            }).emitOn(executor).
            subscribe().with(
                    item -&gt; LOG.info(&quot;Finished&quot;),
                    failure -&gt; LOG.error(&quot;Failed with &quot; + failure)
            );
}

Now, the log output is:

2023-06-01 15:29:08,177 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:29:08,179 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) Start long running mark tickets as printed job
2023-06-01 15:29:08,180 INFO  [com.exa.SomePrinter] (vert.x-worker-thread-1) Printing
2023-06-01 15:29:08,181 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) marked as printed
2023-06-01 15:29:08,182 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) End long running mark tickets as printed job
2023-06-01 15:29:08,183 INFO  [com.exa.TestRunner] (executor-thread-1) Finished

Suggestion: Use custom worker instead of event bus

As I understood the business requirement by the code I don't think eventbus is the perfect solution. Probably I'm wrong, but I prefer to create separated worker for the pdf generation.

Here is a ground work.

@Singleton
@Startup
public class CustomWorker {

    private static final Logger LOG = Logger.getLogger(CustomWorker.class);

    private final WorkerExecutor executor;

    CustomWorker(Vertx vertx) {
        executor = vertx.createSharedWorkerExecutor(&quot;pdf-printer&quot;);
    }

    void tearDown(@Observes ShutdownEvent ev) {
        executor.close();
    }

    public void markSeatsAsPrinted(String bookingId) {

        LOG.info(&quot;Received markSeatsAsPrinted event&quot;);
        executor.executeBlocking(promise -&gt; {
            LOG.info(&quot;Start long running mark tickets as printed job&quot;);
            try {
                // Block thread for 5 seconds
                Thread.sleep(5000L);
                LOG.info(&quot;marked as printed&quot;);
            } catch (Exception e) {
                e.printStackTrace();
            }
            LOG.info(&quot;End long running mark tickets as printed job&quot;);
            promise.complete();
        });
    }
}
@Path(&quot;/booking&quot;)
public class ExampleResource {

    private static final Logger LOG = Logger.getLogger(ExampleResource.class);

    @Inject
    CustomWorker worker;

    @GET
    @Path(&quot;/{bookingId}/print-tickets/&quot;)
    @Produces(MediaType.APPLICATION_JSON)
    public PdfTicket printTickets(@PathParam(&quot;bookingId&quot;) String bookingId) {

        LOG.info(&quot;Got request&quot;);

        // I did&#39;t implement so much (e.g. DAO logic)
        var booking = new Booking(bookingId);
        //....
        PdfTicket pdfTicket = new PdfTicket(bookingId);

        //this is the code which should run in background and the REST call should NOT wait vor the result
        if (booking.hasFixedSeatingTickets()) {
            worker.markSeatsAsPrinted(booking.getBookingId());
        }

        LOG.info(&quot;Return response&quot;);
        return pdfTicket;
    }
}

The outout is:

2023-06-01 15:43:05,127 INFO  [com.exa.ExampleResource] (executor-thread-1) Got request
2023-06-01 15:43:05,128 INFO  [com.exa.CustomWorker] (executor-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:43:05,130 INFO  [com.exa.ExampleResource] (executor-thread-1) Return response
2023-06-01 15:43:05,130 INFO  [com.exa.CustomWorker] (pdf-printer-1) Start long running mark tickets as printed job
2023-06-01 15:43:10,131 INFO  [com.exa.CustomWorker] (pdf-printer-1) marked as printed
2023-06-01 15:43:10,132 INFO  [com.exa.CustomWorker] (pdf-printer-1) End long running mark tickets as printed job

Of course, WorkerExecutor is customizable. You can also set poolSize and maxExecutionTime if it needed.

huangapple
  • 本文由 发表于 2023年6月1日 17:01:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76380276.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定