当从Go应用程序向Python应用程序的stdin写入时出现”Broken Pipe”错误。

huangapple go评论88阅读模式
英文:

Broken Pipe Error when writing to stdin of a python application from a go application

问题

我已经实现了一个工作池,用于将我的作业提交给Python脚本。

func NewWorker(index int, workerConfig config.AppConfig, logger log.PrefixedLogger) error {
    var worker entities.Worker

    worker.ID = index
    worker.Config = workerConfig
    strCommand := workerConfig.RideModelScriptPath
    command := exec.Command(strCommand)

    stdIn, err := command.StdinPipe()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error resolving stdin pipe from command", err.Error())
        return err
    }
    worker.Stdin = stdIn

    stdout, err := command.StdoutPipe()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error resolving stdout pipe from command", err.Error())
        return err
    }
    worker.StdOutReader = bufio.NewReaderSize(stdout, workerConfig.MaxRequestSize)

    stderr, err := command.StderrPipe()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error resolving stderror pipe from command", err.Error())
        return err
    }
    worker.StdError = stderr

    err = command.Start()
    if err != nil {
        logger.Error("worker_pool.InitWorkerPool", "Error starting command", err.Error())
        return err
    }

    go processWorkerPool(&worker, ReqChan, logger)
    return err
}

当共享通道接收到作业时,它会被消耗并发送到Python脚本。

func processWorkerPool(worker *entities.Worker, ReqChannel chan entities.ReqMessage, logger log.PrefixedLogger) {

    for request := range ReqChannel {
        bufferLatency.Observe(float64(time.Since(request.SentTime).Nanoseconds()/1e6), map[string]string{"name": "buffer", "error": "false"})

        logger.Info("worker.processWorkerPool", request.Request)

        startTime := time.Now()

        //Send Request to Worker
        _, err := io.WriteString(worker.Stdin, request.Request)
        if err != nil {
            scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "true"})
            log.ErrorContext(context.Background(), log.WithPrefix("worker.processWorkerPool", err))
            return
        }

        //Get response from Worker
        result := CopyOutput(logger, worker.StdOutReader)

        scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "false"})
        request.ResponseChannel <- result
    }
}

为了从Python脚本的stdout中读取结果,我使用了以下辅助函数。

func CopyOutput(logger log.PrefixedLogger, r io.Reader) string {
    scanner := bufio.NewScanner(r)
    result := ""

    for scanner.Scan() {
        output := scanner.Text()

        switch {
        case strings.Contains(output, "ERROR"):
            errorMsg := strings.SplitAfter(output, "ERROR: ")[1]
            err := errors.New(errorMsg)
            logger.Error("worker.CopyOutput", "ERROR LOG: ", err.Error())
            return err.Error()
        case strings.Contains(output, "OUTPUT"):
            result = strings.SplitAfter(output, "OUTPUT: ")[1]
            logger.Debug("worker.copyOutput", "OUTPUT LOG: ", result)
            return result
        default:
            logger.Debug("worker.copyOutput", "DEBUG LOG: ", output)
        }
    }
    return result
}

在Python端,我的脚本如下所示。

#!/usr/bin/python3
import sys
import json
from threading import Thread

from vrpsolver.ride_model import rides_model
from preprocessor.config_loader import Config


# Load Configs
configs = Config('/opt/pool-processor/configs/configs.yaml')

while True:
    
    # input = json.loads(sys.argv[1])
    # model = sys.argv[2]
    # file = sys.argv[3]
    
    threads = []
    try:
        inputDataStream = sys.stdin.readline()
        inputDataStream = inputDataStream.strip()
        data = inputDataStream.split(' ')
        model = data[1]
    except (Exception) as ex:
        sys.stdout.write('ERROR: Error Occured while reading stdin: {}\n'.format(str(ex)))
        sys.stdout.flush()
        continue
    
    try:
        input = json.loads(data[0])
    except (Exception, IOError) as ex:
        sys.stdout.write('ERROR: Error Occured while parsing data to json: {}\n'.format(str(ex)))
        continue

    try:
        result = rides_model(input, configs)
        sys.stdout.write('OUTPUT: {}\n'.format(json.dumps(result)))
        sys.stdout.flush()
    except (Exception, IOError) as ex:
        sys.stdout.write('ERROR: Error Occured while processing: {}\n'.format(str(ex)))
        sys.stdout.flush()
        continue

当我运行程序一段时间后,我得到以下错误信息。

write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:76
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:83

这些错误来自以下代码行。

