使用多进程队列处理元素块。

huangapple go评论90阅读模式
英文:

Process elements in chunks using multiprocessing queues

问题

I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.

  1. aq = Queue()

The instance in the queue are of class A:

  1. class A:
  2. id: str
  3. desc: str

In a function I'm getting elements from the queue aq and process them in chunks. The first element(if is just one) can be a SENTINEL, nothing to process.

  1. def process:
  2. chunk_data = []
  3. all = []
  4. item = aq.get()
  5. if not isinstance(item, A):
  6. return
  7. chunk_data.append(item.id)
  8. while item != SENTINEL:
  9. # start processing in chunks
  10. # adding elements to the chunk list until it's full
  11. while len(chunk_data) < CHUNK_MAX_SIZE: # 50
  12. item = aq.get()
  13. if item == SENTINEL:
  14. break
  15. chunk_data.append(item.id)
  16. # the chunk list is full, start processing
  17. chunk_process_ids = process_data(chunk_data) # process chunks
  18. all.extend(chunk_process_ids)
  19. # empty chunk list and start again
  20. chunk_data.clear()

The function works as expected, but I consider the code to be convoluted. I'm looking for a simpler, clearer version.

英文:

I have a multiprocessing queue; The end of the queue is signaled by using a SENTINEL value, a string.

  1. aq = Queue()

........................

The instance in the queue are of class A:

  1. class A:
  2. id: str
  3. desc: str

