Equivalent of Go channel in Java

huangapple go评论104阅读模式
英文:

Equivalent of Go channel in Java

问题

我有一个需求,需要从一组阻塞队列中读取数据。这些阻塞队列是由我使用的库创建的。我的代码需要从这些队列中读取数据。我不想为每个阻塞队列创建一个读取线程。相反,我希望使用单个线程(或者最多使用2/3个线程)轮询它们以检查数据的可用性。由于某些阻塞队列可能长时间没有数据,而其他一些队列可能会突然有大量数据,使用小的超时轮询队列是可行的,但效率不高,因为即使其中一些队列长时间没有数据,仍然需要循环遍历所有队列。基本上,我正在寻找一种类似于在套接字上使用的select/epoll(用于套接字)的机制,但用于阻塞队列。非常感谢任何线索。

在Go中实现这个很容易。下面的代码使用通道和goroutine模拟了相同的功能:

package main

import "fmt"
import "time"
import "math/rand"

func sendMessage(sc chan string) {
    var i int

    for {
        i =  rand.Intn(10)
        for ; i >= 0 ; i-- {
            sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
        }
        i = 1000 + rand.Intn(32000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func sendNum(c chan int) {
    var i int 
    for  {
        i = rand.Intn(16);
        for ; i >=  0; i-- {
            time.Sleep(20 * time.Millisecond)
            c <- rand.Intn(65534)
        }
        i = 1000 + rand.Intn(24000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func main() {
    msgchan := make(chan string, 32)
    numchan := make(chan int, 32)
    i := 0
    for ; i < 8 ; i++ {
        go sendNum(numchan)
        go sendMessage(msgchan)
    }
    for {
        select {
        case msg := <- msgchan:
            fmt.Printf("Worked on  %s\n", msg)
        case x := <- numchan:
            fmt.Printf("I got %d \n", x)
        }
    }
}

希望对你有帮助!

英文:

I have a requirement where I need to read from a set of Blocking queues. The blocking queues are created by the Library I am using. My code has to read from the queues. I don't want to create a reader thread for each of these blocking queues. Rather I want to poll them for availability of data using a single thread (or probably using 2/3 threads at max). As some of the blocking queues might not have data for long time, while some of them may get bursts of data. Polling the queues with small timeout will work, but that is not efficient at all as it still needs to keep looping over all the queues even when some of them are without data for long time. Basically, I am looking for a select/epoll(used on sockets) kind of mechanism on blocking queues. Any clue is really appreciated.

Doing that in Go is real easy though. Below code simulates the same with channels and goroutines:

package main
import &quot;fmt&quot;
import &quot;time&quot;
import &quot;math/rand&quot;
func sendMessage(sc chan string) {
var i int
for {
i =  rand.Intn(10)
for ; i &gt;= 0 ; i-- {
sc &lt;- fmt.Sprintf(&quot;Order number %d&quot;,rand.Intn(100))
}
i = 1000 + rand.Intn(32000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func sendNum(c chan int) {
var i int 
for  {
i = rand.Intn(16);
for ; i &gt;=  0; i-- {
time.Sleep(20 * time.Millisecond)
c &lt;- rand.Intn(65534)
}
i = 1000 + rand.Intn(24000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func main() {
msgchan := make(chan string, 32)
numchan := make(chan int, 32)
i := 0
for ; i &lt; 8 ; i++ {
go sendNum(numchan)
go sendMessage(msgchan)
}
for {
select {
case msg := &lt;- msgchan:
fmt.Printf(&quot;Worked on  %s\n&quot;, msg)
case x := &lt;- numchan:
fmt.Printf(&quot;I got %d \n&quot;, x)
}
}
}

答案1

得分: 15

我建议你考虑使用JCSP库。Go语言中的select相当于JCSP中的Alternative。你只需要一个消费线程,如果使用Alternative在通道上进行切换,就不需要轮询输入通道。因此,这是一种有效的多路复用数据源的方法。

如果能够用JCSP通道替换BlockingQueues,将会非常有帮助。通道的行为与BlockingQueues基本相同,但在共享通道端点的扇出或扇入以及使用Alternative方面提供了更大的灵活性。

以下是一个公平多路复用器的示例用法。该示例演示了一个过程,它将来自输入通道数组的流量公平地多路复用到单个输出通道中。无论竞争者的渴望程度如何,都不会使任何输入通道饥饿。

import org.jcsp.lang.*;

public class FairPlex implements CSProcess {

   private final AltingChannelInput[] in;
   private final ChannelOutput out;

   public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
     this.in = in;
     this.out = out;
   }

   public void run () {

     final Alternative alt = new Alternative (in);

     while (true) {
       final int index = alt.fairSelect ();
       out.write (in[index].read ());
     }
   }
}

请注意,如果上述代码中使用了priSelect,则如果较低索引的通道不断要求服务,较高索引的通道将会饥饿。或者,可以使用select而不是fairSelect,但这样就无法进行饥饿分析了。只有在不需要考虑饥饿问题时才应使用select机制。

避免死锁

与Go语言一样,使用通道的Java程序必须设计成不会发生死锁。Java中的低级并发原语的实现非常复杂,很难正确使用,因此你需要一个可靠的解决方案。幸运的是,Alternative和JCSP通道已经通过形式分析进行了验证,因此它们是可靠的选择。

只是为了澄清一点混淆,当前JCSP版本在Maven仓库中是1.1-rc5,而不是网站上所说的版本。

英文:

I suggest you look into using the JCSP library. The equivalent of Go's select is called Alternative. You would only need one consuming thread, which will not need to poll the incoming channels if it switches on them with Alternative. Therefore this would be an efficient way to multiplex the source data.

It will help a lot if you are able to replace the BlockingQueues with JCSP channels. Channels behave essentially the same but provide a greater degree of flexibility regarding the fan-out or fan-in of sharing of channel ends, and in particular, the use of channels with Alternative.

For an example of usage, here is a fair multiplexer. This example demonstrates a process that fairly multiplexes traffic from its array of input channels to its single output channel. No input channel will be starved, regardless of the eagerness of its competitors.

import org.jcsp.lang.*;
public class FairPlex implements CSProcess {
private final AltingChannelInput[] in;
private final ChannelOutput out;
public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
this.in = in;
this.out = out;
}
public void run () {
final Alternative alt = new Alternative (in);
while (true) {
final int index = alt.fairSelect ();
out.write (in[index].read ());
}
}
}

Note that if priSelect were used above, higher-indexed channels would be starved if lower-indexed channels were continually demanding service. Or instead of fairSelect, select could be used, but then no starvation analysis is possible. The select mechanism should only be used when starvation is not an issue.

Freedom from Deadlock

As with Go, a Java program using channels must be designed not to deadlock. The implementation of low-level concurrency primitives in Java is very hard to get right and you need something dependable. Fortunately, Alternative has been validated by formal analysis, along with the JCSP channels. This makes it a solid reliable choice.

Just to clear up on slight point of confusion, the current JCSP version is 1.1-rc5 in the Maven repos, not what the website says.

答案2

得分: 2

另一个选择是Java6+中的一个阻塞双端队列实现类:

import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

class GoChannelPool {

    private final static GoChannelPool defaultInstance = newPool();

    private final AtomicLong serialNumber = new AtomicLong();
    private final WeakHashMap<Long, WeakReference<GoChannel>> channelWeakHashMap = new WeakHashMap<>();
    private final LinkedBlockingDeque<GoChannelObject> totalQueue = new LinkedBlockingDeque<>();

    public <T> GoChannel<T> newChannel()  {
        GoChannel<T> channel = new GoChannel<>();
        channelWeakHashMap.put(channel.getId(), new WeakReference<GoChannel>(channel));
        return channel;
    }

    public void select(GoSelectConsumer consumer) throws InterruptedException {
        consumer.accept(getTotalQueue().take());
    }

    public int size() {
        return getTotalQueue().size();
    }

    public int getChannelCount() {
        return channelWeakHashMap.values().size();
    }

    private LinkedBlockingDeque<GoChannelObject> getTotalQueue() {
        return totalQueue;
    }

    public static GoChannelPool getDefaultInstance() {
        return defaultInstance;
    }

    public static GoChannelPool newPool()  {
        return new GoChannelPool();
    }

    private GoChannelPool() {}

    private long getSerialNumber() {
        return serialNumber.getAndIncrement();
    }

    private synchronized void syncTakeAndDispatchObject() throws InterruptedException {
        select(new GoSelectConsumer() {
            @Override
            void accept(GoChannelObject t) {

                WeakReference<GoChannel> goChannelWeakReference = channelWeakHashMap.get(t.channel_id);
                GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null;
                if (channel != null) {
                    channel.offerBuffer(t);
                }
            }
        });
    }

    class GoChannel<E> {
        // Instance
        private final long id;
        private final LinkedBlockingDeque<GoChannelObject<E>> buffer = new LinkedBlockingDeque<>();

        public GoChannel() {
            this(getSerialNumber());
        }

        private GoChannel(long id) {
            this.id = id;
        }

        public long getId() {
            return id;
        }

        public E take() throws InterruptedException {
            GoChannelObject object;
            while((object = pollBuffer()) == null) {
                syncTakeAndDispatchObject();
            }

            return (E) object.data;
        }

        public void offer(E object) {
            GoChannelObject<E> e = new GoChannelObject();
            e.channel_id = getId();
            e.data = object;

            getTotalQueue().offer(e);
        }

        protected void offerBuffer(GoChannelObject<E> data) {
            buffer.offer(data);
        }

        protected GoChannelObject<E> pollBuffer() {
            return buffer.poll();
        }

        public int size() {
            return buffer.size();
        }

        @Override
        protected void finalize() throws Throwable {
            super.finalize();

            channelWeakHashMap.remove(getId());
        }
    }

    class GoChannelObject<E> {
        long channel_id;
        E data;

        boolean belongsTo(GoChannel channel) {
            return channel != null && channel_id == channel.id;
        }
    }

    abstract static class GoSelectConsumer{
        abstract void accept(GoChannelObject t);
    }
}

然后我们可以这样使用它:

GoChannelPool pool = GoChannelPool.getDefaultInstance();
final GoChannelPool.GoChannel<Integer> numberCh = pool.newChannel();
final GoChannelPool.GoChannel<String> stringCh = pool.newChannel();
final GoChannelPool.GoChannel<String> otherCh = pool.newChannel();

ExecutorService executorService = Executors.newCachedThreadPool();
int times;
times = 2000;
final CountDownLatch countDownLatch = new CountDownLatch(times * 2);

final AtomicInteger numTimes = new AtomicInteger();
final AtomicInteger strTimes = new AtomicInteger();
final AtomicInteger defaultTimes = new AtomicInteger();

final int finalTimes = times;
executorService.submit(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < finalTimes; i++) {
            numberCh.offer(i);

            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});
executorService.submit(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < finalTimes; i++) {
            stringCh.offer("s"+i+"e");

            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});

int otherTimes = 3;
for (int i = 0; i < otherTimes; i++) {
    otherCh.offer("a"+i);
}

for (int i = 0; i < times*2 + otherTimes; i++) {
    pool.select(new GoChannelPool.GoSelectConsumer() {
        @Override
        void accept(GoChannelPool.GoChannelObject t) {
            // The data order should be randomized.
            System.out.println(t.data);

            countDownLatch.countDown();

            if (t.belongsTo(stringCh)) {
                strTimes.incrementAndGet();
                return;
            }
            else if (t.belongsTo(numberCh)) {
                numTimes.incrementAndGet();
                return;
            }

            defaultTimes.incrementAndGet();
        }
    });
}
countDownLatch.await(10, TimeUnit.SECONDS);

/**
The console output of data should be randomized.
numTimes.get() should be 2000
strTimes.get() should be 2000
defaultTimes.get() should be 3
*/

请注意,select方法仅在通道属于同一个GoChannelPool时才起作用,或者只使用默认的GoChannelPool(但是如果太多通道共享同一个GoChannelPool,则性能会降低)。

英文:

An another choice is here for Java6+

A BlockingDeque implementation class:

import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
class GoChannelPool {
private final static GoChannelPool defaultInstance = newPool();
private final AtomicLong serialNumber = new AtomicLong();
private final WeakHashMap&lt;Long, WeakReference&lt;GoChannel&gt;&gt; channelWeakHashMap = new WeakHashMap&lt;&gt;();
private final LinkedBlockingDeque&lt;GoChannelObject&gt; totalQueue = new LinkedBlockingDeque&lt;&gt;();
public &lt;T&gt; GoChannel&lt;T&gt; newChannel()  {
GoChannel&lt;T&gt; channel = new GoChannel&lt;&gt;();
channelWeakHashMap.put(channel.getId(), new WeakReference&lt;GoChannel&gt;(channel));
return channel;
}
public void select(GoSelectConsumer consumer) throws InterruptedException {
consumer.accept(getTotalQueue().take());
}
public int size() {
return getTotalQueue().size();
}
public int getChannelCount() {
return channelWeakHashMap.values().size();
}
private LinkedBlockingDeque&lt;GoChannelObject&gt; getTotalQueue() {
return totalQueue;
}
public static GoChannelPool getDefaultInstance() {
return defaultInstance;
}
public static GoChannelPool newPool()  {
return new GoChannelPool();
}
private GoChannelPool() {}
private long getSerialNumber() {
return serialNumber.getAndIncrement();
}
private synchronized void syncTakeAndDispatchObject() throws InterruptedException {
select(new GoSelectConsumer() {
@Override
void accept(GoChannelObject t) {
WeakReference&lt;GoChannel&gt; goChannelWeakReference = channelWeakHashMap.get(t.channel_id);
GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null;
if (channel != null) {
channel.offerBuffer(t);
}
}
});
}
class GoChannel&lt;E&gt; {
// Instance
private final long id;
private final LinkedBlockingDeque&lt;GoChannelObject&lt;E&gt;&gt; buffer = new LinkedBlockingDeque&lt;&gt;();
public GoChannel() {
this(getSerialNumber());
}
private GoChannel(long id) {
this.id = id;
}
public long getId() {
return id;
}
public E take() throws InterruptedException {
GoChannelObject object;
while((object = pollBuffer()) == null) {
syncTakeAndDispatchObject();
}
return (E) object.data;
}
public void offer(E object) {
GoChannelObject&lt;E&gt; e = new GoChannelObject();
e.channel_id = getId();
e.data = object;
getTotalQueue().offer(e);
}
protected void offerBuffer(GoChannelObject&lt;E&gt; data) {
buffer.offer(data);
}
protected GoChannelObject&lt;E&gt; pollBuffer() {
return buffer.poll();
}
public int size() {
return buffer.size();
}
@Override
protected void finalize() throws Throwable {
super.finalize();
channelWeakHashMap.remove(getId());
}
}
class GoChannelObject&lt;E&gt; {
long channel_id;
E data;
boolean belongsTo(GoChannel channel) {
return channel != null &amp;&amp; channel_id == channel.id;
}
}
abstract static class GoSelectConsumer{
abstract void accept(GoChannelObject t);
}
}

then we can use it in this way:

GoChannelPool pool = GoChannelPool.getDefaultInstance();
final GoChannelPool.GoChannel&lt;Integer&gt; numberCh = pool.newChannel();
final GoChannelPool.GoChannel&lt;String&gt; stringCh = pool.newChannel();
final GoChannelPool.GoChannel&lt;String&gt; otherCh = pool.newChannel();
ExecutorService executorService = Executors.newCachedThreadPool();
int times;
times = 2000;
final CountDownLatch countDownLatch = new CountDownLatch(times * 2);
final AtomicInteger numTimes = new AtomicInteger();
final AtomicInteger strTimes = new AtomicInteger();
final AtomicInteger defaultTimes = new AtomicInteger();
final int finalTimes = times;
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i &lt; finalTimes; i++) {
numberCh.offer(i);
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i &lt; finalTimes; i++) {
stringCh.offer(&quot;s&quot;+i+&quot;e&quot;);
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
int otherTimes = 3;
for (int i = 0; i &lt; otherTimes; i++) {
otherCh.offer(&quot;a&quot;+i);
}
for (int i = 0; i &lt; times*2 + otherTimes; i++) {
pool.select(new GoChannelPool.GoSelectConsumer() {
@Override
void accept(GoChannelPool.GoChannelObject t) {
// The data order should be randomized.
System.out.println(t.data);
countDownLatch.countDown();
if (t.belongsTo(stringCh)) {
strTimes.incrementAndGet();
return;
}
else if (t.belongsTo(numberCh)) {
numTimes.incrementAndGet();
return;
}
defaultTimes.incrementAndGet();
}
});
}
countDownLatch.await(10, TimeUnit.SECONDS);
/**
The console output of data should be randomized.
numTimes.get() should be 2000
strTimes.get() should be 2000
defaultTimes.get() should be 3
*/

and beware that the select works only if the channels belong to the same GoChannelPool, or just use the default GoChannelPool(however the performance would be lower if too many channels share the same GoChannelPool)

答案3

得分: 1

唯一的方法是用更功能性的类的对象替换标准队列,当数据插入空队列时,通知消费者。这个类仍然可以实现BlockingQueue接口,所以另一方(生产者)看不出任何区别。诀窍在于put操作也应该引发一个标志并通知消费者。消费者在轮询所有线程后,清除标志并调用Object.wait()

英文:

The only way is to replace standard queues with objects of a more functional class, which notifies consumer(s) when datum is inserted in an empty queue. This class still can implement the BlockingQueue interface, so the other side (producer) see no difference. The trick is that put operation should also raise a flag and notify consumer. Consumer, after polling all threads, clears the flag and calls Object.wait().

答案4

得分: 1

我记得当我刚开始学习Java时,不知道线程可以共享进程的内存,所以我会使用(TCP/本地)套接字来进行线程间的通信。也许这种方法也可以起作用。

英文:

I remember when I was very new to Java, not knowing threads could share the memory of the process, I would have my threads communicate using (TCP/local) Sockets. Perhaps this can also work.

huangapple
  • 本文由 发表于 2014年3月21日 22:06:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/22561110.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定