使用Golang客户端从远程Kubernetes命令中获取流输出。

huangapple go评论80阅读模式
英文:

Stream output from remote kubernetes command with golang client

问题

我正在使用golang的Kubernetes客户端来创建Kubernetes Pod并在其中执行远程命令。然而,我发现在远程执行完成之前无法获取有关远程执行状态的反馈,因为我无法弄清楚如何流式传输远程命令的日志。以下是我当前执行远程命令的实现代码:

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     false,
        Stdout:    true,
        Stderr:    true,
        TTY:       false,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var stdout, stderr bytes.Buffer
    var streamErr error
    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  nil,
        Stdout: &stdout,
        Stderr: &stderr,
        Tty:    false,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return stderr.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return stdout.String(), 0, nil
}

在这个实现中,我无法在远程命令执行完成之前了解其状态,此时我会一次性获取所有输出日志。

有没有办法在调用exec.Stream时读取stdout/stderr的输出?在理想的情况下,我希望能够逐行打印远程命令的输出。我注意到bytes.Buffer有一个ReadString方法,可以接受一个分隔符。这看起来是一个有用的方法,但我还没有弄清楚如何使用它。

英文:

I'm using the golang kubernetes client to create kubernetes pods and execute remote commands in them. However, I'm finding that I can't get feedback on the status of the remote execution until it's finished because I can't figure out how to stream the logs of the remote command. Here's my current implementation for executing a remote command:

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
	req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
	scheme := runtime.NewScheme()
	if err := v1.AddToScheme(scheme); err != nil {
		return "", 0, fmt.Errorf("could not add to scheme: %w", err)
	}
	parameterCodec := runtime.NewParameterCodec(scheme)
	req.VersionedParams(&v1.PodExecOptions{
		Stdin:     false,
		Stdout:    true,
		Stderr:    true,
		TTY:       false,
		Container: args.ContainerId,
		Command:   []string{"sh", "-c", args.Command},
	}, parameterCodec)
	exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
	if err != nil {
		return "", 0, fmt.Errorf("could not exec command: %w", err)
	}
	var stdout, stderr bytes.Buffer
	var streamErr error
	streamErr = exec.Stream(remotecommand.StreamOptions{
		Stdin:  nil,
		Stdout: &stdout,
		Stderr: &stderr,
		Tty:    false,
	})

	if streamErr != nil {
		if strings.Contains(streamErr.Error(), "command terminated with exit code") {
			return stderr.String(), 1, nil
		} else {
			return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
		}
	}
	return stdout.String(), 0, nil
}

In this implementation I don't get to know the state of the remote command until it's finished executing at which point I get all of the output logs at once.

Is there a way to read stdout/stderr while they're being written by the call to exec.Stream? In an ideal world I'd like to be able to print the output of the remote command line by line. I noticed that bytes.Buffer has a ReadString method which accepts a delimiter. That looks like a useful method but I haven't been able to figure out how to use it.

答案1

得分: 3

这只是一个部分回答,但如果我使用以下的PodExecOptionsStreamOptions,我会实时看到每一行日志被打印出来(注意Ttytrue,我使用的是标准输入和标准输出,而不是自定义缓冲区):

v1.PodExecOptions{
    Stdin:     true,
    Stdout:    true,
    Stderr:    false,
    TTY:       true,
    Container: args.ContainerId,
    Command:   []string{"sh", "-c", args.Command},
}

remotecommand.StreamOptions{
    Stdin:  os.Stdin,
    Stdout: os.Stdout,
    Stderr: nil,
    Tty:    true,
}

然而,如果我尝试使用除了os.Stdinos.Stdout之外的其他内容,我就无法得到任何日志行。例如,以下用法不会打印任何内容:

var stdout, stdin bytes.Buffer
var streamErr error

go func() {
    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  &stdin,
        Stdout: &stdout,
        Stderr: nil,
        Tty:    true,
    })
}()
time.Sleep(5*time.Second)
log.Info("doing raw string calls on both buffers")
log.Info(stdin.String())
log.Info(stdout.String())
log.Info("starting scan of stdin")
scanner := bufio.NewScanner(&stdin)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
    m := scanner.Text()
    fmt.Println(m)
}
log.Info("starting scan of stdout")
scanner = bufio.NewScanner(&stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
    m := scanner.Text()
    fmt.Println(m)
}
log.Info("finished scanning of stdout")

