英文:
Execute background job with quarkus and don't wait for result
问题
我遇到的问题是,我有一个Quarkus REST Web服务,它以阻塞/顺序方式执行一些任务,然后我想以非阻塞方式执行一个运行时间较长的任务,但我找不到一个可行的解决方案。
因此,Web服务看起来像这样(简化了一些内容):
import io.vertx.mutiny.core.eventbus.EventBus;
@Path("/booking")
@ApplicationScoped
public class BookingResource {
@Inject
EventBus eventBus;
@POST
@Path("/{bookingId}/print-tickets/")
@Produces(MediaType.APPLICATION_JSON)
public PdfTicket printTickets(@PathParam("bookingId") String bookingId) throws Exception {
var optBooking = daoBooking.getBookingDetailsById(bookingId);
//....
var eventOpt = daoEvent.getEventById(booking.getEventId());
//...
PdfTicket pdfTicket = myconverter(optBooking, eventOpt); //一些我在这里删除的神奇代码
//这是应该在后台运行的代码,REST调用不应该等待结果
if (booking.hasFixedSeatingTickets()) {
// 标记座位已打印,以防固定座位被重新分配给其他客人
eventBus.requestAndForget("greeting", booking.getBookingId());
}
return pdfTicket;
}
}
我尝试过简单的Java线程,尝试了Mutiny,最后尝试了Quarkus事件总线方法,你可以在上面的代码中看到。在这里,我向“greeting”发送了一条消息。我像这样消费事件:
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class TestResource {
private static final Logger LOG = Logger.getLogger(TestResource.class);
@Inject
private DaoBooking daoBooking;
@Inject
ManagedExecutor executor;
@ConsumeEvent("greeting")
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
daoBooking.markSeatsAsPrinted(bookingId);
} catch (FileMakerException e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor);
}
}
根据 https://quarkus.io/guides/reactive-event-bus,当方法返回void
时,调用服务不应等待结果。
现在,当我调用我的REST服务时,我可以在日志中看到“Received markSeatsAsPrinted event”,但从未看到“Start long running mark tickets as printed job”。因此,Uni块中的代码根本没有执行。
当我将代码更改为返回Uni<Boolean>
时,如下所示:
@ConsumeEvent("greeting")
public Uni<Boolean> markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
return Uni.createFrom().item(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
daoBooking.markSeatsAsPrinted(bookingId);
} catch (FileMakerException e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
return true;
}).emitOn(executor);
}
然后日志文件会显示“Start long running mark tickets as printed job”,但现在我的主REST服务也会阻塞,直到事件完全处理(这需要10秒,所以我必须在后台处理)。
问题出在哪里?
此外,当我没有使用Quarkus事件总线而是在一个简单的方法中使用Uni/Executor的内容时,也出现了相同的行为。同样,使用Java线程也不起作用,因为Quarkus会等待线程完成后才响应REST响应。
最终,我只是想在不等待REST调用的情况下在后台运行需要很长时间的任务。
请帮忙:-)
谢谢,
schube
英文:
My problem is, that I have a quarkus REST webservice which executes some tasks in a blocking/sequential way and then I want to execute a longer running task none blocking, but I found no running solution.
So the webservice looks like that (simplyfied!:
import io.vertx.mutiny.core.eventbus.EventBus;
@Path("/booking")
@ApplicationScoped
public class BookingResource {
@Inject
EventBus eventBus;
@POST
@Path("/{bookingId}/print-tickets/")
@Produces(MediaType.APPLICATION_JSON)
public PdfTicket printTickets(@PathParam("bookingId") String bookingId) throws Exception {
var optBooking = daoBooking.getBookingDetailsById(bookingId);
//....
var eventOpt = daoEvent.getEventById(booking.getEventId());
//...
PdfTicket pdfTicket = myconverter(optBooking, eventOpt); //Some magic code which I removed here
//this is the code which should run in background and the REST call should NOT wait vor the result
if (booking.hasFixedSeatingTickets()) {
// Mark seats as printed, so the fixed seats are not re-assigned to other guests
eventBus.requestAndForget("greeting", booking.getBookingId());
}
return pdfTicket;
}
}
I tried simple java threads, I tried Mutinity and lastly I tried the quarkus eventbus approach which you can see in the code above.
So here I am sending a message to "greeting". I consume the event like that:
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class TestResource {
private static final Logger LOG = Logger.getLogger(TestResource.class);
@Inject
private DaoBooking daoBooking;
@Inject
ManagedExecutor executor;
@ConsumeEvent("greeting")
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
daoBooking.markSeatsAsPrinted(bookingId);
} catch (FileMakerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor);
}
}
According to https://quarkus.io/guides/reactive-event-bus when the method returns void
the calling service should NOT wait for the result.
Now, when I call my REST service, I can see in the log "Received markSeatsAsPrinted event" but never "Start long running mark tickets as printed job". So the code in the Uni block is just not executed.
When I change my code to return a Uni<Boolean>
like that:
@ConsumeEvent("greeting")
public Uni<Boolean> markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");;
return Uni.createFrom().item(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
daoBooking.markSeatsAsPrinted(bookingId);
} catch (FileMakerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
return true;
}).emitOn(executor);
}
then the logfile does display "Start long running mark tickets as printed job" but now, also my main REST service blocks until the event is fully processed (which takes 10 seconds, so I have to do it in the background).
What is wrong here?
Also, I had the same behaviour, when I did not use quarkus event bus but the Uni/Executor stuff in a simple method. Also, using Java Threads did not work because quarkus waited until the thread was finished with the REST response.
At the end of the day I just want to have that task which takes so much time running in background without waiting for the answer in the REST call.
Please help
Thank you,
schube
答案1
得分: 1
以下是翻译好的部分:
总是使用订阅
Mutiny作为一个响应式和事件驱动的库,使用延迟评估的事件和订阅来运行异步代码。这意味着,没有订阅的任何操作都不会运行。
永远不要阻塞事件循环
正如您可以看到的,前面的操作是在“事件循环”线程池上执行的。作为黄金法则,您不应该阻塞事件循环。当您需要运行一些阻塞操作(例如数据库修改)或其他长时间运行的任务时,它必须传递到工作线程。在方法上使用 @Blocking
注解,Quarkus 将在工作线程上运行它。
建议:使用自定义工作器而不是事件总线
根据代码我理解的业务需求,我认为事件总线不是完美的解决方案。也许我错了,但我更喜欢为PDF生成创建一个单独的工作器。
这是一个基础工作。
自定义工作器类:
@Singleton
@Startup
public class CustomWorker {
private static final Logger LOG = Logger.getLogger(CustomWorker.class);
private final WorkerExecutor executor;
CustomWorker(Vertx vertx) {
executor = vertx.createSharedWorkerExecutor("pdf-printer");
}
void tearDown(@Observes ShutdownEvent ev) {
executor.close();
}
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
executor.executeBlocking(promise -> {
LOG.info("Start long running mark tickets as printed job");
try {
// 阻塞线程5秒钟
Thread.sleep(5000L);
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
promise.complete();
});
}
}
REST资源类:
@Path("/booking")
public class ExampleResource {
private static final Logger LOG = Logger.getLogger(ExampleResource.class);
@Inject
CustomWorker worker;
@GET
@Path("/{bookingId}/print-tickets/")
@Produces(MediaType.APPLICATION_JSON)
public PdfTicket printTickets(@PathParam("bookingId") String bookingId) {
LOG.info("Got request");
// 我没有实现太多(例如DAO逻辑)
var booking = new Booking(bookingId);
//....
PdfTicket pdfTicket = new PdfTicket(bookingId);
//这是应该在后台运行的代码,REST调用不应该等待结果
if (booking.hasFixedSeatingTickets()) {
worker.markSeatsAsPrinted(booking.getBookingId());
}
LOG.info("Return response");
return pdfTicket;
}
}
希望这些翻译对您有帮助。
英文:
There are several mistakes in the code.
Always use subscription
Mutiny as a reative and event driven library is using lazy evaluated events and subscriptions to run asynchronous code. That means, any operation without subscription will never run.
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor).subscribe().with(
item -> LOG.info("Finished"),
failure -> LOG.error("Failed with " + failure)
);
NOTE: other methods are available e.q. onItem()
, onFailure()
and so on.
The output is:
2023-06-01 15:11:58,545 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) Received markSeatsAsPrinted event
2023-06-01 15:11:58,548 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) Start long running mark tickets as printed job
2023-06-01 15:11:58,549 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) marked as printed
2023-06-01 15:11:58,549 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) End long running mark tickets as printed job
2023-06-01 15:11:58,551 INFO [com.exa.TestRunner] (executor-thread-1) Finished
Never block the event loop
As you can see the previous operation was executed on event-loop
thread pool. As a golden rule you should never block the event loop. When you have to run some blocking operations (e.g database modifications) or other long running tasks it has to be propagated to worker threads. Using @Blocking
annotation on the method Quarkus will run that on worker thread.
@ConsumeEvent("greeting")
@Blocking
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
printer.doSome(bookingId);
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor).
subscribe().with(
item -> LOG.info("Finished"),
failure -> LOG.error("Failed with " + failure)
);
}
Now, the log output is:
2023-06-01 15:29:08,177 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:29:08,179 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) Start long running mark tickets as printed job
2023-06-01 15:29:08,180 INFO [com.exa.SomePrinter] (vert.x-worker-thread-1) Printing
2023-06-01 15:29:08,181 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) marked as printed
2023-06-01 15:29:08,182 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) End long running mark tickets as printed job
2023-06-01 15:29:08,183 INFO [com.exa.TestRunner] (executor-thread-1) Finished
Suggestion: Use custom worker instead of event bus
As I understood the business requirement by the code I don't think eventbus is the perfect solution. Probably I'm wrong, but I prefer to create separated worker for the pdf generation.
Here is a ground work.
@Singleton
@Startup
public class CustomWorker {
private static final Logger LOG = Logger.getLogger(CustomWorker.class);
private final WorkerExecutor executor;
CustomWorker(Vertx vertx) {
executor = vertx.createSharedWorkerExecutor("pdf-printer");
}
void tearDown(@Observes ShutdownEvent ev) {
executor.close();
}
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
executor.executeBlocking(promise -> {
LOG.info("Start long running mark tickets as printed job");
try {
// Block thread for 5 seconds
Thread.sleep(5000L);
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
promise.complete();
});
}
}
@Path("/booking")
public class ExampleResource {
private static final Logger LOG = Logger.getLogger(ExampleResource.class);
@Inject
CustomWorker worker;
@GET
@Path("/{bookingId}/print-tickets/")
@Produces(MediaType.APPLICATION_JSON)
public PdfTicket printTickets(@PathParam("bookingId") String bookingId) {
LOG.info("Got request");
// I did't implement so much (e.g. DAO logic)
var booking = new Booking(bookingId);
//....
PdfTicket pdfTicket = new PdfTicket(bookingId);
//this is the code which should run in background and the REST call should NOT wait vor the result
if (booking.hasFixedSeatingTickets()) {
worker.markSeatsAsPrinted(booking.getBookingId());
}
LOG.info("Return response");
return pdfTicket;
}
}
The outout is:
2023-06-01 15:43:05,127 INFO [com.exa.ExampleResource] (executor-thread-1) Got request
2023-06-01 15:43:05,128 INFO [com.exa.CustomWorker] (executor-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:43:05,130 INFO [com.exa.ExampleResource] (executor-thread-1) Return response
2023-06-01 15:43:05,130 INFO [com.exa.CustomWorker] (pdf-printer-1) Start long running mark tickets as printed job
2023-06-01 15:43:10,131 INFO [com.exa.CustomWorker] (pdf-printer-1) marked as printed
2023-06-01 15:43:10,132 INFO [com.exa.CustomWorker] (pdf-printer-1) End long running mark tickets as printed job
Of course, WorkerExecutor
is customizable. You can also set poolSize
and maxExecutionTime
if it needed.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论