Client-Go,如何在Kubernetes中监视新创建的Pods

huangapple go评论73阅读模式
英文:

Client-Go, How to watch for newly created Pods in Kubernetes

问题

我需要使用client-go编写一个golang应用程序,它将监听/观察特定命名空间中的以下任何事件:

  • 创建了一个新的 Pod
  • 删除了一个 Pod
  • 向现有 Pod 添加了一个新的容器
  • 任何 Pod 的容器镜像发生了变化

我希望将这些信息传递给在其他命名空间中运行的另一个应用程序。

我对client-go库非常陌生,我查阅了他们的文档,但没有找到类似于Kopf 中的事件的内容。

我对这个库很陌生,我找不到实现这个功能的方法/函数。我不需要完整的代码,但我希望知道在哪里可以找到相关信息,这样我就能找到解决方法。

有人可以帮助我吗?

英文:

I need to write a golang application with the help of client-go which will listen/watch a particular namespace for any of these events:

  • A new pod has been created
  • A pod has been deleted
  • A new container has been added to existing pods
  • Image for container for any pod has changed

And I want to communicate this information to another application application running in other namespace.

I am really new to the client-go library and I searched their documentation but couldn't find something similar to Events in Kopf

I am new to this library and I couldn't find a method/function of doing this. I don't need to have the full code of doing this, but I appreciate where I can look into, so I can find my way out

Can someone help me on this?

答案1

得分: 2

你可以通过解析kubeconfig文件来创建一个clientset,然后使用这个clientset来为你的特定命名空间创建一个sharedInformerfactory。获取你的pod的informer并添加事件处理函数。根据你的需求实现这些函数。你可以在OnUpdate函数中检查oldPod和newPod之间的容器更新。使用clientset与其他应用程序进行通信。我建议你探索clientset实现的方法,以了解其工作原理的详细信息。

package main

import (
	"flag"
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/klog/v2"
	"path/filepath"
	"time"
)

func main() {
	// 解析.kubeconfig文件
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) kubeconfig文件的绝对路径")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig文件的绝对路径")
	}
	flag.Parse()

	// 从kubeconfig创建配置
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	if err != nil {
		panic(err)
	}

	// 创建clientset
	clientSet, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	// informer的停止信号
	stopper := make(chan struct{})
	defer close(stopper)

	// 使用重新同步周期和命名空间为所有已知API组版本中的资源创建共享informer
	factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 10*time.Second, informers.WithNamespace("demo"))
	podInformer := factory.Core().V1().Pods().Informer()

	defer runtime.HandleCrash()

	// 启动informer
	go factory.Start(stopper)

	// 开始同步和调用列表
	if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
		runtime.HandleError(fmt.Errorf("等待缓存同步超时"))
		return
	}

	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    onAdd, // 注册add事件处理函数
		UpdateFunc: onUpdate,
		DeleteFunc: onDelete,
	})

	// 阻塞主go例程以防止退出
	<-stopper
}

func onAdd(obj interface{}) {
	pod := obj.(*corev1.Pod)
	klog.Infof("POD CREATED: %s/%s", pod.Namespace, pod.Name)
}

func onUpdate(oldObj interface{}, newObj interface{}) {
	oldPod := oldObj.(*corev1.Pod)
	newPod := newObj.(*corev1.Pod)
	klog.Infof(
		"POD UPDATED. %s/%s %s",
		oldPod.Namespace, oldPod.Name, newPod.Status.Phase,
	)
}

func onDelete(obj interface{}) {
	pod := obj.(*corev1.Pod)
	klog.Infof("POD DELETED: %s/%s", pod.Namespace, pod.Name)
}
英文:

You can create a clientset from parsing the kubeconfig file and then use this clientset to create a sharedInformerfactory for your particular namespace. Get a informer for your pods and add Event Handler functions. Implement those functions according to your requirement. You can check for container updates between oldPod and newPod in the OnUpdate function. Use the clientset for however you want to communicate with other applications. I would say explore the methods that clientset implements to get a detailed idea how it works.

package main
import (
&quot;flag&quot;
&quot;fmt&quot;
corev1 &quot;k8s.io/api/core/v1&quot;
&quot;k8s.io/apimachinery/pkg/util/runtime&quot;
&quot;k8s.io/client-go/informers&quot;
&quot;k8s.io/client-go/kubernetes&quot;
&quot;k8s.io/client-go/tools/cache&quot;
&quot;k8s.io/client-go/tools/clientcmd&quot;
&quot;k8s.io/client-go/util/homedir&quot;
&quot;k8s.io/klog/v2&quot;
&quot;path/filepath&quot;
&quot;time&quot;
)
func main() {
// parse the .kubeconfig file
var kubeconfig *string
if home := homedir.HomeDir(); home != &quot;&quot; {
kubeconfig = flag.String(&quot;kubeconfig&quot;, filepath.Join(home, &quot;.kube&quot;, &quot;config&quot;), &quot;(optional) absolute path to the kubeconfig file&quot;)
} else {
kubeconfig = flag.String(&quot;kubeconfig&quot;, &quot;&quot;, &quot;absolute path to the kubeconfig file&quot;)
}
flag.Parse()
// create config from the kubeconfig
config, err := clientcmd.BuildConfigFromFlags(&quot;&quot;, *kubeconfig)
if err != nil {
panic(err)
}
// create the clientset
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)
// create shared informers for resources in all known API group versions with a reSync period and namespace
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 10*time.Second, informers.WithNamespace(&quot;demo&quot;))
podInformer := factory.Core().V1().Pods().Informer()
defer runtime.HandleCrash()
// start informer -&gt;
go factory.Start(stopper)
// start to sync and call list
if !cache.WaitForCacheSync(stopper, podInformer.HasSynced) {
runtime.HandleError(fmt.Errorf(&quot;Timed out waiting for caches to sync&quot;))
return
}
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    onAdd, // register add eventhandler
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
// block the main go routine from exiting
&lt;-stopper
}
func onAdd(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.Infof(&quot;POD CREATED: %s/%s&quot;, pod.Namespace, pod.Name)
}
func onUpdate(oldObj interface{}, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
klog.Infof(
&quot;POD UPDATED. %s/%s %s&quot;,
oldPod.Namespace, oldPod.Name, newPod.Status.Phase,
)
}
func onDelete(obj interface{}) {
pod := obj.(*corev1.Pod)
klog.Infof(&quot;POD DELETED: %s/%s&quot;, pod.Namespace, pod.Name)
}

答案2

得分: 0

你可以使用类似kubernetes-event-exporterkube-eventer的工具,并使用不同的目标发送消息。

英文:

You could use something like kubernetes-event-exporter or kube-eventer and send messages with different sinks.

huangapple
  • 本文由 发表于 2022年11月18日 00:40:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/74479105.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定