Python equivalent of Golang's select on channels

huangapple go评论102阅读模式
英文:

Python equivalent of Golang's select on channels

问题

Go语言中有一个可以在通道上工作的select语句。根据文档:

select语句允许一个goroutine等待多个通信操作。

select会阻塞,直到其中一个case可以运行,然后执行该case。如果有多个case都准备好了,它会随机选择一个。

是否有Python等效的以下代码:

package main

import "fmt"

func main() {
    c1 := make(chan int)
    c2 := make(chan int)
    quit := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            c1 <- i
        }
        quit <- 0
    }()

    go func() {
        for i := 0; i < 2; i++ {
            c2 <- i
        }
    }()

    for {
        select {
        case <-c1:
            fmt.Println("Received value from c1")
        case <-c2:
            fmt.Println("Received value from c2")
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

该程序的输出为:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit

请注意,Python中没有完全等效的select语句。但是,您可以使用多线程和队列来实现类似的功能。

英文:

Go has a select statement that works on channels. From the documentation:

> The select statement lets a goroutine wait on multiple communication
> operations.
>
> A select blocks until one of its cases can run, then it executes that
> case. It chooses one at random if multiple are ready.

Is there a Python equivalent of the following code:

package main

import &quot;fmt&quot;

func main() {
	c1 := make(chan int)
	c2 := make(chan int)
	quit := make(chan int)

	go func() {
		for i := 0; i &lt; 10; i++ {
			c1 &lt;- i
		}
		quit &lt;- 0
	}()

	go func() {
		for i := 0; i &lt; 2; i++ {
			c2 &lt;- i
		}
	}()

	for {
		select {
		case &lt;-c1:
			fmt.Println(&quot;Received value from c1&quot;)
		case &lt;-c2:
			fmt.Println(&quot;Received value from c2&quot;)
		case &lt;-quit:
			fmt.Println(&quot;quit&quot;)
			return
		}
	}
}

Output of this program:

Received value from c1
Received value from c1
Received value from c2
Received value from c1
Received value from c2
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
Received value from c1
quit

答案1

得分: 19

这是一个相当直接的翻译,但是“选择哪个通道如果有多个通道都准备好了”这部分的工作方式不同——它只是选择先到达的通道。此外,这就像使用gomaxprocs(1)运行代码一样。

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print('Received value from c1')
        elif which is c2:
            print('Received value from c2')
        elif which is quit:
            print('Received value from quit')
            return
main()

基本的改变是使用线程来模拟选择并合并消息。如果你经常使用这种模式,你可以编写一些选择代码:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print('Received value from c1')
        elif which is c2:
            print('Received value from c2')
        elif which is quit:
            print('Received value from quit')
            return
main()

但是...

请注意,这个select函数并不完全等同于Go语言中的select,尽管对于你的程序来说并没有关系——一个goroutine可以在一个通道上发送结果,而这个结果会在select中排队,如果我们不总是完整地迭代select,它可能会丢失!

英文:

Here's a pretty direct translation, but the "choosing which if multiple are ready" part works differently - it's just taking what came in first. Also this is like running your code with gomaxprocs(1).

import threading
import Queue

def main():
    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    combined = Queue.Queue(maxsize=0)

    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))

    t = threading.Thread(target=listen_and_forward, args=(c1,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(c2,))
    t.daemon = True
    t.start()
    t = threading.Thread(target=listen_and_forward, args=(quit,))
    t.daemon = True
    t.start()

    while True:
        which, message = combined.get()
        if which is c1:
            print &#39;Received value from c1&#39;
        elif which is c2:
            print &#39;Received value from c2&#39;
        elif which is quit:
            print &#39;Received value from quit&#39;
            return
main()

The basic change is simulating the select with threads that combine messages. If you were going to use this pattern much, you might write some select code:

import threading
import Queue

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = threading.Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print &#39;Received value from c1&#39;
        elif which is c2:
            print &#39;Received value from c2&#39;
        elif which is quit:
            print &#39;Received value from quit&#39;
            return
main()

But...

Note that this select isn't quite the go one, though it doesn't matter for your program - a goroutine could send a result on a channel that would be queued up in the select and lost if we didn't always iterate over the select to completion!

答案2

得分: 12

还要考虑由Benoit Chesneau开发的offset库。它是将Go并发模型移植到Python的一个库,底层使用纤程(fibers)。

他在PyCon APAC 2013上做了一个关于这个库的演讲:

英文:

Also consider the offset library by Benoit Chesneau. It is a port of the Go concurrency model to Python, using fibers under the covers.

He gave a presentation about this at PyCon APAC 2013:

答案3

得分: 9

你可以使用multiprocessing.Pipe代替chanthreading.Thread代替go,以及select.select代替select

以下是使用这种方法重新实现你的go示例的Python代码:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread


def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print('Received value from c1')
        elif which == c2_r:
            c2_r.recv()
            print('Received value from c2')
        elif which == quit_r and len(ready) == 1:
            quit_r.recv()
            print('Received value from quit')
            return

if __name__ == '__main__':
    main()

这个实现基于@Thomas的实现,但与@Thomas的实现不同,它不会生成额外的线程来执行select操作。

在Linux上使用Python 2.7.13进行了测试。在Windows上可能会有不同的行为,因为select是Unix风格的。

编辑:我添加了len(ready) == 1的条件,以便在处理quit之前先处理其他的管道。这在Go中是不需要的,因为通道的大小为零,所以func1在将消息发送到quit_w之前不能发送消息到c1_w。感谢@Sean Perry的评论。

英文:

You can use multiprocessing.Pipe instead of chan, threading.Thread instead of go and select.select instead of select.

Here's a reimplementation of your go example in Python using this approach:

import random
from multiprocessing import Pipe
from select import select
from threading import Thread


def main():
    c1_r, c1_w = Pipe(duplex=False)
    c2_r, c2_w = Pipe(duplex=False)
    quit_r, quit_w = Pipe(duplex=False)

    def func1():
        for i in range(10):
            c1_w.send(i)
        quit_w.send(0)

    Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2_w.send(i)

    Thread(target=func2).start()

    while True:
        ready, _, _ = select([c1_r, c2_r, quit_r], [], [])
        which = random.choice(ready)
        if which == c1_r:
            c1_r.recv()
            print &#39;Received value from c1&#39;
        elif which == c2_r:
            c2_r.recv()
            print &#39;Received value from c2&#39;
        elif which == quit_r and len(ready) == 1:
            quit_r.recv()
            print &#39;Received value from quit&#39;
            return

if __name__ == &#39;__main__&#39;:
    main()

This implementation is based upon @Thomas's implementation, but unlike @Thomas's it doesn't spawn extra threads to perform the select.

Tested on Linux with Python 2.7.13. Windows may behave differently as select is a Unixy thing.

Edit: I added the len(ready) == 1 condition so quit is only handled after the other pipes are drained. This isn't required in Go as the channels are zero sized, so func1 can't send a message to quit_w until after the message sent to c1_w has been received. Thanks to the comment by @Sean Perry.

答案4

得分: 4

使用Python 3.5版本,可以使用关键字asyncawait来创建可以暂停执行的函数,从而能够在事件循环上运行,而不是线程。标准库asyncio提供了这个功能。

为了更直接地模拟Go语言中的阻塞通道和select行为,你可以使用这个小型库,然后你的示例代码在Python中看起来会非常相似。

英文:

With Python 3.5 there are the keywords async and await which make it possible to have functions which can be suspended in execution and thus are able to run on an evenloop instead of threads. The asyncio std lib is offering one.

To more directly map the behaviour of Go blocking channels and select you might make use of this small library and then your example code would look very similar in Python.

答案5

得分: 4

是的,使用goless可以实现所有这些功能。你可以尝试一下。

玩得开心 Python equivalent of Golang's select on channels

以下是一个示例:

c1 = goless.chan()
c2 = goless.chan()

def func1():
    time.sleep(1)
    c1.send('one')
goless.go(func1)

def func2():
    time.sleep(2)
    c2.send('two')
goless.go(func2)

for i in range(2):
    case, val = goless.select([goless.rcase(c1), goless.rcase(c2)])
    print(val)
英文:

Yes, all are possible with goless. You can try it.

Have Fun Python equivalent of Golang's select on channels

Here is an example:

c1 = goless.chan()
c2 = goless.chan()

def func1():
    time.sleep(1)
    c1.send(&#39;one&#39;)
goless.go(func1)

def func2():
    time.sleep(2)
    c2.send(&#39;two&#39;)
goless.go(func2)

for i in range(2):
    case, val = goless.select([goless.rcase(c1), goless.rcase(c2)])
    print(val)

答案6

得分: 3

以下是代码的中文翻译:

这是另一个试图模仿Go语法的代码:

from threading import Thread
from queue import Queue

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    Thread(target=lambda: [c1.put(i) for i in range(10)] or quit.put(0)).start()
    Thread(target=lambda: [c2.put(i) for i in range(2)]).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print('从c1接收到的值')
        elif which is c2:
            print('从c2接收到的值')
        elif which is quit:
            print('从quit接收到的值')
            return

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

main()
英文:

Here's another, an attempt at imitating the go syntax:

from threading import Thread
from Queue import Queue

def main():

    c1 = Queue.Queue(maxsize=0)
    c2 = Queue.Queue(maxsize=0)
    quit = Queue.Queue(maxsize=0)

    Thread(target=lambda: [c1.put(i) for i in range(10)] or quit.put(0)).start()
    Thread(target=lambda: [c2.put(i) for i in range(2)]).start()

    for which, msg in select(c1, c2, quit):
        if which is c1:
            print &#39;Received value from c1&#39;
        elif which is c2:
            print &#39;Received value from c2&#39;
        elif which is quit:
            print &#39;Received value from quit&#39;
            return

def select(*queues):
    combined = Queue.Queue(maxsize=0)
    def listen_and_forward(queue):
        while True:
            combined.put((queue, queue.get()))
    for queue in queues:
        t = Thread(target=listen_and_forward, args=(queue,))
        t.daemon = True
        t.start()
    while True:
        yield combined.get()

main()

答案7

得分: 2

为了完整起见,Go风格的通道,包括工作的select,可以作为pygolang的一部分使用:

ch1 = chan()    # 同步通道
ch2 = chan(3)   # 缓冲大小为3的通道

def _():
    ch1.send('a')
    ch2.send('b')
go(_)

ch1.recv()      # 将返回 'a'
ch2.recv_()     # 将返回 ('b', True)

_, _rx = select(
    ch1.recv,           # 0
    ch2.recv_,          # 1
    (ch2.send, obj2),   # 2
    default,            # 3
)
if _ == 0:
    # _rx 是从ch1接收到的内容
    ...
if _ == 1:
    # _rx 是从ch2接收到的(rx, ok)
    ...
if _ == 2:
    # 我们知道obj2被发送到了ch2
    ...
if _ == 3:
    # 默认情况
    ...

offset(参见https://stackoverflow.com/a/19143696/9456786)也很有趣。

goless(参见https://stackoverflow.com/a/39269599/9456786),不幸的是,它有一个弱select实现,根据设计在同步通道上无法正常工作

英文:

For completeness: Go-style channels, including working select are available as part of pygolang:

ch1 = chan()    # synchronous channel
ch2 = chan(3)   # channel with buffer of size 3

def _():
    ch1.send(&#39;a&#39;)
    ch2.send(&#39;b&#39;)
go(_)

ch1.recv()      # will give &#39;a&#39;
ch2.recv_()     # will give (&#39;b&#39;, True)

_, _rx = select(
    ch1.recv,           # 0
    ch2.recv_,          # 1
    (ch2.send, obj2),   # 2
    default,            # 3
)
if _ == 0:
    # _rx is what was received from ch1
    ...
if _ == 1:
    # _rx is (rx, ok) of what was received from ch2
    ...
if _ == 2:
    # we know obj2 was sent to ch2
    ...
if _ == 3:
    # default case
    ...

offset (see https://stackoverflow.com/a/19143696/9456786) also seems interesting.

goless (see https://stackoverflow.com/a/39269599/9456786), unfortunately, has weak select implementation, which by design does not work properly on synchronous channels.

答案8

得分: 1

这里有几个答案使用queue.Queuethreading.Thread来模拟选择行为,但这并不是必要的。你可以像这样扩展queue.Queue

import queue
import os
import select

class EQueue(queue.Queue):
    def __init__(self, *args, **kwargs):
        self._fd = os.eventfd(flags=0x00004001)
        super().__init__(*args, **kwargs)

    def put(self, *args, **kwargs):
        super().put(*args, **kwargs)
        eventfd_write(self._fd, 1)

    def get(self, *args, **kwargs):
        os.eventfd_read(self._fd)
        super().get(*args, **kwargs)

    def fileno(self):
        return self._fd

    def __del__(self):
        os.close(self._fd)

这在队列周围添加了一个额外的信号量,并且关键的是,通过文件描述符可以访问该信号量。这意味着现在可以使用select.select()在这个队列上等待。因此,上面使用队列和线程的示例可以重新编写,而不需要额外的线程:

def main():

    c1 = EQueue(maxsize=0)
    c2 = EQueue(maxsize=0)
    quit = EQueue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    rx, _, _ = select.select([c1, c2, quit], [], [])
    if c1 in rx:
        msg = c1.get()
        print('Received value from c1')
    elif c2 in rx:
        msg = c2.get()
        print('Received value from c2')
    elif quit in rx:
        print('Received value from quit')
        return
main()

这里的main函数与上面给出的@alkasm的函数非常相似,但没有自定义实现的select,也没有为每个队列创建线程以将所有单独的队列收集到一个队列中;它依赖于操作系统告诉你哪个队列有可用的项。

请注意,os.eventfd仅在Python 3.10中添加,但使用ctypes实现它相当简单,或者可以使用PyPI上的eventfd包。后者还支持Windows,而其他选项不支持,它使用管道模拟eventfds。Python文档声称,eventfds仅在运行glibc >= 2.8的Linux系统上可用,但muslc也支持它们。

英文:

There are several answers here that use queue.Queue and threading.Thread to simulate the select behaviour but it's not necessary. You can extend queue.Queue like this:

import queue
import os
import select

class EQueue(queue.Queue):
    def __init__(self, *args, **kwargs)
        self._fd = os.eventfd(flags=0x00004001)
        super().__init__(*args, **kwargs)

    def put(self, *args, **kwargs):
        super().put(*args, **kwargs)
        eventfd_write(self._fd, 1)

    def get(self, *args, **kwargs):
        os.eventfd_read(self._fd)
        super().get(*args, **kwargs)

    def fileno(self):
        return self._fd

    def __del__(self):
        os.close(self._fd)

This adds an extra semaphore around the queue and, crucially, one that is accessible through a file descriptor. This means you can now wait on this queue with select.select(). So the above examples that use queues and threads can be rewritten without the extra threads:

def main():

    c1 = EQueue(maxsize=0)
    c2 = EQueue(maxsize=0)
    quit = EQueue(maxsize=0)

    def func1():
        for i in range(10):
            c1.put(i)
        quit.put(0)

    threading.Thread(target=func1).start()

    def func2():
        for i in range(2):
            c2.put(i)

    threading.Thread(target=func2).start()

    rx, _, _ = select.select([c1, c2, quit], [], []):
        if c1 in rx:
            msg = c1.get()
            print &#39;Received value from c1&#39;
        elif c2 in rx:
            msg = c2.get()
            print &#39;Received value from c2&#39;
        elif quit in rx:
            print &#39;Received value from quit&#39;
            return
main()

The main function here is fairly similar to that given by @alkasm above but there is no custom implementation of select and no thread-per-queue to collect all the separate queues into one; it relies on the operating system to tell you when a queue has items available.

Note that os.eventfd was only added in Python 3.10 but implementing it in ctypes is fairly trivial or there is the eventfd package on PyPI. The latter also supports Windows, unlike the other options, simulating eventfds with pipes. The python doco claims that eventfds are only available on Linux systems running glibc >= 2.8 but muslc also supports them.

huangapple
  • 本文由 发表于 2013年10月2日 14:16:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/19130986.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定