我仍在努力弄清楚如何使用自定义缓冲区,以便我可以管理写入日志的内容,而不是直接将其传输到标准输出(我想要附加一些自定义字段到每一行被记录的日志中)。

编辑:好的,我找到了一个可行的解决方案。以下是完整的代码:

type LogStreamer struct{
    b bytes.Buffer
}

func (l *LogStreamer) String() string {
    return l.b.String()
}

func (l *LogStreamer) Write(p []byte) (n int, err error) {
    a := strings.TrimSpace(string(p))
    l.b.WriteString(a)
    log.Info(a)
    return len(p), nil
}

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     true,
        Stdout:    true,
        Stderr:    false,
        TTY:       true,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var streamErr error
    l := &LogStreamer{}

    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  os.Stdin,
        Stdout: l,
        Stderr: nil,
        Tty:    true,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return l.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return l.String(), 0, nil
}

我创建了一个实现了io.Writer接口的结构体,并在StreamOptions结构体中使用它。还要注意,我必须StreamOptions结构体中使用os.Stdin,否则只会将单行流式传输到Stdout

还要注意,我必须修剪传递给LogStreamer.Write的缓冲区,因为似乎回车符或换行符会导致logrus包出现问题。这个解决方案还需要进一步完善,但是它确实朝着正确的方向发展。

英文:

This is only a partial answer but if I set use the following PodExecOptions and StreamOptions then I see each log line get printed in real time (note that Tty is true and I'm using stdin and stdout, not custom buffers):

v1.PodExecOptions{
		Stdin:     true,
		Stdout:    true,
		Stderr:    false,
		TTY:       true,
		Container: args.ContainerId,
		Command:   []string{"sh", "-c", args.Command},
	}

and

remotecommand.StreamOptions{
			Stdin:  os.Stdin,
			Stdout: os.Stdout,
			Stderr: nil,
			Tty:    true,
		}

However, if I try to use something other than os.Stdin and os.Stdout then I never get any log lines. For example, the following usage doesn't print anything:

	var stdout, stdin bytes.Buffer
	var streamErr error

	go func() {
		streamErr = exec.Stream(remotecommand.StreamOptions{
			Stdin:  &stdin,
			Stdout: &stdout,
			Stderr: nil,
			Tty:    true,
		})
	}()
	time.Sleep(5*time.Second)
	log.Info("doing raw string calls on both buffers")
	log.Info(stdin.String())
	log.Info(stdout.String())
	log.Info("starting scan of stdin")
	scanner := bufio.NewScanner(&stdin)
	scanner.Split(bufio.ScanLines)
	for scanner.Scan() {
		m := scanner.Text()
		fmt.Println(m)
	}
	log.Info("starting scan of stdout")
	scanner = bufio.NewScanner(&stdout)
	scanner.Split(bufio.ScanLines)
	for scanner.Scan() {
		m := scanner.Text()
		fmt.Println(m)
	}
	log.Info("finished scanning of stdout")

I'm still trying to figure out how to use custom buffers so I can manage what's written to my logs instead of piping directly to stdout (I want to attach some custom fields to each line that gets logged).


EDIT: alright, I figured out a solution that works. Here's the full code

type LogStreamer struct{
b bytes.Buffer
}
func (l *LogStreamer) String() string {
return l.b.String()
}
func (l *LogStreamer) Write(p []byte) (n int, err error) {
a := strings.TrimSpace(string(p))
l.b.WriteString(a)
log.Info(a)
return len(p), nil
}
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin:     true,
Stdout:    true,
Stderr:    false,
TTY:       true,
Container: args.ContainerId,
Command:   []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var streamErr error
l := &LogStreamer{}
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin:  os.Stdin,
Stdout: l,
Stderr: nil,
Tty:    true,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return l.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return l.String(), 0, nil
}

I created a struct which implements the io.Writer interface and use that in the StreamOptions struct. Also note that I had to use os.Stdin in the StreamOptions struct or else only a single line would be streamed back for Stdout.

Also note that I had to trim the buffer passed to LogStreamer.Write because it seems that carriage returns or newlines cause problems with the logrus package. There's still more polish to add to this solution but it's definitely headed in the right direction.

huangapple
  • 本文由 发表于 2021年6月22日 04:01:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/68074036.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定