英文:
How to process elements of a list in python using multi threading without thread overlapping for each element?
问题
以下是您要翻译的内容:
我有一个字符串列表 `l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']`
我有一个将字符串转换为整数并返回数字的函数。
```python
def to_int(s: str):
print(int(s))
return int(s)
我是一个多线程的初学者,知识非常有限,通常在Python中创建一个新线程时,我会这样做:
import time
from time import sleep
from threading import Thread
def to_int(s: str):
print(int(s))
return int(s)
# 创建一个新线程
t1 = Thread(target=to_int, args=(l[0],))
# 启动线程
t1.start()
# 等待线程完成
t1.join()
我可以看到当新线程开始时,函数的输出。现在我有一个包含50,000个元素的大列表,我想要以较小的批次处理整个列表,并按以下方式准备了批处理:
batch_size = 2
for i in range(0, len(l), batch_size):
sub_list = l[i:i+batch_size]
有没有办法可以通过在并行线程中调用方法 to_int
来并行处理子列表中的每个元素?
例如:在第一次迭代中,我有包含元素的子列表:['1', '2', '3']
,我希望同时处理所有三个元素,并且只在当前批次完成后运行下一批次。
请问有没有办法实现这个功能?非常感谢任何帮助。
英文:
I have a list of strings l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
I have a function to convert string to integer and return the number.
def to_int(s: str):
print(int(s))
return int(s)
I am beginner in multi threading with very minimal knowledge and to create a new thread in Python, I usually do this:
import time
from time import sleep
from threading import Thread
def to_int(s: str):
print(int(s))
return int(s)
# create a new thread
t1 = Thread(target=to_int, args=(l[0],))
# start the thread
t1.start()
# wait for the thread to complete
t1.join()
I can see the output of the function when the new thread starts. Now I have a huge list of size 50,000 elements and I'd like to process the entire list in smaller batches and I prepared batching as below:
batch_size = 2
for i in range(0, len(l), batch_size):
sub_list = l[i:i+batch_size]
Is there anyway I can process each element of the sublist parallelly by calling the method to_int
in parallell threads?
For ex: In the first iteration, I have the sublist with elements: ['1', '2', '3']
and I want to process all three elements parallelly and only run the next batch once current batch is completed.
Could anyone let me know is there a way I can achieve this functionality? Any help is really appreciated.
答案1
得分: 1
你可以尝试使用multiprocessing.pool.ThreadPool
类来实现,示例代码如下:
from multiprocessing.pool import ThreadPool
# 你的任务
def to_int(s: str):
return int(s)
# 测试数据
l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
# 创建包含10个线程的线程池
pool = ThreadPool(processes=10)
# 对每个批次
batch_size = 2
for i in range(0, len(l), batch_size):
sub_list = l[i:i+batch_size]
# 执行任务
print("批次", int(i/batch_size))
for result in pool.map(to_int, sub_list):
print(result)
结果:
批次 0
1
2
批次 1
3
4
批次 2
5
6
批次 3
7
8
批次 4
9
10
正如之前建议的那样,我不建议为每个单独的元素使用单独的线程。最好是一个线程处理整个批次。但最终取决于你的实际任务。
英文:
You could try to work with multiprocessing.pool.ThreadPool
class like this:
from multiprocessing.pool import ThreadPool
# your task
def to_int(s: str):
return int(s)
# test data
l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
# create thread pool with 10 threads
pool = ThreadPool(processes=10)
# for each batch
batch_size = 2
for i in range(0, len(l), batch_size):
sub_list = l[i:i+batch_size]
# do the work
print("batch", int(i/batch_size))
for result in pool.map(to_int, sub_list):
print(result)
Result:
batch 0
1
2
batch 1
3
4
batch 2
5
6
batch 3
7
8
batch 4
9
10
As already suggested I would not recommend to process every single element in a separate thread. It would be better if one thread processes an entire batch. But in the end it depends on your real task.
答案2
得分: 1
以下是代码的中文翻译部分:
您可以尝试手动方法。Marco提供的方法是ThreadPoolExecutor方法。我试图使解决方案尽可能与您的问题相似。
import threading, time
# 创建一个基于字符串的整数列表以模拟问题。
lst = list(range(1, 201))
lst1 = list(map(lambda x: str(x), lst))
# 问题提供的函数
def to_int(s: str):
print(int(s))
return int(s)
batch_size = 4
# 查看子列表组
for i in range(0, len(lst1), batch_size):
sub_list = lst1[i:i+batch_size]
print(f"sub_list{i}:", end="")
print(sub_list)
# 提供的打印以轻松可视化线程的处理方式。
for i in range(0, len(lst1), batch_size):
sub_list = lst1[i:i+batch_size]
threads = []
for index in range(batch_size):
print(f"Main : 创建并启动线程{index}。")
t = threading.Thread(target=to_int, name=str(index), args=(sub_list[index],))
threads.append(t)
t.start()
for index, thread in enumerate(threads):
print(f"Main : 在线程{index}之前。")
thread.join()
print(f"Main : 线程{index}完成")
print(f"All Threads completed for this sub_list{i}")
输出的部分翻译如下:
所有线程已完成此子列表188
Main : 创建并启动线程0。
Main : 创建并启动线程1。
Main : 创建并启动线程2。
Main : 创建并启动线程3。
Main : 在线程0之前。
Main : 线程0完成
Main : 在线程1之前。
Main : 线程1完成
Main : 在线程2之前。
Main : 线程2完成
Main : 在线程3之前。
Main : 线程3完成
所有线程已完成此子列表192
Main : 创建并启动线程0。
Main : 创建并启动线程1。
Main : 创建并启动线程2。
Main : 创建并启动线程3。
Main : 在线程0之前。
Main : 线程0完成
Main : 在线程1之前。
Main : 线程1完成
Main : 在线程2之前。
Main : 线程2完成
Main : 在线程3之前。
Main : 线程3完成
所有线程已完成此子列表196
英文:
You can try this way for the manual method. The method that Marco provided is the ThreadPoolExecutor method. I tried to stick for the solution to be as similar to your question as possible.
import threading,time
# To create a list of string based integers to simulate the question.
lst = list(range(1,201))
lst1 = list(map(lambda x: str(x),lst))
# Function provided by the question
def to_int(s: str):
print(int(s))
return int(s)
batch_size = 4
# To see the sub-list groups
for i in range (0, len(lst1),batch_size):
sub_list = lst1[i:i+batch_size]
print(f"sub_list{i}:",end="")
print(sub_list)
# Provided prints to easily visualize how the threads are processed.
for i in range (0, len(lst1),batch_size):
sub_list = lst1[i:i+batch_size]
threads=[]
for index in range(batch_size):
print(f"Main : create and start thread {index}.")
t = threading.Thread(target=to_int,name=str(index), args=(sub_list[index],))
threads.append(t)
t.start()
for index, thread in enumerate(threads):
print(f"Main : before thread {index}.")
thread.join()
print(f"Main : thread {index} done")
print(f"All Threads completed for this sub_list{i}")
A part of the output will look as such:
All Threads completed for this sub_list188
Main : create and start thread 0.
Main : create and start thread 1.
Main : create and start thread 2.
Main : create and start thread 3.
Main : before thread 0.
Main : thread 0 done
Main : before thread 1.
Main : thread 1 done
Main : before thread 2.
Main : thread 2 done
Main : before thread 3.
Main : thread 3 done
All Threads completed for this sub_list192
Main : create and start thread 0.
Main : create and start thread 1.
Main : create and start thread 2.
Main : create and start thread 3.
Main : before thread 0.
Main : thread 0 done
Main : before thread 1.
Main : thread 1 done
Main : before thread 2.
Main : thread 2 done
Main : before thread 3.
Main : thread 3 done
All Threads completed for this sub_list196
答案3
得分: 1
你不需要真的分批处理。使用map_async
和精心计算的块大小,你将获得最佳性能。
考虑这个:
from multiprocessing.pool import ThreadPool
from math import ceil
from time import perf_counter
from os import cpu_count
def to_int(v: str) -> int:
try:
return int(v)
except ValueError:
...
return -1
N = 50_000
# 使用可用CPU的一半(至少为2)
if (NCPUS := cpu_count()):
NCPUS = max(NCPUS//2, 2)
else:
NCPUS = 2
# 创建N个整数的字符串表示
my_list = [str(n) for n in range(N)]
with ThreadPool(NCPUS) as pool:
chunksize = ceil(len(my_list) / NCPUS)
start = perf_counter()
results = list(pool.map_async(to_int, my_list, chunksize=chunksize).get())
end = perf_counter()
assert len(results) == N
print(results[-20:])
print(f'Duration={end-start:.4f}s')
输出:
[49980, 49981, 49982, 49983, 49984, 49985, 49986, 49987, 49988, 49989, 49990, 49991, 49992, 49993, 49994, 49995, 49996, 49997, 49998, 49999]
Duration=0.0094s
英文:
You don't really need to do this in batches. Using map_async and a carefully calculated chunk size, you will get optimum performance.
Consider this:
from multiprocessing.pool import ThreadPool
from math import ceil
from time import perf_counter
from os import cpu_count
def to_int(v: str) -> int:
try:
return int(v)
except ValueError:
...
return -1
N = 50_000
# use half of available CPUs (minimum of 2)
if (NCPUS := cpu_count()):
NCPUS = max(NCPUS//2, 2)
else:
NCPUS = 2
# create N string representations of integer
my_list = [str(n) for n in range(N)]
with ThreadPool(NCPUS) as pool:
chunksize = ceil(len(my_list) / NCPUS)
start = perf_counter()
results = list(pool.map_async(to_int, my_list, chunksize=chunksize).get())
end = perf_counter()
assert len(results) == N
print(results[-20:])
print(f'Duration={end-start:.4f}s')
Output:
[49980, 49981, 49982, 49983, 49984, 49985, 49986, 49987, 49988, 49989, 49990, 49991, 49992, 49993, 49994, 49995, 49996, 49997, 49998, 49999]
Duration=0.0094s
答案4
得分: 0
创建一个全局可变对象,在这种情况下,列表 result = list()
。然后修改 to_int()
函数以处理数组切片。
def to_int(arr_slice):
for i in arr_slice:
result.append(int(i))
现在你可以为每个切片启动一个线程。你可以选择数组切片的大小,以便优化线程的使用。
现在,result
应该包含从原始列表转换为 int
类型的所有整数。
应用于包含 3 个线程的示例:
l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
r = list()
t1 = Thread(target=to_int, args=(l[:3],))
t2 = Thread(target=to_int, args=(l[3:6],))
t3 = Thread(target=to_int, args=(l[6:],))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
英文:
Create a global mutable object, in this case list result = list()
. Then modify the to_int()
function to work on array slices.
def to_int(arr_slice):
for i in arr_slice:
result.append(int(i))
Now you can start a thread for each slice. You can choose the size of the array slice in such a way that it optimizes your thread use.
Result should now contain all of the integers from your original list converted to int
.
Applied to your example with 3 threads:
l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
r = list()
t1 = Thread(target=to_int, args=(l[:3],))
t2 = Thread(target=to_int, args=(l[3:6],))
t3 = Thread(target=to_int, args=(l[6:],))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论