如何停止由外部 I/O 阻塞的 goroutine,该 goroutine 是为进程启动的?

huangapple go评论95阅读模式
英文:

How to stop goroutine blocked by external I/O started for process?

问题

我在这里遇到一个问题,我无法安全地退出 goroutine。

我使用 exec.Command 创建了一个外部进程(存储了进程的 cmd、stdin 管道和 stdout 管道):

exec.Command(args[0], args[1]...) // args[0] 是基本命令

每当需要启动该进程时,我会调用:

cmd.Start()

然后在启动和附加后,我会运行两个 goroutine:

shutdown := make(chan struct{})
// 运行从进程读取数据并将数据发送到 CmdIn 通道的例程
go pr.cmdInRoutine()
// 运行从 CmdOut 读取数据并写入进程的例程
go pr.cmdOutRoutine()

cmdInRoutine:

func (pr *ExternalProcess) cmdInRoutine() {
	app.At(te, "cmdInRoutine")

	for {
		println("CMDINROUTINE")
		select {
		case <-pr.shutdown:
			println("!!! Shutting cmdInRoutine down !!!")
			return
		default:
			println("Inside the for loop of the CmdInRoutine")
			if pr.stdOutPipe == nil {
				println("!!! Standard output pipe is nil. Sending Exit Request !!!")
				pr.ProcessExit <- true
				close(pr.shutdown)
				return
			}

			buf := make([]byte, 2048)

			size, err := pr.stdOutPipe.Read(buf)
			if err != nil {
				println("!!! Sending exit request from cmdInRoutine !!!")
				pr.ProcessExit <- true
				close(pr.shutdown)
				return
			}

			println("--- Received data for sending to CmdIn:", string(buf[:size]))
			pr.CmdIn <- buf[:size]
		}

	}
}

cmdOutRoutine:

func (pr *ExternalProcess) cmdOutRoutine() {
	app.At(te, "cmdOutRoutine")

	for {
		select {
		case <-pr.shutdown:
			println("!!! Shutting cmdOutRoutine down !!!")
			return
		case data := <-pr.CmdOut:
			println("Received data for sending to Process: ", data)
			if pr.stdInPipe == nil {
				println("!!! Standard input pipe is nil. Sending Exit Request !!!")
				pr.ProcessExit <- true
				return
			}

			println("--- Received input to write to external process:", string(data))
			_, err := pr.stdInPipe.Write(append(data, '\n'))
			if err != nil {
				println("!!! Couldn't Write To the std in pipe of the process !!!")
				pr.ProcessExit <- true
				return
			}
		}
	}
}

这里有一些有趣的情况:

  1. 当进程发送 EOF(不要在意 pr.ProcessExit <- true,我是使用一个通道通知父处理程序停止和退出进程)在 cmdInRoutine 中,我也关闭了 shutdown 通道,这使得 cmdOutRoutine 退出,因为在 select 语句中没有默认情况,所以它会阻塞并等待退出或数据,然后将数据写入正在运行的进程中使用存储的 stdInPipe

  2. 当我只想停止 goroutine,但保持进程运行,即暂停读取和写入时,我关闭了 shutdown 通道,希望这两个 goroutine 会结束。

    - cmdOutRoutine 打印 !!! Shutting cmdOutRoutine down !!!,因为 select 没有默认情况,关闭 shutdown 通道会导致几乎立即返回

    - cmdOutRoutine 不打印任何内容,我有一种奇怪的感觉它甚至没有返回,我认为是因为 它在默认情况下被阻塞在从 stdInPipe 读取

我考虑在 for 循环之前在 cmdOutRoutine 中运行另一个 goroutine,并将进程的 stdIn 数据转换为一个通道,然后我就能够在 cmdInRoutine 中消除默认情况,但这会创建另一个问题,新的 goroutine 也必须被停止,它仍然被阻塞在从正在运行的进程的 stdIn 读取。

有什么办法可以解决这个问题(修改逻辑)以满足随时关闭和启动 goroutine(进程 I/O),但不关闭正在运行的进程本身的需求吗?或者有没有一种避免阻塞调用读取和写入的方法,我还不知道的?

英文:

I'm having a problem here that I can't exit goroutine safely.

I'm having an external process created using exec.Command (storing cmd, stdin pipe and stdout pipe of the process):

exec.Command(args[0], args[1]...) // args[0] is a base command

