英文:
Stream output from remote kubernetes command with golang client
问题
我正在使用golang的Kubernetes客户端来创建Kubernetes Pod并在其中执行远程命令。然而,我发现在远程执行完成之前无法获取有关远程执行状态的反馈,因为我无法弄清楚如何流式传输远程命令的日志。以下是我当前执行远程命令的实现代码:
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var stdout, stderr bytes.Buffer
var streamErr error
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return stderr.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return stdout.String(), 0, nil
}
在这个实现中,我无法在远程命令执行完成之前了解其状态,此时我会一次性获取所有输出日志。
有没有办法在调用exec.Stream
时读取stdout
/stderr
的输出?在理想的情况下,我希望能够逐行打印远程命令的输出。我注意到bytes.Buffer
有一个ReadString
方法,可以接受一个分隔符。这看起来是一个有用的方法,但我还没有弄清楚如何使用它。
英文:
I'm using the golang kubernetes client to create kubernetes pods and execute remote commands in them. However, I'm finding that I can't get feedback on the status of the remote execution until it's finished because I can't figure out how to stream the logs of the remote command. Here's my current implementation for executing a remote command:
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var stdout, stderr bytes.Buffer
var streamErr error
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return stderr.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return stdout.String(), 0, nil
}
In this implementation I don't get to know the state of the remote command until it's finished executing at which point I get all of the output logs at once.
Is there a way to read stdout
/stderr
while they're being written by the call to exec.Stream
? In an ideal world I'd like to be able to print the output of the remote command line by line. I noticed that bytes.Buffer
has a ReadString
method which accepts a delimiter. That looks like a useful method but I haven't been able to figure out how to use it.
答案1
得分: 3
这只是一个部分回答,但如果我使用以下的PodExecOptions
和StreamOptions
,我会实时看到每一行日志被打印出来(注意Tty
是true
,我使用的是标准输入和标准输出,而不是自定义缓冲区):
v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}
和
remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: nil,
Tty: true,
}
然而,如果我尝试使用除了os.Stdin
和os.Stdout
之外的其他内容,我就无法得到任何日志行。例如,以下用法不会打印任何内容:
var stdout, stdin bytes.Buffer
var streamErr error
go func() {
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
Stderr: nil,
Tty: true,
})
}()
time.Sleep(5*time.Second)
log.Info("doing raw string calls on both buffers")
log.Info(stdin.String())
log.Info(stdout.String())
log.Info("starting scan of stdin")
scanner := bufio.NewScanner(&stdin)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("starting scan of stdout")
scanner = bufio.NewScanner(&stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("finished scanning of stdout")
我仍在努力弄清楚如何使用自定义缓冲区,以便我可以管理写入日志的内容,而不是直接将其传输到标准输出(我想要附加一些自定义字段到每一行被记录的日志中)。
编辑:好的,我找到了一个可行的解决方案。以下是完整的代码:
type LogStreamer struct{
b bytes.Buffer
}
func (l *LogStreamer) String() string {
return l.b.String()
}
func (l *LogStreamer) Write(p []byte) (n int, err error) {
a := strings.TrimSpace(string(p))
l.b.WriteString(a)
log.Info(a)
return len(p), nil
}
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var streamErr error
l := &LogStreamer{}
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: l,
Stderr: nil,
Tty: true,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return l.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return l.String(), 0, nil
}
我创建了一个实现了io.Writer
接口的结构体,并在StreamOptions
结构体中使用它。还要注意,我必须在StreamOptions
结构体中使用os.Stdin
,否则只会将单行流式传输到Stdout
。
还要注意,我必须修剪传递给LogStreamer.Write
的缓冲区,因为似乎回车符或换行符会导致logrus包出现问题。这个解决方案还需要进一步完善,但是它确实朝着正确的方向发展。
英文:
This is only a partial answer but if I set use the following PodExecOptions
and StreamOptions
then I see each log line get printed in real time (note that Tty
is true
and I'm using stdin and stdout, not custom buffers):
v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}
and
remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: nil,
Tty: true,
}
However, if I try to use something other than os.Stdin
and os.Stdout
then I never get any log lines. For example, the following usage doesn't print anything:
var stdout, stdin bytes.Buffer
var streamErr error
go func() {
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
Stderr: nil,
Tty: true,
})
}()
time.Sleep(5*time.Second)
log.Info("doing raw string calls on both buffers")
log.Info(stdin.String())
log.Info(stdout.String())
log.Info("starting scan of stdin")
scanner := bufio.NewScanner(&stdin)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("starting scan of stdout")
scanner = bufio.NewScanner(&stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("finished scanning of stdout")
I'm still trying to figure out how to use custom buffers so I can manage what's written to my logs instead of piping directly to stdout (I want to attach some custom fields to each line that gets logged).
EDIT: alright, I figured out a solution that works. Here's the full code
type LogStreamer struct{
b bytes.Buffer
}
func (l *LogStreamer) String() string {
return l.b.String()
}
func (l *LogStreamer) Write(p []byte) (n int, err error) {
a := strings.TrimSpace(string(p))
l.b.WriteString(a)
log.Info(a)
return len(p), nil
}
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var streamErr error
l := &LogStreamer{}
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: l,
Stderr: nil,
Tty: true,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return l.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return l.String(), 0, nil
}
I created a struct which implements the io.Writer
interface and use that in the StreamOptions
struct. Also note that I had to use os.Stdin
in the StreamOptions
struct or else only a single line would be streamed back for Stdout
.
Also note that I had to trim the buffer passed to LogStreamer.Write
because it seems that carriage returns or newlines cause problems with the logrus package. There's still more polish to add to this solution but it's definitely headed in the right direction.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论