如何在Python中使用多线程处理列表元素,以避免每个元素之间的线程重叠?

huangapple go评论90阅读模式
英文:

How to process elements of a list in python using multi threading without thread overlapping for each element?

问题

以下是您要翻译的内容:

我有一个字符串列表 `l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']`

我有一个将字符串转换为整数并返回数字的函数

```python
def to_int(s: str):
    print(int(s))
    return int(s)

我是一个多线程的初学者,知识非常有限,通常在Python中创建一个新线程时,我会这样做:

import time
from time import sleep
from threading import Thread

def to_int(s: str):
    print(int(s))
    return int(s)

# 创建一个新线程
t1 = Thread(target=to_int, args=(l[0],))

# 启动线程
t1.start()

# 等待线程完成
t1.join()

我可以看到当新线程开始时,函数的输出。现在我有一个包含50,000个元素的大列表,我想要以较小的批次处理整个列表,并按以下方式准备了批处理:

batch_size = 2
for i in range(0, len(l), batch_size):
    sub_list = l[i:i+batch_size]

有没有办法可以通过在并行线程中调用方法 to_int 来并行处理子列表中的每个元素?
例如:在第一次迭代中,我有包含元素的子列表:['1', '2', '3'],我希望同时处理所有三个元素,并且只在当前批次完成后运行下一批次。
请问有没有办法实现这个功能?非常感谢任何帮助。

英文:

I have a list of strings l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

I have a function to convert string to integer and return the number.

def to_int(s: str):
    print(int(s))
	return int(s)

I am beginner in multi threading with very minimal knowledge and to create a new thread in Python, I usually do this:

import time
from time import sleep
from threading import Thread

def to_int(s: str):
	print(int(s))
	return int(s)

# create a new thread
t1 = Thread(target=to_int, args=(l[0],))

# start the thread
t1.start()

# wait for the thread to complete
t1.join()

如何在Python中使用多线程处理列表元素,以避免每个元素之间的线程重叠?

I can see the output of the function when the new thread starts. Now I have a huge list of size 50,000 elements and I'd like to process the entire list in smaller batches and I prepared batching as below:

batch_size = 2
for i in range(0, len(l), batch_size):
    sub_list = l[i:i+batch_size]

Is there anyway I can process each element of the sublist parallelly by calling the method to_int in parallell threads?
For ex: In the first iteration, I have the sublist with elements: ['1', '2', '3'] and I want to process all three elements parallelly and only run the next batch once current batch is completed.
Could anyone let me know is there a way I can achieve this functionality? Any help is really appreciated.

答案1

得分: 1

你可以尝试使用multiprocessing.pool.ThreadPool类来实现,示例代码如下:

from multiprocessing.pool import ThreadPool

# 你的任务
def to_int(s: str):
    return int(s)

# 测试数据
l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

# 创建包含10个线程的线程池
pool = ThreadPool(processes=10)

# 对每个批次
batch_size = 2
for i in range(0, len(l), batch_size):
    sub_list = l[i:i+batch_size]
    # 执行任务
    print("批次", int(i/batch_size))
    for result in pool.map(to_int, sub_list):
        print(result)

结果:

批次 0
1
2
批次 1
3
4
批次 2
5
6
批次 3
7
8
批次 4
9
10

正如之前建议的那样,我不建议为每个单独的元素使用单独的线程。最好是一个线程处理整个批次。但最终取决于你的实际任务。

英文:

You could try to work with multiprocessing.pool.ThreadPool class like this:

from multiprocessing.pool import ThreadPool

# your task
def to_int(s: str): 
    return int(s)

# test data
l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

# create thread pool with 10 threads
pool = ThreadPool(processes=10)

# for each batch
batch_size = 2
for i in range(0, len(l), batch_size):
    sub_list = l[i:i+batch_size]
    # do the work
    print("batch", int(i/batch_size))
    for result in pool.map(to_int, sub_list):
        print(result)

Result:

batch 0
1
2
batch 1
3
4
batch 2
5
6
batch 3
7
8
batch 4
9
10

As already suggested I would not recommend to process every single element in a separate thread. It would be better if one thread processes an entire batch. But in the end it depends on your real task.

答案2

得分: 1

以下是代码的中文翻译部分:

您可以尝试手动方法Marco提供的方法是ThreadPoolExecutor方法我试图使解决方案尽可能与您的问题相似

import threading, time

# 创建一个基于字符串的整数列表以模拟问题。
lst = list(range(1, 201))
lst1 = list(map(lambda x: str(x), lst))

# 问题提供的函数
def to_int(s: str):
    print(int(s))
    return int(s)

batch_size = 4
# 查看子列表组
for i in range(0, len(lst1), batch_size):
    sub_list = lst1[i:i+batch_size]
    print(f"sub_list{i}:", end="")
    print(sub_list)

# 提供的打印以轻松可视化线程的处理方式。
for i in range(0, len(lst1), batch_size):
    sub_list = lst1[i:i+batch_size]
    threads = []
    for index in range(batch_size):
        print(f"Main : 创建并启动线程{index}。")
        t = threading.Thread(target=to_int, name=str(index), args=(sub_list[index],))
        threads.append(t)
        t.start()
    for index, thread in enumerate(threads):
        print(f"Main : 在线程{index}之前。")
        thread.join()
        print(f"Main : 线程{index}完成")
    print(f"All Threads completed for this sub_list{i}")

输出的部分翻译如下:

所有线程已完成此子列表188
Main : 创建并启动线程0。
Main : 创建并启动线程1。
Main : 创建并启动线程2。
Main : 创建并启动线程3。
Main : 在线程0之前。
Main : 线程0完成
Main : 在线程1之前。
Main : 线程1完成
Main : 在线程2之前。
Main : 线程2完成
Main : 在线程3之前。
Main : 线程3完成
所有线程已完成此子列表192
Main : 创建并启动线程0。
Main : 创建并启动线程1。
Main : 创建并启动线程2。
Main : 创建并启动线程3。
Main : 在线程0之前。
Main : 线程0完成
Main : 在线程1之前。
Main : 线程1完成
Main : 在线程2之前。
Main : 线程2完成
Main : 在线程3之前。
Main : 线程3完成
所有线程已完成此子列表196
英文:

You can try this way for the manual method. The method that Marco provided is the ThreadPoolExecutor method. I tried to stick for the solution to be as similar to your question as possible.

import threading,time

# To create a list of string based integers to simulate the question.
lst = list(range(1,201))
lst1 = list(map(lambda x: str(x),lst))

# Function provided by the question
def to_int(s: str):
    print(int(s))
    return int(s)

batch_size = 4
# To see the sub-list groups
for i in range (0, len(lst1),batch_size):
    sub_list = lst1[i:i+batch_size]
    print(f"sub_list{i}:",end="")
    print(sub_list)
    

# Provided prints to easily visualize how the threads are processed.    
for i in range (0, len(lst1),batch_size):
    sub_list = lst1[i:i+batch_size]
    threads=[]
    for index in range(batch_size):
        print(f"Main : create and start thread {index}.")
        t = threading.Thread(target=to_int,name=str(index), args=(sub_list[index],))
        threads.append(t)
        t.start()
    for index, thread in enumerate(threads):
        print(f"Main : before thread {index}.")
        thread.join()
        print(f"Main : thread {index} done")
    print(f"All Threads completed for this sub_list{i}")

A part of the output will look as such:

All Threads completed for this sub_list188
Main : create and start thread 0.
Main : create and start thread 1.
Main : create and start thread 2.
Main : create and start thread 3.
Main : before thread 0.
Main : thread 0 done
Main : before thread 1.
Main : thread 1 done
Main : before thread 2.
Main : thread 2 done
Main : before thread 3.
Main : thread 3 done
All Threads completed for this sub_list192
Main : create and start thread 0.
Main : create and start thread 1.
Main : create and start thread 2.
Main : create and start thread 3.
Main : before thread 0.
Main : thread 0 done
Main : before thread 1.
Main : thread 1 done
Main : before thread 2.
Main : thread 2 done
Main : before thread 3.
Main : thread 3 done
All Threads completed for this sub_list196

答案3

得分: 1

你不需要真的分批处理。使用map_async和精心计算的块大小,你将获得最佳性能。

考虑这个:

from multiprocessing.pool import ThreadPool
from math import ceil
from time import perf_counter
from os import cpu_count

def to_int(v: str) -> int:
    try:
        return int(v)
    except ValueError:
        ...
    return -1

N = 50_000

# 使用可用CPU的一半(至少为2)
if (NCPUS := cpu_count()):
    NCPUS = max(NCPUS//2, 2)
else:
    NCPUS = 2

# 创建N个整数的字符串表示
my_list = [str(n) for n in range(N)]

with ThreadPool(NCPUS) as pool:
    chunksize = ceil(len(my_list) / NCPUS)
    start = perf_counter()
    results = list(pool.map_async(to_int, my_list, chunksize=chunksize).get())
    end = perf_counter()
    assert len(results) == N
    print(results[-20:])
    print(f'Duration={end-start:.4f}s')

输出:

[49980, 49981, 49982, 49983, 49984, 49985, 49986, 49987, 49988, 49989, 49990, 49991, 49992, 49993, 49994, 49995, 49996, 49997, 49998, 49999]
Duration=0.0094s
英文:

You don't really need to do this in batches. Using map_async and a carefully calculated chunk size, you will get optimum performance.

Consider this:

from multiprocessing.pool import ThreadPool
from math import ceil
from time import perf_counter
from os import cpu_count

def to_int(v: str) -> int:
    try:
        return int(v)
    except ValueError:
        ...
    return -1

N = 50_000

# use half of available CPUs (minimum of 2)
if (NCPUS := cpu_count()):
    NCPUS = max(NCPUS//2, 2)
else:
    NCPUS = 2

# create N string representations of integer
my_list = [str(n) for n in range(N)]

with ThreadPool(NCPUS) as pool:
    chunksize = ceil(len(my_list) / NCPUS)
    start = perf_counter()
    results = list(pool.map_async(to_int, my_list, chunksize=chunksize).get())
    end = perf_counter()
    assert len(results) == N
    print(results[-20:])
    print(f'Duration={end-start:.4f}s')

Output:

[49980, 49981, 49982, 49983, 49984, 49985, 49986, 49987, 49988, 49989, 49990, 49991, 49992, 49993, 49994, 49995, 49996, 49997, 49998, 49999]
Duration=0.0094s

答案4

得分: 0

创建一个全局可变对象,在这种情况下,列表 result = list()。然后修改 to_int() 函数以处理数组切片。

def to_int(arr_slice):
    for i in arr_slice:
        result.append(int(i))

现在你可以为每个切片启动一个线程。你可以选择数组切片的大小,以便优化线程的使用。

现在,result 应该包含从原始列表转换为 int 类型的所有整数。

应用于包含 3 个线程的示例:

l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
r = list()

t1 = Thread(target=to_int, args=(l[:3],))
t2 = Thread(target=to_int, args=(l[3:6],))
t3 = Thread(target=to_int, args=(l[6:],))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()
英文:

Create a global mutable object, in this case list result = list(). Then modify the to_int() function to work on array slices.

def to_int(arr_slice):
	for i in arr_slice:
		result.append(int(i))

Now you can start a thread for each slice. You can choose the size of the array slice in such a way that it optimizes your thread use.

Result should now contain all of the integers from your original list converted to int.

Applied to your example with 3 threads:

l = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
r = list()

t1 = Thread(target=to_int, args=(l[:3],))
t2 = Thread(target=to_int, args=(l[3:6],))
t3 = Thread(target=to_int, args=(l[6:],))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

huangapple
  • 本文由 发表于 2023年7月31日 20:11:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76803496.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定