如何使用Golang仅在一段时间内并行收集Kubernetes(k8s)的Pod日志

huangapple go评论79阅读模式
英文:

How to collect k8s pods logs in parallel using golang only for a duration of time

问题

我是你的中文翻译助手,以下是翻译好的内容:

我是golang的新手,我有一个任务是收集应用程序日志,该应用程序作为部署在k8s集群中运行,总共有4个pod。

作为测试自动化的一部分,我需要在执行一些操作并将日志写入文件的同时,以并行方式收集应用程序日志(仅在我的测试操作期间)。然后进行下一个操作并执行相同的操作。

最后,我逐个迭代日志文件,并根据我的操作筛选出特定关键字,并验证日志是否正确。


我考虑直接使用kubectl命令获取pod日志,而不是使用go-sdk,因为我在尝试多次后仍然无法解决丢失的日志条目。

kubectl logs -f -l app=my-app -n dev > /usr/mylogs.txt

我找到了一种使用exec.Command运行此命令的方法

command := exec.Command("/bin/sh", "-c", "kubectl logs -f -l app=my-app -n dev > /usr/mylogs.txt")
err := command.Start()

现在我需要在golang中实现这个功能

func myTest(t *testing.T){
  go collectApplicationLogs("test1")
  
  // 执行应用程序操作
  // 运行测试
  
  stopLogsCollection ()   -------> 如何实现这一点?
}

func collectApplicationLogs(fileName string){
   // 收集日志到文件的命令
  // kubectl logs -f -l app=my-app -n dev > /usr/{fileName}
}
英文:

I am new to golang, I have a task to collect the application logs, application is running as deployment in k8s cluster, there are 4 pods in total.
As part of test automation, I need to collect the application logs (only for the duration of my test operation) in parallel while I perform some operations on the application and write the logs to a file, and move to the next operation and do the same.

Finally I iterate through the log files one-by-one and filter for certain keywords corresponding to my operation and validate the logs are correct/not.


I am thinking to get the pod logs using kubectl command directly, instead of using the go-sdk as I am facing missing log entries which I couldn't triage with many attempts.

kubectl logs -f -l app=my-app -n dev > /usr/mylogs.txt

I found a way to run this command using exec.Command

command := exec.Command("/bin/sh", "-c", "kubectl logs -f -l app=my-app -n dev > /usr/mylogs.txt")
	err := command.Start()

Now I need to do this in golang,

func myTest(t *testing.T){
  go collectApplicationLogs("test1")
  
  // do application operation 
   // run test
  
  stopLogsCollection ()   -------> how to achieve this?
}

func collectApplicationLogs(fileName string){
   // command to collect the logs to a file
  // kubectl logs -f -l app=my-app -n dev > /usr/{fileName}
}

答案1

得分: 4

你可以使用Kubernetes go-client从Pod中获取日志。首先从Kubernetes配置中创建clientset。你可以使用InclusterConfig或Out of cluster config。这里我使用的是Out of cluster config。

const (
	// 设置命名空间和标签
	namespace = "dev"
	label     = "app=my-app"
)

func main() {
	// 解析.kubeconfig文件
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()

	// 使用当前上下文的kubeconfig
	ctx := context.TODO()
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		log.Println(err, "Failed to build config from flags")
		return
	}

	err = collectApplicationLogs(ctx, config, "/usr/mylogs.txt")
	if err != nil {
		log.Println(err, "Failed to collect logs")
	}

}

要收集日志,首先需要列出Pods。然后并发地从每个Pod获取日志,并将其追加到文件中。

func collectApplicationLogs(ctx context.Context, config *rest.Config, filename string) error {
	// 创建clientset
	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		log.Println("Failed to create clientset from the given config")
		return err
	}
	// 获取Pods作为ListItems
	pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
		LabelSelector: label,
	})
	if err != nil {
		log.Println(err, "Failed to get pods")
		return err
	}
	// 如果文件不存在,则创建文件或追加到文件
	file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
	if err != nil {
		return err
	}
	defer file.Close()
	// 首先获取Pod列表
	// 然后从每个Pod获取PodLogs
	// 并发地写入文件
	// 使用通道进行阻塞
	ch := make(chan bool)
	podItems := pods.Items
	for i := 0; i < len(podItems); i++ {
		podLogs, err := clientSet.CoreV1().Pods(namespace).GetLogs(podItems[i].Name, &v1.PodLogOptions{
			Follow: true,
		}).Stream(ctx)
		if err != nil {
			return err
		}
		buffer := bufio.NewReader(podLogs)
		go writeLogs(buffer, file, ch)
	}
	<-ch
	return nil
}

func writeLogs(buffer *bufio.Reader, file *os.File, ch chan bool) {
	defer func() {
		ch <- true
	}()
	for {
		str, readErr := buffer.ReadString('\n')
		if readErr == io.EOF {
			break
		}
		_, err := file.Write([]byte(str))
		if err != nil {
			return
		}
	}
}
英文:

You can use Kubernetes go-client to get logs from the pods. At first create the clientset from kubenetes config. You can use InclusterConfig or Out of cluster config. I have used out of cluster config here.

const (
// set namespace and label
namespace = &quot;dev&quot;
label     = &quot;app=my-app&quot;
)
func main() {
// parse the .kubeconfig file
var kubeconfig *string
if home := homedir.HomeDir(); home != &quot;&quot; {
kubeconfig = flag.String(&quot;kubeconfig&quot;, filepath.Join(home, &quot;.kube&quot;, &quot;config&quot;), &quot;(optional) absolute path to the kubeconfig file&quot;)
} else {
kubeconfig = flag.String(&quot;kubeconfig&quot;, &quot;&quot;, &quot;absolute path to the kubeconfig file&quot;)
}
flag.Parse()
// use the current context in kubeconfig
ctx := context.TODO()
config, err := clientcmd.BuildConfigFromFlags(&quot;&quot;, *kubeconfig)
if err != nil {
log.Println(err, &quot;Failed to build config from flags&quot;)
return
}
err = collectApplicationLogs(ctx, config, &quot;/usr/mylogs.txt&quot;)
if err != nil {
log.Println(err, &quot;Failed to collect logs&quot;)
}
}

To collect logs, you need to list the gods first. Then get logs from each of those pods and append them to the file concurrently.

func collectApplicationLogs(ctx context.Context, config *rest.Config, filename string) error {
// create the clientset
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Println(&quot;Failed to create clientset from the given config&quot;)
return err
}
// get the pods as ListItems
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: label,
})
if err != nil {
log.Println(err, &quot;Failed to get pods&quot;)
return err
}
// If the file doesn&#39;t exist, create it or append to the file
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return err
}
defer file.Close()
// get the pod lists first
// then get the podLogs from each of the pods
// write to files concurrently
// use channel for blocking reasons
ch := make(chan bool)
podItems := pods.Items
for i := 0; i &lt; len(podItems); i++ {
podLogs, err := clientSet.CoreV1().Pods(namespace).GetLogs(podItems[i].Name, &amp;v1.PodLogOptions{
Follow: true,
}).Stream(ctx)
if err != nil {
return err
}
buffer := bufio.NewReader(podLogs)
go writeLogs(buffer, file, ch)
}
&lt;-ch
return nil
}
func writeLogs(buffer *bufio.Reader, file *os.File, ch chan bool) {
defer func() {
ch &lt;- true
}()
for {
str, readErr := buffer.ReadString(&#39;\n&#39;)
if readErr == io.EOF {
break
}
_, err := file.Write([]byte(str))
if err != nil {
return
}
}
}

huangapple
  • 本文由 发表于 2022年11月18日 13:43:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/74485370.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定