生产者-消费者问题,但任务数量不确定?

huangapple go评论54阅读模式
英文:

Producer-Consumer Problem but with indefinite tasks?

问题

我正在尝试理解在任务可以是无限期的情况下,生产者-消费者模式的概念,同时这些任务也是可中断的,可以在以后恢复。到目前为止,关于这种模式的很多示例都涉及到消费者从生产者填充的某种队列中拉取一次性任务。

但是如果这些任务可以是无限期的呢?例如,假设手头的任务是在一系列字符串流中计算某些单词的出现次数。这个流可以是无限长的(或有限的),并且在提供更多的字符串流以供处理的情况下,可以形成一种机制,其中工作者轮流处理流 - 暂停他们在当前流上的工作,以便另一个工作者在处理另一个流之前继续工作。

我不确定如何为这个问题正确设计和实现一个有效的生产者-消费者模式的机制 - 或者任何其他模式/方法。

到目前为止,我已经按照这个指南进行了操作:https://www.baeldung.com/java-producer-consumer-problem

但问题出在第4.3节,如果需要将Queue<String>这样的内容推回到BlockingQueue,将我的消费者也变成了生产者。

英文:

I'm trying to wrap my head around the Producer-Consumer pattern in the context of tasks that can be indefinite but are also suspendable and can be resumed at later time. So far, alot of the examples around this pattern deals with Consumers pulling a singular, one-off tasks from some sort of queue that's filled by Producers.

But what if said tasks can be indefinite? For example, say the task at hand is to count the presence of certain works in a stream of strings. The stream can be infinitely long (or finite), and given more streams of strings that workers to process them, a mechanism maybe formed where workers alternate on processing streams - pausing their work on their current stream for another worker to continue before processing another stream.

I'm not sure how to properly design and implement an efficient mechanism around the Producer-Consumer pattern for this problem - or any other pattern/method.

So far, I've followed this guide: https://www.baeldung.com/java-producer-consumer-problem

But the issue is on Section 4.3, if instead of a Double, it was something like Queue<String> that needed to be pushed back onto the BlockingQueue, turning my Consumers to also be Producers.

答案1

得分: 0

但问题出在第4.3节,如果需要将Double替换为需要推回到BlockingQueueQueue<String>,使我的消费者也成为生产者。

我可以看到您的消费者(在标准的ExecutorService中运行)可以访问一个工作队列。他们可以出列一个Queue对象,处理其中的字符串一段时间,然后通过将该Queue放回工作队列并从工作队列的前面获取另一个作业来“暂停”该作业。一个问题是是否存在需要在处理Queue<String>时保留的状态。在这种情况下,您将需要一个包装的JobStatus对象,其中包含队列和任何累加器或其他信息,以便下一个工作线程可以从上一个工作线程停止的地方继续。这也可能是个好主意,以免混淆队列的队列。

工作线程交替处理流 - 在处理另一个流之前,暂停对当前流的处理

我认为关键在于工作线程如何确定是否应该暂停工作?也许他们可以处理来自流的10000个项目,然后将其放回工作队列?这种逻辑可以构建到JobStatus对象中。您可以查看工作队列的size()来判断是否有积压的情况。另一种选择是为不同大小的作业使用多个线程池。因此,快速作业有自己的队列进行即时服务,而大作业则在后台处理。

但根据您的规格,我可以看到类似以下的伪代码。通过将作业放在工作队列的末尾来暂停。不确定这是否符合您的需求。

// 当看到这个队列时,表示作业已完成
final static Queue<String> END_MARKER = new LinkedList<>();
final BlockingQueue<Queue<String>> workQueue = new LinkedBlockingQueue<>();
...
ExecutorService threadPool = Executors.newCachedThreadPool();
...
// 现在启动我们的工作线程
for (int i = 0; i < NUM_THREADS; i++) {
   threadPool.submit(new WorkerThread(workQueue));
}
// 当看到END_MARKER时,线程将关闭
threadPool.shutdown();
...
// 现在将作业添加到工作队列
for (Queue<String> queue : queuesForProcessing) {
    workQueue.add(queue);
}
// 在工作结束时,将END_MARKER添加到工作队列
workQueue.add(END_MARKER);