Whenever there is a need to start that process I'm calling:

cmd.Start()

Then upon start and attaching I'm running 2 goroutines:

shutdown := make(chan struct{})
// Run the routine which will read from process and send the data to CmdIn channel
go pr.cmdInRoutine()
// Run the routine which will read from CmdOut and write to process
go pr.cmdOutRoutine()

cmdInRoutine:

func (pr *ExternalProcess) cmdInRoutine() {
	app.At(te, &quot;cmdInRoutine&quot;)

	for {
		println(&quot;CMDINROUTINE&quot;)
		select {
		case &lt;-pr.shutdown:
			println(&quot;!!! Shutting cmdInRoutine down !!!&quot;)
			return
		default:
			println(&quot;Inside the for loop of the CmdInRoutine&quot;)
			if pr.stdOutPipe == nil {
				println(&quot;!!! Standard output pipe is nil. Sending Exit Request !!!&quot;)
				pr.ProcessExit &lt;- true
				close(pr.shutdown)
				return
			}

			buf := make([]byte, 2048)

			size, err := pr.stdOutPipe.Read(buf)
			if err != nil {
				println(&quot;!!! Sending exit request from cmdInRoutine !!!&quot;)
				pr.ProcessExit &lt;- true
				close(pr.shutdown)
				return
			}

			println(&quot;--- Received data for sending to CmdIn:&quot;, string(buf[:size]))
			pr.CmdIn &lt;- buf[:size]
		}

	}
}

cmdOutRoutine:

