英文:
Multiplex on queue.Queue?
问题
我如何同时在多个queue.Queue
上进行“选择”?
Golang通过其通道具有所需的功能:
select {
case i1 = <-c1:
print("received ", i1, " from c1\n")
case c2 <- i2:
print("sent ", i2, " to c2\n")
case i3, ok := <-c3: // same as: i3, ok := <-c3
if ok {
print("received ", i3, " from c3\n")
} else {
print("c3 is closed\n")
}
default:
print("no communication\n")
}
其中第一个解除阻塞的通道执行相应的代码块。我如何在Python中实现这个?
更新0
根据tux21b的答案中给出的链接,所需的队列类型具有以下属性:
- 多生产者/多消费者队列(MPMC)
- 提供每个生产者的FIFO/LIFO
- 当队列为空/满时,消费者/生产者被阻塞
此外,通道可以是阻塞的,生产者将被阻塞,直到消费者检索项目。我不确定Python的Queue是否可以做到这一点。
英文:
How can I go about "selecting" on multiple queue.Queue
's simultaneously?
Golang has the desired feature with its channels:
select {
case i1 = <-c1:
print("received ", i1, " from c1\n")
case c2 <- i2:
print("sent ", i2, " to c2\n")
case i3, ok := (<-c3): // same as: i3, ok := <-c3
if ok {
print("received ", i3, " from c3\n")
} else {
print("c3 is closed\n")
}
default:
print("no communication\n")
}
Wherein the first channel to unblock executes the corresponding block. How would I achieve this in Python?
Update0
Per the link given in tux21b's answer, the desired queue type has the following properties:
- Multi-producer/multi-consumer queues (MPMC)
- provides per-producer FIFO/LIFO
- When a queue is empty/full consumers/producers get blocked
Furthermore channels can be blocking, producers will block until a consumer retrieves the item. I'm not sure that Python's Queue can do this.
答案1
得分: 3
如果您使用queue.PriorityQueue
,您可以使用通道对象作为优先级来获得类似的行为:
import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager
logging.basicConfig(level=logging.NOTSET,
format="%(threadName)s - %(message)s")
class ChannelManager(object):
next_priority = 0
def __init__(self):
self.queue = PriorityQueue()
self.channels = []
def put(self, channel, item, *args, **kwargs):
self.queue.put((channel, item), *args, **kwargs)
def get(self, *args, **kwargs):
return self.queue.get(*args, **kwargs)
@contextmanager
def select(self, ordering=None, default=False):
if default:
try:
channel, item = self.get(block=False)
except Empty:
channel = 'default'
item = None
else:
channel, item = self.get()
yield channel, item
def new_channel(self, name):
channel = Channel(name, self.next_priority, self)
self.channels.append(channel)
self.next_priority += 1
return channel
class Channel(object):
def __init__(self, name, priority, manager):
self.name = name
self.priority = priority
self.manager = manager
def __str__(self):
return self.name
def __lt__(self, other):
return self.priority < other.priority
def put(self, item):
self.manager.put(self, item)
if __name__ == '__main__':
num_channels = 3
num_producers = 4
num_items_per_producer = 2
num_consumers = 3
num_items_per_consumer = 3
manager = ChannelManager()
channels = [manager.new_channel('Channel#{0}'.format(i))
for i in range(num_channels)]
def producer_target():
for i in range(num_items_per_producer):
time.sleep(random.random())
channel = random.choice(channels)
message = random.choice(string.ascii_letters)
logging.info('Putting {0} in {1}'.format(message, channel))
channel.put(message)
producers = [threading.Thread(target=producer_target,
name='Producer#{0}'.format(i))
for i in range(num_producers)]
for producer in producers:
producer.start()
for producer in producers:
producer.join()
logging.info('Producers finished')
def consumer_target():
for i in range(num_items_per_consumer):
time.sleep(random.random())
with manager.select(default=True) as (channel, item):
if channel:
logging.info('Received {0} from {1}'.format(item, channel))
else:
logging.info('No data received')
consumers = [threading.Thread(target=consumer_target,
name='Consumer#{0}'.format(i))
for i in range(num_consumers)]
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.join()
logging.info('Consumers finished')
示例输出:
Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished
在这个例子中,ChannelManager
只是queue.PriorityQueue
的一个包装器,它将select
方法实现为一个contextmanager
,使其看起来类似于Go中的select
语句。
需要注意的几点:
-
排序
-
在Go的例子中,
select
语句中通道的写入顺序决定了如果有多个通道都有数据可用,将执行哪个通道的代码。 -
在Python的例子中,顺序由分配给每个通道的优先级确定。然而,优先级可以动态地分配给每个通道(如示例中所示),因此可以通过更复杂的
select
方法来改变顺序,该方法根据方法的参数分配新的优先级。此外,一旦上下文管理器完成,可以恢复旧的顺序。 -
阻塞
-
在Go的例子中,如果存在
default
情况,select
语句是阻塞的。 -
在Python的例子中,必须向
select
方法传递一个布尔参数,以明确指示何时需要阻塞/非阻塞。在非阻塞情况下,上下文管理器返回的通道只是字符串'default'
,因此在with
语句内部的代码中很容易检测到这一点。 -
线程:
queue
模块中的对象已经准备好用于多生产者、多消费者的场景,如示例中所示。
英文:
If you use queue.PriorityQueue
you can get a similar behaviour using the channel objects as priorities:
import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager
logging.basicConfig(level=logging.NOTSET,
format="%(threadName)s - %(message)s")
class ChannelManager(object):
next_priority = 0
def __init__(self):
self.queue = PriorityQueue()
self.channels = []
def put(self, channel, item, *args, **kwargs):
self.queue.put((channel, item), *args, **kwargs)
def get(self, *args, **kwargs):
return self.queue.get(*args, **kwargs)
@contextmanager
def select(self, ordering=None, default=False):
if default:
try:
channel, item = self.get(block=False)
except Empty:
channel = 'default'
item = None
else:
channel, item = self.get()
yield channel, item
def new_channel(self, name):
channel = Channel(name, self.next_priority, self)
self.channels.append(channel)
self.next_priority += 1
return channel
class Channel(object):
def __init__(self, name, priority, manager):
self.name = name
self.priority = priority
self.manager = manager
def __str__(self):
return self.name
def __lt__(self, other):
return self.priority < other.priority
def put(self, item):
self.manager.put(self, item)
if __name__ == '__main__':
num_channels = 3
num_producers = 4
num_items_per_producer = 2
num_consumers = 3
num_items_per_consumer = 3
manager = ChannelManager()
channels = [manager.new_channel('Channel#{0}'.format(i))
for i in range(num_channels)]
def producer_target():
for i in range(num_items_per_producer):
time.sleep(random.random())
channel = random.choice(channels)
message = random.choice(string.ascii_letters)
logging.info('Putting {0} in {1}'.format(message, channel))
channel.put(message)
producers = [threading.Thread(target=producer_target,
name='Producer#{0}'.format(i))
for i in range(num_producers)]
for producer in producers:
producer.start()
for producer in producers:
producer.join()
logging.info('Producers finished')
def consumer_target():
for i in range(num_items_per_consumer):
time.sleep(random.random())
with manager.select(default=True) as (channel, item):
if channel:
logging.info('Received {0} from {1}'.format(item, channel))
else:
logging.info('No data received')
consumers = [threading.Thread(target=consumer_target,
name='Consumer#{0}'.format(i))
for i in range(num_consumers)]
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.join()
logging.info('Consumers finished')
Example output:
Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished
In this example, ChannelManager
is just a wrapper around queue.PriorityQueue
that implements the select
method as a contextmanager
to make it look similar to the select
statement in Go.
A few things to note:
-
Ordering
-
In the Go example, the order in which the channels are written inside the
select
statement determines which channel's code will be executed if there's data available for more than one channel. -
In the python example the order is determined by the priority assigned to each channel. However, the priority can be dinamically assigned to each channel (as seen in the example), so changing the ordering would be possible with a more complex
select
method that takes care of assigning new priorities based on an argument to the method. Also, the old ordering could be reestablished once the context manager is finished. -
Blocking
-
In the Go example, the
select
statement is blocking if adefault
case exists. -
In the python example, a boolean argument has to be passed to the
select
method to make it clear when blocking/non-blocking is desired. In the non-blocking case, the channel returned by the context mananager is just the string'default'
so it's easy in the code inside to detect this in the code inside thewith
statement. -
Threading: Object in the
queue
module are already ready for multi-producer, multiconsumer-scenarios as already seen in the example.
答案2
得分: 2
有许多不同的生产者-消费者队列的实现,比如queue.Queue。它们通常在很多属性上有所不同,就像Dmitry Vyukov在这篇优秀的文章中所列举的那样。正如你所看到的,可能有超过10,000种不同的组合。用于这些队列的算法也因需求而异。不能仅仅扩展现有的队列算法来保证额外的属性,因为这通常需要不同的内部数据结构和不同的算法。
Go的通道提供了相对较多的保证属性,因此这些通道可能适用于许多程序。其中最困难的要求之一是支持在多个通道上进行读取/阻塞(select语句),并且如果select语句中的多个分支都能继续执行,要公平地选择一个通道,以确保没有消息被遗漏。Python的queue.Queue没有提供这些功能,因此无法通过它来实现相同的行为。
因此,如果你想继续使用queue.Queue,你需要为此问题找到解决方法。然而,这些解决方法也有自己的缺点,并且更难维护。寻找另一个生产者-消费者队列,它提供你所需的功能,可能是一个更好的主意!无论如何,这里有两种可能的解决方法:
轮询
while True:
try:
i1 = c1.get_nowait()
print "received %s from c1" % i1
except queue.Empty:
pass
try:
i2 = c2.get_nowait()
print "received %s from c2" % i2
except queue.Empty:
pass
time.sleep(0.1)
这种方法在轮询通道时可能会使用大量的CPU周期,并且在有大量消息时可能会很慢。使用指数退避时间的time.sleep()(而不是这里显示的恒定的0.1秒)可能会大大改善这个版本。
单个通知队列
queue_id = notify.get()
if queue_id == 1:
i1 = c1.get()
print "received %s from c1" % i1
elif queue_id == 2:
i2 = c2.get()
print "received %s from c2" % i2
使用这种设置,你必须在发送到c1或c2之后向通知队列发送一些内容。只要一个这样的通知队列对你来说足够(即你没有多个“selects”,每个都在不同的子集通道上阻塞),这可能适合你。
或者你也可以考虑使用Go。Go的goroutine和并发支持比Python有限的线程能力更强大。
英文:
There are many different implementations of producer-consumer queues, like queue.Queue available. They normally differ in a lot of properties like listed on this excellent article by Dmitry Vyukov. As you can see, there are more than 10k different combinations possible. The algorithms used for such queues also differ widely depending on the requirements. It's not possible to just extend an existing queue algorithm to guarantee additional properties, since that normally requires different internal data structures and different algorithms.
Go's channels offer a relatively high number of guaranteed properties, so those channels might be suitable for a lot of programs. One of the hardest requirements there is the support for reading / blocking on multiple channels at once (select statement) and to choose a channel fairly if more than one branch in a select statement is able to proceed, so that no messages will be left behind. Python's queue.Queue doesn't offer this features, so it's simply not possible to archive the same behavior with it.
So, if you want to continue using queue.Queue you need to find workarounds for that problem. The workarounds have however their own list of drawbacks and are harder to maintain. Looking for another producer-consumer queue which offers the features you need might be a better idea! Anyway, here are two possible workarounds:
Polling
while True:
try:
i1 = c1.get_nowait()
print "received %s from c1" % i1
except queue.Empty:
pass
try:
i2 = c2.get_nowait()
print "received %s from c2" % i2
except queue.Empty:
pass
time.sleep(0.1)
This might use a lot of CPU cycles while polling the channels and might be slow when there are a lot of messages. Using time.sleep() with an exponential back-off time (instead of the constant 0.1 secs shown here) might improve this version drastically.
A single notify-queue
queue_id = notify.get()
if queue_id == 1:
i1 = c1.get()
print "received %s from c1" % i1
elif queue_id == 2:
i2 = c2.get()
print "received %s from c2" % i2
With this setup, you must send something to the notify queue after sending to c1 or c2. This might work for you, as long as only one such notify-queue is enough for you (i.e. you do not have multiple "selects", each blocking on a different subset of your channels).
Alternatively you can also consider using Go. Go's goroutines and concurrency support is much more powerful than Python's limited threading capabilities anyway.
答案3
得分: 2
pychan项目在Python中复制了Go通道,包括多路复用。它实现了与Go相同的算法,因此满足您所有的期望属性:
- 多个生产者和消费者可以通过Chan进行通信。当生产者和消费者都准备好时,它们将阻塞。
- 生产者和消费者按照到达的顺序进行服务(先进先出)。
- 空(满)队列将阻塞消费者(生产者)。
以下是您的示例代码:
c1 = Chan(); c2 = Chan(); c3 = Chan()
try:
chan, value = chanselect([c1, c3], [(c2, i2)])
if chan == c1:
print("Received %r from c1" % value)
elif chan == c2:
print("Sent %r to c2" % i2)
else: # c3
print("Received %r from c3" % value)
except ChanClosed as ex:
if ex.which == c3:
print("c3 is closed")
else:
raise
(完全透明披露:我编写了这个库)
英文:
The pychan project duplicates Go channels in Python, including multiplexing. It implements the same algorithm as Go, so it meets all of your desired properties:
- Multiple producers and consumers can communicate through a Chan. When both a producer and consumer are ready, the pair of them will block
- Producers and consumers are serviced in the order they arrived (FIFO)
- An empty (full) queue will block consumers (producers).
Here's what your example would look like:
c1 = Chan(); c2 = Chan(); c3 = Chan()
try:
chan, value = chanselect([c1, c3], [(c2, i2)])
if chan == c1:
print("Received %r from c1" % value)
elif chan == c2:
print("Sent %r to c2" % i2)
else: # c3
print("Received %r from c3" % value)
except ChanClosed as ex:
if ex.which == c3:
print("c3 is closed")
else:
raise
(Full disclosure: I wrote this library)
答案4
得分: 1
from queue import Queue
these imports needed for example code
from threading import Thread
from time import sleep
from random import randint
class MultiQueue(Queue):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queues = []
def addQueue(self, queue):
queue.put = self._put_notify(queue, queue.put)
queue.put_nowait = self._put_notify(queue, queue.put_nowait)
self.queues.append(queue)
def _put_notify(self, queue, old_put):
def wrapper(*args, **kwargs):
result = old_put(*args, **kwargs)
self.put(queue)
return result
return wrapper
if name == 'main':
# an example of MultiQueue usage
q1 = Queue()
q1.name = 'q1'
q2 = Queue()
q2.name = 'q2'
q3 = Queue()
q3.name = 'q3'
mq = MultiQueue()
mq.addQueue(q1)
mq.addQueue(q2)
mq.addQueue(q3)
queues = [q1, q2, q3]
for i in range(9):
def message(i=i):
print("thread-%d starting..." % i)
sleep(randint(1, 9))
q = queues[i%3]
q.put('thread-%d ending...' % i)
Thread(target=message).start()
print('awaiting results...')
for _ in range(9):
result = mq.get()
print(result.name)
print(result.get())
Rather than try to use the .get()
method of several queues, the idea here is to have the queues notify the MultiQueue
when they have data ready -- sort of a select
in reverse. This is achieved by having MultiQueue
wrap the various Queue
's put()
and put_nowait()
methods so that when something is added to those queues, that queue is then put()
into the the MultiQueue
, and a corresponding MultiQueue.get()
will retrieve the Queue
that has data ready.
This MultiQueue
is based on the FIFO Queue, but you could also use the LIFO or Priority queues as the base depending on your needs.
英文:
from queue import Queue
# these imports needed for example code
from threading import Thread
from time import sleep
from random import randint
class MultiQueue(Queue):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queues = []
def addQueue(self, queue):
queue.put = self._put_notify(queue, queue.put)
queue.put_nowait = self._put_notify(queue, queue.put_nowait)
self.queues.append(queue)
def _put_notify(self, queue, old_put):
def wrapper(*args, **kwargs):
result = old_put(*args, **kwargs)
self.put(queue)
return result
return wrapper
if __name__ == '__main__':
# an example of MultiQueue usage
q1 = Queue()
q1.name = 'q1'
q2 = Queue()
q2.name = 'q2'
q3 = Queue()
q3.name = 'q3'
mq = MultiQueue()
mq.addQueue(q1)
mq.addQueue(q2)
mq.addQueue(q3)
queues = [q1, q2, q3]
for i in range(9):
def message(i=i):
print("thread-%d starting..." % i)
sleep(randint(1, 9))
q = queues[i%3]
q.put('thread-%d ending...' % i)
Thread(target=message).start()
print('awaiting results...')
for _ in range(9):
result = mq.get()
print(result.name)
print(result.get())
Rather than try to use the .get()
method of several queues, the idea here is to have the queues notify the MultiQueue
when they have data ready -- sort of a select
in reverse. This is achieved by having MultiQueue
wrap the various Queue
's put()
and put_nowait()
methods so that when something is added to those queues, that queue is then put()
into the the MultiQueue
, and a corresponding MultiQueue.get()
will retrieve the Queue
that has data ready.
This MultiQueue
is based on the FIFO Queue, but you could also use the LIFO or Priority queues as the base depending on your needs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论