public class WorkerThread {
    private final BlockingQueue<Queue<String>> workQueue; // 来自构造函数
    public void run() {
       while (true) {
           Queue<String> job = workQueue.take();
           if (job == END_MARKER) {
              // 需要告诉其他线程也退出
              workQueue.put(job);
              break;
           }
           // 现在处理作业队列
           while (true) {
              String foo = job.poll();
              if (foo == null) {
                 // 我们完成了这个作业,获取另一个
                 break;
              }
              // 处理字符串上的工作
              ...
              // 现在我们完成了,看看是否应该暂停此作业
              if (testToSeeIfWeShouldPauseJob()) {
                  // 将它放回队列,然后循环获取另一个作业
                  workQueue.put(job);
                  break;
              }
           }
       }
    }
}

希望这对您有所帮助。

英文:

> But the issue is on Section 4.3, if instead of a Double, it was something like Queue<String> that needed to be pushed back onto the BlockingQueue, turning my Consumers to also be Producers.

I can see your consumers (running in a standard ExecutorService) be able to access a work-queue. They can dequeue a Queue object, work on the strings for a while and then "pause" the job by putting that Queue back on the work-queue and grabbing another job from the front of the work-queue to process. One question is whether or not there is state that needs to be kept as the Queue&lt;String&gt; is being processed. In this case you will need to have a wrapping JobStatus object which would hold the queue and any accumulators or other information so the next worker thread can pickup where the last one left off. This might be a good idea anyway so you don't get confused by the queue of queues.

> workers alternate on processing streams - pausing their work on their current stream for another worker to continue before processing another stream

I think the trick here is how do the workers figure out whether they should pause their work or not? Maybe they could process 10000 items from the stream and put it back on the work-queue? That logic could be built into the JobStatus object. You could look at the size() of the work-queue to see whether things are backing up too. An alternative to this would be to have multiple thread-pools for different sized jobs. So the fast ones have their own queue for immediate servicing while the big ones chew in the background.

But given your specifications I can see pseudo code like the following. It pauses by putting the job on the end of the work-queue. Not sure that's what you would want.

// queue that when seen means that the jobs are done
final static Queue&lt;String&gt; END_MARKER = new LinkedList&lt;&gt;();
final BlockingQueue&lt;Queue&lt;String&gt;&gt; workQueue = new LinkedBlockingQueue&lt;&gt;();
...
ExecutorService threadPool = Executors.newCachedThreadPool();
...
// now start up our worker threads
for (int i = 0; i &lt; NUM_THREADS; i++) {
threadPool.submit(new WorkerThread(workQueue));
}
// the threads will shutdown when the END_MARKER is seen
threadPool.shutdown();
...
// now add the jobs to the work-queue
for (Queue&lt;String&gt; queue : queuesForProcessing) {
workQueue.add(queue);
}
// at the end of the work you add END_MARKER to the work-queue
workQueue.add(END_MARKER);
public class WorkerThread {
private final BlockingQueue&lt;Queue&lt;String&gt;&gt; workQueue; // from constructor
public void run() {
while (true) {
Queue&lt;String&gt; job = workQueue.take();
if (job == END_MARKER) {
// need to tell the others to quit as well
workQueue.put(job);
break;
}
// now work on the queue job
while (true) {
String foo = job.poll();
if (foo == null) {
// we are done with this job, get another one
break;
}
// do the work on the String
...
// now that we are done, see if we should pause this job
if (testToSeeIfWeShouldPauseJob()) {
// put it back on the queue and then loop to get another job
workQueue.put(job);
break;
}
}
}
}
}

Hopefully this helps.

huangapple
  • 本文由 发表于 2023年6月16日 06:37:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/76485904.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定