Python 在 threading.join() 随机终止

huangapple go评论69阅读模式
英文:

Python terminates at threading.join() randomly

问题

以下是您要翻译的内容:

我有一个带有4个线程的Python方法。我必须在这4个线程中处理一些数据,并等待所有线程完成处理,然后继续进行。

问题是,有时脚本按预期工作:它处理线程,然后继续`thread.join()`后的代码。然而,该方法随机终止在`thread.join()`处。我陷入了找到确切问题并修复它的困境。

这是我的代码:
```python
def check_gaze(self) -> List[List[int]]:
    folder_path = self._gaze_preprocess()

    .
    .
    .

    def process_files(files):
        output = []

        for _file in files:
            try:
                ...
            except:
                print("error")

        return output

    r1, r2, r3, r4 = [], [], [], []

    t1 = threading.Thread(target=lambda: r1.extend(process_files(file_groups[0])))
    t2 = threading.Thread(target=lambda: r2.extend(process_files(file_groups[1])))
    t3 = threading.Thread(target=lambda: r3.extend(process_files(file_groups[2])))
    t4 = threading.Thread(target=lambda: r4.extend(process_files(file_groups[3])))

    threads = [t1, t2, t3, t4]

    print("before start")

    for thread in threads:
        thread.start()

    print("after start")

    print("before join")

    for t in threads:
        t.join()

    print("after join")

    # 将来自所有三个线程的结果合并并保存到CSV文件中
    output = r1 + r2 + r3 + r4

    data = self._gaze_postprocess()

    return data


obj = Gaze()

print("pre call")

gaze_output = obj.check_gaze()

print("post call")

以下是问题的输出:

pre call
before start
after start
before join

这是终端输出(缺少一些调试打印语句,但我已经检查过,程序不会超出thread.join())。

Python 在 threading.join() 随机终止

编辑:使用Louis Lac的解决方案,我仍然遇到相同的问题。根据我迄今为止的调试,我可以说问题出现在我使用的注视检测包中的某个地方,因为如果我在其位置使用虚拟代码,我不再遇到问题!

代码:

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            output.append([frame_number, gaze, gaze])

        except Exception as e:
            print(e)
            output.append([frame_number, 'No face', 'No face'])

    print(f'Output is : {output}')
    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    start = time.time()
    
    jpg_files = list(Path('test_9/img').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    
    
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    end = time.time()
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    main()

结果
Python 在 threading.join() 随机终止

解决方案
我根据下面Lie Ryan的答案修复了我的代码,并使用了ProcessPool而不是ThreadPool。在这里使用多进程而不是多线程的原因是,线程的问题是由于eye_game包引起的。尽管代码是开源的,但我还没有时间去查看它的问题出在哪里。

这是我的解决方案。

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            
            output.append([frame_number, 'No face', 'No face'])

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
            
    return results


def _debug_main():
    start = time.time()
    
    jpg_files = list(Path('path-to-directory').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    
    end = time.time()
    
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    _debug_main()
英文:

I have a Python method with 4 threads. I have to process some data in these 4 threads and wait till all of the threads complete the processing and then proceed ahead.

The issue is, sometimes the script works as expected: it process the threads, then proceeds with the code after thread.join(). However, the method randomly terminates at thread.join(). I'm stuck at finding the exact issue and fixing it.

Here's my code

def check_gaze(self) -> List[List[int]]:
    folder_path = self._gaze_preprocess()

    .
    .
    .
    
    def process_files(files):
        output = []

        for _file in files:
            try:
                ...
            except:
                print("error")

        return output

    r1, r2, r3, r4 = [], [], [], []

    t1 = threading.Thread(target=lambda: r1.extend(process_files(file_groups[0])))
    t2 = threading.Thread(target=lambda: r2.extend(process_files(file_groups[1])))
    t3 = threading.Thread(target=lambda: r3.extend(process_files(file_groups[2])))
    t4 = threading.Thread(target=lambda: r4.extend(process_files(file_groups[3])))

    threads = [t1, t2, t3, t4]
    
    print("before start")

    for thread in threads:
        thread.start()
    
    print("after start")

    print("before join")

    for t in threads:
        t.join()
    
    print("after join")

    # Merge the results from all three threads and save to a CSV file
    output = r1 + r2 + r3 + r4

    data = self._gaze_postprocess()

    return data


obj = Gaze()

print("pre call")

gaze_output = obj.check_gaze()

print("post call")

Here's the output for the issue:

pre call
before start
after start
before join

Here's the terminal output (it lacks some debug print statements but I have checked that program does not go beyond thread.join()

Python 在 threading.join() 随机终止


Edit: Using Louis Lac's solution from their answer posted below, I'm still facing the same issue. Based on my debugging so far, I can say that the issue is somewhere in the gaze detection package I'm using, because if I use dummy code in it's place I don't get the issue anymore!

code:

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            output.append([frame_number, 'No face', 'No face'])

    print(f'Output is : {output}')
    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    start = time.time()
    
    jpg_files = list(Path('test_9/img').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    
    
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    end = time.time()
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    main()

Results
Python 在 threading.join() 随机终止


Solution

I fixed my code looking at Lie Ryan's answer below and used a ProcessPool instead of a ThreadPool. The reason for using multiprocessing here instead of multithreading is that the issue with threads is due to the eye_game package. While the code is open source, I haven't had the time to go through it and see where exactly the issues arise.

Here's my solution.

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            
            output.append([frame_number, 'No face', 'No face'])

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
            
    return results


def _debug_main():
    start = time.time()
    
    jpg_files = list(Path('path-to-directory').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    
    end = time.time()
    
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    _debug_main()

答案1

得分: 3

以下是翻译好的内容:

Few recommendations:

  • 仅在必要时使用线程。在您的情况下,您可以使用ThreadPoolExecutor
  • 不要捕获所有可能的异常(except:),通常您只想捕获非退出异常(except Exception:),
  • 在捕获块内处理错误,而不仅仅是打印它;如果相关的话,您可以引发/重新引发错误,或者返回None或空列表,
  • 避免从不同的线程中改变共享状态,如果处理不当,这可能会导致错误(例如,如果两个线程同时改变相同的共享状态),
  • 将您的main入口点包装在if __name__ == "main":块中,因为如果没有这个块,它可能会导致多进程问题。

以下是符合这些指南的示例:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def process_files(files: list) -> list:
    output = []

    for file_ in files:
        sleep(0.5)
        try:
            output.append(file_)
        except Exception as e:
            print(e)
            raise ValueError("un-expected value received")

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor() as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    file_groups = [["a", "b"], ["c"], ["d", "e", "f"]]
    results = check_gaze(file_groups)
    print(results)


if __name__ == "main":
    main()

确实,在eye_game包的face_recognition依赖中深藏着共享状态。如果您深入查看eye_game.get_gaze_direction(),您会看到在调用树的深处有一个对face_recognition.face_locations()的调用,该调用访问了一个作为全局变量实例化的面部检测器(在这里)。然后,这会调用一个C++库(dlib),这可能不是线程安全的。

基于这个观察,您有一些解决此问题的解决方案:

  • 在使用共享状态的代码周围使用锁/互斥锁以避免并发访问。这将适用于线程,但这会使多线程的目的丧失效果,因为现在调用是顺序的。
  • 改用多进程,如另一个答案中建议的那样。但请注意,全局状态(这里是面部检测器)将多次实例化,每个进程一次。如果模型很大,这可能导致内存使用量很高,并可能触发内存不足(OOM)错误。
英文:

Few recommendations:

  • use threads as a last resort. In your case you could use a ThreadPoolExecutor instead,
  • do not catch all possible exceptions (except:), usually you only want to catch non-exit exceptions (except Exception:),
  • handle errors inside the catch block instead of just printing it; you can either raise/re-raise an error or return None or an empty list if this is relevant,
  • avoid mutating shared state from a different threads, this can cause to bugs if not handled correctly (for instance if two threads mutate the same shared state at the same time),
  • wrap your main entry point in a if __name__ == "__main__": block as it can cause issues with multiprocessing if not present.

Here is an example which respects these guidelines:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def process_files(files: list) -> list:
    output = []

    for file_ in files:
        sleep(0.5)
        try:
            output.append(file_)
        except Exception as e:
            print(e)
            raise ValueError("un-expected value received")

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor() as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    file_groups = [["a", "b"], ["c"], ["d", "e", "f"]]
    results = check_gaze(file_groups)
    print(results)


if __name__ == "__main__":
    main()

There is indeed shared states hidden deeply in the face_recognition dependency of the eye_game package. If you dig inside eye_game.get_gaze_direction(), you'll see that deep down the call tree there is a call to face_recognition.face_locations() with accesses a face detector instantiated as a global variable (here). This then calls into a C++ library (dlib) which is likely not thread-safe.

From this observation, you have few solutions to address the issue:

  • Use a lock/mutex around the code using the shared state to avoid concurrent accesses. This will work with treads, however, this defeats the purpose of multi-threading since this will negate any speed gain now that the calls are sequentials.
  • Use multi-processing instead as advised in another answer. However, keep in mind that the global state (the face detector here) will be instantiated multiple time, once per process. This can lead to high memory usage if the model is large and could trigger Out Of Memory (OOM) errors.

答案2

得分: 2

也许你的程序遇到了分段错误(segfault)或核心转储(core dump)?在这些情况下,这些都是不受控制的崩溃,你的程序可能会立即终止,可能没有任何Python级别的异常处理。

在纯Python代码中,像这样的不受控制的崩溃非常罕见,它们相当不寻常,但如果你调用一个本地库并且该库存在错误,或者如果你传递了错误类型的参数,或者以不安全的方式调用库,那么可能会导致不受控制的崩溃并进而引发分段错误/核心转储。看起来你在这里调用的库是用于人脸跟踪的软件,几乎肯定会包含一些本机代码。

如果你将多线程替换为多进程,程序是否仍然崩溃?如果它们仍然崩溃,那么这可能是库的错误,或者你传递给库的参数存在问题(例如,它期望一个字符串,但你传递了一个整数)。

如果代码仅在多线程代码中崩溃,请尝试重写代码以使用多进程。在多进程中是否仍然崩溃?如果程序在多线程中崩溃而在多进程中不崩溃,那么很可能是库不具备线程安全性。它可能具有内部全局状态,并在多线程中调用它可能导致崩溃,这在单线程代码中通常不会发生。

如果在将参数传递给线程之前深度复制输入参数,问题是否仍然存在:

t1 = threading.Thread(
    target=lambda files: 
        r1.extend(process_files(files)
    ),
    args=[deepcopy(file_groups[0])],
)

如果深度复制输入参数修复了问题,那么可能存在共享的可变状态在输入参数之间,如果库不是完全线程安全的,这可能是问题的原因。

请注意,即使在Python中是只读的代码,在本机代码级别也是可变的,这是由于引用计数,如果库使用CPython提供的C宏来处理引用计数,并且正确处理了获取和释放全局解释器锁(GIL),那么通常不会有问题,但如果该库没有这样做,那么很有可能会出现核心转储/分段错误。

英文:

Maybe your program crashed with a segfault or core dump? In those cases, these are uncontrolled crash and your program may terminate immediately and there may not be any python-level exception handling.

In pure Python code, uncontrolled crash like that is extremely rare, they're quite unusual, but it is not uncommon to have an uncontrolled crash leading into a segfault/coredump if you call into a native library and there's a bug in that library, or if you passed an argument of the wrong type, or if you call the library in ways that are not thread safe. It seems like the library you're calling here is a software for face tracking, which almost certainly would internally have some native code.

If you replace the multithreading with multiprocessing does the program still crash? If they do, then that may be a bug in the library or there are issues with the arguments that you passed into the library (e.g. it's expecting a string, but you passed in an int).

If the code only crash in multi threaded code, try rewriting your code to use multi processing. Does it still crash in multi processing? If the program crash in multithreading but not multiprocessing, then it's likely that the library is not thread safe. It may have an internal global state and calling it in multithreaded may cause a crash that normally don't happen in single threaded code.

Does the issue still happen if you deep copy your input parameters before you pass it to the thread:

t1 = threading.Thread(
    target=lambda files: 
        r1.extend(process_files(files)
    ),
    args=[deepcopy(file_groups[0])],
)

If deepcopying the input parameters fixes the issue, then there may have been shared mutable state between the input parameters and if the library isn't fully thread safe, that may be the cause of issues.

Note that even code that are read only in Python actually is mutable at native code level due to reference counting, if the library uses the C macros provided by CPython to handle ref counts and if it handles acquiring and releasing GIL correctly, then these should normally not be an issue, but if the library that doesn't, then there is a good chance that it may coredump/segfault.

huangapple
  • 本文由 发表于 2023年6月16日 00:09:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76483569.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定