终止程序时终止一个未来。

huangapple go评论69阅读模式
英文:

kill a future if program stops

问题

我在我的程序中使用了ThreadPoolExecutor来submit()一个任务。然而,当我结束程序时,脚本会"冻结"。似乎线程没有正确地结束。

是否有解决方法?

示例:

from concurrent.futures import ThreadPoolExecutor
from time import sleep

def task():
    for i in range(3):
        print(i)
        sleep(1)

with ThreadPoolExecutor() as executor:
    future = executor.submit(task)
    
    future.cancel()  # 等待阻塞任务的循环完成
    executor.shutdown(wait=False)  # 仍然等待阻塞任务中的循环完成

sys.exit()也不起作用它仍然会等待未来完成
英文:

I have a ThreadPoolExecutor in my programs which submit()s a task.
However, when I end my program, the script "freezes". It seems like the thread is not ended correctly.

Is there a solution for this?

example:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task():
    for i in range(3):
        print(i)
        sleep(1)
        
        
with ThreadPoolExecutor() as executor:
    future = executor.submit(task)
    
    future.cancel()  # Waits for loop of blocking task to complete
    executor.shutdown(wait=False)  # Still waits for loop in blocking task to complete

sys.exit() does not work either, it will still wait for the future to complete

答案1

得分: 3

这个程序对我来说没有卡住,我在你的代码中没有看到任何违反concurrent.futures包使用规定的情况。因此,继续阅读可能会浪费你的时间。但这并不是说你的代码中没有一些语句在执行时没有任何作用,你可能没有意识到。如果是这样的话,我想指出这些给你。也许你遇到的问题可能与其中一个语句以及你所使用的Python版本或者你运行的平台有关(尽管我并不非常确定你的问题是否出在这里)。无论如何,我已经修改了你的代码如下以指出一些事情。

首先,我将池大小修改为1,现在调用了submit方法两次。因此,第一个提交的任务会立即执行,但第二个任务只有在第一个提交的任务完成后才会开始执行。其次,当你将ThreadPoolExecutor实例作为上下文管理器使用时,就像你现在做的这样,当块终止时会隐式调用ThreadPoolExecutor.shutown(wait=True)。因此,我重新编写了你的代码以使这个隐式调用变为显式调用。修改后的代码如下:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task(task_no):
    for i in range(3):
        print(f'任务编号 = {task_no}, i = {i}')
        sleep(1)


executor = ThreadPoolExecutor(1)
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)

print('未来 1 已取消 = ', future1.cancel())
print('未来 2 已取消 = ', future2.cancel())
executor.shutdown(wait=False)
executor.shutdown(wait=True)
print('我完成了!')

在我的环境中,其输出为:

任务编号 = 1, i = 0
未来 1 已取消 =  False
未来 2 已取消 =  True
任务编号 = 1, i = 1
任务编号 = 1, i = 2
我完成了!

讨论

首先要注意的是,当执行future1.cancel()调用时,第一个任务已经在运行,因此调用cancel没有任何效果。但是由于第二个任务在调用future2.cancel()时尚未开始执行,我们可以看到该任务可以被取消。重点是在你的原始代码中,调用future.cancel()将没有任何效果

第二点是,因为你将变量executor用作上下文管理器,除了你显式调用的executor.shutdown(wait=False)之外,它紧接着会隐式调用executor.shutdown(wait=True),因此你最终会等待所有待处理的任务完成后才继续,从而使得对executor.shutdown(wait=False)的第一次调用变得相当无用

问题

在你的环境中,这两个连续调用shutdown是否可能导致程序卡住呢?尝试以下代码,看看是否有任何不同:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task():
    for i in range(3):
        print(i)
        sleep(1)


executor = ThreadPoolExecutor()
future = executor.submit(task)
executor.shutdown(wait=False)

如果这个能顺利运行完成,那么你可以尝试先添加调用future.cancel(),如果程序仍然能完成,那么再添加额外的调用executor.shutdown(wait=True)

更新

根据你的评论,如果你想终止一个永远不会自动结束的线程,那么如果你使用concurrent.futures包或者一个threading.Thread实例,是无法做到的。然而,你可以使用multiprocessing.pool.ThreadPool多线程池来实现:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # 让任务运行一段时间
pool.terminate() # 现在终止线程池

输出:

0
1
2
3

或者你可以将线程池作为上下文管理器使用,当上下文管理器块退出时会隐式调用terminate()

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


with ThreadPool(1) as pool:
    async_result = pool.apply_async(task)
    sleep(2) # 让任务运行一段时间
    # 现在我们退出了这个块:
# 当上下文管理器块退出时会隐式调用pool.terminate()。

