Can Go spawn and communicate with external processes without starting one OS-thread per external process?

huangapple go评论85阅读模式
英文:

Can Go spawn and communicate with external processes without starting one OS-thread per external process?

问题

在Golang中,是否可以并行地生成多个外部进程(shell命令),而不是为每个外部进程启动一个操作系统线程...并且在其完成时仍然能够接收其输出?

在Elixir中,如果使用ports,您可以生成数千个外部进程,而不会真正增加Erlang虚拟机中的线程数。

例如,以下代码片段启动了2500个外部的sleep进程,在Erlang VM下仅由20个操作系统线程管理:

defmodule Exmultiproc do
  for _ <- 1..2500 do
    cmd = "sleep 3600"
    IO.puts "Starting another process ..."
    Port.open({:spawn, cmd}, [:exit_status, :stderr_to_stdout])
  end
  System.cmd("sleep", ["3600"])
end

(假设您将ulimit -n设置为较高的数字,例如10000)

另一方面,以下Go代码也要做同样的事情-启动2500个外部的sleep进程,但它也会启动2500个操作系统线程。因此,它显然为每个(阻塞?)系统调用启动一个操作系统线程(以便不会阻塞整个CPU等等,如果我理解正确的话):

package main

import (
    "fmt"
    "os/exec"
    "sync"
)

func main() {
    wg := new(sync.WaitGroup)
    for i := 0; i < 2500; i++ {
        wg.Add(1)
        go func(i int) {
            fmt.Println("Starting sleep ", i, "...")
            cmd := exec.Command("sleep", "3600")
            _, err := cmd.Output()
            if err != nil {
                panic(err)
            }
            fmt.Println("Finishing sleep ", i, "...")
            wg.Done()
        }(i)
    }
    fmt.Println("Waiting for WaitGroup ...")
    wg.Wait()
    fmt.Println("WaitGroup finished!")
}

因此,我想知道是否有一种方法可以编写Go代码,使其像Elixir代码一样执行类似的操作,而不是为每个外部进程打开一个操作系统线程?

我基本上正在寻找一种方法来管理至少几千个外部长时间运行(长达10天)的进程,以尽可能少地引起操作系统中的任何虚拟或物理限制的问题。

(对于代码中的任何错误,我表示抱歉,因为我对Elixir和Go都很陌生。我渴望了解我可能犯的任何错误。)

编辑:澄清了并行运行长时间进程的要求。

英文:

Short version:

Is it possible in Golang to spawn a number of external processes (shell commands) in parallel, such that it does not start one operating system thread per external process ... and still be able to receive its output when it is finished?

Longer version:

In Elixir, if you use ports, you can spawn thousands of external processes without really increasing the number of threads in the Erlang virtual machine.

E.g. the following code snippet, which starts 2500 external sleep processes, is managed by only 20 operating system threads under the Erlang VM:

defmodule Exmultiproc do
  for _ &lt;- 1..2500 do
    cmd = &quot;sleep 3600&quot;
    IO.puts &quot;Starting another process ...&quot;
    Port.open({:spawn, cmd}, [:exit_status, :stderr_to_stdout])
  end
  System.cmd(&quot;sleep&quot;, [&quot;3600&quot;])
end

(Provided you set ulimit -n to a high number, such as 10000)

On the other hand, the following code in Go, which is supposed to do the same thing - starting 2500 external sleep processes - does also start 2500 operating system threads. So it obviously starts one operating system thread per (blocking?) system call (so as not to block the whole CPU, or similar, if I understand correctly):

package main

import (
    &quot;fmt&quot;
    &quot;os/exec&quot;
    &quot;sync&quot;
)

func main() {
    wg := new(sync.WaitGroup)
    for i := 0; i &lt; 2500; i++ {
        wg.Add(1)
        go func(i int) {
            fmt.Println(&quot;Starting sleep &quot;, i, &quot;...&quot;)
            cmd := exec.Command(&quot;sleep&quot;, &quot;3600&quot;)
            _, err := cmd.Output()
            if err != nil {
                panic(err)
            }
            fmt.Println(&quot;Finishing sleep &quot;, i, &quot;...&quot;)
            wg.Done()
        }(i)
    }
    fmt.Println(&quot;Waiting for WaitGroup ...&quot;)
    wg.Wait()
    fmt.Println(&quot;WaitGroup finished!&quot;)
}

