Kubernetes client-go使用Informers来监视部署。

huangapple go评论86阅读模式
英文:

Kubernetes client-go use Informers to watch deployments

问题

我正在尝试使用client-go informers来获取部署的副本数量。每当自动缩放更改副本数量时,我需要检索这个数量以处理其他逻辑。我之前使用Watch()函数,但是存在一些超时和连接断开的问题。

下面的代码示例展示了实现的示例:

labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
    opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()

// 使用下面的通道和goroutine没有显示更改:
stopper := make(chan struct{})
defer close(stopper)
//go func() {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        mObj, ok := obj.(*appsv1.Deployment)
        if !ok {
            panic(spew.Sdump("informer returned invalid type", mObj))
        }

        replicas := int(*mObj.Spec.Replicas)
        logger.Infof("updating replicas to %d", replicas)

        sendUpdates() // 在其他地方使用更新
    },


    UpdateFunc: func(oldObj, newObj interface{}) {
        old, ok := oldObj.(*appsv1.Deployment)
        if !ok {
            panic(spew.Sdump("informer returned invalid type", old))
        }
        newDeployment, ok := newObj.(*appsv1.Deployment)
        if !ok {
            panic(spew.Sdump("informer returned invalid type", newDeployment))
        }
        oldReplicas := int(*old.Spec.Replicas)
        newReplicas := int(*newDeployment.Spec.Replicas)
        if oldReplicas != newReplicas {
            sendUpdates()
        }
    },
})

//factory.Start(wait.NeverStop)
//factory.WaitForCacheSync(wait.NeverStop)
informer.Run(stopper)

当Kubernetes自动缩放或手动更改部署的副本时,我会收到deployment.apps/app scaled的消息,但它不会被Informer捕获。日志中没有打印任何内容,并且进入了一个没有错误消息的崩溃循环。

我使用了以下资源:

英文:

I'm trying to use client-go informers to get the replica count on deployments. Whenever autoscaling changes the number of replicas, I need to retrieve this in order to handle some other logic. I was previously using the Watch() function, but there are a few inconsistencies with timeouts and connection drops.

The following code below shows an example of the implementation:

labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()
// Using the channels and goroutines below didn't show changes:
stopper := make(chan struct{})
defer close(stopper)
//go func() {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj, ok := obj.(*appsv1.Deployment)
if !ok {
panic(spew.Sdump("informer returned invalid type", mObj))
}
replicas := int(*mObj.Spec.Replicas)
logger.Infof("updating replicas to %d", replicas)
sendUpdates() // use updates elsewhere
},
UpdateFunc: func(oldObj, newObj interface{}) {
old, ok := oldObj.(*appsv1.Deployment)
if !ok {
panic(spew.Sdump("informer returned invalid type", old))
}
newDeployment, ok := newObj.(*appsv1.Deployment)
if !ok {
panic(spew.Sdump("informer returned invalid type", newDeployment))
}
oldReplicas := int(*old.Spec.Replicas)
newReplicas := int(*newDeployment.Spec.Replicas)
if oldReplicas != newReplicas {
sendUpdates()
}
},
})
//factory.Start(wait.NeverStop)
//factory.WaitForCacheSync(wait.NeverStop)
informer.Run(stopper)

When Kubernetes autoscales or I change the Deployments replica manually, I get deployment.apps/app scaled but it doesn't get caught by the Informer. Nothing gets printed in the logs and it enters a crash loop with no error message.

I used the following resources:

答案1

得分: 2

一些需要注意的事项:

  • 在调用 informerFactory.Start() 之前,请确保直接调用 Informer(informer := factory.Apps().V1().Deployments().Informer()),否则 Start() 不会启动任何内容。
  • 使用 goroutine 来启动 SharedInformerFactory 是没有意义的,因为 informerFactory.Start() 内部已经使用了一个。
  • 这也会导致 informerFactory.WaitForCacheSync() 方法无法正常工作,从而获取到错误的已启动 informer 的数据。
labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
    opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        mObj, ok := obj.(*appsv1.Deployment)
        if !ok {
            doSomething()
        }
        replicas := int(*mObj.Spec.Replicas)
        doSomething() 
    },

    
    UpdateFunc: func(oldObj, newObj interface{}) {
        old, ok := oldObj.(*appsv1.Deployment)
        if !ok {
            doSomething()
        }
        newDeployment, ok := newObj.(*appsv1.Deployment)
        if !ok {
            doSomething()
        }
        oldReplicas := int(*old.Spec.Replicas)
        newReplicas := int(*newDeployment.Spec.Replicas)
        if oldReplicas != newReplicas {
            doSomething()
        }
    },
})

// 初始化所有活动的 informer 并启动内部的 goroutine
factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
英文:

A few things to note:

  • Before calling informerFactory.Start(), ensure that the Informer is called directly(informer := factory.Apps().V1().Deployments().Informer()) or Start() wont start anything.
  • Using a goroutine to start the SharedInformerFactory is meaningless because the informerFactory.Start() uses one internally.
  • It will also cease the informerFactory.WaitForCacheSync() method from working resulting in it getting the wrong data for started informers.
labelOptions := informers.WithTweakListOptions(func(opts *v1.ListOptions) {
opts.FieldSelector = "metadata.name=" + name
})
factory := informers.NewSharedInformerFactoryWithOptions(clientSet, 2*time.Second, informers.WithNamespace(namespace), labelOptions)
informer := factory.Apps().V1().Deployments().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj, ok := obj.(*appsv1.Deployment)
if !ok {
doSomething()
}
replicas := int(*mObj.Spec.Replicas)
doSomething() 
},
UpdateFunc: func(oldObj, newObj interface{}) {
old, ok := oldObj.(*appsv1.Deployment)
if !ok {
doSomething()
}
newDeployment, ok := newObj.(*appsv1.Deployment)
if !ok {
doSomething()
}
oldReplicas := int(*old.Spec.Replicas)
newReplicas := int(*newDeployment.Spec.Replicas)
if oldReplicas != newReplicas {
doSomething()
}
},
})
// Initializes all active informers and starts the internal goroutine
factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)

huangapple
  • 本文由 发表于 2022年6月14日 04:43:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/72608810.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定