如何在Python中使用多线程处理列表元素,以避免每个元素之间的线程重叠?

huangapple go评论136阅读模式
英文:

How to process elements of a list in python using multi threading without thread overlapping for each element?

问题

以下是您要翻译的内容:

  1. 我有一个字符串列表 `l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']`
  2. 我有一个将字符串转换为整数并返回数字的函数
  3. ```python
  4. def to_int(s: str):
  5. print(int(s))
  6. return int(s)

我是一个多线程的初学者,知识非常有限,通常在Python中创建一个新线程时,我会这样做:

  1. import time
  2. from time import sleep
  3. from threading import Thread
  4. def to_int(s: str):
  5. print(int(s))
  6. return int(s)
  7. # 创建一个新线程
  8. t1 = Thread(target=to_int, args=(l[0],))
  9. # 启动线程
  10. t1.start()
  11. # 等待线程完成
  12. t1.join()

我可以看到当新线程开始时,函数的输出。现在我有一个包含50,000个元素的大列表,我想要以较小的批次处理整个列表,并按以下方式准备了批处理:

  1. batch_size = 2
  2. for i in range(0, len(l), batch_size):
  3. sub_list = l[i:i+batch_size]

有没有办法可以通过在并行线程中调用方法 to_int 来并行处理子列表中的每个元素?
例如:在第一次迭代中,我有包含元素的子列表:['1', '2', '3'],我希望同时处理所有三个元素,并且只在当前批次完成后运行下一批次。
请问有没有办法实现这个功能?非常感谢任何帮助。

英文:

I have a list of strings l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

I have a function to convert string to integer and return the number.

  1. def to_int(s: str):
  2. print(int(s))
  3. return int(s)

I am beginner in multi threading with very minimal knowledge and to create a new thread in Python, I usually do this:

  1. import time
  2. from time import sleep
  3. from threading import Thread
  4. def to_int(s: str):
  5. print(int(s))
  6. return int(s)
  7. # create a new thread
  8. t1 = Thread(target=to_int, args=(l[0],))
  9. # start the thread
  10. t1.start()
  11. # wait for the thread to complete
  12. t1.join()

如何在Python中使用多线程处理列表元素,以避免每个元素之间的线程重叠?

I can see the output of the function when the new thread starts. Now I have a huge list of size 50,000 elements and I'd like to process the entire list in smaller batches and I prepared batching as below:

  1. batch_size = 2
  2. for i in range(0, len(l), batch_size):
  3. sub_list = l[i:i+batch_size]

Is there anyway I can process each element of the sublist parallelly by calling the method to_int in parallell threads?
For ex: In the first iteration, I have the sublist with elements: ['1', '2', '3'] and I want to process all three elements parallelly and only run the next batch once current batch is completed.
Could anyone let me know is there a way I can achieve this functionality? Any help is really appreciated.

答案1

得分: 1

你可以尝试使用multiprocessing.pool.ThreadPool类来实现,示例代码如下:

  1. from multiprocessing.pool import ThreadPool
  2. # 你的任务
  3. def to_int(s: str):
  4. return int(s)
  5. # 测试数据
  6. l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
  7. # 创建包含10个线程的线程池
  8. pool = ThreadPool(processes=10)
  9. # 对每个批次
  10. batch_size = 2
  11. for i in range(0, len(l), batch_size):
  12. sub_list = l[i:i+batch_size]
  13. # 执行任务
  14. print("批次", int(i/batch_size))
  15. for result in pool.map(to_int, sub_list):
  16. print(result)

结果:

  1. 批次 0
  2. 1
  3. 2
  4. 批次 1
  5. 3
  6. 4
  7. 批次 2
  8. 5
  9. 6
  10. 批次 3
  11. 7
  12. 8
  13. 批次 4
  14. 9
  15. 10

正如之前建议的那样,我不建议为每个单独的元素使用单独的线程。最好是一个线程处理整个批次。但最终取决于你的实际任务。

英文:

You could try to work with multiprocessing.pool.ThreadPool class like this:

  1. from multiprocessing.pool import ThreadPool
  2. # your task
  3. def to_int(s: str):
  4. return int(s)
  5. # test data
  6. l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
  7. # create thread pool with 10 threads
  8. pool = ThreadPool(processes=10)
  9. # for each batch
  10. batch_size = 2
  11. for i in range(0, len(l), batch_size):
  12. sub_list = l[i:i+batch_size]
  13. # do the work
  14. print("batch", int(i/batch_size))
  15. for result in pool.map(to_int, sub_list):
  16. print(result)

Result:

  1. batch 0
  2. 1
  3. 2
  4. batch 1
  5. 3
  6. 4
  7. batch 2
  8. 5
  9. 6
  10. batch 3
  11. 7
  12. 8
  13. batch 4
  14. 9
  15. 10

As already suggested I would not recommend to process every single element in a separate thread. It would be better if one thread processes an entire batch. But in the end it depends on your real task.

答案2

得分: 1

以下是代码的中文翻译部分:

  1. 您可以尝试手动方法Marco提供的方法是ThreadPoolExecutor方法我试图使解决方案尽可能与您的问题相似
  2. import threading, time
  3. # 创建一个基于字符串的整数列表以模拟问题。
  4. lst = list(range(1, 201))
  5. lst1 = list(map(lambda x: str(x), lst))
  6. # 问题提供的函数
  7. def to_int(s: str):
  8. print(int(s))
  9. return int(s)
  10. batch_size = 4
  11. # 查看子列表组
  12. for i in range(0, len(lst1), batch_size):
  13. sub_list = lst1[i:i+batch_size]
  14. print(f"sub_list{i}:", end="")
  15. print(sub_list)
  16. # 提供的打印以轻松可视化线程的处理方式。
  17. for i in range(0, len(lst1), batch_size):
  18. sub_list = lst1[i:i+batch_size]
  19. threads = []
  20. for index in range(batch_size):
  21. print(f"Main : 创建并启动线程{index}。")
  22. t = threading.Thread(target=to_int, name=str(index), args=(sub_list[index],))
  23. threads.append(t)
  24. t.start()
  25. for index, thread in enumerate(threads):
  26. print(f"Main : 在线程{index}之前。")
  27. thread.join()
  28. print(f"Main : 线程{index}完成")
  29. print(f"All Threads completed for this sub_list{i}")

输出的部分翻译如下:

  1. 所有线程已完成此子列表188
  2. Main : 创建并启动线程0
  3. Main : 创建并启动线程1
  4. Main : 创建并启动线程2
  5. Main : 创建并启动线程3
  6. Main : 在线程0之前。
  7. Main : 线程0完成
  8. Main : 在线程1之前。
  9. Main : 线程1完成
  10. Main : 在线程2之前。
  11. Main : 线程2完成
  12. Main : 在线程3之前。
  13. Main : 线程3完成
  14. 所有线程已完成此子列表192
  15. Main : 创建并启动线程0
  16. Main : 创建并启动线程1
  17. Main : 创建并启动线程2
  18. Main : 创建并启动线程3
  19. Main : 在线程0之前。
  20. Main : 线程0完成
  21. Main : 在线程1之前。
  22. Main : 线程1完成
  23. Main : 在线程2之前。
  24. Main : 线程2完成
  25. Main : 在线程3之前。
  26. Main : 线程3完成
  27. 所有线程已完成此子列表196
英文:

You can try this way for the manual method. The method that Marco provided is the ThreadPoolExecutor method. I tried to stick for the solution to be as similar to your question as possible.

  1. import threading,time
  2. # To create a list of string based integers to simulate the question.
  3. lst = list(range(1,201))
  4. lst1 = list(map(lambda x: str(x),lst))
  5. # Function provided by the question
  6. def to_int(s: str):
  7. print(int(s))
  8. return int(s)
  9. batch_size = 4
  10. # To see the sub-list groups
  11. for i in range (0, len(lst1),batch_size):
  12. sub_list = lst1[i:i+batch_size]
  13. print(f"sub_list{i}:",end="")
  14. print(sub_list)
  15. # Provided prints to easily visualize how the threads are processed.
  16. for i in range (0, len(lst1),batch_size):
  17. sub_list = lst1[i:i+batch_size]
  18. threads=[]
  19. for index in range(batch_size):
  20. print(f"Main : create and start thread {index}.")
  21. t = threading.Thread(target=to_int,name=str(index), args=(sub_list[index],))
  22. threads.append(t)
  23. t.start()
  24. for index, thread in enumerate(threads):
  25. print(f"Main : before thread {index}.")
  26. thread.join()
  27. print(f"Main : thread {index} done")
  28. print(f"All Threads completed for this sub_list{i}")

A part of the output will look as such:

  1. All Threads completed for this sub_list188
  2. Main : create and start thread 0.
  3. Main : create and start thread 1.
  4. Main : create and start thread 2.
  5. Main : create and start thread 3.
  6. Main : before thread 0.
  7. Main : thread 0 done
  8. Main : before thread 1.
  9. Main : thread 1 done
  10. Main : before thread 2.
  11. Main : thread 2 done
  12. Main : before thread 3.
  13. Main : thread 3 done
  14. All Threads completed for this sub_list192
  15. Main : create and start thread 0.
  16. Main : create and start thread 1.
  17. Main : create and start thread 2.
  18. Main : create and start thread 3.
  19. Main : before thread 0.
  20. Main : thread 0 done
  21. Main : before thread 1.
  22. Main : thread 1 done
  23. Main : before thread 2.
  24. Main : thread 2 done
  25. Main : before thread 3.
  26. Main : thread 3 done
  27. All Threads completed for this sub_list196

答案3

得分: 1

你不需要真的分批处理。使用map_async和精心计算的块大小,你将获得最佳性能。

考虑这个:

  1. from multiprocessing.pool import ThreadPool
  2. from math import ceil
  3. from time import perf_counter
  4. from os import cpu_count
  5. def to_int(v: str) -> int:
  6. try:
  7. return int(v)
  8. except ValueError:
  9. ...
  10. return -1
  11. N = 50_000
  12. # 使用可用CPU的一半(至少为2)
  13. if (NCPUS := cpu_count()):
  14. NCPUS = max(NCPUS//2, 2)
  15. else:
  16. NCPUS = 2
  17. # 创建N个整数的字符串表示
  18. my_list = [str(n) for n in range(N)]
  19. with ThreadPool(NCPUS) as pool:
  20. chunksize = ceil(len(my_list) / NCPUS)
  21. start = perf_counter()
  22. results = list(pool.map_async(to_int, my_list, chunksize=chunksize).get())
  23. end = perf_counter()
  24. assert len(results) == N
  25. print(results[-20:])
  26. print(f'Duration={end-start:.4f}s')

输出:

  1. [49980, 49981, 49982, 49983, 49984, 49985, 49986, 49987, 49988, 49989, 49990, 49991, 49992, 49993, 49994, 49995, 49996, 49997, 49998, 49999]
  2. Duration=0.0094s
英文:

You don't really need to do this in batches. Using map_async and a carefully calculated chunk size, you will get optimum performance.

Consider this:

  1. from multiprocessing.pool import ThreadPool
  2. from math import ceil
  3. from time import perf_counter
  4. from os import cpu_count
  5. def to_int(v: str) -> int:
  6. try:
  7. return int(v)
  8. except ValueError:
  9. ...
  10. return -1
  11. N = 50_000
  12. # use half of available CPUs (minimum of 2)
  13. if (NCPUS := cpu_count()):
  14. NCPUS = max(NCPUS//2, 2)
  15. else:
  16. NCPUS = 2
  17. # create N string representations of integer
  18. my_list = [str(n) for n in range(N)]
  19. with ThreadPool(NCPUS) as pool:
  20. chunksize = ceil(len(my_list) / NCPUS)
  21. start = perf_counter()
  22. results = list(pool.map_async(to_int, my_list, chunksize=chunksize).get())
  23. end = perf_counter()
  24. assert len(results) == N
  25. print(results[-20:])
  26. print(f'Duration={end-start:.4f}s')

Output:

  1. [49980, 49981, 49982, 49983, 49984, 49985, 49986, 49987, 49988, 49989, 49990, 49991, 49992, 49993, 49994, 49995, 49996, 49997, 49998, 49999]
  2. Duration=0.0094s

答案4

得分: 0

创建一个全局可变对象,在这种情况下,列表 result = list()。然后修改 to_int() 函数以处理数组切片。

  1. def to_int(arr_slice):
  2. for i in arr_slice:
  3. result.append(int(i))

现在你可以为每个切片启动一个线程。你可以选择数组切片的大小,以便优化线程的使用。

现在,result 应该包含从原始列表转换为 int 类型的所有整数。

应用于包含 3 个线程的示例:

  1. l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
  2. r = list()
  3. t1 = Thread(target=to_int, args=(l[:3],))
  4. t2 = Thread(target=to_int, args=(l[3:6],))
  5. t3 = Thread(target=to_int, args=(l[6:],))
  6. t1.start()
  7. t2.start()
  8. t3.start()
  9. t1.join()
  10. t2.join()
  11. t3.join()
英文:

Create a global mutable object, in this case list result = list(). Then modify the to_int() function to work on array slices.

  1. def to_int(arr_slice):
  2. for i in arr_slice:
  3. result.append(int(i))

Now you can start a thread for each slice. You can choose the size of the array slice in such a way that it optimizes your thread use.

Result should now contain all of the integers from your original list converted to int.

Applied to your example with 3 threads:

  1. l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
  2. r = list()
  3. t1 = Thread(target=to_int, args=(l[:3],))
  4. t2 = Thread(target=to_int, args=(l[3:6],))
  5. t3 = Thread(target=to_int, args=(l[6:],))
  6. t1.start()
  7. t2.start()
  8. t3.start()
  9. t1.join()
  10. t2.join()
  11. t3.join()

huangapple
  • 本文由 发表于 2023年7月31日 20:11:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76803496.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定