In a function I'm getting elements from the queue aq and process them in chunks.
The first element(if is just one) can be a SENTINEL, nothing to process.
....

  1. def process:
  2. chunk_data = []
  3. all = [
  4. item = aq.get()
  5. if not isinstance(item, A):
  6. return
  7. chunk_data.append(item.id)
  8. while item != SENTINEL:
  9. # start process in chunks
  10. # adding elements to the chunk list until is full
  11. while len(chunk_data) &lt; CHUNK_MAX_SIZE: # 50
  12. item = aq.get()
  13. if item == SENTINEL:
  14. break
  15. chunk_data.append(item.id)
  16. # the chunk list is full start processing
  17. chunk_process_ids = process_data(chunk_data) # process chunks
  18. all.extend(chunk_process_ids)
  19. # empty chunk list and start again
  20. chunk_data.clear()

The function works as expected but I consider the code to be convoluted. I'm looking for a simple, clearer version.

答案1

得分: 2

以下是您要翻译的代码部分:

  1. 在遵循[DRY原则](https://en.wikipedia.org/wiki/Don&#39;t_repeat_yourself)的兴趣下,以下是我认为是您的代码的更干净版本,没有逻辑的重复。请注意,通常更好的处理错误的方式是引发异常,而不仅仅是在输入值的类型不符合预期时返回。
  2. def process():
  3. all = []
  4. while True:
  5. chunk_data = []
  6. for _ in range(CHUNK_MAX_SIZE):
  7. if (item := aq.get()) == SENTINEL:
  8. break
  9. assert isinstance(item, A)
  10. chunk_data.append(item.id)
  11. if chunk_data:
  12. all.extend(process_data(chunk_data))
  13. if len(chunk_data) &lt; CHUNK_MAX_SIZE:
  14. break
  15. 您还可以进一步简化代码使用[`iter`](https://docs.python.org/3/library/functions.html#iter)和[`itertools.islice`](https://docs.python.org/3/library/itertools.html#itertools.islice),如果您不需要验证每个项目是否为类型`A`(如果您有控制代码入队的话,你不应该验证每个项目是否为类型`A`):
  16. from itertools import islice
  17. from operator import attrgetter
  18. def process():
  19. all = []
  20. data = iter(aq.get, SENTINEL)
  21. while True:
  22. chunk_data = list(map(attrgetter(&#39;id&#39;), islice(data, CHUNK_MAX_SIZE)))
  23. if chunk_data:
  24. all.extend(process_data(chunk_data))
  25. if len(chunk_data) &lt; CHUNK_MAX_SIZE:
  26. break
  27. 鸣谢@KellyBundy在下方评论中提供的更简洁版本
  28. from itertools import islice
  29. from operator import attrgetter
  30. def process():
  31. all = []
  32. data = iter(aq.get, SENTINEL)
  33. ids = map(attrgetter(&#39;id&#39;), data)
  34. while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
  35. all += process_data(chunk_ids)
英文:

In the interest of following the DRY principle, here's what I believe is a cleaner version of your code with no repetition of logics. Note that it's generally better to handle errors by raising an exception than to simply return when the type of an input value is not as expected.

  1. def process():
  2. all = []
  3. while True:
  4. chunk_data = []
  5. for _ in range(CHUNK_MAX_SIZE):
  6. if (item := aq.get()) == SENTINEL:
  7. break
  8. assert isinstance(item, A)
  9. chunk_data.append(item.id)
  10. if chunk_data:
  11. all.extend(process_data(chunk_data))
  12. if len(chunk_data) &lt; CHUNK_MAX_SIZE:
  13. break

You can also clean up the code a bit further with iter and itertools.islice if you don't need to validate if each item is of type A (which you shouldn't anyway if you have control over the code that enqueues items):

  1. from itertools import islice
  2. from operator import attrgetter
  3. def process():
  4. all = []
  5. data = iter(aq.get, SENTINEL)
  6. while True:
  7. chunk_data = list(map(attrgetter(&#39;id&#39;), islice(data, CHUNK_MAX_SIZE)))
  8. if chunk_data:
  9. all.extend(process_data(chunk_data))
  10. if len(chunk_data) &lt; CHUNK_MAX_SIZE:
  11. break

Credit to @KellyBundy for a more concise version as commented below:

  1. from itertools import islice
  2. from operator import attrgetter
  3. def process():
  4. all = []
  5. data = iter(aq.get, SENTINEL)
  6. ids = map(attrgetter(&#39;id&#39;), data)
  7. while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
  8. all += process_data(chunk_ids)

答案2

得分: 0

以下是您要求的代码部分的中文翻译:

  1. def get_chunks():
  2. chunk_data = []
  3. while True:
  4. item = aq.get()
  5. if item == SENTINEL: # 或者: if not isinstance(item, A):
  6. break
  7. chunk_data.append(item.id)
  8. if len(chunk_data) == CHUNK_MAX_SIZE:
  9. yield chunk_data
  10. chunk_data = []
  11. # 我们有一个"小"块吗?
  12. if chunk_data:
  13. yield chunk_data
  14. def process():
  15. all = []
  16. for chunk_data in get_chunks():
  17. all.extend(process_data(chunk_data))
  18. # 但更好的做法是让"写入者"使用`get_chunks`,这样已经生成的块会被写入队列。这将减少(但更大的)队列访问,通常会更有效。
  19. 这里有一个示例假设所有的`A`实例都在一个列表中`list_of_a_instances`
  20. def get_chunks():
  21. chunk_data = []
  22. for item in list_of_a_instances: # 例如,所有A实例的列表
  23. chunk_data.append(item.id)
  24. if len(chunk_data) == CHUNK_MAX_SIZE:
  25. yield chunk_data
  26. chunk_data = []
  27. # 我们有一个"小"块吗?
  28. if chunk_data:
  29. yield chunk_data
  30. def process():
  31. all = []
  32. while True:
  33. chunk_data = aq.get()
  34. if chunk_data == SENTINEL:
  35. break
  36. all.extend(process_data(chunk_data))
  37. def writer():
  38. for chunk_data in get_chunks():
  39. aq.put(chunk_data)
  40. aq.put(SENTINEL)
英文:

My preference would be to structure the code as follows:

  1. def get_chunks():
  2. chunk_data = []
  3. while True:
  4. item = aq.get()
  5. if item == SENTINEL: # or: if not isinstance(item A):
  6. break
  7. chunk_data.append(item.id)
  8. if len(chunk_data) == CHUNK_MAX_SIZE:
  9. yield chunk
  10. chunk_data = []
  11. # Do we have a &quot;small&quot; chunk?
  12. if chunk_data:
  13. yield chunk_data
  14. def process():
  15. all = []
  16. for chunk_data in get_chunks():
  17. all.extend(process_data(chunk_data))

But it would be preferable for the "writer" to use get_chunks so that it is already-made chunks being written to the queue. This would result in fewer (but larger) queue accesses, which in general would be more efficient.

Here is an example where I am assuming that all the A instances are in a list, list_of_a_instances:

  1. def get_chunks():
  2. chunk_data = []
  3. for item in list_of_a_instances: # a list of all the A instances, for example
  4. chunk_data.append(item.id)
  5. if len(chunk_data) == CHUNK_MAX_SIZE:
  6. yield chunk
  7. chunk_data = []
  8. # Do we have a &quot;small&quot; chunk?
  9. if chunk_data:
  10. yield chunk_data
  11. def process():
  12. all = []
  13. while True:
  14. chunk_data = aq.get()
  15. if chunk_data == SENTINEL:
  16. break
  17. all.extend(process_data(chunk_data))
  18. def writer():
  19. for chunk_data in get_chunks():
  20. aq.put(chunk_data)
  21. aq.put(SENTINEL)

huangapple
  • 本文由 发表于 2023年4月11日 02:32:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/75979713.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定