Python多线程在任务处理完成后无法终止。

huangapple go评论62阅读模式
英文:

python multiple threads can't be terminated when task handle finished

问题

我在使用多线程时遇到了问题,线程无法终止,只有第一个任务终止,其余的7个线程一直在运行。请帮忙指出错误并告诉我如何纠正,谢谢。

file_queue = queue.Queue()

def process_file_queue(que):
    while True:
        file_path = que.get()
        if not file_path or que.empty():
            que.task_done()
            break
        try:
            self._analysis(file_path)
        except:
            print(f"处理 {file_path} 时出错")
            traceback.print_exc()

# 添加到队列
for file in scanfiles:
    file_queue.put(file)
file_queue.put(None)
#
max_threads = 8
threads = []
for _ in range(max_threads):
    thread = Thread(
        target=process_file_queue, args=(file_queue,))
    thread.start()
    threads.append(thread)
# 等待线程完成
for td in threads:
    td.join()

希望这可以帮助你解决问题。

英文:

I have troubles when I using multiple threads, the thread can't be terminated, and only the first task terminated. the rest 7 threads always running all the time.
please help to point out errors and how to correct, thanks.

file_queue = queue.Queue()

def process_file_queue(que):
    while True:
        file_path = que.get()
        if not file_path or que.empty():
            que.task_done()
            break
        try:
            self._analysis(file_path)
        except:
            print(f"error when handling {file_path}")
            traceback.print_exc()

#Added to queue
for file in scanfiles:
    file_queue.put(file)
file_queue.put(None)
#
max_threads = 8
threads = []
for _ in range(max_threads):
    thread = Thread(
        target=process_file_queue, args=(file_queue,))
    thread.start()
    threads.append(thread)
# waiting thread finish
for td in threads:
    td.join()

答案1

得分: 0

que.get()是阻塞的。只有一个线程会观察到队列中的None并终止。其他7个线程将被阻塞。你可以通过将None放入队列max_threads次来解决这个问题。

在这种情况下,que.task_done()是不必要的。

此外,你实际上不需要使用队列。你可以构建一个线程池,并通过将scanfiles(一个可迭代对象)作为第二个参数来使用它的*map()*功能。

例如:

from multiprocessing.pool import ThreadPool
import string

def process_file_queue(file_path):
    # 在这里处理文件路径
    print(file_path)

# 构建一个合成的文件名列表 a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]

with ThreadPool() as pool:
    pool.map(process_file_queue, scanfiles)

如果在问题中没有明显的使用队列的原因,那么你可以适应这种模式:

import queue
import string
from threading import Thread

NTHREADS = 8

def process_file_queue(q):
    while (qv := q.get()) is not None:
        # 在这里处理队列中的值
        print(qv)

# 构建一个合成的文件名列表 a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]

q = queue.Queue()

for filename in scanfiles:
    q.put(filename)

threads = []

for _ in range(NTHREADS):
    threads.append(Thread(target=process_file_queue, args=(q,)))
    threads[-1].start()
    q.put(None)

for thread in threads:
    thread.join()
英文:

que.get() is blocking. Only one thread will observe None on the queue and terminate. The other 7 threads will block. You can overcome this by putting None on the queue max_threads times.

que.task_done() is unnecessary in this scenario

Also, you don't really need a queue for this. You could construct a ThreadPool and use its map() functionality by passing scanfiles (an iterable) as the second parameter.

For example:

from multiprocessing.pool import ThreadPool
import string

def process_file_queue(file_path):
    # process the file path hhere
    print(file_path)

# build a synthetic list of filenames a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]

with ThreadPool() as pool:
    pool.map(process_file_queue, scanfiles)

If there is a reason for using a queue that's not apparent in the question then you could adapt this pattern:

import queue
import string
from threading import Thread

NTHREADS = 8

def process_file_queue(q):
    while (qv := q.get()) is not None:
        # process the queued value here
        print(qv)

# build a synthetic list of filenames a.txt -> z.txt
scanfiles = [f'{c}.txt' for c in string.ascii_lowercase]

q = queue.Queue()

for filename in scanfiles:
    q.put(filename)

threads = []

for _ in range(NTHREADS):
    threads.append(Thread(target=process_file_queue, args=(q,)))
    threads[-1].start()
    q.put(None)

for thread in threads:
    thread.join()

huangapple
  • 本文由 发表于 2023年8月8日 20:39:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76859668.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定