func (pr *ExternalProcess) cmdOutRoutine() {
	app.At(te, &quot;cmdOutRoutine&quot;)

	for {
		select {
		case &lt;-pr.shutdown:
			println(&quot;!!! Shutting cmdOutRoutine down !!!&quot;)
			return
		case data := &lt;-pr.CmdOut:
			println(&quot;Received data for sending to Process: &quot;, data)
			if pr.stdInPipe == nil {
				println(&quot;!!! Standard input pipe is nil. Sending Exit Request !!!&quot;)
				pr.ProcessExit &lt;- true
				return
			}

			println(&quot;--- Received input to write to external process:&quot;, string(data))
			_, err := pr.stdInPipe.Write(append(data, &#39;\n&#39;))
			if err != nil {
				println(&quot;!!! Couldn&#39;t Write To the std in pipe of the process !!!&quot;)
				pr.ProcessExit &lt;- true
				return
			}
		}
	}
}

Interesting cases here:

  1. When process sends EOF (don't mind the pr.ProcessExit <- true I was notifying to the parent handler using a channel to stop and exit the process) in cmdInRoutine I'm closing the shutdown channel too and this lets cmdOutRoutine to exit because inside the select statement there's no default case so it blocks and waits for either exit or data in which then gets written to the running process using stored stdInPipe.

  2. When I want to just stop goroutines but leave process running i.e kind of pausing the read and write I'm closing the shutdown channel with the hope that those 2 goroutines will end.
    <br>- cmdOutRoutine prints !!! Shutting cmdOutRoutine down !!! because select doesn't have default case and closing the shutdown channel causes returning almost immediately
    <br>- cmdOutRoutine doesn't print anything and I'm having a weird feeling that it's not even returning, I think because It's blocked in the default case at reading from stdInPipe.

I was thinking of running another goroutine inside the cmdOutRoutine before the for loop and have stdIn data of the process converted into a channel then I would be able to eliminate default case in cmdInRoutine but this creates another problem, that new goroutine also has to be stopped it will still be blocked by the reading from the stdIn of the running process.

Any ideas how can I resolve this (modify the logic) to meet the needs of shutting down anytime and starting goroutines (process I/O) but not the running process itself? Or Is there a way to avoid blocking calls of read and write at all, that I'm not aware of yet?

Thanks a lot.

答案1

得分: 2

它可能在pr.stdOutPipe.Read(buf)这一行被阻塞了。你可以尝试关闭pr.stdOutPipe,这样应该会中断读取操作。

你也可以关闭pr.stdInPipe,以确保写入操作不会被阻塞。

编辑:这样做不会让你重新连接,但没有其他方法可以中断读取操作。最好的做法可能是让这两个goroutine在整个进程中一直运行,并在堆栈的其他位置暂停(例如,如果你不想在暂停状态下接收命令的输出,就不要将buf写入pr.CmdIn,但要小心避免竞争条件)。

在当前版本的Go中,关闭可能存在错误:issue 6817

编辑结束

此外,对于pr.CmdIn要小心处理。如果关闭stdOutPipe不会导致Read返回错误,cmdInRoutine将尝试向通道写入数据。如果没有任何地方从中读取数据,cmdInRoutine将永远阻塞。我建议将pr.stdOutPipe.Read(buf)移出select语句,然后将pr.CmdIn <- buf[:size]作为另一个case添加到select语句中:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    // 这个检查可能只需要在循环开始时进行一次。
    // 如果stdOutPipe在循环期间可能发生变化,
    // 那就是一个竞争条件。
    if pr.stdOutPipe == nil {
        println("!!! 标准输出管道为nil。发送退出请求 !!!")
        pr.ProcessExit <- true
        close(pr.shutdown)
        return
    }

    for {
        println("CMDINROUTINE")
        // 我们需要在每次迭代中分配一个新的缓冲区,
        // 因为当我们通过通道传递它时,
        // 我们不能再安全地覆盖其中的数据,
        // 因为另一个goroutine可能仍在使用它。
        buf := make([]byte, 2048)
        size, err := pr.stdOutPipe.Read(buf)
        if err != nil {
            println("!!! 从cmdInRoutine发送退出请求 !!!")
            // 对此要小心,如果你在关闭stdOutPipe时也关闭了pr.shutdown,那么这将导致panic(关闭一个已关闭的通道)。
            pr.ProcessExit <- true
            close(pr.shutdown)
            return
        }

        // 现在我们有了一些数据,尝试发送它,
        // 除非我们已经完成。
        select {
        case <-pr.shutdown:
            println("!!! 关闭cmdInRoutine !!!")
            return
        case pr.CmdIn <- buf[:size]:
            println("--- 接收到要发送到CmdIn的数据:", string(buf[:size]))
        }
    }
}
英文:

It's probably blocked at pr.stdOutPipe.Read(buf). You could try closing pr.stdOutPipe, that should interrupt the Read.

You can also close pr.stdInPipe, to make sure the Write doesn't block.

Edit: This won't allow you to reattach, but there's no other way to interrupt that read. It's probably better to just keep these two goroutines running for the entire process, and pause somewhere else in the stack (e.g. if you don't want to receive the command's output in the paused state, don't write buf to pr.CmdIn - but be careful to avoid race conditions).

Close might be buggy in current versions of go: issue 6817.

End of edit

Also, be careful with pr.CmdIn. If closing stdOutPipe doesn't cause Read to return an error, cmdInRoutine will try to write to the channel. If nothing reads from it, cmdInRoutine will block forever. I would move pr.stdOutPipe.Read(buf) out of the select, then add pr.CmdIn &lt;- buf[:size] as another case to the select:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, &quot;cmdInRoutine&quot;)

    // this check should probably only happen once.
    // if stdOutPipe can change during the loop,
    // then that&#39;s a race condition.
    if pr.stdOutPipe == nil {
        println(&quot;!!! Standard output pipe is nil. Sending Exit Request !!!&quot;)
        pr.ProcessExit &lt;- true
        close(pr.shutdown)
        return
    }

    for {
        println(&quot;CMDINROUTINE&quot;)
        // we need to allocate a new buffer in each iteration,
        // because when we pass it through the channel,
        // we can no longer safely overwrite the data in it,
        // since the other goroutine might still be using it.
        buf := make([]byte, 2048)
        size, err := pr.stdOutPipe.Read(buf)
        if err != nil {
            println(&quot;!!! Sending exit request from cmdInRoutine !!!&quot;)
            // Be careful with this, if you also closed pr.shutdown when you closed stdOutPipe, then this is going to panic (closing a closed channel).
            pr.ProcessExit &lt;- true
            close(pr.shutdown)
            return
        }

        // now that we have some data, try to send it,
        // unless we&#39;re done.
        select {
        case &lt;-pr.shutdown:
            println(&quot;!!! Shutting cmdInRoutine down !!!&quot;)
            return
        case pr.CmdIn &lt;- buf[:size]:
            println(&quot;--- Received data for sending to CmdIn:&quot;, string(buf[:size]))
        }
    }
}

huangapple
  • 本文由 发表于 2017年4月16日 19:29:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/43436751.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定