英文:
python multiple threads can't be terminated when task handle finished
问题
我在使用多线程时遇到了问题,线程无法终止,只有第一个任务终止,其余的7个线程一直在运行。请帮忙指出错误并告诉我如何纠正,谢谢。
file_queue = queue.Queue()
def process_file_queue(que):
while True:
file_path = que.get()
if not file_path or que.empty():
que.task_done()
break
try:
self._analysis(file_path)
except:
print(f"处理 {file_path} 时出错")
traceback.print_exc()
# 添加到队列
for file in scanfiles:
file_queue.put(file)
file_queue.put(None)
#
max_threads = 8
threads = []
for _ in range(max_threads):
thread = Thread(
target=process_file_queue, args=(file_queue,))
thread.start()
threads.append(thread)
# 等待线程完成
for td in threads:
td.join()
希望这可以帮助你解决问题。
英文:
I have troubles when I using multiple threads, the thread can't be terminated, and only the first task terminated. the rest 7 threads always running all the time.
please help to point out errors and how to correct, thanks.
file_queue = queue.Queue()
def process_file_queue(que):
while True:
file_path = que.get()
if not file_path or que.empty():
que.task_done()
break
try:
self._analysis(file_path)
except:
print(f"error when handling {file_path}")
traceback.print_exc()
#Added to queue
for file in scanfiles:
file_queue.put(file)
file_queue.put(None)
#
max_threads = 8
threads = []
for _ in range(max_threads):
thread = Thread(
target=process_file_queue, args=(file_queue,))
thread.start()
threads.append(thread)
# waiting thread finish
for td in threads:
td.join()
答案1
得分: 0
que.get()是阻塞的。只有一个线程会观察到队列中的None并终止。其他7个线程将被阻塞。你可以通过将None放入队列max_threads次来解决这个问题。
在这种情况下,que.task_done()是不必要的。
此外,你实际上不需要使用队列。你可以构建一个线程池,并通过将scanfiles(一个可迭代对象)作为第二个参数来使用它的*map()*功能。
例如:
from multiprocessing.pool import ThreadPool
import string
def process_file_queue(file_path):
# 在这里处理文件路径
print(file_path)
# 构建一个合成的文件名列表 a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]
with ThreadPool() as pool:
pool.map(process_file_queue, scanfiles)
如果在问题中没有明显的使用队列的原因,那么你可以适应这种模式:
import queue
import string
from threading import Thread
NTHREADS = 8
def process_file_queue(q):
while (qv := q.get()) is not None:
# 在这里处理队列中的值
print(qv)
# 构建一个合成的文件名列表 a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]
q = queue.Queue()
for filename in scanfiles:
q.put(filename)
threads = []
for _ in range(NTHREADS):
threads.append(Thread(target=process_file_queue, args=(q,)))
threads[-1].start()
q.put(None)
for thread in threads:
thread.join()
英文:
que.get() is blocking. Only one thread will observe None on the queue and terminate. The other 7 threads will block. You can overcome this by putting None on the queue max_threads times.
que.task_done() is unnecessary in this scenario
Also, you don't really need a queue for this. You could construct a ThreadPool and use its map() functionality by passing scanfiles (an iterable) as the second parameter.
For example:
from multiprocessing.pool import ThreadPool
import string
def process_file_queue(file_path):
# process the file path hhere
print(file_path)
# build a synthetic list of filenames a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]
with ThreadPool() as pool:
pool.map(process_file_queue, scanfiles)
If there is a reason for using a queue that's not apparent in the question then you could adapt this pattern:
import queue
import string
from threading import Thread
NTHREADS = 8
def process_file_queue(q):
while (qv := q.get()) is not None:
# process the queued value here
print(qv)
# build a synthetic list of filenames a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]
q = queue.Queue()
for filename in scanfiles:
q.put(filename)
threads = []
for _ in range(NTHREADS):
threads.append(Thread(target=process_file_queue, args=(q,)))
threads[-1].start()
q.put(None)
for thread in threads:
thread.join()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论