Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
成都创新互联专注于冀州网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供冀州营销型网站建设,冀州网站制作、冀州网页设计、冀州网站官网定制、小程序设计服务,打造冀州网络公司原创品牌,更为您提供冀州网站排名全网营销落地服务。
ResoureQuota介绍
关于ResoureQuota和ResourceController的介绍和使用请参见如下官方文档。这是你理解这篇博客的基础。
https://kubernetes.io/docs/admin/resourcequota/
https://kubernetes.io/docs/admin/resourcequota/walkthrough/
https://kubernetes.io/docs/user-guide/compute-resources/
https://kubernetes.io/docs/admin/admission-controllers/
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/admission_control_resource_quota.md
ResourceQuota Controller源码目录结构分析
ResourceQuota Controller作为Kubernetes Controller Manager管理的众多Controller中的一员,其主要的源码位于目录k8s.io/kubernetes/pkg/quota
和k8s.io/kubernetes/pkg/controller/resourcequota
,具体分析如下:
k8s.io/kubernetes/pkg/quota . ├── evaluator // 负责各种资源使用的统计 │ └── core │ ├── configmap.go // ConfigMapEvaluator的实现,负责ConfigMap资源的统计 │ ├── doc.go │ ├── persistent_volume_claims.go // PVCEvaluator的实现,负责PVC资源的统计 │ ├── persistent_volume_claims_test.go │ ├── pods.go //PodEvaluator的实现,负责Pod资源的统计 │ ├── pods_test.go │ ├── registry.go // 创建Registry时注册所有的Evaluators │ ├── replication_controllers.go // RCEvaluator的实现,负责ReplicationController资源的统计 │ ├── resource_quotas.go // ResourceQuotaEvaluator的实现,负责ResourceQuota资源的统计 │ ├── secrets.go // SecretEvaluator的实现,负责Secret资源的统计 │ ├── services.go // ServiceEvaluator的实现,负责Service资源的统计 │ └── services_test.go ├── generic // genericEvaluator的定义和实现 │ ├── evaluator.go // 实现了genericEvaluator的接口,包括最重要的CalculateUsageStats接口 │ └── registry.go // 定义GenericRegistry ├── install │ └── registry.go // 定义了startResourceQuotaController时会调用创建ResourceQuota Registry的方法 ├── interfaces.go // 定义了Registry和Evaluator Interface ├── resources.go // 定义Resources的集合操作以及CalculateUsage方法 └── resources_test.go
k8s.io/kubernetes/pkg/controller/resourcequota . ├── doc.go ├── replenishment_controller.go // 定义replenishmentControllerFactory,用来创建replenishmentController ├── replenishment_controller_test.go ├── resource_quota_controller.go // 定义ResourceQuotaController及其Run方法,syncResourceQuota方法等,属于核心文件。 └── resource_quota_controller_test.go
ResourceQuota Controller内部实现原理图
请下载到本地放大查看。
具体各个模块的功能和交互请看下面的源码分析。
ResourceQuota Controller源码分析
上面的内部实现原理图显示,ResourceQuotaController是Kubenetes Controller Manager启动进行初始化众多Controllers的时候,通过调用startResourceQuotaController来完成ResourceQuotaController的启动。
###从kube-controller-manager的startResourceQuotaController开始
cmd/kube-controller-manager/app/core.go:76 func startResourceQuotaController(ctx ControllerContext) (bool, error) { resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller") resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory) // 定义ReplenishmentController需要监控的资源对象 groupKindsToReplenish := []schema.GroupKind{ api.Kind("Pod"), api.Kind("Service"), api.Kind("ReplicationController"), api.Kind("PersistentVolumeClaim"), api.Kind("Secret"), api.Kind("ConfigMap"), } ... go resourcequotacontroller.NewResourceQuotaController( resourceQuotaControllerOptions, ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop) return true, nil }
startResourceQuotaController启动一个goroutine,通过NewResourceQuotaController创建一个ResourceQuotaController并执行其Run方法开始提供ResourceQuotaController。
下面是ResourceQuotaController和ResourceQuotaControllerOptions结构体的定义。ResourceQuotaController中定义了几个关键Entity,分别是rqController、queue、missingUsageQueue、registry、replenishmentControllers,在上一节中的原理图中也能看到它们的身影。
###ResourceQuotaController定义
pkg/controller/resourcequota/resource_quota_controller.go:40 // ResourceQuotaControllerOptions holds options for creating a quota controller type ResourceQuotaControllerOptions struct { // Must have authority to list all quotas, and update quota status KubeClient clientset.Interface // Controls full recalculation of quota usage ResyncPeriod controller.ResyncPeriodFunc // Knows how to calculate usage Registry quota.Registry // Knows how to build controllers that notify replenishment events ControllerFactory ReplenishmentControllerFactory // Controls full resync of objects monitored for replenihsment. ReplenishmentResyncPeriod controller.ResyncPeriodFunc // List of GroupKind objects that should be monitored for replenishment at // a faster frequency than the quota controller recalculation interval GroupKindsToReplenish []schema.GroupKind } // ResourceQuotaController is responsible for tracking quota usage status in the system type ResourceQuotaController struct { // Must have authority to list all resources in the system, and update quota status kubeClient clientset.Interface // An index of resource quota objects by namespace rqIndexer cache.Indexer // Watches changes to all resource quota rqController *cache.Controller // ResourceQuota objects that need to be synchronized queue workqueue.RateLimitingInterface // missingUsageQueue holds objects that are missing the initial usage informatino missingUsageQueue workqueue.RateLimitingInterface // To allow injection of syncUsage for testing. syncHandler func(key string) error // function that controls full recalculation of quota usage resyncPeriod controller.ResyncPeriodFunc // knows how to calculate usage registry quota.Registry // controllers monitoring to notify for replenishment replenishmentControllers []cache.ControllerInterface }
NewRegistry
接下来,我们看看startResourceQuotaController调用的NewRegistry、NewResourceQuotaController以及ResourceQuotaController的Run方法。
pkg/quota/evaluator/core/registry.go:29 // NewRegistry returns a registry that knows how to deal with core kubernetes resources // If an informer factory is provided, evaluators will use them. func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry { pod := NewPodEvaluator(kubeClient, f) service := NewServiceEvaluator(kubeClient) replicationController := NewReplicationControllerEvaluator(kubeClient) resourceQuota := NewResourceQuotaEvaluator(kubeClient) secret := NewSecretEvaluator(kubeClient) configMap := NewConfigMapEvaluator(kubeClient) persistentVolumeClaim := NewPersistentVolumeClaimEvaluator(kubeClient, f) return &generic.GenericRegistry{ InternalEvaluators: map[schema.GroupKind]quota.Evaluator{ pod.GroupKind(): pod, service.GroupKind(): service, replicationController.GroupKind(): replicationController, secret.GroupKind(): secret, configMap.GroupKind(): configMap, resourceQuota.GroupKind(): resourceQuota, persistentVolumeClaim.GroupKind(): persistentVolumeClaim, }, } }
可见,NewRegistry负责这些资源对象(pod,service,rc,secret,configMap,resourceQuota,PVC)的的Evaluator的创建和注册,供后面Worker中执行quota.CalculateUsage(...)对这些资源对象进行使用统计。
NewResourceQuotaController
NewRegistry执行完后,开始创建ResourceQuotaController,代码如下。
pkg/controller/resourcequota/resource_quota_controller.go:78 func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController { // build the resource quota controller rq := &ResourceQuotaController{ kubeClient: options.KubeClient, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"), missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"), resyncPeriod: options.ResyncPeriod, registry: options.Registry, replenishmentControllers: []cache.ControllerInterface{}, } ... // set the synchronization handler rq.syncHandler = rq.syncResourceQuotaFromKey // build the controller that observes quota rq.rqIndexer, rq.rqController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).Watch(options) }, }, &v1.ResourceQuota{}, rq.resyncPeriod(), cache.ResourceEventHandlerFuncs{ AddFunc: rq.addQuota, UpdateFunc: func(old, cur interface{}) { oldResourceQuota := old.(*v1.ResourceQuota) curResourceQuota := cur.(*v1.ResourceQuota) if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) { return } rq.addQuota(curResourceQuota) }, DeleteFunc: rq.enqueueResourceQuota, }, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc}, ) for _, groupKindToReplenish := range options.GroupKindsToReplenish { controllerOptions := &ReplenishmentControllerOptions{ GroupKind: groupKindToReplenish, ResyncPeriod: options.ReplenishmentResyncPeriod, ReplenishmentFunc: rq.replenishQuota, } replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) if err != nil { glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err) } else { rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController) } } return rq }
NewResourceQuotaController负责创建ResourceQuotaController,包括queue, missingUsageQueue, syncHandler,rqIndexer, rqController,replenishmentControllers的Entity填充。重点关注 rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(...)
进行了rqController中注册ResourceEventHandlerFuncs:addQuota和enqueueResourceQuota。另外, replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
负责replenishmentController的创建,NewRegistry中注册了6种replenishmentSource,所以这里replenishmentControllers会添加对应的6中replenishmentController。
###ResourceQuotaController.Run
创建完ResourceQuotaController之后,就执行Run方法开始进行任务处理了。
pkg/controller/resourcequota/resource_quota_controller.go:227 // Run begins quota controller using the specified number of workers func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) { ... // 启动rqController和rq.replenishmentControllers中的6中replenishmentController,开始watch对应的ResourceQuota加入到queue和missingUsageQueue。 go rq.rqController.Run(stopCh) // the controllers that replenish other resources to respond rapidly to state changes for _, replenishmentController := range rq.replenishmentControllers { go replenishmentController.Run(stopCh) } // 启动workers数量的worker协程,分别对queue和missingUsageQueue中的Item。 for i := 0; i < workers; i++ { go wait.Until(rq.worker(rq.queue), time.Second, stopCh) go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh) } // 定期的进行全量的quotas计算。 go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh) <-stopCh glog.Infof("Shutting down ResourceQuotaController") rq.queue.ShutDown() }
Worker
接下来的主要处理都交给了workers进行处理了,默认配置是有5个worker对queue进行处理,有5个worker对missingUsageQuota进行处理。下面来看看worker是怎么对Queue中的Item进行处理的。
pkg/controller/resourcequota/resource_quota_controller.go:199 // worker runs a worker thread that just dequeues items, processes them, and marks them done. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() { workFunc := func() bool { // 从queue中获取Key key, quit := queue.Get() if quit { return true } defer queue.Done(key) // 执行NewResourceQuotaController时注册的syncHandler(流程跳转到syncResourceQuotaFromKey) err := rq.syncHandler(key.(string)) ... } return func() { for { if quit := workFunc(); quit { glog.Infof("resource quota controller worker shutting down") return } } } }
流程进入到syncResourceQuotaFromKey,下面看看它的实现:
pkg/controller/resourcequota/resource_quota_controller.go:247 // syncResourceQuotaFromKey syncs a quota key func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) { ... obj, exists, err := rq.rqIndexer.GetByKey(key) ... quota := *obj.(*v1.ResourceQuota) return rq.syncResourceQuota(quota) }
syncResourceQuotaFromKey根据从queue中获得的key,从rqIndexer中得到该Object,然后执行rq.syncResourceQuota(quota)。
pkg/controller/resourcequota/resource_quota_controller.go:268 // syncResourceQuota runs a complete sync of resource quota status across all known kinds func (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) { ... newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry) ... // ensure set of used values match those that have hard constraints hardResources := quota.ResourceNames(hardLimits) used = quota.Mask(used, hardResources) usage := api.ResourceQuota{ ObjectMeta: api.ObjectMeta{ Name: resourceQuota.Name, Namespace: resourceQuota.Namespace, ResourceVersion: resourceQuota.ResourceVersion, Labels: resourceQuota.Labels, Annotations: resourceQuota.Annotations}, Status: api.ResourceQuotaStatus{ Hard: hardLimits, Used: used, }, } dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used) // there was a change observed by this controller that requires we update quota if dirty { v1Usage := &v1.ResourceQuota{} if err := v1.Convert_api_ResourceQuota_To_v1_ResourceQuota(&usage, v1Usage, nil); err != nil { return err } _, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(v1Usage) return err } return nil }
syncResourceQuota中最关键的操作是: newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
quota.CalculateUsage根据namespace, quota的Scope,hardLimits,registry对该Item(resourceQuota)进行CalculateUsage。
pkg/quota/resources.go:217 // CalculateUsage calculates and returns the requested ResourceList usage func CalculateUsage(namespaceName string, scopes []api.ResourceQuotaScope, hardLimits api.ResourceList, registry Registry) (api.ResourceList, error) { // find the intersection between the hard resources on the quota // and the resources this controller can track to know what we can // look to measure updated usage stats for hardResources := ResourceNames(hardLimits) potentialResources := []api.ResourceName{} evaluators := registry.Evaluators() for _, evaluator := range evaluators { potentialResources = append(potentialResources, evaluator.MatchingResources(hardResources)...) } // NOTE: the intersection just removes duplicates since the evaluator match intersects wtih hard matchedResources := Intersection(hardResources, potentialResources) // sum the observed usage from each evaluator newUsage := api.ResourceList{} for _, evaluator := range evaluators { // only trigger the evaluator if it matches a resource in the quota, otherwise, skip calculating anything intersection := evaluator.MatchingResources(matchedResources) if len(intersection) == 0 { continue } usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection} stats, err := evaluator.UsageStats(usageStatsOptions) if err != nil { return nil, err } newUsage = Add(newUsage, stats.Used) } // mask the observed usage to only the set of resources tracked by this quota // merge our observed usage with the quota usage status // if the new usage is different than the last usage, we will need to do an update newUsage = Mask(newUsage, matchedResources) return newUsage, nil }
CalculateUsage中最重要的一步是循环registry中注册的所有Evaluators,执行对应Evaluator的UsageStats方法进资源使用统计。看到这里,你也许懵逼了,Evaluators又是个什么东西?
我们先来看看Registry和Evaluator的关系,以及Evaluator的定义。
pkg/quota/interfaces.go:62 // Registry holds the list of evaluators associated to a particular group kind type Registry interface { // Evaluators returns the set Evaluator objects registered to a groupKind Evaluators() map[schema.GroupKind]Evaluator } pkg/quota/interfaces.go:43 // Evaluator knows how to evaluate quota usage for a particular group kind type Evaluator interface { // Constraints ensures that each required resource is present on item Constraints(required []api.ResourceName, item runtime.Object) error // GroupKind returns the groupKind that this object knows how to evaluate GroupKind() schema.GroupKind // Handles determines if quota could be impacted by the specified operation. // If true, admission control must perform quota processing for the operation, otherwise it is safe to ignore quota. Handles(operation admission.Operation) bool // Matches returns true if the specified quota matches the input item Matches(resourceQuota *api.ResourceQuota, item runtime.Object) (bool, error) // MatchingResources takes the input specified list of resources and returns the set of resources evaluator matches. MatchingResources(input []api.ResourceName) []api.ResourceName // Usage returns the resource usage for the specified object Usage(item runtime.Object) (api.ResourceList, error) // UsageStats calculates latest observed usage stats for all objects UsageStats(options UsageStatsOptions) (UsageStats, error) }
可见Evaluator就是一系列操作的集合,是一个Interface,而Registry就是资源类型到Evaluator的一个Map。
Kubernetes中定义了7种资源的Evaluator,都在pkg/quota/evaluator/core/*
目录下,比如pods.go
就是PodEvaluator的实现,里面实现了关键的UsageStats方法。除了PodEvaluator之外,其他的Evaluator的UsageStats实现,都是genericEvaluator来完成的,其代码在pkg/quota/generic/evaluator.go:177
。
具体Evaluator的代码分析,请读者自行完成。
下面我给出Worker的内部流程图,供大家参考:
###ReplenishmentController
rqController负责watch待sync的ResourceQuota,并将其加入到queue和missingUsageQueue中,而上面分析NewResourceQuotaController代码时提到: replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
负责replenishmentController的创建,那replenishmentController又是啥呢?我们有必要来看看replenishmentController的创建。
pkg/controller/resourcequota/replenishment_controller.go:131 func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) { ... switch options.GroupKind { case api.Kind("Pod"): if r.sharedInformerFactory != nil { result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{ UpdateFunc: PodReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }) break } result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod()) case api.Kind("Service"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().Services(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().Services(v1.NamespaceAll).Watch(options) }, }, &v1.Service{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ UpdateFunc: ServiceReplenishmentUpdateFunc(options), DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("ReplicationController"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options) }, }, &v1.ReplicationController{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("PersistentVolumeClaim"): if r.sharedInformerFactory != nil { result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }) break } // TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths... _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).Watch(options) }, }, &v1.PersistentVolumeClaim{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("Secret"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().Secrets(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().Secrets(v1.NamespaceAll).Watch(options) }, }, &v1.Secret{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) case api.Kind("ConfigMap"): // TODO move to informer when defined _, result = cache.NewInformer( &cache.ListWatch{ ListFunc: func(options v1.ListOptions) (runtime.Object, error) { return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).List(options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).Watch(options) }, }, &v1.ConfigMap{}, options.ResyncPeriod(), cache.ResourceEventHandlerFuncs{ DeleteFunc: ObjectReplenishmentDeleteFunc(options), }, ) default: return nil, NewUnhandledGroupKindError(options.GroupKind) } return result, err }
整个代码结构非常清晰,就是根据不同的资源类型,返回对应的Controller。而每种资源的Controller的定义都是通过创建一个对应的Informer完成。Informer中注册对应的ResourceEventHandlerFuncs:UpdateFunc和DeleteFunc用来出watch的对象发生对应的change时需要调用的方法。
以Pod为例,看看Pod注册的UpdateFunc:PodReplenishmentUpdateFunc和DeleteFunc:ObjectReplenishmentDeleteFunc,你就知道replenishmentController是用来干啥的了。
pkg/controller/resourcequota/replenishment_controller.go:56 // PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { oldPod := oldObj.(*v1.Pod) newPod := newObj.(*v1.Pod) if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) { options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod) } } } // ObjectReplenenishmentDeleteFunc will replenish on every delete func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) { return func(obj interface{}) { metaObject, err := meta.Accessor(obj) if err != nil { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod()) utilruntime.HandleError(err) return } metaObject, err = meta.Accessor(tombstone.Obj) if err != nil { glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod()) utilruntime.HandleError(err) return } } options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil) } }
在NewResourceQuotaController中创建replenishmentController时,已经指定了对应的ReplenishmentFunc为rq.replenishQuota,PodReplenishmentUpdateFunc和ObjectReplenishmentDeleteFunc最终都是调用ReplenishmentFunc(rq.replenishQuota)来进行quota recalculated。
pkg/controller/resourcequota/resource_quota_controller.go:330 // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated func (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) { ... for i := range resourceQuotas { resourceQuota := resourceQuotas[i].(*v1.ResourceQuota) internalResourceQuota := &api.ResourceQuota{} if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil { glog.Error(err) continue } resourceQuotaResources := quota.ResourceNames(internalResourceQuota.Status.Hard) if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 { // 将该resourceQuota加入到队列queue rq.enqueueResourceQuota(resourceQuota) } } }
因此replenishmentController就是用来捕获对应资源的Update/Delete事件,将其对应的ResourceQuota加入到queue
中,然后worker再对其进行重新计算Usage。
总结
Kubernetes Controller Manager在初始化Controllers时执行startResourceQuotaController启动创建ResourceQuotaController并执行其Run方法来启动ResourceQuotaController。
ResourceQuotaController中包括两个队列:
queue:用来存放待sync和recalculate的ResourceQuota
missingUsageQueue:用来存放那些丢失Usage信息的ResourceQuota
ResourceQuotaController中有两种Controller:
rqController:通过List/Watch对应的资源及变化,根据情况,将ResourceQuota加入到queue和missingUsageQueue。
replenishmentControllers:通过监控资源的Update/Delete操作,将ResourceQuota加入到queue。
ResourceQuotaController中存在一个Registry对象,用来存放各种资源的Evaluator,包括:
PodEvaluator
ConfigMapEvaluator
PersistentVolumeClaimEvaluator
ResourceQuotaEvaluator
ReplicationControllerEvaluator
ServiceEvaluator
SecretEvaluator
ResourceQuotaController中的replenishmentControllers包含以下replenishmentController:
PodReplenishController
ConfigMapReplenishController
PersistentVolumeClaimReplenishController
ReplicationControllerReplenishController
ServiceReplenishController
SecretReplenishController
ResourceQuotaController中默认存在5个worker对queue中的ResourceQuota Item进行处理。可通过kube-controller-manager的
--concurrent-resource-quota-syncs
配置。ResourceQuotaController中默认存在5个worker对missingUsageQueue中的ResourceQuota Item进行处理。可通过kube-controller-manager的
--concurrent-resource-quota-syncs
配置。ResourceQuotaController默认5min会做一次全量的quota usage同步。可通过kube-controller-manager的
--resource-quota-sync-period
关于Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注创新互联行业资讯频道了解更多相关知识。
分享标题:KubernetesResourceQuotaController内部实现原理及源码分析是怎样的
文章链接:http://scpingwu.com/article/jipjgj.html