Thread.Join()在.NET中如何在线程终止之前返回?

huangapple go评论78阅读模式
英文:

How can Thread.Join() in .NET return before the thread has terminated?

问题

我有一个生成2个线程的过程。一个线程从网络套接字读取,另一个线程写入相同的网络套接字。一旦一切都初始化并且网络套接字已连接,主线程将在以下函数中阻塞,该函数创建了这2个线程:

public void BlockUntilThreadsAreDone()
{
    if (_incomingThread.IsAlive)
        _incomingThread.Join();
    if (_outgoingThread.IsAlive)
        _outgoingThread.Join();
}

一切都运行正常一段时间。但几分钟后,我发现主线程将从此函数返回。没有引发任何异常,我也没有看到任何异常日志记录。函数只是因为Thread.Join()调用已返回而返回。

我修改了函数以等待两个线程关闭时都会设置的事件,如下所示:

private readonly ManualResetEventSlim _messagingComplete = new(false);

public void BlockUntilThreadsAreDone()
{
    if (_incomingThread.IsAlive)
        _incomingThread.Join();
    Console.WriteLine($"Incoming thread state={_incomingThread.ThreadState}");

    if (_outgoingThread.IsAlive)
       _outgoingThread.Join();
    Console.WriteLine($"Outgoing thread state={_outgoingThread.ThreadState}");

    _messagingComplete.Wait(_cancellationToken);
}

线程对象是这样创建的:

_incomingMessageThread = new(HandleIncomingMessages) { Name = "Incoming" };
_outgoingMessageThread = new(HandleOutgoingMessages) { Name = "Outgoing" };

下面是HandleIncomingMessages方法的部分代码:

private async void HandleIncomingMessages()
{
    try
    {
        Console.WriteLine("PapiClient HandleIncomingMessages starting");

        do
        {
            try
            {
                Message? message = await ReceiveIncomingMessageAsync();
                // 异步处理每个消息,以便我们可以继续接收更多消息
                _ = Task.Run(
                    () =>
                    {
                        HandleIncomingMessage(message);
                    },
                    _cancellationToken
                );
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"处理消息时发生异常:{ex}");
            }
        } while (!_cancellationToken.IsCancellationRequested && Connected);
    }
    catch (ObjectDisposedException) { }
    finally
    {
        _messagingComplete.Set();
        Console.WriteLine(
            $"PapiClient HandleIncomingMessages 完成:cancel={_cancellationToken.IsCancellationRequested}, connected={Connected}"
        );
    }
}

现在代码按照我期望的方式工作(即,该函数仅在我们处理完时返回)。从技术上讲,这会在第一个线程完成时返回,但对于我的目的来说,这已经足够接近了。

有趣的是,我发现在进程启动几分钟后,两个线程最终会记录它们的状态为ThreadState.Stopped。但是,这两个线程绝对没有停止,因为我可以看出进程继续在网络套接字上发送和接收数据,并且在这些线程终止时通常发生的所有正常处理此时都没有发生。

文档中说Join"阻塞调用线程,直到由此实例表示的线程终止。"文档还说"线程永远不会离开Stopped状态。"

我感到困惑。为什么Thread.Join()会返回,而线程指示其状态为Stopped,而它明显没有停止?

我怀疑这可能与网络和/或特定的网络套接字有关。在此发生时,我运行了数据包捕获,并注意到问题发生前几毫秒,网络套接字会发送一个未经请求的PONG消息。单独的PONG消息本身不是问题,但它们只会每30秒发送一次,因此在定时器唤醒并发送PONG消息后的几毫秒内发生Thread.Join()的异常似乎不太寻常。

我在.NET core GitHub问题列表上找不到关于此问题的问题。有什么想法吗?

英文:

I have a process that spawns 2 threads. One thread reads from a web socket, and the other writes to the same web socket. Once everything is initialized and the web socket has connected, the main thread blocks on the 2 threads it created in the following function:

public void BlockUntilThreadsAreDone()
{
    if (_incomingThread.IsAlive)
        _incomingThread.Join();
    if (_outgoingThread.IsAlive)
        _outgoingThread.Join();
}

Everything runs fine for a while. But after several minutes, I discovered that the main thread will return from this function. No exceptions are thrown, and I don't see any unusual logging. The function simply returns because the Thread.Join() calls returned.

I modified the function to wait on an event that both threads will set when they are shutting down as follows:

private readonly ManualResetEventSlim _messagingComplete = new(false);

public void BlockUntilThreadsAreDone()
{
    if (_incomingThread.IsAlive)
        _incomingThread.Join();
    Console.WriteLine($"Incoming thread state={_incomingThread.ThreadState}");

    if (_outgoingThread.IsAlive)
       _outgoingThread.Join();
    Console.WriteLine($"Outgoing thread state={_outgoingThread.ThreadState}");

    _messagingComplete.Wait(_cancellationToken);
}

The thread objects are created like this

_incomingMessageThread = new(HandleIncomingMessages) { Name = "Incoming" };
_outgoingMessageThread = new(HandleOutgoingMessages) { Name = "Outgoing" };
private async void HandleIncomingMessages()
    try
    {
        Console.WriteLine("PapiClient HandleIncomingMessages starting");

        do
        {
            try
            {
                Message? message = await ReceiveIncomingMessageAsync();
                // Handle each message asynchronously so we can keep receiving more messages
                _ = Task.Run(
                    () =>
                    {
                        HandleIncomingMessage(message);
                    },
                    _cancellationToken
                );
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Exception while handling messages: {ex}");
            }
        } while (!_cancellationToken.IsCancellationRequested && Connected);
    }
    catch (ObjectDisposedException) { }
    finally
    {
        _messagingComplete.Set();
        Console.WriteLine(
            $"PapiClient HandleIncomingMessages finishing: cancel={_cancellationToken.IsCancellationRequested}, connected={Connected}"
        );
    }
}