Thus, I was wondering if there is a way to write the Go code so that it does the similar thing as the Elixir code, not opening one operating system thread per external process?

I'm basically looking for a way to manage at least a few thousand external long-running (up to 10 days) processes, in a way that causes as little problems as possible with any virtual or physical limits in the operating system.

(Sorry for any mistakes in the codes, as I'm new to Elixir and, and quite new to Go. I'm eager to get to know any mistakes I'm doing.)

EDIT: Clarified about the requirement to run the long-running processes in parallel.

答案1

得分: 1

我发现如果我们不等待进程,Go运行时将不会启动2500个操作系统线程,所以请使用cmd.Start()而不是cmd.Output()。

但是,似乎不可能在不消耗操作系统线程的情况下读取进程的stdout,这是由于Golang的os包没有使用非阻塞IO来读取管道。

下面的程序在我的Linux上运行良好,尽管它会像@JimB在评论中所说的那样阻塞进程的stdout,可能是因为我们的输出很小并且适合系统缓冲区。

func main() {
    concurrentProcessCount := 50
    wtChan := make(chan *result, concurrentProcessCount)
    for i := 0; i < concurrentProcessCount; i++ {
        go func(i int) {
            fmt.Println("Starting process ", i, "...")
            cmd := exec.Command("bash", "-c", "for i in 1 2 3 4 5; do echo to sleep $i seconds;sleep $i;echo done;done;")
            outPipe,_ := cmd.StdoutPipe()
            err := cmd.Start()
            if err != nil {
                panic(err)
            }
            <-time.Tick(time.Second)
            fmt.Println("Finishing process ", i, "...")
            wtChan <- &result{cmd.Process, outPipe}
        }(i)
    }

    fmt.Println("root:",os.Getpid());

    waitDone := 0
    forLoop:
    for{
        select{
        case r:=<-wtChan:
            r.p.Wait()
            waitDone++
            output := &bytes.Buffer{}
            io.Copy(output, r.b)
            fmt.Println(waitDone, output.String())
            if waitDone == concurrentProcessCount{
                break forLoop
            }
        }
    }
}

希望对你有帮助!

英文:

I find that if we not wait processes, the Go runtime will not start 2500 operating system threads. so please use cmd.Start() other than cmd.Output().

But seems it is impossible to read the process's stdout without consuming a OS thread by golang os package. I think it is because os package not use non-block io to read the pipe.

The bottom, following program runs well on my Linux, although it block the process's stdout as @JimB said in comment, maybe it is because we have small output and it fit the system buffers.

func main() {
	concurrentProcessCount := 50
	wtChan := make(chan *result, concurrentProcessCount)
	for i := 0; i &lt; concurrentProcessCount; i++ {
		go func(i int) {
			fmt.Println(&quot;Starting process &quot;, i, &quot;...&quot;)
			cmd := exec.Command(&quot;bash&quot;, &quot;-c&quot;, &quot;for i in 1 2 3 4 5; do echo to sleep $i seconds;sleep $i;echo done;done;&quot;)
			outPipe,_ := cmd.StdoutPipe()
			err := cmd.Start()
			if err != nil {
				panic(err)
			}
			&lt;-time.Tick(time.Second)
			fmt.Println(&quot;Finishing process &quot;, i, &quot;...&quot;)
			wtChan &lt;- &amp;result{cmd.Process, outPipe}
		}(i)
	}

	fmt.Println(&quot;root:&quot;,os.Getpid());

	waitDone := 0
	forLoop:
	for{
		select{
		case r:=&lt;-wtChan:
			r.p.Wait()
			waitDone++
			output := &amp;bytes.Buffer{}
			io.Copy(output, r.b)
			fmt.Println(waitDone, output.String())
			if waitDone == concurrentProcessCount{
				break forLoop
			}
		}
	}
}

huangapple
  • 本文由 发表于 2015年11月27日 08:19:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/33948726.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定