并行化反序列化步骤

huangapple go评论71阅读模式
英文:

Parallelizing deserialization step

问题

有以下的流程:

  1. 生成物品(生成器在流程之外);
  2. 反序列化物品(从 JSON 到 Java 对象);
  3. 处理物品;

目前,所有这些步骤都在单个线程中同步进行:

while(producer.next()) {
   var item = gson.deserialize(producer.item());
   
   processItem(item);
}

或者以示意图表示:

PRODUCER -> DESERIALIZATION -> CONSUMER
(sync)      (sync)            (sync)

问题是,反序列化步骤没有副作用,可以并行化,从而节省一些时间。

整体代码应该如下所示:

var pipeline = new Pipeline<Item>();
pipeline.setProducer(producer);
pipeline.setDeserialization(gson::deserialize);
pipeline.setConsumer(item -> {
    ...
});
pipeline.run();

或者以示意图表示:

             -> DESERIALIZATION
             -> DESERIALIZATION
             -> DESERIALIZATION
PRODUCER -> ...             -> CONSUMER
             -> DESERIALIZATION
             -> DESERIALIZATION
             -> DESERIALIZATION
(sync)      (parallel)         (sync)

重要提醒。 反序列化的物品 应该:

  • 同步地生成;
  • 以与原始生成器生成编码物品的相同顺序。

问。 有没有一种标准的方法来编写这样的流水线?

英文:

There is the following pipeline:

  1. item is produced (the producer is external to the pipeline);
  2. item is deserialized (JSON to Java object);
  3. item is processed;

At the moment it all happens synchronously in a single thread:

while(producer.next()) {
   var item = gson.deserialize(producer.item());
   
   processItem(item);
}

Or schematically:

PRODUCER -&gt; DESERIALIZATION -&gt; CONSUMER

(sync)      (sync)            (sync)

The concern is that the deserialization step has no side-effects and could be parallelized saving some world time.

The overall code should like the following:

var pipeline = new Pipeline&lt;Item&gt;();
pipeline.setProducer(producer);
pipeline.setDeserialization(gson::deserialize);
pipeline.setConsumer(item -&gt; {
    ...
});
pipeline.run();

Or schematically:

         -&gt; DESERIALIZATION
         -&gt; DESERIALIZATION
         -&gt; DESERIALIZATION
PRODUCER -&gt; ...             -&gt; CONSUMER
         -&gt; DESERIALIZATION
         -&gt; DESERIALIZATION
         -&gt; DESERIALIZATION

(sync)      (parallel)         (sync)

Important notice. Deserialized items should be produced:

  • synchronously;
  • in the same order the original producer produces encoded items.

Q. Is there a standardized way to code such a pipeline?

答案1

得分: 1

尝试

while (producer.next()) {
    CompletableFuture.supplyAsync(() -> gson.deserialize(producer.item()))
        .thenRunAsync(item -> processItem(item));
}
英文:

Try

while(producer.next()) {
   CompletableFuture.supplyAsync(()-&gt; gson.deserialize(producer.item()))
    .thenRunAsync(item-&gt;processItem(item));
}

答案2

得分: 1

你可以实现你的模式的一种方法是:

  • 构建一个多线程执行器来处理解码请求。
  • 拥有一个消费者队列;每次提交一个要解码的项,同时将相应的 Future 对象添加到消费者队列中。
  • 有一个消费者线程等待从队列中获取项目 [因此按照它们发布的顺序进行消费],调用相应的 get() 方法 [等待项目解码]。

因此,'消费者' 的代码如下:

BlockingQueue<Future<Item>> consumerQueue = new LinkedBlockingDeque<>();
Thread consumerThread = new Thread(() -> {
    try {
        while (true) {
            Future<Item> item = consumerQueue.take();
            try {
                // 获取下一个已准备好的解码项目
                Item decodedItem = item.get();
                // '消费' 该项目
                ...
            } catch (ExecutionException ex) {
            }
        }
    } catch (InterruptedException irr) {
    }
});
consumerThread.start();

与此同时,带有多线程 '解码器' 的 '生产者' 端的代码如下:

ExecutorService decoder = Executors.newFixedThreadPool(4);
while (!producer.hasNext()) {
    Item item = producer.next();

    // 提交解码作业进行异步处理
    Future<Item> p = decoder.submit(() -> {
        item.decode();
    }, item);

    // 一旦完成,还可以将此解码作业排队以供将来消费
    consumerQueue.add(p);
}

另外,我想知道在实际中是否会看到很大的好处,因为坚持按相同顺序消费,本质上引入了进程的串行条件。但从技术上讲,这是你可以实现你想要的方式之一。

附言:如果你不想要一个单独的消费者线程,那么同样的 '生产者' 线程可以轮询已完成的项目队列并内联执行。

英文:

One way you can achieve your pattern is to:

  • Construct a multi-threaded executor to process the decoding requests
  • Have a consumer queue; each time you submit an item to be decoded, also add the corresponding Future object to the consumer queue
  • Have a consumer thread sit waiting to take items off the queue [which therefore consumes them in the order they were posted], call the corresponding get() method [which waits for the item to be decoded]

So the 'consumer' would look like this:

BlockingQueue&lt;Future&lt;Item&gt;&gt; consumerQueue = new LinkedBlockingDeque&lt;&gt;();
Thread consumerThread = new Thread(() -&gt; {
        try {
            while (true) {
                Future&lt;Item&gt; item = consumerQueue.take();
                try {
                    // Get the next decoded item that&#39;s ready
                    Item decodedItem = item.get();
                    // &#39;Consume&#39; the item
                    ...
                } catch (ExecutionException ex) {
                }
            }
        } catch (InterruptedException irr) {

        }
    });
    consumerThread.start()

Meanwhile, the 'producer' end, with its multi-threaded 'decoder', would look like this:

ExecutorService decoder = Executors.newFixedThreadPool(4);
while (!producer.hasNext()) {
        Item item = producer.next()

        // Submit the decode job for asynchronous processing
        Future&lt;Item&gt; p = decoder.submit(() -&gt; {
            item.decode();
        }, item);

        // Also queue this decode job for future consumption once complete
        consumerQueue.add(p);
    }

As a separate matter, I wonder if you will actually see much benefit in practice, since by insisting on consumption in the same order, you are inherently introducing a serial condition on the process. But technically, this is one way that you could achieve what you are after.

P.S. If you didn't want a separate consumer thread, then the same 'producer' thread could poll the queue for completed items and execute in line.

huangapple
  • 本文由 发表于 2020年8月21日 23:16:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/63525535.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定