_, err := io.WriteString(worker.Stdin, request.Request)
result := CopyOutput(logger, worker.StdOutReader)

我已经陷入了一段时间,对此的任何输入都将不胜感激。我猜测在一段时间后,Python脚本崩溃了,因此我得到了这个错误。我不确定为什么这个崩溃错误没有被异常捕获到。

英文:

I have implemented a worker pool to submit my jobs to the python script.

func NewWorker(index int, workerConfig config.AppConfig, logger log.PrefixedLogger) error {
	var worker entities.Worker

	worker.ID = index
	worker.Config = workerConfig
	strCommand := workerConfig.RideModelScriptPath
	command := exec.Command(strCommand)

	stdIn, err := command.StdinPipe()
	if err != nil {
		logger.Error(&quot;worker_pool.InitWorkerPool&quot;, &quot;Error resolving stdin pipe from command&quot;, err.Error())
		return err
	}
	worker.Stdin = stdIn

	stdout, err := command.StdoutPipe()
	if err != nil {
		logger.Error(&quot;worker_pool.InitWorkerPool&quot;, &quot;Error resolving stdout pipe from command&quot;, err.Error())
		return err
	}
	worker.StdOutReader = bufio.NewReaderSize(stdout, workerConfig.MaxRequestSize)

	stderr, err := command.StderrPipe()
	if err != nil {
		logger.Error(&quot;worker_pool.InitWorkerPool&quot;, &quot;Error resolving stderror pipe from command&quot;, err.Error())
		return err
	}
	worker.StdError = stderr

	err = command.Start()
	if err != nil {
		logger.Error(&quot;worker_pool.InitWorkerPool&quot;, &quot;Error starting command&quot;, err.Error())
		return err
	}

	go processWorkerPool(&amp;worker, ReqChan, logger)
	return err
}

When the shared channel receives jobs it is consumed and sent to the python script.

func processWorkerPool(worker *entities.Worker, ReqChannel chan entities.ReqMessage, logger log.PrefixedLogger) {

	for request := range ReqChannel {
		bufferLatency.Observe(float64(time.Since(request.SentTime).Nanoseconds()/1e6), map[string]string{&quot;name&quot;: &quot;buffer&quot;, &quot;error&quot;: &quot;false&quot;})

		logger.Info(&quot;worker.processWorkerPool&quot;, request.Request)

		startTime := time.Now()

		//Send Request to Worker
		_, err := io.WriteString(worker.Stdin, request.Request)
		if err != nil {
			scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{&quot;name&quot;: &quot;script&quot;, &quot;error&quot;: &quot;true&quot;})
			log.ErrorContext(context.Background(), log.WithPrefix(&quot;worker.processWorkerPool&quot;, err))
			return
		}

		//Get response from Worker
		result := CopyOutput(logger, worker.StdOutReader)

		scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{&quot;name&quot;: &quot;script&quot;, &quot;error&quot;: &quot;false&quot;})
		request.ResponseChannel &lt;- result
	}
}

To read the results from stdout of the python script I use the following helper function

func CopyOutput(logger log.PrefixedLogger, r io.Reader) string {
	scanner := bufio.NewScanner(r)
	result := &quot;&quot;

	for scanner.Scan() {
		output := scanner.Text()

		switch {
		case strings.Contains(output, &quot;ERROR&quot;):
			errorMsg := strings.SplitAfter(output, &quot;ERROR: &quot;)[1]
			err := errors.New(errorMsg)
			logger.Error(&quot;worker.CopyOutput&quot;, &quot;ERROR LOG: &quot;, err.Error())
			return err.Error()
		case strings.Contains(output, &quot;OUTPUT&quot;):
			result = strings.SplitAfter(output, &quot;OUTPUT: &quot;)[1]
			logger.Debug(&quot;worker.copyOutput&quot;, &quot;OUTPUT LOG: &quot;, result)
			return result
		default:
			logger.Debug(&quot;worker.copyOutput&quot;, &quot;DEBUG LOG: &quot;, output)
		}
	}
	return result
}

On the python end my script look like this

#!/usr/bin/python3
import sys
import json
from threading import Thread

from vrpsolver.ride_model import rides_model
from preprocessor.config_loader import Config


