能否在Java(Scala)阻塞队列中允许插队(更高优先级)?

huangapple go评论70阅读模式
英文:

Could Java(Scala) blocking queue allows queue jumper(higher priority)?

问题

以下是您提供的内容的翻译部分:

可以使用 LinkedBlockingQueue 来阻塞操作。假设队列中只有一个元素,并且每个人都可以使用它。

val q = new LinkedBlockingQueue() // 队列中有一个元素
def fun() = {
  val instance = q.take()
  // 进行一些操作
}
def foo() = {
  val instance = q.take()
  // 进行一些操作
}
// 使用3个线程来运行以下3个方法,它们调用 q.take() 的顺序如下
fun()
fun() // 将等待第一个 fun()
foo() // 将等待第二个 fun()

这些方法完成的顺序为 fun(), fun(), foo()

然而,现在我想将 foo 设置为更高的优先级,这意味着允许它插队。也就是说,foo 可以在第二个 fun() 之前获取实例(在第二个 fun 等待时,foo 插到了它前面)

完成的顺序可以变为 fun(), foo(), fun()(第一个 fun 可以获取实例,因为实例可用,第二个 fun 应该等待,然后 foo 也等待,但插到了第二个 fun 的前面)

这种情况是否可能?还是有其他可能的数据结构可以实现这一点?

英文:

One could use LinkedBlockingQueue to block the operations. Suppose I have only 1 element in queue, and every one could use it.

val q = new LinkedBlockingQueue() // 1 element in it
def fun() = {
  val instance = q.take()
  // do some operations
}
def foo() = {
  val instance = q.take()
  // do some operations
}
// Use 3 threads to run following 3 methods, and the order they call q.take() is following
fun()
fun() // will wait for first fun()
foo() // will wait for second fun()

And the sequential order these methods finish is fun(), fun(), foo()

However, now I want to set foo to higher priority, which means allow it to be queue-jumper. That the foo could take instance before the second fun()(while the second fun is waiting, foo jumps to the front of it)

And the order they finish could become fun(), foo(), fun(), (the first fun would take the instance because the instance is available, and second should wait, then the foo also wait, but jumped to the front of second fun)

Is is possible? Or is there any other data structure possible for this

答案1

得分: 3

这是您提供的Java代码的翻译:

我不知道是否有内置工具可用于此任务但实现起来不会太难因为您只想要交换单个元素所以不需要队列而是需要一个交换器

一个简单的实现可以如下所示

```java
class SingleElementExchanger<T> {
    int priorityConsumer;
    T value;

    public synchronized void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        while(value != null) wait();
        value = newValue;
        notifyAll();
    }

    public synchronized T ordinaryGet() throws InterruptedException {
        while(priorityConsumer != 0 || value == null) wait();
        T received = value;
        value = null;
        notifyAll();
        return received;
    }

    public synchronized T priorityGet() throws InterruptedException {
        priorityConsumer++;
        try {
            while(value == null) wait();
            T received = value;
            value = null;
            notifyAll();
            return received;
        }
        finally {
            priorityConsumer--;
        }
    }
}

对于您的两个普通消费者和一个优先级消费者以及少量生产者,这可能已经足够了。

对于更多的线程,您可能希望使用Lock,以便能够通知正确的线程,而不是使用notifyAll()

class SingleElementExchanger<T> {
    final Lock lock = new ReentrantLock();
    final Condition empty = lock.newCondition(),
        fullNoPri = lock.newCondition(), fullPri = lock.newCondition();

    int priorityConsumer;
    T value;

    public void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        lock.lock();
        try {
            while(value != null) empty.await();
            value = newValue;
            (priorityConsumer==0? fullNoPri: fullPri).signal();
        }
        finally {
            lock.unlock();
        }
    }

    public T ordinaryGet() throws InterruptedException {
        lock.lock();
        try {
            while(priorityConsumer != 0 || value == null) fullNoPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            lock.unlock();
        }
    }

    public T priorityGet() throws InterruptedException {
        lock.lock();
        try {
            priorityConsumer++;
            while(value == null) fullPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            priorityConsumer--;
            lock.unlock();
        }
    }
}

希望这对您有所帮助。如果您有任何其他问题,请随时提出。

英文:

I don’t know of any built-in tool for this task, but it wouldn’t be too hard to implement. Since you only want to exchange a single element, you don’t need a queue but an exchanger.

A simple implementation could look like

class SingleElementExchanger<T> {
    int priorityConsumer;
    T value;

    public synchronized void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        while(value != null) wait();
        value = newValue;
        notifyAll();
    }

    public synchronized T ordinaryGet() throws InterruptedException {
        while(priorityConsumer != 0 || value == null) wait();
        T received = value;
        value = null;
        notifyAll();
        return received;
    }

    public synchronized T priorityGet() throws InterruptedException {
        priorityConsumer++;
        try {
            while(value == null) wait();
            T received = value;
            value = null;
            notifyAll();
            return received;
        }
        finally {
            priorityConsumer--;
        }
    }
}

For your two ordinary consumers and one priority consumer and a small number of producers, this might already be sufficient.

For a larger number of threads, you might want to use a Lock, to be able to notify the right party instead of using notifyAll().

class SingleElementExchanger<T> {
    final Lock lock = new ReentrantLock();
    final Condition empty = lock.newCondition(),
        fullNoPri = lock.newCondition(), fullPri = lock.newCondition();

    int priorityConsumer;
    T value;

    public void set(T newValue) throws InterruptedException {
        Objects.requireNonNull(newValue);
        lock.lock();
        try {
            while(value != null) empty.await();
            value = newValue;
            (priorityConsumer==0? fullNoPri: fullPri).signal();
        }
        finally {
            lock.unlock();
        }
    }

    public T ordinaryGet() throws InterruptedException {
        lock.lock();
        try {
            while(priorityConsumer != 0 || value == null) fullNoPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            lock.unlock();
        }
    }

    public T priorityGet() throws InterruptedException {
        lock.lock();
        try {
            priorityConsumer++;
            while(value == null) fullPri.await();
            T received = value;
            value = null;
            empty.signal();
            return received;
        }
        finally {
            priorityConsumer--;
            lock.unlock();
        }
    }
}

答案2

得分: 0

你需要查阅 CompletableFuture 来实现你的目的。这样你可以实现任务的顺序。

CompletableFuture<String> text = CompletableFuture.supplyAsync(() -> {
    return "";
}).thenApply(param -> {
    return "";
}).thenApply(param -> {
    return "";
});
英文:

You need to look up CompletableFuture for your purpose. This way you can achieve task ordering.

CompletableFuture&lt;String&gt; text = CompletableFuture.supplyAsync(() -&gt; {
return &quot;&quot;;
}).thenApply(param -&gt; {
return &quot;&quot;;
}).thenApply(param -&gt; {
return &quot;&quot;;
});

huangapple
  • 本文由 发表于 2020年9月23日 13:33:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/64021631.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定