英文:
Parallelizing deserialization step
问题
有以下的流程:
- 生成物品(生成器在流程之外);
- 反序列化物品(从 JSON 到 Java 对象);
- 处理物品;
目前,所有这些步骤都在单个线程中同步进行:
while(producer.next()) {
var item = gson.deserialize(producer.item());
processItem(item);
}
或者以示意图表示:
PRODUCER -> DESERIALIZATION -> CONSUMER
(sync) (sync) (sync)
问题是,反序列化步骤没有副作用,可以并行化,从而节省一些时间。
整体代码应该如下所示:
var pipeline = new Pipeline<Item>();
pipeline.setProducer(producer);
pipeline.setDeserialization(gson::deserialize);
pipeline.setConsumer(item -> {
...
});
pipeline.run();
或者以示意图表示:
-> DESERIALIZATION
-> DESERIALIZATION
-> DESERIALIZATION
PRODUCER -> ... -> CONSUMER
-> DESERIALIZATION
-> DESERIALIZATION
-> DESERIALIZATION
(sync) (parallel) (sync)
重要提醒。 反序列化的物品 应该:
- 同步地生成;
- 以与原始生成器生成编码物品的相同顺序。
问。 有没有一种标准的方法来编写这样的流水线?
英文:
There is the following pipeline:
- item is produced (the producer is external to the pipeline);
- item is deserialized (JSON to Java object);
- item is processed;
At the moment it all happens synchronously in a single thread:
while(producer.next()) {
var item = gson.deserialize(producer.item());
processItem(item);
}
Or schematically:
PRODUCER -> DESERIALIZATION -> CONSUMER
(sync) (sync) (sync)
The concern is that the deserialization step has no side-effects and could be parallelized saving some world time.
The overall code should like the following:
var pipeline = new Pipeline<Item>();
pipeline.setProducer(producer);
pipeline.setDeserialization(gson::deserialize);
pipeline.setConsumer(item -> {
...
});
pipeline.run();
Or schematically:
-> DESERIALIZATION
-> DESERIALIZATION
-> DESERIALIZATION
PRODUCER -> ... -> CONSUMER
-> DESERIALIZATION
-> DESERIALIZATION
-> DESERIALIZATION
(sync) (parallel) (sync)
Important notice. Deserialized items should be produced:
- synchronously;
- in the same order the original producer produces encoded items.
Q. Is there a standardized way to code such a pipeline?
答案1
得分: 1
尝试
while (producer.next()) {
CompletableFuture.supplyAsync(() -> gson.deserialize(producer.item()))
.thenRunAsync(item -> processItem(item));
}
英文:
Try
while(producer.next()) {
CompletableFuture.supplyAsync(()-> gson.deserialize(producer.item()))
.thenRunAsync(item->processItem(item));
}
答案2
得分: 1
你可以实现你的模式的一种方法是:
- 构建一个多线程执行器来处理解码请求。
- 拥有一个消费者队列;每次提交一个要解码的项,同时将相应的 Future 对象添加到消费者队列中。
- 有一个消费者线程等待从队列中获取项目 [因此按照它们发布的顺序进行消费],调用相应的 get() 方法 [等待项目解码]。
因此,'消费者' 的代码如下:
BlockingQueue<Future<Item>> consumerQueue = new LinkedBlockingDeque<>();
Thread consumerThread = new Thread(() -> {
try {
while (true) {
Future<Item> item = consumerQueue.take();
try {
// 获取下一个已准备好的解码项目
Item decodedItem = item.get();
// '消费' 该项目
...
} catch (ExecutionException ex) {
}
}
} catch (InterruptedException irr) {
}
});
consumerThread.start();
与此同时,带有多线程 '解码器' 的 '生产者' 端的代码如下:
ExecutorService decoder = Executors.newFixedThreadPool(4);
while (!producer.hasNext()) {
Item item = producer.next();
// 提交解码作业进行异步处理
Future<Item> p = decoder.submit(() -> {
item.decode();
}, item);
// 一旦完成,还可以将此解码作业排队以供将来消费
consumerQueue.add(p);
}
另外,我想知道在实际中是否会看到很大的好处,因为坚持按相同顺序消费,本质上引入了进程的串行条件。但从技术上讲,这是你可以实现你想要的方式之一。
附言:如果你不想要一个单独的消费者线程,那么同样的 '生产者' 线程可以轮询已完成的项目队列并内联执行。
英文:
One way you can achieve your pattern is to:
- Construct a multi-threaded executor to process the decoding requests
- Have a consumer queue; each time you submit an item to be decoded, also add the corresponding Future object to the consumer queue
- Have a consumer thread sit waiting to take items off the queue [which therefore consumes them in the order they were posted], call the corresponding get() method [which waits for the item to be decoded]
So the 'consumer' would look like this:
BlockingQueue<Future<Item>> consumerQueue = new LinkedBlockingDeque<>();
Thread consumerThread = new Thread(() -> {
try {
while (true) {
Future<Item> item = consumerQueue.take();
try {
// Get the next decoded item that's ready
Item decodedItem = item.get();
// 'Consume' the item
...
} catch (ExecutionException ex) {
}
}
} catch (InterruptedException irr) {
}
});
consumerThread.start()
Meanwhile, the 'producer' end, with its multi-threaded 'decoder', would look like this:
ExecutorService decoder = Executors.newFixedThreadPool(4);
while (!producer.hasNext()) {
Item item = producer.next()
// Submit the decode job for asynchronous processing
Future<Item> p = decoder.submit(() -> {
item.decode();
}, item);
// Also queue this decode job for future consumption once complete
consumerQueue.add(p);
}
As a separate matter, I wonder if you will actually see much benefit in practice, since by insisting on consumption in the same order, you are inherently introducing a serial condition on the process. But technically, this is one way that you could achieve what you are after.
P.S. If you didn't want a separate consumer thread, then the same 'producer' thread could poll the queue for completed items and execute in line.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论