Now the code works as I would expect (i.e., the function only returns once we're done processing). Technically it's not the same since this returns once the first thread completes, but that's close enough for my purposes.

Interestingly, I see that a few minutes after the process started, both threads will be eventually log that their state is ThreadState.Stopped. The threads most definitely are not stopped, though, because I can tell the process continues to send and receive data over the web socket, and there are no other threads created to work with the web socket. In addition, all the normal processing that happens when the threads terminate doesn't occur at that point.

The documentation says that Join "blocks the calling thread until the thread represented by this instance terminates." The documentation also says that "a thread can never leave the Stopped state."

I am baffled. Why would Thread.Join() return and a thread indicate its state is Stopped when it most definitely has not stopped?

I suspect this might have something to do with networking and/or web sockets in particular. I ran a packet capture while this was happening, and I noticed a few milliseconds before the problem occurred, the websocket sends an unsolicited PONG message. The unsolicited PONG isn't a problem by itself, but those are only sent every 30 seconds, so it seems unusual that the spurious Thread.Join() would happen just a few milliseconds after a timer wakes up and sends the PONG message.

I couldn't find an issue about this on the .NET core github issues list. Any ideas?

答案1

得分: 2

看着你的Github仓库(顺便说一句,你的问题应该真正是自包含的),两个函数HandleOutgoingMessagesHandleIncomingMessages标记为async void。这意味着一旦它们遇到第一个await,它们就会结束,并将继续操作提交到线程池。

你似乎期望它们所在的线程会在await继续操作发生时暂停,然后继续执行。这是不正确的。你的线程会结束,线程池会接管它。

这本质上是关于线程和任务之间的混淆。你大部分时间都在使用任务和异步,我建议你坚持使用它,因此使用Task.Run来将Handle函数卸载。

你需要以下更改:

private readonly Task _incomingMessageThread;
private readonly Task _outgoingMessageThread;
private async Task HandleIncomingMessages()
{
private async Task HandleOutgoingMessages()
{

在构造函数中删除以_incomingMessageThread = _outgoingMessageThread =开头的行。

用以下代码替换_incomingMessageThread.Start(); _outgoingMessageThread.Start();

_incomingMessageThread = Task.Run(HandleIncomingMessages, _cancellationToken);
_outgoingMessageThread = Task.Run(HandleOutgoingMessages, _cancellationToken);

在需要等待线程完成的各个时刻,只需执行:

await Task.WhenAll(_incomingMessageThread, _outgoingMessageThread);

你不需要检查它们是否已完成,它们在那之后总是已经完成了。

或者使用以下替代方法:

try
{
    await Task
        .WhenAll(_incomingMessageThread, _outgoingMessageThread)
        .WaitAsync(TimeSpan.FromSeconds(DISCONNECT_TIMEOUT));
}
catch (OperationCanceledException)
{
    if (!_incomingMessageThread.IsCompleted)
        Console.WriteLine("Incoming task not stopping");
    if (!_outgoingMessageThread.IsCompleted)
        Console.WriteLine("Outgoing task not stopping");
}
英文:

Looking at your repo on Github (by the way your question should really be self-contained), the two functions HandleOutgoingMessages and HandleIncomingMessages are marked async void. Which means as soon as they hit their first await they are going to end, and post the continuation to the threadpool.

You seem to be expecting that the thread they are on will somehow pause, and pick up when the await continuation happens. This is not true. Your thread ends, and the threadpool will pick it up.

This is essentially a confusion between threads and tasks. You are mostly using tasks and async, I suggest you stick with that, and therefore use Task.Run to offload the Handle functions.

You need the following changes

private readonly Task _incomingMessageThread;
private readonly Task _outgoingMessageThread;
private async Task HandleIncomingMessages()
{
private async Task HandleOutgoingMessages()
{

Remove the lines in the constructor beginning _incomingMessageThread = and _outgoingMessageThread =.

Replace the lines _incomingMessageThread.Start(); _outgoingMessageThread.Start(); with

_incomingMessageThread = Task.Run(HandleIncomingMessages, _cancellationToken);
_outgoingMessageThread = Task.Run(HandleOutgoingMessages, _cancellationToken);

At various points when you need to wait for the threads to finish, just do

await Task.WhenAll(_incomingMessageThread, _outgoingMessageThread);

You do not need to check whether they have finished, they will always have finished after that.

Alternatively

try
{
    await Task
        .WhenAll(_incomingMessageThread, _outgoingMessageThread)
        .WaitAsync(TimeSpan.FromSeconds(DISCONNECT_TIMEOUT));
}
catch (OperationCanceledException)
{
    if (!_incomingMessageThread.IsCompleted)
        Console.WriteLine("Incoming task not stopping");
    if (!_outgoingMessageThread.IsCompleted)
        Console.WriteLine("Outgoing task not stopping");
}

huangapple
  • 本文由 发表于 2023年6月29日 00:15:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76575018.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定