最后,由于池中的线程是守护线程,当创建池的进程终止时,池的线程会自动终止,你甚至不需要调用terminate

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # 让任务运行一段时间
# 现在由于没有更多的语句在执行,主进程隐式终止,
# 池的线程也会被销毁。

但是如果你不希望运行中的任务因为程序没有更多语句可以执行而提前终止:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    for i in range(10):
        print(i)
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
# 通过在AsyncResult实例上调用`get`或者调用pool.close()然后pool.join()
# 显式等待任务完成:
async

<details>
<summary>英文:</summary>

This program does not hang for me and I do not see anything in your code that is violating any published restrictions placed on the use of the `concurrent.futures` package. So reading the rest of this may be a waste of your time. But that&#39;s not to say that you don&#39;t have some statements that are not accomplishing anything in your code that you may not be aware of and, if that is the case, I thought I should point these out to you. And perhaps the issue you are having *may* be related to one of these statements combined with the version of Python you are using or the platform on which you are executing (although I am not very confident that your problem doesn&#39;t lie elsewhere). Anyway, I have modified your code below to point out a few things.

First, I have modified the pool size to be 1 and I now call method `submit` twice. Consequently the first submitted task executes immediately but the second task will not start executing until the first submitted task completes. Second, when you use a `ThreadPoolExecutor` instance as a context manager like you are doing, then when the block terminates there is an implicit call to `ThreadPoolExecutor.shutown(wait=True)`. Consequently, I have rewritten your code to make this implicit call explicit. The resulting modified code is:

```python
from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task(task_no):
    for i in range(3):
        print(f&#39;task no. = {task_no}, i = {i}&#39;)
        sleep(1)


executor = ThreadPoolExecutor(1)
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)

print(&#39;future 1 canceled = &#39;, future1.cancel())
print(&#39;future 2 canceled = &#39;, future2.cancel())
executor.shutdown(wait=False)
executor.shutdown(wait=True)
print(&#39;I am done!&#39;)

And its output in my environment is:

task no. = 1, i = 0
future 1 canceled =  False
future 2 canceled =  True
task no. = 1, i = 1
task no. = 1, i = 2
I am done!

Discussion

The first thing to observe is that when the call future1.cancel() is executed, the first task is already running and therefore calling cancel has no effect. But since the second task has not started execution when future2.cancel() is called, we can see that the task can be canceled. The point is that in your original code, the call to future.cancel() will have no effect.

The second point is that because you are using variable executor as a context manager, in addition to the call you explicitly make to executor.shutdown(wait=False), it is immediately followed by an implicit call to executor.shutdown(wait=True), so you end up waiting for all pending tasks to complete before continuing thus rendering the first call to executor.shutdown(wait=False) rather useless.

Question

Is it possible that for some reason in your environment these two consecutive calls to shutdown is the cause of your hang? Try the following code to see if it makes any difference:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def task():
    for i in range(3):
        print(i)
        sleep(1)


executor = ThreadPoolExecutor()
future = executor.submit(task)
executor.shutdown(wait=False)

If this runs to completion, then you can try first adding the call future.cancel() and if the program still completes, then add back the additional call to executor.shutdown(wait=True).

Update

Based on your comment, if you are looking to terminate a thread that never ends on its own, you cannot do this if you are using the concurrent.futures package or a threading.Thread instance. However, you can do this with the multiprocessing.pool.ThreadPool multithreading pool:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
pool.terminate() # Now terminate the pool

Prints:

0
1
2
3

Or you can use the pool as a context manager, which implicitly calls terminate() on the pool when the context manager block exits:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


with ThreadPool(1) as pool:
    async_result = pool.apply_async(task)
    sleep(2) # Let the task run for a while
    # Now we exit the block:
# There is an explicit call to pool.terminate() when
# the context manager block exits.

Finally, since the pool threads are daemon threads, when the process that created the pool terminates, then the pool's threads automatically terminate and you do not even have to call terminate:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    i = 0;
    while True:
        print(i)
        i += 1
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
sleep(2) # Let the task run for a while
# Now the main process implicitly terminates since no more statements
# are being executed and the pool&#39;s threads are destroyed.

But if you do not want the running task to terminate prematurely just because the program has no more statements to execute:

from multiprocessing.pool import ThreadPool
from time import sleep

def task():
    for i in range(10):
        print(i)
        sleep(.5)


pool = ThreadPool(1)
async_result = pool.apply_async(task)
# Explicitly wait for the task to complete by calling `get` on the 
# AsyncResult instance or call pool.close() followed by pool.join() 
# to wait for all submitted tasks to complete:
async_result.get()
# Program terminates here:

You may also wish to look at this for a comparison of the two pool packages.

huangapple
  • 本文由 发表于 2023年6月26日 02:49:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/76551956.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定