英文:
Process elements in chunks using multiprocessing queues
问题
I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.
aq = Queue()
The instance in the queue are of class A:
class A:
id: str
desc: str
In a function I'm getting elements from the queue aq
and process them in chunks. The first element(if is just one) can be a SENTINEL, nothing to process.
def process:
chunk_data = []
all = []
item = aq.get()
if not isinstance(item, A):
return
chunk_data.append(item.id)
while item != SENTINEL:
# start processing in chunks
# adding elements to the chunk list until it's full
while len(chunk_data) < CHUNK_MAX_SIZE: # 50
item = aq.get()
if item == SENTINEL:
break
chunk_data.append(item.id)
# the chunk list is full, start processing
chunk_process_ids = process_data(chunk_data) # process chunks
all.extend(chunk_process_ids)
# empty chunk list and start again
chunk_data.clear()
The function works as expected, but I consider the code to be convoluted. I'm looking for a simpler, clearer version.
英文:
I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.
aq = Queue()
........................
The instance in the queue are of class A:
class A:
id: str
desc: str
In a function I'm getting elements from the queue aq
and process them in chunks.
The first element(if is just one) can be a SENTINEL, nothing to process.
....
def process:
chunk_data = []
all = [
item = aq.get()
if not isinstance(item, A):
return
chunk_data.append(item.id)
while item != SENTINEL:
# start process in chunks
# adding elements to the chunk list until is full
while len(chunk_data) < CHUNK_MAX_SIZE: # 50
item = aq.get()
if item == SENTINEL:
break
chunk_data.append(item.id)
# the chunk list is full start processing
chunk_process_ids = process_data(chunk_data) # process chunks
all.extend(chunk_process_ids)
# empty chunk list and start again
chunk_data.clear()
The function works as expected but I consider the code to be convoluted. I'm looking for a simple, clearer version.
答案1
得分: 2
以下是您要翻译的代码部分:
在遵循[DRY原则](https://en.wikipedia.org/wiki/Don't_repeat_yourself)的兴趣下,以下是我认为是您的代码的更干净版本,没有逻辑的重复。请注意,通常更好的处理错误的方式是引发异常,而不仅仅是在输入值的类型不符合预期时返回。
def process():
all = []
while True:
chunk_data = []
for _ in range(CHUNK_MAX_SIZE):
if (item := aq.get()) == SENTINEL:
break
assert isinstance(item, A)
chunk_data.append(item.id)
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
您还可以进一步简化代码,使用[`iter`](https://docs.python.org/3/library/functions.html#iter)和[`itertools.islice`](https://docs.python.org/3/library/itertools.html#itertools.islice),如果您不需要验证每个项目是否为类型`A`(如果您有控制代码入队的话,你不应该验证每个项目是否为类型`A`):
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
while True:
chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
鸣谢@KellyBundy在下方评论中提供的更简洁版本:
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
ids = map(attrgetter('id'), data)
while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
all += process_data(chunk_ids)
英文:
In the interest of following the DRY principle, here's what I believe is a cleaner version of your code with no repetition of logics. Note that it's generally better to handle errors by raising an exception than to simply return when the type of an input value is not as expected.
def process():
all = []
while True:
chunk_data = []
for _ in range(CHUNK_MAX_SIZE):
if (item := aq.get()) == SENTINEL:
break
assert isinstance(item, A)
chunk_data.append(item.id)
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
You can also clean up the code a bit further with iter
and itertools.islice
if you don't need to validate if each item is of type A
(which you shouldn't anyway if you have control over the code that enqueues items):
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
while True:
chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
Credit to @KellyBundy for a more concise version as commented below:
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
ids = map(attrgetter('id'), data)
while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
all += process_data(chunk_ids)
答案2
得分: 0
以下是您要求的代码部分的中文翻译:
def get_chunks():
chunk_data = []
while True:
item = aq.get()
if item == SENTINEL: # 或者: if not isinstance(item, A):
break
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk_data
chunk_data = []
# 我们有一个"小"块吗?
if chunk_data:
yield chunk_data
def process():
all = []
for chunk_data in get_chunks():
all.extend(process_data(chunk_data))
# 但更好的做法是让"写入者"使用`get_chunks`,这样已经生成的块会被写入队列。这将减少(但更大的)队列访问,通常会更有效。
这里有一个示例,假设所有的`A`实例都在一个列表中,`list_of_a_instances`:
def get_chunks():
chunk_data = []
for item in list_of_a_instances: # 例如,所有A实例的列表
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk_data
chunk_data = []
# 我们有一个"小"块吗?
if chunk_data:
yield chunk_data
def process():
all = []
while True:
chunk_data = aq.get()
if chunk_data == SENTINEL:
break
all.extend(process_data(chunk_data))
def writer():
for chunk_data in get_chunks():
aq.put(chunk_data)
aq.put(SENTINEL)
英文:
My preference would be to structure the code as follows:
def get_chunks():
chunk_data = []
while True:
item = aq.get()
if item == SENTINEL: # or: if not isinstance(item A):
break
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk
chunk_data = []
# Do we have a "small" chunk?
if chunk_data:
yield chunk_data
def process():
all = []
for chunk_data in get_chunks():
all.extend(process_data(chunk_data))
But it would be preferable for the "writer" to use get_chunks
so that it is already-made chunks being written to the queue. This would result in fewer (but larger) queue accesses, which in general would be more efficient.
Here is an example where I am assuming that all the A
instances are in a list, list_of_a_instances
:
def get_chunks():
chunk_data = []
for item in list_of_a_instances: # a list of all the A instances, for example
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk
chunk_data = []
# Do we have a "small" chunk?
if chunk_data:
yield chunk_data
def process():
all = []
while True:
chunk_data = aq.get()
if chunk_data == SENTINEL:
break
all.extend(process_data(chunk_data))
def writer():
for chunk_data in get_chunks():
aq.put(chunk_data)
aq.put(SENTINEL)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论