英文:
Broken Pipe Error when writing to stdin of a python application from a go application
问题
我已经实现了一个工作池,用于将我的作业提交给Python脚本。
func NewWorker(index int, workerConfig config.AppConfig, logger log.PrefixedLogger) error {
var worker entities.Worker
worker.ID = index
worker.Config = workerConfig
strCommand := workerConfig.RideModelScriptPath
command := exec.Command(strCommand)
stdIn, err := command.StdinPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stdin pipe from command", err.Error())
return err
}
worker.Stdin = stdIn
stdout, err := command.StdoutPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stdout pipe from command", err.Error())
return err
}
worker.StdOutReader = bufio.NewReaderSize(stdout, workerConfig.MaxRequestSize)
stderr, err := command.StderrPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stderror pipe from command", err.Error())
return err
}
worker.StdError = stderr
err = command.Start()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error starting command", err.Error())
return err
}
go processWorkerPool(&worker, ReqChan, logger)
return err
}
当共享通道接收到作业时,它会被消耗并发送到Python脚本。
func processWorkerPool(worker *entities.Worker, ReqChannel chan entities.ReqMessage, logger log.PrefixedLogger) {
for request := range ReqChannel {
bufferLatency.Observe(float64(time.Since(request.SentTime).Nanoseconds()/1e6), map[string]string{"name": "buffer", "error": "false"})
logger.Info("worker.processWorkerPool", request.Request)
startTime := time.Now()
//Send Request to Worker
_, err := io.WriteString(worker.Stdin, request.Request)
if err != nil {
scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "true"})
log.ErrorContext(context.Background(), log.WithPrefix("worker.processWorkerPool", err))
return
}
//Get response from Worker
result := CopyOutput(logger, worker.StdOutReader)
scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "false"})
request.ResponseChannel <- result
}
}
为了从Python脚本的stdout中读取结果,我使用了以下辅助函数。
func CopyOutput(logger log.PrefixedLogger, r io.Reader) string {
scanner := bufio.NewScanner(r)
result := ""
for scanner.Scan() {
output := scanner.Text()
switch {
case strings.Contains(output, "ERROR"):
errorMsg := strings.SplitAfter(output, "ERROR: ")[1]
err := errors.New(errorMsg)
logger.Error("worker.CopyOutput", "ERROR LOG: ", err.Error())
return err.Error()
case strings.Contains(output, "OUTPUT"):
result = strings.SplitAfter(output, "OUTPUT: ")[1]
logger.Debug("worker.copyOutput", "OUTPUT LOG: ", result)
return result
default:
logger.Debug("worker.copyOutput", "DEBUG LOG: ", output)
}
}
return result
}
在Python端,我的脚本如下所示。
#!/usr/bin/python3
import sys
import json
from threading import Thread
from vrpsolver.ride_model import rides_model
from preprocessor.config_loader import Config
# Load Configs
configs = Config('/opt/pool-processor/configs/configs.yaml')
while True:
# input = json.loads(sys.argv[1])
# model = sys.argv[2]
# file = sys.argv[3]
threads = []
try:
inputDataStream = sys.stdin.readline()
inputDataStream = inputDataStream.strip()
data = inputDataStream.split(' ')
model = data[1]
except (Exception) as ex:
sys.stdout.write('ERROR: Error Occured while reading stdin: {}\n'.format(str(ex)))
sys.stdout.flush()
continue
try:
input = json.loads(data[0])
except (Exception, IOError) as ex:
sys.stdout.write('ERROR: Error Occured while parsing data to json: {}\n'.format(str(ex)))
continue
try:
result = rides_model(input, configs)
sys.stdout.write('OUTPUT: {}\n'.format(json.dumps(result)))
sys.stdout.flush()
except (Exception, IOError) as ex:
sys.stdout.write('ERROR: Error Occured while processing: {}\n'.format(str(ex)))
sys.stdout.flush()
continue
当我运行程序一段时间后,我得到以下错误信息。
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:76
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:83
这些错误来自以下代码行。
_, err := io.WriteString(worker.Stdin, request.Request)
result := CopyOutput(logger, worker.StdOutReader)
我已经陷入了一段时间,对此的任何输入都将不胜感激。我猜测在一段时间后,Python脚本崩溃了,因此我得到了这个错误。我不确定为什么这个崩溃错误没有被异常捕获到。
英文:
I have implemented a worker pool to submit my jobs to the python script.
func NewWorker(index int, workerConfig config.AppConfig, logger log.PrefixedLogger) error {
var worker entities.Worker
worker.ID = index
worker.Config = workerConfig
strCommand := workerConfig.RideModelScriptPath
command := exec.Command(strCommand)
stdIn, err := command.StdinPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stdin pipe from command", err.Error())
return err
}
worker.Stdin = stdIn
stdout, err := command.StdoutPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stdout pipe from command", err.Error())
return err
}
worker.StdOutReader = bufio.NewReaderSize(stdout, workerConfig.MaxRequestSize)
stderr, err := command.StderrPipe()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error resolving stderror pipe from command", err.Error())
return err
}
worker.StdError = stderr
err = command.Start()
if err != nil {
logger.Error("worker_pool.InitWorkerPool", "Error starting command", err.Error())
return err
}
go processWorkerPool(&worker, ReqChan, logger)
return err
}
When the shared channel receives jobs it is consumed and sent to the python script.
func processWorkerPool(worker *entities.Worker, ReqChannel chan entities.ReqMessage, logger log.PrefixedLogger) {
for request := range ReqChannel {
bufferLatency.Observe(float64(time.Since(request.SentTime).Nanoseconds()/1e6), map[string]string{"name": "buffer", "error": "false"})
logger.Info("worker.processWorkerPool", request.Request)
startTime := time.Now()
//Send Request to Worker
_, err := io.WriteString(worker.Stdin, request.Request)
if err != nil {
scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "true"})
log.ErrorContext(context.Background(), log.WithPrefix("worker.processWorkerPool", err))
return
}
//Get response from Worker
result := CopyOutput(logger, worker.StdOutReader)
scriptLatency.Observe(float64(time.Since(startTime).Nanoseconds()/1e6), map[string]string{"name": "script", "error": "false"})
request.ResponseChannel <- result
}
}
To read the results from stdout of the python script I use the following helper function
func CopyOutput(logger log.PrefixedLogger, r io.Reader) string {
scanner := bufio.NewScanner(r)
result := ""
for scanner.Scan() {
output := scanner.Text()
switch {
case strings.Contains(output, "ERROR"):
errorMsg := strings.SplitAfter(output, "ERROR: ")[1]
err := errors.New(errorMsg)
logger.Error("worker.CopyOutput", "ERROR LOG: ", err.Error())
return err.Error()
case strings.Contains(output, "OUTPUT"):
result = strings.SplitAfter(output, "OUTPUT: ")[1]
logger.Debug("worker.copyOutput", "OUTPUT LOG: ", result)
return result
default:
logger.Debug("worker.copyOutput", "DEBUG LOG: ", output)
}
}
return result
}
On the python end my script look like this
#!/usr/bin/python3
import sys
import json
from threading import Thread
from vrpsolver.ride_model import rides_model
from preprocessor.config_loader import Config
# Load Configs
configs = Config('/opt/pool-processor/configs/configs.yaml')
while True:
# input = json.loads(sys.argv[1])
# model = sys.argv[2]
# file = sys.argv[3]
threads = []
try:
inputDataStream = sys.stdin.readline()
inputDataStream = inputDataStream.strip()
data = inputDataStream.split(' ')
model = data[1]
except (Exception) as ex:
sys.stdout.write('ERROR: Error Occured while reading stdin: {}\n'.format(str(ex)))
sys.stdout.flush()
continue
try:
input = json.loads(data[0])
except (Exception, IOError) as ex:
sys.stdout.write('ERROR: Error Occured while parsing data to json: {}\n'.format(str(ex)))
continue
try:
result = rides_model(input, configs)
sys.stdout.write('OUTPUT: {}\n'.format(json.dumps(result)))
sys.stdout.flush()
except (Exception, IOError) as ex:
sys.stdout.write('ERROR: Error Occured while processing: {}\n'.format(str(ex)))
sys.stdout.flush()
continue
When I run the programme after some time I'm getting
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:76
write |1: broken pipe on /build/pool-engine/worker_pool/worker.go:83
from the following lines
_, err := io.WriteString(worker.Stdin, request.Request)
result := CopyOutput(logger, worker.StdOutReader)
I am stuck on this for a while now and any input on this is appreciated. My guess is that after some time the python script is crashing and as a result I am getting this error. I am not sure why that crashing error is not catched from the exception.
答案1
得分: 1
这个错误的基本答案是:由于某种原因,你的Python进程关闭了其STDIN(可能是由于某种原因退出),请检查为什么它过早退出。
以下是你很难看到你的Python进程在做什么的一些原因:
- 它的主要“活动日志”在
sys.stdout
上, stdout
被你的Go程序捕获和处理(副作用是:它不会打印到控制台),- 你在处理Go中的子进程输出时存在一些问题。
为了更容易进行调试,我建议你的Python脚本也将其输出写入日志文件中。
我看到的前三个问题是:
- 你在Python进程上设置了一个StderrPipe,但从未使用过,因此STDERR完全被静音了。
尝试不要重定向stderr(你应该在控制台上看到stderr被打印出来),或者至少添加一个额外的goroutine来排空并将其内容打印到某个地方(在你的Go进程stderr中,或者在日志文件中...)
例如:
go func() {
io.Copy(os.Stderr, worker.StdErrReader)
}()
- 为了将子进程输出作为文本行读取,你反复在Stdout管道上创建一个新的
bufio.Scanner
当你运行bufio.NewScanner(...)
时,会创建一个新的带有新的缓冲区的缓冲读取器。如果你丢弃它并创建新的Scanner
,之前的缓冲区将被丢弃,你不知道有多少字节已经从底层的io.Reader
中读取(有些可能已经被缓冲...)。
至少,你应该只在processWorkerPool()
中实例化一次bufio.NewScanner()
,并重复调用scanner.Scan()
在那个单独的*bufio.Scanner
实例上,这样就可以使用相同的缓冲区。
- 你应该以某种方式监视运行进程的状态
保留一种访问command.ProcessState
的方式,并检查你的外部命令是否已完成。
英文:
The basic answer on this error is : for some reason, your python process has closed its STDIN (it probably has exited for some reason), check why it exits too early.
Some elements on why it's hard for you to see what your python process does :
- its main "actitivity log" is on
sys.stdout
, stdout
is caught and processed by your go program (and a side effect is: it does not get printed to the console),- there are some issues in how you handle the subprocess output in go.
To make debugging easier, I would advise you to have your python script also write its output in a log file.
The first three issues I see are :
- you set a StderrPipe on your python process, but it is never used, so STDERR is completely silenced
Try to not redirect stderr (you should see stderr being printed on the console), or at least add an extra goroutine to drain and print its content somewhere (on your go process stderr, in a log file ...)
e.g :
go func() {
io.Copy(os.Stderr, worker.StdErrReader)
}()
- to read the child process output as lines of text, you repeatedly create a new
bufio.Scanner
over the Stdout pipe
When you run bufio.NewScanner(...)
, a new buffered reader with a new buffer is created. If you discard it and create new Scanner
, the previous buffer gets discarded, and you don't know how many bytes have been read from the underlying io.Reader
(some may have been buffered ...).
At the very least, you should instanciate your bufio.NewScanner()
only once (in processWorkerPool()
), and repeatedly call scanner.Scan()
on that single *bufio.Scanner
instance, so that the same buffer gets used.
- you should somehow monitor the state of the running process
Keep a way to access commmand.ProcessState
, and check whether your external command has completed.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论