# Load Configs
configs = Config(&#39;/opt/pool-processor/configs/configs.yaml&#39;)

while True:
    
    # input = json.loads(sys.argv[1])
    # model = sys.argv[2]
    # file = sys.argv[3]
    
    threads = []
    try:
        inputDataStream = sys.stdin.readline()
        inputDataStream = inputDataStream.strip()
        data = inputDataStream.split(&#39; &#39;)
        model = data[1]
    except (Exception) as ex:
        sys.stdout.write(&#39;ERROR: Error Occured while reading stdin: {}\n&#39;.format(str(ex)))
        sys.stdout.flush()
        continue
    
    try:
        input = json.loads(data[0])
    except (Exception, IOError) as ex:
        sys.stdout.write(&#39;ERROR: Error Occured while parsing data to json: {}\n&#39;.format(str(ex)))
        continue

    try:
        result = rides_model(input, configs)
        sys.stdout.write(&#39;OUTPUT: {}\n&#39;.format(json.dumps(result)))
        sys.stdout.flush()
    except (Exception, IOError) as ex:
        sys.stdout.write(&#39;ERROR: Error Occured while processing: {}\n&#39;.format(str(ex)))
        sys.stdout.flush()
        continue

When I run the programme after some time I'm getting

write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:76
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:83

from the following lines

_, err := io.WriteString(worker.Stdin, request.Request)
result := CopyOutput(logger, worker.StdOutReader)

I am stuck on this for a while now and any input on this is appreciated. My guess is that after some time the python script is crashing and as a result I am getting this error. I am not sure why that crashing error is not catched from the exception.

答案1

得分: 1

这个错误的基本答案是:由于某种原因,你的Python进程关闭了其STDIN(可能是由于某种原因退出),请检查为什么它过早退出。


以下是你很难看到你的Python进程在做什么的一些原因:

  • 它的主要“活动日志”在sys.stdout上,
  • stdout被你的Go程序捕获和处理(副作用是:它不会打印到控制台),
  • 你在处理Go中的子进程输出时存在一些问题。

为了更容易进行调试,我建议你的Python脚本将其输出写入日志文件中。


我看到的前三个问题是:

  • 你在Python进程上设置了一个StderrPipe,但从未使用过,因此STDERR完全被静音了。

尝试不要重定向stderr(你应该在控制台上看到stderr被打印出来),或者至少添加一个额外的goroutine来排空并将其内容打印到某个地方(在你的Go进程stderr中,或者在日志文件中...)

例如:

go func() {
   io.Copy(os.Stderr, worker.StdErrReader)
}()
  • 为了将子进程输出作为文本行读取,你反复在Stdout管道上创建一个新的bufio.Scanner

当你运行bufio.NewScanner(...)时,会创建一个新的带有新的缓冲区的缓冲读取器。如果你丢弃它并创建新的Scanner,之前的缓冲区将被丢弃,你不知道有多少字节已经从底层的io.Reader中读取(有些可能已经被缓冲...)。

至少,你应该只在processWorkerPool()中实例化一次bufio.NewScanner(),并重复调用scanner.Scan()在那个单独的*bufio.Scanner实例上,这样就可以使用相同的缓冲区。

  • 你应该以某种方式监视运行进程的状态

保留一种访问command.ProcessState的方式,并检查你的外部命令是否已完成。

英文:

The basic answer on this error is : for some reason, your python process has closed its STDIN (it probably has exited for some reason), check why it exits too early.


Some elements on why it's hard for you to see what your python process does :

  • its main "actitivity log" is on sys.stdout,
  • stdout is caught and processed by your go program (and a side effect is: it does not get printed to the console),
  • there are some issues in how you handle the subprocess output in go.

To make debugging easier, I would advise you to have your python script also write its output in a log file.


The first three issues I see are :

  • you set a StderrPipe on your python process, but it is never used, so STDERR is completely silenced

Try to not redirect stderr (you should see stderr being printed on the console), or at least add an extra goroutine to drain and print its content somewhere (on your go process stderr, in a log file ...)

e.g :

go func() {
   io.Copy(os.Stderr, worker.StdErrReader)
}()
  • to read the child process output as lines of text, you repeatedly create a new bufio.Scanner over the Stdout pipe

When you run bufio.NewScanner(...), a new buffered reader with a new buffer is created. If you discard it and create new Scanner, the previous buffer gets discarded, and you don't know how many bytes have been read from the underlying io.Reader (some may have been buffered ...).

At the very least, you should instanciate your bufio.NewScanner() only once (in processWorkerPool()), and repeatedly call scanner.Scan() on that single *bufio.Scanner instance, so that the same buffer gets used.

  • you should somehow monitor the state of the running process

Keep a way to access commmand.ProcessState, and check whether your external command has completed.

huangapple
  • 本文由 发表于 2022年6月15日 12:52:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/72626068.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定