使用多进程队列处理元素块。

huangapple go评论67阅读模式
英文:

Process elements in chunks using multiprocessing queues

问题

I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.

aq = Queue()

The instance in the queue are of class A:

class A:
     id: str
     desc: str

In a function I'm getting elements from the queue aq and process them in chunks. The first element(if is just one) can be a SENTINEL, nothing to process.

def process:
  chunk_data = []
  all = []
  item = aq.get()
  if not isinstance(item, A):
    return
  chunk_data.append(item.id)
  while item != SENTINEL:
    # start processing in chunks
    # adding elements to the chunk list until it's full
    while len(chunk_data) < CHUNK_MAX_SIZE: # 50
      item = aq.get()
      if item == SENTINEL:
        break
      chunk_data.append(item.id)
    # the chunk list is full, start processing
    chunk_process_ids = process_data(chunk_data) # process chunks
    all.extend(chunk_process_ids)
    # empty chunk list and start again
    chunk_data.clear()

The function works as expected, but I consider the code to be convoluted. I'm looking for a simpler, clearer version.

英文:

I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.

aq =  Queue() 

........................

The instance in the queue are of class A:

class A:
 id: str
 desc: str

In a function I'm getting elements from the queue aq and process them in chunks.
The first element(if is just one) can be a SENTINEL, nothing to process.
....

def process:
  chunk_data = []
  all = [
  item = aq.get()
  if not isinstance(item, A):
    return
  chunk_data.append(item.id)
  while item != SENTINEL:
   # start process in chunks
   # adding elements to the chunk list until is full
    while len(chunk_data) &lt; CHUNK_MAX_SIZE: # 50
      item = aq.get()
      if item == SENTINEL:
        break
      chunk_data.append(item.id)
    # the chunk list is full start processing
    chunk_process_ids = process_data(chunk_data) # process chunks
    all.extend(chunk_process_ids)
    # empty chunk list and start again
    chunk_data.clear()  

The function works as expected but I consider the code to be convoluted. I'm looking for a simple, clearer version.

答案1

得分: 2

以下是您要翻译的代码部分:

在遵循[DRY原则](https://en.wikipedia.org/wiki/Don&#39;t_repeat_yourself)的兴趣下,以下是我认为是您的代码的更干净版本,没有逻辑的重复。请注意,通常更好的处理错误的方式是引发异常,而不仅仅是在输入值的类型不符合预期时返回。

def process():
    all = []
    while True:
        chunk_data = []
        for _ in range(CHUNK_MAX_SIZE):
            if (item := aq.get()) == SENTINEL:
                break
            assert isinstance(item, A)
            chunk_data.append(item.id)
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) &lt; CHUNK_MAX_SIZE:
            break

您还可以进一步简化代码使用[`iter`](https://docs.python.org/3/library/functions.html#iter)和[`itertools.islice`](https://docs.python.org/3/library/itertools.html#itertools.islice),如果您不需要验证每个项目是否为类型`A`(如果您有控制代码入队的话,你不应该验证每个项目是否为类型`A`):

from itertools import islice
from operator import attrgetter

def process():
    all = []
    data = iter(aq.get, SENTINEL)
    while True:
        chunk_data = list(map(attrgetter(&#39;id&#39;), islice(data, CHUNK_MAX_SIZE)))
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) &lt; CHUNK_MAX_SIZE:
            break

鸣谢@KellyBundy在下方评论中提供的更简洁版本

from itertools import islice
from operator import attrgetter

def process():
    all = []
    data = iter(aq.get, SENTINEL)
    ids = map(attrgetter(&#39;id&#39;), data)
    while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
        all += process_data(chunk_ids)
英文:

In the interest of following the DRY principle, here's what I believe is a cleaner version of your code with no repetition of logics. Note that it's generally better to handle errors by raising an exception than to simply return when the type of an input value is not as expected.

def process():
    all = []
    while True:
        chunk_data = []
        for _ in range(CHUNK_MAX_SIZE):
            if (item := aq.get()) == SENTINEL:
                break
            assert isinstance(item, A)
            chunk_data.append(item.id)
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) &lt; CHUNK_MAX_SIZE:
            break

You can also clean up the code a bit further with iter and itertools.islice if you don't need to validate if each item is of type A (which you shouldn't anyway if you have control over the code that enqueues items):

from itertools import islice
from operator import attrgetter

def process():
    all = []
    data = iter(aq.get, SENTINEL)
    while True:
        chunk_data = list(map(attrgetter(&#39;id&#39;), islice(data, CHUNK_MAX_SIZE)))
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) &lt; CHUNK_MAX_SIZE:
            break

Credit to @KellyBundy for a more concise version as commented below:

from itertools import islice
from operator import attrgetter

def process():
    all = []
    data = iter(aq.get, SENTINEL)
    ids = map(attrgetter(&#39;id&#39;), data)
    while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
        all += process_data(chunk_ids)

答案2

得分: 0

以下是您要求的代码部分的中文翻译:

def get_chunks():
    chunk_data = []
    while True:
        item = aq.get()
        if item == SENTINEL:  # 或者: if not isinstance(item, A):
            break
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk_data
            chunk_data = []
    # 我们有一个"小"块吗?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    for chunk_data in get_chunks():
        all.extend(process_data(chunk_data))

# 但更好的做法是让"写入者"使用`get_chunks`,这样已经生成的块会被写入队列。这将减少(但更大的)队列访问,通常会更有效。
这里有一个示例假设所有的`A`实例都在一个列表中,`list_of_a_instances`:

def get_chunks():
    chunk_data = []
    for item in list_of_a_instances:  # 例如,所有A实例的列表
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk_data
            chunk_data = []
    # 我们有一个"小"块吗?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    while True:
        chunk_data = aq.get()
        if chunk_data == SENTINEL:
            break
        all.extend(process_data(chunk_data))

def writer():
    for chunk_data in get_chunks():
        aq.put(chunk_data)
    aq.put(SENTINEL)
英文:

My preference would be to structure the code as follows:

def get_chunks():
    chunk_data = []
    while True:
        item = aq.get()
        if item == SENTINEL: # or: if not isinstance(item A):
            break
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk
            chunk_data = []
    # Do we have a &quot;small&quot; chunk?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    for chunk_data in get_chunks():
        all.extend(process_data(chunk_data))

But it would be preferable for the "writer" to use get_chunks so that it is already-made chunks being written to the queue. This would result in fewer (but larger) queue accesses, which in general would be more efficient.

Here is an example where I am assuming that all the A instances are in a list, list_of_a_instances:

def get_chunks():
    chunk_data = []
    for item in list_of_a_instances: # a list of all the A instances, for example
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk
            chunk_data = []
    # Do we have a &quot;small&quot; chunk?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    while True:
        chunk_data = aq.get()
        if chunk_data == SENTINEL:
            break
        all.extend(process_data(chunk_data))

def writer():
    for chunk_data in get_chunks():
        aq.put(chunk_data)
    aq.put(SENTINEL)

huangapple
  • 本文由 发表于 2023年4月11日 02:32:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/75979713.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定