[TOC]
本次使用的版本为:
kubernetes-1.17.0/client-go v0.17.0(2019.12.08)
不要直接go get k8s.io/client-go
(以前发布过v12.0.0的版本)
使用go get k8s.io/client-go@v0.17.0
Kubernetes官方从2016年8月份开始,将Kubernetes资源操作相关的核心源码抽取出来,独立出来一个项目Client-go,作为官方提供的Go client。Kubernetes的部分代码也是基于这个client实现的,所以对这个client的质量、性能等方面还是非常有信心的。
client-go是一个调用kubernetes集群资源对象API的客户端,即通过client-go实现对kubernetes集群中资源对象(包括deployment、service、ingress、replicaSet、pod、namespace、node等)的增删改查等操作。大部分对kubernetes进行前置API封装的二次开发都通过client-go这个第三方包来实现。
RESTClient是最基础的,相当于的底层基础结构,可以直接通过 是RESTClient提供的RESTful方法如Get(),Put(),Post(),Delete()进行交互
RESTClient 封装了指定资源URL的通用Kubernetes API的访问姿势
Clientset是调用Kubernetes资源对象最常用的client,可以操作所有的资源对象,包含RESTClient。需要指定Group、指定Version,然后根据Resource获取
Clientset 是一系列的clients的group组合,注意每个group在一个Clientset中只包含一个版本。
Clientset包含了appsV1、coreV1,这中间包含了RESTClient,因此Clientset是基于RESTClient的。
Dynamic client 是一种动态的 client,它能处理 kubernetes 所有的资源。不同于 clientset,dynamic client 返回的对象是一个 map[string]interface{},如果一个 controller 中需要控制所有的 API,可以使用dynamic client,目前它在 garbage collector 和 namespace controller中被使用。
dynamic client针对的是所有资源,但是只支持Json;
# dynamic/interface.go
type ResourceInterface interface {
Create(obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
UpdateStatus(obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
Delete(name string, options *metav1.DeleteOptions, subresources ...string) error
DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
}
默认的每一种资源对象都有一个interface,封装了对象的CURD方法和list/watch方法
如 Deployment(kubernetes/typed/apps/v1/deployment.go):
// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
Create(*v1.Deployment) (*v1.Deployment, error)
Update(*v1.Deployment) (*v1.Deployment, error)
UpdateStatus(*v1.Deployment) (*v1.Deployment, error)
Delete(name string, options *metav1.DeleteOptions) error
DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(name string, options metav1.GetOptions) (*v1.Deployment, error)
List(opts metav1.ListOptions) (*v1.DeploymentList, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Deployment, err error)
GetScale(deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(deploymentName string, scale *autoscalingv1.Scale) (*autoscalingv1.Scale, error)
DeploymentExpansion
}
在Kubernetes中,所有对象资源的操作方式都是统一的,有个interface当做虚基类,包含资源的所有操作方法,然后各个子类继承然后实现它们,子类中的实现定义会针对不同的资源有不同诠释
自己结合源码的理解如下:
在包缓存内的类型reflector中定义的Reflector监视指定资源类型(kind)的Kubernetes API。执行此操作的函数是ListAndWatch。监视可以是内置资源,也可以是自定义资源。当反射器通过监视API接收关于新资源实例的存在的通知时,它使用相应的列表API获取新创建的对象并将其放入WETCHANDER函数内的Delta FIFO队列中。
reflector使用listerWatcher获取资源,并将其保存在store中,此处的store就是DeltaFIFO,Reflector核心处理函数为ListAndWatch(client-go/tools/cache/reflector.go)
// client-go/tools/cache/reflector.go
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// metrics tracks basic metric information about the reflector
metrics *reflectorMetrics// 但凡遇到metrics多半是用于做监控的,可以忽略
// The type of object we expect to place in the store.
expectedType reflect.Type// 反射的类型,也就是要监控的对象类型,比如Pod
// The destination to sync up with the watch source
store Store// 存储,就是DeltaFIFO,为什么,后面会有代码证明
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher// 这个是用来从apiserver获取资源用的
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration// 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
//period这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
resyncPeriod time.Duration// 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
// resyncPeriod其实这里面同步指的是shared_informer使用者需要定期同步全量对象
ShouldResync func() bool// 如果需要同步,调用这个函数问一下,当然前提是该函数指针不为空
// clock allows tests to manipulate time
clock clock.Clock// 时钟
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string// 最后一次同步的资源版本
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex// 还专门为最后一次同步的资源版本弄了个锁
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
}
ListAndWatch在Reflector.Run函数中启动,并以Reflector.period周期性进行调度。ListAndWatch使用resourceVersion来获取资源的增量变化:在List时会获取资源的首个resourceVersion值,在Watch的时候会使用List获取的resourceVersion来获取资源的增量变化,然后将获取到的资源的resourceVersion保存起来,作为下一次Watch的基线。
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) Run(stopCh <-chan struct{}) {
// func Until(f func(), period time.Duration, stopCh <-chan struct{})是下面函数的声明
// 这里面我们不用关心wait.Until是如何实现的,只要知道他调用函数f会被每period周期执行一次
// 意思就是f()函数执行完毕再等period时间后在执行一次,也就是r.ListAndWatch()会被周期性的调用
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
// 很多存储类的系统都是这样设计的,数据采用版本的方式记录,数据每变化(添加、删除、更新)都会触发版本更新,
// 这样的做法可以避免全量数据访问。以apiserver资源监控为例,只要监控比缓存中资源版本大的对象就可以了,
// 把变化的部分更新到缓存中就可以达到与apiserver一致的效果,一般资源的初始版本为0,从0版本开始列举就是全量的对象了
options := metav1.ListOptions{ResourceVersion: "0"}
// 与监控相关的内容不多解释
r.metrics.numberOfLists.Inc()
start := r.clock.Now()
// 列举资源,这部分是apimachery相关的内容,读者感兴趣可以自己了解
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
// 还是监控相关的
r.metrics.listDuration.Observe(time.Since(start).Seconds())
// 下面的代码主要是利用apimachinery相关的函数实现,就是把列举返回的结果转换为对象数组
// 下面的代码大部分来自apimachinery,此处不做过多说明,读者只要知道实现什么功能就行了
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
// 和监控相关的内容
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
// 以上部分都是对象实例化的过程,可以称之为反射,也是Reflector这个名字的主要来源,本文不是讲解反射原理的,
// 而是作为SharedInformer的前端,所以我们重点介绍的是对象在SharedInformer中流转过程,所以反射原理部分不做为重点讲解
// 这可是真正从apiserver同步过来的全量对象,所以要同步到DeltaFIFO中
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
// 设置最新的同步的对象版本
r.setLastSyncResourceVersion(resourceVersion)
// 下面要启动一个后台协程实现定期的同步操作,这个同步就是将SharedInformer里面的对象全量以同步事件的方式通知使用者
// 我们暂且称之为“后台同步协程”,Run()函数退出需要后台同步协程退出,所以下面的cancelCh就是干这个用的
// 利用defer close(cancelCh)实现的,而resyncerrc是后台同步协程反向通知Run()函数的报错通道
// 当后台同步协程出错,Run()函数接收到信号就可以退出了
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// 下面这个匿名函数就是后台同步协程的函数了
go func() {
// resyncCh返回的就是一个定时器,如果resyncPeriod这个为0那么就会返回一个永久定时器,cleanup函数是用来清理定时器的
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup()
}()
// 死循环等待各种信号
for {
// 只有定时器有信号才继续处理,其他的都会退出
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// ShouldResync是个函数地址,创建反射器对象的时候传入,即便时间到了,也要通过函数问问是否需要同步
if r.ShouldResync == nil || r.ShouldResync() {
// 我们知道这个store是DeltaFIFO,DeltaFIFO.Resync()做了什么,读者自行温习相关的文章~
// 就在这里实现了我们前面提到的同步,从这里看所谓的同步就是以全量对象同步事件的方式通知使用者
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
// 清理掉当前的计时器,获取下一个同步时间定时器
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 前面已经列举了全量对象,接下来就是watch的逻辑了
for {
// 如果有退出信号就立刻返回,否则就会往下走,因为有default.
select {
case <-stopCh:
return nil
default:
}
// 计算watch的超时时间
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
// 设置watch的选项,因为前期列举了全量对象,从这里只要监听最新版本以后的资源就可以了
// 如果没有资源变化总不能一直挂着吧?也不知道是卡死了还是怎么了,所以有一个超时会好一点
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
// 监控相关
r.metrics.numberOfWatches.Inc()
// 开始监控对象
w, err := r.listerWatcher.Watch(options)
// watch产生错误了,大部分错误就要退出函数然后再重新来一遍流程
if err != nil {
switch err {
case io.EOF:
case io.ErrUnexpectedEOF:
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
}
// 类似于网络拒绝连接的错误要等一会儿再试,因为可能网络繁忙
if urlError, ok := err.(*url.Error); ok {
if opError, ok := urlError.Err.(*net.OpError); ok {
if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
time.Sleep(time.Second)
continue
}
}
}
return nil
}
// watch返回是流,apiserver会将变化的资源通过这个流发送出来,client-go最终通过chan实现的
// 所以watchHandler()是一个需要持续从chan读取数据的流程,所以需要传入resyncerrc和stopCh
// 用于异步通知退出或者后台同步协程错误
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
}
上面的函数中,调用了两个私有函数,分别为syncWith()和watchHandler()。syncWith()用于实现一次从apiserver全量对象的同步,这里的同步和我们上面提到的同步不是一回事,这里指的是从apiserver的同步。watchHandler是实现监控apiserver资源变化的处理过程,主要就是把apiserver的资源变化转换为DeltaFIFO调用。我们接下来就看这两个函数的具体实现
接下来我们就要看看watchHandler做了什么?
// 代码源自client-go/tools/cache/reflector.go
// 实现apiserver全量对象的同步
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
// 做一次slice类型转换
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// 直接调用了DeltaFIFO的Replace()接口,这个接口就是用于同步全量对象的
return r.store.Replace(found, resourceVersion)
}
// 实现从watch返回的chan中持续读取变化的资源,并转换为DeltaFIFO相应的调用
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
start := r.clock.Now()
eventCount := 0
// 监控相关
defer func() {
r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
r.metrics.watchDuration.Observe(time.Since(start).Seconds())
}()
// 这里就开始无限循环的从chan中读取资源的变化,也可以理解为资源的增量变化,同时还要监控各种信号
loop:
for {
select {
// 系统退出信号
case <-stopCh:
return errorStopRequested
// 后台同步协程出错信号
case err := <-errc:
return err
// watch函数返回的是一个chan,通过这个chan持续的读取对象
case event, ok := <-w.ResultChan():
// 如果不OK,说明chan关闭了,就要重新获取,这里面我们可以推测这个chan可能会运行过程中重新创建
// 否则就应该退出而不是继续循环
if !ok {
break loop
}
// 看来event可以作为错误的返回值,挺有意思,而不是通过关闭chan,这种方式可以传递错误信息,关闭chan做不到
if event.Type == watch.Error {
return apierrs.FromObject(event.Object)
}
// 这里面就是利用反射实例化对象了,而且判断了对象类型是我们设定的类型
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
// 和list操作相似,也要获取对象的版本,要更新缓存中的版本,下次watch就可以忽略这些资源了
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
// 根据事件的类型做不同的DeltaFIFO的操作
switch event.Type {
// 向DeltaFIFO添加一个添加的Delta
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
// 更新对象,向DeltaFIFO添加一个更新的Delta
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
// 删除对象,向DeltaFIFO添加一个删除的Delta
case watch.Deleted:
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
// 其他类型就不知道干什么了,只能报错
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
// 更新最新资源版本
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
// watch返回时间非常短而且没有任何事件要处理,这个属于异常现象,因为我们watch是设置了超时的
watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
r.metrics.numberOfShortWatches.Inc()
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
return nil
}
// 代码源自client-go/tools/cache/reflector.go
func (r *Reflector) setLastSyncResourceVersion(v string) {
// 设置已经获取到资源的最新版本
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
rv, err := strconv.Atoi(v)
if err == nil {
r.metrics.lastResourceVersion.Set(float64(rv))
}
}
// 获取resync定时器,叫定时器比较好理解,叫chan很难和定时关联起来
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
// 如果resyncPeriod说明就不用定时同步,返回的是永久超时的定时器
if r.resyncPeriod == 0 {
return neverExitWatch, func() bool { return false }
}
// 构建定时起
t := r.clock.NewTimer(r.resyncPeriod)
return t.C(), t.Stop
}
如可以使用如下命令获取Pod的resourceVersion
kubectl get pod podname -o yaml|grep resourceVersion
上图中的Resync触发的Sync动作,其作用与Replace中的第三步相同,用于将knowObject中的对象与DeltaFIFO中同步。这种操作是有必要的
通过Kubernetes API监控Kubernetes的资源类型
DeltaFIFO的源码注释写的比较清楚,它是一个生产者-消费者队列(先入先出),生产者为Reflector,消费者为Pop()函数,从架构图中可以看出DeltaFIFO的数据来源为Reflector,通过Pop操作消费数据,消费的数据一方面存储到Indexer中,另一方面可以通过informer的handler进行处理(见下文)。informer的handler处理的数据需要与存储在Indexer中的数据匹配。需要注意的是,Pop的单位是一个Deltas,而不是Delta。
DeltaFIFO同时实现了Queue和Store接口。DeltaFIFO使用Deltas保存了对象状态的变更(Add/Delete/Update)信息(如Pod的删除添加等),Deltas缓存了针对相同对象的多个状态变更信息,如Pod的Deltas[0]可能更新了标签,Deltas[1]可能删除了该Pod。最老的状态变更信息为Newest(),最新的状态变更信息为Oldest()。使用中,获取DeltaFIFO中对象的key以及获取DeltaFIFO都以最新状态为准。
# tools/cache/delta_fifo.go
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string // Delta的类型用字符串表达
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync" // 同步
)
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType // Delta类型,比如增、减,后面有详细说明
Object interface{} // 对象,Delta的粒度是一个对象
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta // Delta数组
我们再说一说如下几个类型,因为他们定义在DeltaFIFO的文件中,而且在很多地方应用:
# tools/cache/delta_fifo.go
// 这个接口类型就是下面面两个接口类型的组合了
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister
KeyGetter
}
// 这是一个非常通用的接口类型,只定义了一个接口函数,就是返回所有的keys
// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
ListKeys() []string
}
// 这也是一个非常通用的接口类型,只定义了一个接口函数,就是通过key获取对象
// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}
有没有发现上面两个接口在client-go.tools.cache.Store这个接口类型中也存在,也就是说实现了Store接口的类型同时也实现了上面三个接口
接下来再来认识一个类型:
# tools/cache/fifo.go
// 这个才是FIFO的抽象,DeltaFIFO只是FIFO的一种实现。
// Queue is exactly like a Store, but has a Pop() method too.
type Queue interface {
Store // 实现了存储接口,这个很好理解,FIFO也是一种存储
// 在存储的基础上增加了Pop接口,用于弹出对象
// Pop blocks until it has something to process.
// It returns the object that was process and the result of processing.
// The PopProcessFunc may return an ErrRequeue{...} to indicate the item
// should be requeued before releasing the lock on the queue.
Pop(PopProcessFunc) (interface{}, error)
// 对象如果不在队列中就添加
// AddIfNotPresent adds a value previously
// returned by Pop back into the queue as long
// as nothing else (presumably more recent)
// has since been added.
AddIfNotPresent(interface{}) error
// 通过Replace()放入第一批对象到队列中并且已经被Pop()全部取走
// HasSynced returns true if the first batch of items has been popped
HasSynced() bool
// 关闭队列
// Close queue
Close()
}
我们先来看看DeltaFIFO的类型定义:
# tools/cache/delta_fifo.go
// DeltaFIFO is like FIFO, but allows you to process deletes.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
// * You want to process every object change (delta) at most once.
// * When you process an object, you want to see everything
// that's happened to it since you last processed it.
// * You want to process the deletion of objects.
// * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but it
// will always return an object of type Deltas.
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
//
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the purpose of figuring out which
// items have been deleted when Replace() or Delete() are called. The deleted
// object will be included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex// 读写锁,因为涉及到同时读写,读写锁性能要高
cond sync.Cond// 给Pop()接口使用,在没有对象的时候可以阻塞,内部锁复用读写锁
// 这个应该是Store的本质了,按照kv的方式存储对象,但是存储的是对象的Deltas数组
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string// 这个是为先入先出实现的,存储的就是对象的键
// 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int// 通过Replace()接口将第一批对象放入队列的对象数量
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc// 对象键计算函数,在Indexer那篇文章介绍过
// 前面介绍就是为了这是用,该对象指向的就是Indexer,
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool// 是否已经关闭的标记
closedLock sync.Mutex// 专为关闭设计的锁
}
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
)
queueActionLocked函数
// 代码源自client-go/tools/cache/delta_fifo.go
// 从函数名称来看把“动作”放入队列中,这个动作就是DeltaType,而且已经加锁了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 前面提到的计算对象键的函数
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 如果是同步,并且对象未来会被删除,那么就直接返回,没必要记录这个动作了
// 肯定有人会问为什么Add/Delete/Update这些动作可以,因为同步对于已经删除的对象是没有意义的
// 已经删除的对象后续跟添加、更新有可能,因为同名的对象又被添加了,删除也是有可能
// 删除有些复杂,后面会有说明
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
// 同一个对象的多次操作,所以要追加到Deltas数组中
newDeltas := append(f.items[id], Delta{actionType, obj})
// 合并操作,去掉冗余的delta
newDeltas = dedupDeltas(newDeltas)
// 判断对象是否已经存在
_, exists := f.items[id]
// 合并后操作有可能变成没有Delta么?后面的代码分析来看应该不会,所以暂时不知道这个判断目的
if len(newDeltas) > 0 {
// 如果对象没有存在过,那就放入队列中,如果存在说明已经在queue中了,也就没必要再添加了
if !exists {
f.queue = append(f.queue, id)
}
// 更新Deltas数组,通知所有调用Pop()的人
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
// 直接把对象删除,这段代码我不知道什么条件会进来,因为dedupDeltas()肯定有返回结果的
// 后面会有dedupDeltas()详细说明
delete(f.items, id)
}
return nil
}
合并操作的具体实现:
// 代码源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
// 小于2个delta,那就是1个呗,没啥好合并的
n := len(deltas)
if n < 2 {
return deltas
}
// 取出最后两个
a := &deltas[n-1]
b := &deltas[n-2]
// 判断如果是重复的,那就删除这两个delta把合并后的追加到Deltas数组尾部
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// 判断两个Delta是否是重复的
func isDup(a, b *Delta) *Delta {
// 只有一个判断,只能判断是否为删除类操作,和我们上面的判断相同
// 这个函数的本意应该还可以判断多种类型的重复,当前来看只能有删除这一种能够合并
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
// 判断是否为删除类的重复
func isDeletionDup(a, b *Delta) *Delta {
// 二者都是删除那肯定有一个是重复的
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// 理论上返回最后一个比较好,但是对象已经不再系统监控范围,前一个删除状态是好的
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
因为系统对于删除的对象有DeletedFinalStateUnknown这个状态,所以会存在两次删除的情况,但是两次添加同一个对象由于apiserver可以保证对象的唯一性,所以处理中就没有考虑合并两次添加操作。
接下来我们来看看Replace()函数的实现,这个也是Store定义的接口:
// 代码源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 遍历所有的输入目标
for _, item := range list {
// 计算目标键
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// 记录处理过的目标键,采用set存储,是为了后续快速查找
keys.Insert(key)
// 因为输入是目标全量,所以每个目标相当于重新同步了一次
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 如果没有存储的话,自己存储的就是所有的老对象,目的要看看那些老对象不在全量集合中,那么就是删除的对象了
if f.knownObjects == nil {
// 遍历所有的元素
for k, oldItem := range f.items {
// 这个目标在输入的对象中存在就可以忽略
if keys.Has(k) {
continue
}
// 输入对象中没有,说明对象已经被删除了。
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
// 终于看到哪里用到DeletedFinalStateUnknown了,队列中存储对象的Deltas数组中
// 可能已经存在Delete了,避免重复,采用DeletedFinalStateUnknown这种类型
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 如果populated还没有设置,说明是第一次并且还没有任何修改操作执行过
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) // 记录第一次通过来的对象数量
}
return nil
}
// 下面处理的就是检测某些目标删除但是Delta没有在队列中
// 从存储中获取所有对象键
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// 对象还存在那就忽略
if keys.Has(k) {
continue
}
// 获取对象
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// 累积删除的对象数量
queuedDeletions++
// 把对象删除的Delta放入队列
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 和上面的代码差不多,只是计算initialPopulationCount值的时候增加了删除对象的数量
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
从Replace()的实现来看,主要用于实现对象的全量更新。这个可以理解为DeltaFIFO在必要的时刻做一次全量更新,这个时刻可以是定期的,也可以是事件触发的。由于DeltaFIFO对外输出的就是所有目标的增量变化,所以每次全量更新都要判断对象是否已经删除,因为在全量更新前可能没有收到目标删除的请求。这一点与cache不同,cache的Replace()相当于重建,因为cache就是对象全量的一种内存映射,所以Replace()就等于重建。
那我来问题一个非常有水平的问题,为什么knownObjects为nil时需要对比队列和对象全量来判断对象是否删除,而knownObjects不为空的时候就不需要了?如果读者想判断自己是否已经全部理解可以不看下面自己想想。
我们前面说过,knownObjects就是Indexer(具体实现是cache),而开篇的那副图已经非常明确的描述了二者以及使用之间的关系。也就是说knownObjects有的对象就是使用者知道的所有对象,此时即便队列(DeltaFIFO)中有相应的对象,在更新的全量对象中又被删除了,那就没必要通知使用者对象删除了,这种情况可以假想为系统短时间添加并删除了对象,对使用者来说等同于没有这个对象。
现在,我们来看看Queue相对于Stored扩展的Pop接口:
// 代码源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 队列中有数据么?
for len(f.queue) == 0 {
// 看来是先判断的是否有数据,后判断是否关闭,这个和chan像
if f.IsClosed() {
return nil, FIFOClosedError
}
// 没数据那就等待把
f.cond.Wait()
}
// 取出第一个对象
id := f.queue[0]
// 数组缩小,相当于把数组中的第一个元素弹出去了,这个不多解释哈
f.queue = f.queue[1:]
// 取出对象,因为queue中存的是对象键
item, ok := f.items[id]
// 同步对象计数减一,当减到0就说明外部已经全部同步完毕了
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 对象不存在,这个是什么情况?貌似我们在合并对象的时候代码上有这个逻辑,估计永远不会执行
if !ok {
continue
}
// 把对象删除
delete(f.items, id)
// Pop()需要传入一个回调函数,用于处理对象
err := process(item)
// 如果需要重新入队列,那就重新入队列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
上面分析的函数基本上就算是把DeltaFIFO核心逻辑分析完毕了,下面我们就把其他的接口函数简单过一下
// 代码源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
// 这里就比较明白了,一次同步全量对象后,并且全部Pop()出去才能算是同步完成
// 其实这里所谓的同步就是全量内容已经进入Indexer,Indexer已经是系统中对象的全量快照了
return f.populated && f.initialPopulationCount == 0
}
// 添加不存在的对象
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
// 这个要求放入的必须是Deltas数组,就是通过Pop()弹出的对象
deltas, ok := obj.(Deltas)
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
// 多个Delta都是一个对象,所以用最新的就可以了
id, err := f.KeyOf(deltas.Newest().Object)
if err != nil {
return KeyError{obj, err}
}
// 后面有实现
f.lock.Lock()
defer f.lock.Unlock()
f.addIfNotPresent(id, deltas)
return nil
}
// 这个是添加不存在对象的实现
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
f.populated = true
// 这里判断的对象是否存在
if _, exists := f.items[id]; exists {
return
}
// 放入队列中
f.queue = append(f.queue, id)
f.items[id] = deltas
f.cond.Broadcast()
}
// 重新同步,这个在cache实现是空的,这里面有具体实现
func (f *DeltaFIFO) Resync() error {
f.lock.Lock()
defer f.lock.Unlock()
// 如果没有Indexer那么重新同步是没有意义的,因为连同步了哪些对象都不知道
if f.knownObjects == nil {
return nil
}
// 列举Indexer里面所有的对象键
keys := f.knownObjects.ListKeys()
// 遍历对象键,为每个对象产生一个同步的Delta
for _, k := range keys {
// 具体实现后面有介绍
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
// 具体对象同步实现接口
func (f *DeltaFIFO) syncKeyLocked(key string) error {
// 获取对象
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
// 计算对象的键值,有人会问对象键不是已经传入了么?那个是存在Indexer里面的对象键,可能与这里的计算方式不同
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 对象已经在存在,说明后续会通知对象的新变化,所以再加更新也没意义
if len(f.items[id]) > 0 {
return nil
}
// 添加对象同步的这个Delta
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
判断是否已同步populated和initialPopulationCount这两个变量存在的目的是什么?我的理解是否已同步指的是第一次从apiserver获取全量对象是否已经全部通知到外部,也就是通过Pop()被取走。所谓的同步就是指apiserver的状态已经同步到缓存中了,也就是Indexer中;
接口AddIfNotPresent()存在的目的是什么,只有在Pop()函数中使用了一次,但是在调用这个接口的时候已经从map中删除了,所以肯定不存在。这个接口在我看来主要用来保险的,因为Pop()本身就存在重入队列的可能,外部如果判断返回错误重入队列就可能会重复;
DeltaFIFO结构中比较难以理解的是knownObjects,它的类型为KeyListerGetter。其接口中的方法ListKeys和GetByKey也是Store接口中的方法,因此knownObjects能够被赋值为实现了Store的类型指针;同样地,由于Indexer继承了Store方法,因此knownObjects能够被赋值为实现了Indexer的类型指针。
DeltaFIFO.knownObjects.GetByKey就是执行的store.go中的GetByKey函数,用于获取Indexer中的对象键。
initialPopulationCount用于表示是否完成全量同步,initialPopulationCount在Replace函数中增加,在Pop函数中减小,当initialPopulationCount为0且populated为true时表示Pop了所有Replace添加到DeltaFIFO中的对象,populated用于判断是DeltaFIFO中是否为初始化状态(即没有处理过任何对象)。
在NewSharedIndexInformer(client-go/tools/cache/shared_informer.go)函数中使用下面进行初始化一个sharedIndexInformer,即使用函数DeletionHandlingMetaNamespaceKeyFunc初始化indexer,并在sharedIndexInformer.Run中将该indexer作为knownObjects入参,最终初始化为一个DeltaFIFO。
NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) //NewDeltaFIFO
---
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) //sharedIndexInformer.Run
knownObjects实际使用时为Indexer,它对应图中的localStore,DeltaFIFO根据其保存的对象状态变更消息处理(增/删/改/同步)knownObjects中相应的对象。其中同步(Sync)Detals中即将被删除的对象是没有意义的(参见willObjectBeDeletedLocked函数)。
ListWatch的list步骤中会调用Replace(client-go/tools/cache/delta_fifo.go)函数来对DeltaFIFO进行全量更新,包括3个步骤:
第2步好理解,knownObjects为空,只需要更新DeltaFIFO即可。第3步中,当knownObjects非空时,需要以knowObjects为基准进行对象的删除,否则会造成indexer中的数据与apiserver的数据不一致,举个例子,比如knownObjects中的对象为{obj1, obj2, obj3},而DeltaFIFO中待处理的对象为{obj2, obj3,obj4},如果仅按照2步骤进行处理,会导致knownObjects中残留obj1,因此需要在DeltaFIFO中添加删除obj1变更消息。从下面ShareInformer章节的图中可以看出,knownObjects(即Indexer)的数据只能通过DeltaFIFO变更。
下图为SharedInformer的运行图。可以看出SharedInformer启动了controller,reflector,并将其与Indexer结合起来。
注:不同颜色表示不同的chan,相同颜色表示在同一个chan中的处理
SharedInformer.Run启动了两个chan,s.c.Run为controller的入口,s.c.Run函数中会Pop DeltaFIFO中的元素,并根据DeltaFIFO的元素的类型(Sync/Added/Updated/Deleted)进两类处理,一类会使用indexer.Update,indexer,Add,indexer.Delete对保存的在Store中的数据进行处理;另一类会根据DeltaFIFO的元素的类型将其封装为sharedInformer内部类型updateNotification,addNotification,deleteNotification,传递给s.processor.Listeners.addCh,后续给注册的pl.handler处理。
s.processor.run主要用于处理注册的handler,processorListener.run函数接受processorListener.nextCh中的值,将其作为参数传递给handler进行处理。而processorListener.pop负责将processorListener.addCh中的元素缓存到p.pendingNotifications,并读取p.pendingNotifications中的元素,将其传递到processorListener.nextCh。即processorListener.pop负责管理数据,processorListener.run负责使用processorListener.pop管理的数据进行处理。
// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
sharedIndexInformer有3个状态:启动前,启动后,停止后,由started, stopped两个bool值表示。
stopped=true表示inforer不再运作且不能添加新的handler(因为即使添加了也不会运行)
informer启动前和停止后允许添加新的indexer(sharedIndexInformer.AddIndexers),但不能在informer运行时添加,因为此时需要通过listwatch以及handler等一系列处理来操作sharedIndexInformer.inxder。如果允许同时使用sharedIndexInformer.AddIndexers,可能会造成数据不一致。
还有一个状态sharedProcessor.listenersStarted,用于表示是否所有的s.processor.Listeners都已经启动,如果已经启动,则在添加新的processorListener时,需要运行新添加的processorListener,否则仅仅添加即可(添加后同样会被sharedProcessor.run调度)
// 代码源自client-go/tools/cache/shared_informer.go
type SharedInformer interface {
// 添加资源事件处理器,关于ResourceEventHandler的定义在下面
// 相当于注册回调函数,当有资源变化就会通过回调通知使用者,是不是能和上面介绍的Controller可以联系上了?
// 为什么是Add不是Reg,说明可以支持多个handler
AddEventHandler(handler ResourceEventHandler)
// 上面添加的是不需要周期同步的处理器,下面的接口添加的是需要周期同步的处理器,周期同步上面提了好多遍了,不赘述
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// Store这个有专门的文章介绍,这个函数就是获取Store的接口,说明SharedInformer内有Store对象
GetStore() Store
// Controller在上面的章节介绍了,说明SharedInformer内有Controller对象
GetController() Controller
// 这个应该是SharedInformer的核心逻辑实现的地方
Run(stopCh <-chan struct{})
// 因为有Store,这个函数就是告知使用者Store里面是否已经同步了apiserver的资源,这个接口很有用
// 当创建完SharedInformer后,通过Reflector从apiserver同步全量对象,然后在通过DeltaFIFO一个一个的同志到cache
// 这个接口就是告知使用者,全量的对象是不是已经同步到了cache,这样就可以从cache列举或者查询了
HasSynced() bool
// 最新同步资源的版本,这个就不多说了,通过Controller(Controller通过Reflector)实现
LastSyncResourceVersion() string
}
// 扩展了SharedInformer类型,从类型名字上看共享的是Indexer,Indexer也是一种Store的实现
type SharedIndexInformer interface {
// 继承了SharedInformer
SharedInformer
// 扩展了Indexer相关的接口
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
// 代码源自client-go/tools/cache/controller.go,SharedInformer使用者如果需要处理资源的事件
// 那么就要自己实现相应的回调函数
type ResourceEventHandler interface {
// 添加对象回调函数
OnAdd(obj interface{})
// 更新对象回调函数
OnUpdate(oldObj, newObj interface{})
// 删除对象回调函数
OnDelete(obj interface{})
}
// 代码源自client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
// Indexer也是一种Store,这个我们知道的,Controller负责把Reflector和FIFO逻辑串联起来
// 所以这两个变量就涵盖了开篇那张图里面的Reflector、DeltaFIFO和LocalStore(cache)
indexer Indexer
controller Controller
// sharedIndexInformer把上面提到的ResourceEventHandler进行了在层封装,并统一由sharedProcessor管理,后面章节专门介绍
processor *sharedProcessor
// CacheMutationDetector其实没啥用,我理解是开发者自己实现的一个调试工具,用来发现对象突变的
// 实现方法也比较简单,DeltaFIFO弹出的对象在处理前先备份(深度拷贝)一份,然后定期比对两个对象是否相同
// 如果不同那就报警,说明处理过程中有人修改过对象,这个功能默认是关闭,所以我说没啥用
cacheMutationDetector CacheMutationDetector
// 这两个变量是给Reflector用的,我们知道Reflector是在Controller创建的
listerWatcher ListerWatcher
objectType runtime.Object
// 定期同步的周期,因为可能存在多个ResourceEventHandler,就有可能存在多个同步周期,sharedIndexInformer采用最小的周期
// 这个周期值就存储在resyncCheckPeriod中,通过AddEventHandler()添加的处理器都采用defaultEventHandlerResyncPeriod
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
// 时钟
clock clock.Clock
// 启动、停止标记,肯定有人会问为啥用两个变量,一个变量不就可以实现启动和停止了么?
// 其实此处是三个状态,启动前,已启动和已停止,start表示了两个状态,而且为启动标记专门做了个锁
// 说明启动前和启动后有互斥的资源操作
started, stopped bool
startedLock sync.Mutex
// 这个名字起的也是够了,因为DeltaFIFO每次Pop()的时候需要传入一个函数用来处理Deltas
// 处理Deltas也就意味着要把消息通知给处理器,如果此时调用了AddEventHandler()
// 就会存在崩溃的问题,所以要有这个锁,阻塞Deltas....细想名字也没毛病~
blockDeltas sync.Mutex
}
CacheMutationDetector这个就是检测对象在过程中突变的,何所谓突变呢?突变就是莫名其妙的修改了,如何实现突变检测,也是比较简单的。CacheMutationDetector对所有的对象做了一次深度拷贝(DeepCopy),然后定期比较两个对象是否一致,当发现有不同时说明对象突变了,然后就panic。我认为CacheMutationDetector是用来调试的,因为代码默认是关闭的:
// 代码源自client-go/tools/cache/mutation_detector.go
// 默认关闭突变检测
var mutationDetectionEnabled = false
// 但是可以通过环境变量的KUBE_CACHE_MUTATION_DETECTOR开启
func init() {
mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
}
// 这个是突变检测的类型抽象
type CacheMutationDetector interface {
AddObject(obj interface{}) // 用于记录所有的对象
Run(stopCh <-chan struct{}) // 开启协程定期比对
}
// 创建CacheMutationDetector对象
func NewCacheMutationDetector(name string) CacheMutationDetector {
// 如果没有开启选项就构造一个什么都不做的对象
if !mutationDetectionEnabled {
return dummyMutationDetector{}
}
// 如果开启了选项,那么就构造一个默认的突变检测器
glog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
return &defaultCacheMutationDetector{name: name, period: 1 * time.Second}
}
// 这就是什么都不做的突变检测器
type dummyMutationDetector struct{}
func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
}
func (dummyMutationDetector) AddObject(obj interface{}) {
}
有没有感觉shared这个词被kubernetes玩儿坏了(继controller之后有一个背玩儿坏的单词),sharedProcessor这又shared啥了?首先需要知道Processor的定义,这里定义的Processor就是处理事件的东西。什么事件,就是SharedInformer向外部通知的事件。因为官方代码没有注释,我猜是shared是同一个SharedInformer,有没有很绕嘴?还有更绕的在后面呢,我们还要了解一个新的类型,那就是processorListener,processor刚说完,又来了个Listener!
通过SharedInformer.AddEventHandler()添加的处理器最终就会封装成processorListener,然后通过sharedProcessor管理起来,通过processorListener的封装就可以达到所谓的有事处理,没事挂起。
processorListener可以理解为两个核心功能,一个是processor,一个是listener,用一句话概括,有事做事没事挂起。先看看processorListener的定义:
// 代码源自clien-go/tools/cache/shared_informer.go
type processorListener struct {
// nextCh、addCh、handler、pendingNotifications的用法请参看我的《golang的chan有趣用法》里面有相关的例子
// 总结这四个变量实现了事件的输入、缓冲、处理,事件就是apiserver资源的变化
nextCh chan interface{}
addCh chan interface{}
handler ResourceEventHandler
pendingNotifications buffer.RingGrowing
// 下面四个变量就是跟定时同步相关的了,requestedResyncPeriod是处理器设定的定时同步周期
// resyncPeriod是跟sharedIndexInformer对齐的同步时间,因为sharedIndexInformer管理了多个处理器
// 最终所有的处理器都会对齐到一个周期上,nextResync就是下一次同步的时间点
requestedResyncPeriod time.Duration
resyncPeriod time.Duration
nextResync time.Time
resyncLock sync.Mutex
}
// 代码源自client-go/tools/cache/shared_informer.go
// 对,就这么简单,通过addCh传入,这里面的notification就是我们所谓的事件
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
// 代码源自client-go/tools/cache/shared_informer.go
// 这个函数是通过sharedProcessor利用wait.Group启动的,读者可以自行查看wait.Group
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
// nextCh是在这里,函数退出前析构的
defer close(p.nextCh)
// 临时变量,下面会用到
var nextCh chan<- interface{}
var notification interface{}
// 进入死循环啦
for {
select {
// 有两种情况,nextCh还没有初始化,这个语句就会被阻塞,这个我在《深入浅出golang之chan》说过
// nextChan后面会赋值为p.nextCh,因为p.nextCh也是无缓冲的chan,数据不发送成功就阻塞
case nextCh <- notification:
// 如果发送成功了,那就从缓冲中再取一个事件出来
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok {
// 如果没有事件,那就把nextCh再次设置为nil,接下来对于nextCh操作还会被阻塞
nextCh = nil
}
// 从p.addCh读取一个事件出来,这回看到消费p.addCh的地方了
case notificationToAdd, ok := <-p.addCh:
// 说明p.addCh关闭了,只能退出
if !ok {
return
}
// notification为空说明当前还没发送任何事件给处理器
if notification == nil {
// 那就把刚刚获取的事件通过p.nextCh发送个处理器
notification = notificationToAdd
nextCh = p.nextCh
} else {
// 上一个事件还没有发送成功,那就先放到缓存中
// pendingNotifications可以想象为一个slice,这样方便理解,是一个动态的缓存,
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
pop()函数实现的非常巧妙,利用一个协程就把接收、缓冲、发送全部解决了。它充分的利用了golang的select可以同时操作多个chan的特性,同时从addChd读取数据从nextCh发送数据,这两个chan任何一个完成都可以激活协程。对于C/C++程序猿理解起来有点费劲,但这就是GO的魅力所在。接下来,我们看看从nextCh读取事件后是如何处理的:
// 代码源自client-go/tools/cache/shared_informer.go
// 这个也是sharedProcessor通过wait.Group启动的
func (p *processorListener) run() {
// 因为wait.Until需要传入退出信号的chan
stopCh := make(chan struct{})
// wait.Until不多说了,我在前期不点的文章中说过了,只要没有收到退出信号就会周期的执行传入的函数
wait.Until(func() {
// wait.ExponentialBackoff()和wait.Until()类似,wait.Until()是无限循环
// wait.ExponentialBackoff()是尝试几次,每次等待时间会以指数上涨
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
// 这也是chan的range用法,可以参看我的《深入浅出golang的chan》了解细节
for next := range p.nextCh {
// 判断事件类型,这里面的handler就是调用SharedInfomer.AddEventHandler()传入的
// 理论上处理的不是Deltas么?怎么变成了其他类型,这是SharedInformer做的二次封装,后面会看到
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
return true, nil
})
// 执行到这里只能是nextCh已经被关闭了,所以关闭stopCh,通知wait.Until()退出
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
因为processorListener其他函数没啥大用,上面两个函数就就已经把核心功能都实现了。processorListener就是实现了事件的缓冲和处理,此处的处理就是使用者传入的函数。在没有事件的时候可以阻塞处理器,当事件较多是可以把事件缓冲起来,实现了事件分发器与处理器的异步处理。
processorListener的run()和pop()函数是sharedProcessor启动的协程调用的,所以下面就要对sharedProcessor进行分析了。
sharedProcessor的定义如下:
// client-go/tools/cache/shared_informer.go
// sharedProcessor是通过数组组织处理器的,只是分了需要定时同步和不需要要同步两类
type sharedProcessor struct {
listenersStarted bool // 所有处理器是否已经启动的标识
listenersLock sync.RWMutex // 读写锁
listeners []*processorListener // 通用的处理器
syncingListeners []*processorListener // 需要定时同步的处理器
clock clock.Clock // 时钟
wg wait.Group // 前面讲过了processorListener每个需要两个协程,
// 用wait.Group来管理所有处理器的携程,保证他们都能退出
}
// 代码源自client-go/tools/cache/shared_informer.go
// 添加处理器,sharedIndexInformer.AddEventHandler()就会调用这个函数实现处理器的添加
func (p *sharedProcessor) addListener(listener *processorListener) {
// 加锁,这个很好理解
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
// 把处理器添加到数组中
p.addListenerLocked(listener)
// 通过wait.Group启动两个协程,做的事情我们在processorListener说过了,这里就是我们上面提到的启动两个协程的地方
// 这个地方判断了listenersStarted,这说明sharedProcessor在启动前、后都可以添加处理器
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
// 把处理器添加到数组中
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
// 两类(定时同步和不同步)的处理器数组都添加了,这是因为没有定时同步的也会用默认的时间,后面我们会看到
// 那么问题来了,那还用两个数组干什么呢?
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
// 代码源自client-go/tools/cache/shared_informer.go
// 通过函数名称也能感觉到分发的感觉~sync表示obj对象是否为同步事件对象
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
// 加锁没毛病
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 无论是否为sync,添加处理器的代码中我们知道两个数组都会被添加,所以判断不判断没啥区别~
// 所以我的猜测是代码以前实现的是明显区分两类的,但随着代码的更新二者的界限已经没那么明显了
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// 代码源自client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
// 启动前、后对于添加处理器的逻辑是不同,启动前的处理器是不会立刻启动连个协程执行处理器的pop()和run()函数的
// 而是在这里统一的启动
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 遍历所有的处理器,然后为处理器启动两个后台协程
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 等待退出信号
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 关闭addCh,processorListener.pop()这个协程就会退出,不明白的可以再次回顾代码
// 因为processorListener.pop()会关闭processorListener.nextCh,processorListener.run()就会退出
// 所以这里只要关闭processorListener.addCh就可以自动实现两个协程的退出,不得不说设计的还是挺巧妙的
for _, listener := range p.listeners {
close(listener.addCh)
}
// 等待所有的协程退出,这里指的所有协程就是所有处理器的那两个协程
p.wg.Wait()
}
// 代码源自client-go/tools/cache/shared_informer.go
// lw:这个是apiserver客户端相关的,用于Reflector从apiserver获取资源,所以需要外部提供
// objType:这个SharedInformer监控的对象类型
// resyncPeriod:同步周期,SharedInformer需要多长时间给使用者发送一次全量对象的同步时间
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
// 还是用SharedIndexInformer实现的
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}
// 创建SharedIndexInformer对象,其中大部分参数再上面的函数已经介绍了
// indexers:需要外部提供计算对象索引键的函数,也就是这里面的对象需要通过什么方式创建索引
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// 管理所有处理器用的,这个上面的章节解释了
processor: &sharedProcessor{clock: realClock},
// 其实就是在构造cache,读者可以自行查看NewIndexer()的实现,
// 在cache中的对象用DeletionHandlingMetaNamespaceKeyFunc计算对象键,用indexers计算索引键
// 可以想象成每个对象键是Namespace/Name,每个索引键是Namespace,即按照Namesapce分类
// 因为objType决定了只有一种类型对象,所以Namesapce是最大的分类
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
// 下面这两主要就是给Controller用,确切的说是给Reflector用的
listerWatcher: lw,
objectType: objType,
// 无论是否需要定时同步,SharedInformer都提供了一个默认的同步时间,当然这个是外部设置的
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
// 默认没有开启的对象突变检测器,没啥用,也不多介绍
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
// 代码源自client-go/tools/cache/shared_informer.go
// 添加没有指定同步周期的事件处理器
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
// defaultEventHandlerResyncPeriod是默认的同步周期,在创建SharedInformer的时候设置的
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
// 添加需要定期同步的事件处理器
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
// 因为是否已经开始对于添加事件处理器的方式不同,后面会有介绍,所以此处加了锁
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 如果已经结束了,那就可以直接返回了
if s.stopped {
return
}
// 如果有同步周期,==0就是永远不用同步
if resyncPeriod > 0 {
// 同步周期不能太短,太短对于系统来说反而是个负担,大量的无效计算浪费在这上面
if resyncPeriod < minimumResyncPeriod {
resyncPeriod = minimumResyncPeriod
}
// SharedInformer管理了很多处理器,每个处理器都有自己的同步周期,所以此处要统一成一个,称之为对齐
// SharedInformer会选择所有处理器中最小的那个作为所有处理器的同步周期,称为对齐后的同步周期
// 此处就要判断是不是比当前对齐后的同步周期还要小
if resyncPeriod < s.resyncCheckPeriod {
// 如果已经启动了,那么只能用和大家一样的周期
if s.started {
resyncPeriod = s.resyncCheckPeriod
// 如果没启动,那就让大家都用最新的对齐同步周期
} else {
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
// 创建处理器,代码一直用listener,可能想强调没事件就挂起把,我反而想用处理器这个名词
// determineResyncPeriod()这个函数读者自己分析把,非常简单,这里面只要知道创建了处理器就行了
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
// 如果没有启动,那么直接添加处理器就可以了
if !s.started {
s.processor.addListener(listener)
return
}
// 这个锁就是暂停再想所有的处理器分发事件用的,因为这样会遍历所有的处理器,此时添加会有风险
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 添加处理器
s.processor.addListener(listener)
// 这里有意思啦,遍历缓冲中的所有对象,通知处理器,因为SharedInformer已经启动了,可能很多对象已经让其他的处理器处理过了,
// 所以这些对象就不会再通知新添加的处理器,此处就是解决这个问题的
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
// 代码源自client-go/tools/cache/shared_informer.go
// sharedIndexInformer的核心逻辑函数
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 在此处构造的DeltaFIFO
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 这里的Config是我们介绍Reflector时介绍的那个Config
cfg := &Config{
// 我前面一直在说Reflector输入到DeltaFIFO,这里算是直接证明了
Queue: fifo,
// 下面这些变量我们在Reflector都说了,这里赘述
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// 这个才是重点,Controller调用DeltaFIFO.Pop()接口传入的就是这个回调函数,也是我们后续重点介绍的
Process: s.HandleDeltas,
}
// 创建Controller,这个不用多说了
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 这个processorStopCh 是给sharedProcessor和cacheMutationDetector传递退出信号的
// 因为这里要创建两个协程运行sharedProcessor和cacheMutationDetector的核心函数
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
// Run()函数都退出了,也就应该设置结束的标识了
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true
}()
// 启动Controller,Controller一旦运行,整个流程就开始启动了,所以叫Controller也不为过
// 毕竟Controller是SharedInformer的发动机嘛
s.controller.Run(stopCh)
}
sharedIndexInformer通过Run()函数启动了Controller和sharedProcess(),Controller通过DeltaFIFO.Pop()函数弹出Deltas,并调用函数处理,这个处理函数就是sharedIndexInformer.HandleDeltas(),这个函数是衔接Controller和sharedProcess的关键点,他把Deltas转换为sharedProcess需要的各种Notification类型。下面我们就对这个函数进行代码分析:
// 代码源自client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// 看到这里就知道为啥起名为blockDeltas了,这是阻塞处理器Deltas啊~因为分发事件到处理器,所以要加锁
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// Deltas里面包含了一个对象的多个增量操作,所以要从最老的Delta到最先的Delta遍历处理
for _, d := range obj.(Deltas) {
// 根据不同的Delta做不同的操作,但是大致分为对象添加、删除两大类操作
// 所有的操作都要先同步到cache在通知处理器,这样保持处理器和cache的状态是一致的
switch d.Type {
// 同步、添加、更新都是对象添加类的造作,至于是否是更新还要看cache是否有这个对象
case Sync, Added, Updated:
// 看看对象是不是有定时同步产生的事件
isSync := d.Type == Sync
// 检测突变,没啥用
s.cacheMutationDetector.AddObject(d.Object)
// 如果cache中有的对象,一律看做是更新事件
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 把对象更新到cache中
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 通知处理器处理事件
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
// cache中没有的对象,一律看做是新增事件
} else {
// 把对象添加到cache中
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 通知处理器处理器事件
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
// 对象被删除
case Deleted:
// 从cache中删除对象
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 通知所有的处理器对象被删除了
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
利用apiserver的api实现资源的列举和监控(Reflector实现);
利用cache存储apiserver中的部分对象,通过对象类型进行制定,并在cache中采用Namespace做对象的索引
先通过apiserver的api将对象的全量列举出来存储在cache中,然后再watch资源,一旦有变化就更新cache中;
更新到cache中的过程通过DeltaFIFO实现的有顺序的更新,因为资源状态是通过全量+增量方式实现同步的,所以顺序错误会造成状态不一致;
使用者可以注册回调函数(类似挂钩子),在更新到cache的同时通知使用者处理,为了保证回调处理不被某一个处理器阻塞,SharedInformer实现了processorListener异步缓冲处理;
整个过程是Controller是发动机,驱动整个流程运转;
用一幅图来总结SharedInformer(其中Reflector.resync()因为是个匿名函数,所以用斜体,其实是不存在这个函数的)
Lister用于获取某个资源(如Pod)的全量,Watcher用于获取某个资源的增量变化。实际使用中Lister和Watcher都从apiServer获取资源信息,Lister一般用于首次获取某资源(如Pod)的全量信息,而Watcher用于持续获取该资源的增量变化信息。Lister和Watcher的接口定义如下,使用NewListWatchFromClient函数来初始化ListerWatcher
// client-go/tools/cache/listwatch.go
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
在workqueue的例子中可以看到调用NewListWatchFromClient的地方,该例子会从clientset.CoreV1().RESTClient()获取"pods"的相关信息。
ListerWatcher是针对某一类对象的,比如Pod,不是所有对象的,这个在构造ListerWatcher对象的时候由apiserver的client类型决定了。
// client-go/examples/workqueue/main.go
// create the pod watcher
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
cache.NewListWatchFromClient函数中的资源名称可以从types.go中获得
/ k8s.io/api/core/v1/types.go
const (
// Pods, number
ResourcePods ResourceName = "pods"
// Services, number
ResourceServices ResourceName = "services"
// ReplicationControllers, number
ResourceReplicationControllers ResourceName = "replicationcontrollers"
// ResourceQuotas, number
ResourceQuotas ResourceName = "resourcequotas"
// ResourceSecrets, number
ResourceSecrets ResourceName = "secrets"
// ResourceConfigMaps, number
ResourceConfigMaps ResourceName = "configmaps"
// ResourcePersistentVolumeClaims, number
ResourcePersistentVolumeClaims ResourceName = "persistentvolumeclaims"
// ResourceServicesNodePorts, number
ResourceServicesNodePorts ResourceName = "services.nodeports"
// ResourceServicesLoadBalancers, number
ResourceServicesLoadBalancers ResourceName = "services.loadbalancers"
// CPU request, in cores. (500m = .5 cores)
ResourceRequestsCPU ResourceName = "requests.cpu"
// Memory request, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
ResourceRequestsMemory ResourceName = "requests.memory"
// Storage request, in bytes
ResourceRequestsStorage ResourceName = "requests.storage"
// Local ephemeral storage request, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
ResourceRequestsEphemeralStorage ResourceName = "requests.ephemeral-storage"
// CPU limit, in cores. (500m = .5 cores)
ResourceLimitsCPU ResourceName = "limits.cpu"
// Memory limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
ResourceLimitsMemory ResourceName = "limits.memory"
// Local ephemeral storage limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
ResourceLimitsEphemeralStorage ResourceName = "limits.ephemeral-storage"
)
除了可以从CoreV1版本的API group获取RESTClient信息外,还可以从下面Clientset结构体定义的API group中获取信息
// client-go/kubernetes/clientset.go
type Clientset struct {
*discovery.DiscoveryClient
admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
appsV1 *appsv1.AppsV1Client
appsV1beta1 *appsv1beta1.AppsV1beta1Client
appsV1beta2 *appsv1beta2.AppsV1beta2Client
auditregistrationV1alpha1 *auditregistrationv1alpha1.AuditregistrationV1alpha1Client
authenticationV1 *authenticationv1.AuthenticationV1Client
authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client
authorizationV1 *authorizationv1.AuthorizationV1Client
authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client
autoscalingV1 *autoscalingv1.AutoscalingV1Client
autoscalingV2beta1 *autoscalingv2beta1.AutoscalingV2beta1Client
autoscalingV2beta2 *autoscalingv2beta2.AutoscalingV2beta2Client
batchV1 *batchv1.BatchV1Client
batchV1beta1 *batchv1beta1.BatchV1beta1Client
batchV2alpha1 *batchv2alpha1.BatchV2alpha1Client
certificatesV1beta1 *certificatesv1beta1.CertificatesV1beta1Client
coordinationV1beta1 *coordinationv1beta1.CoordinationV1beta1Client
coordinationV1 *coordinationv1.CoordinationV1Client
coreV1 *corev1.CoreV1Client
eventsV1beta1 *eventsv1beta1.EventsV1beta1Client
extensionsV1beta1 *extensionsv1beta1.ExtensionsV1beta1Client
networkingV1 *networkingv1.NetworkingV1Client
networkingV1beta1 *networkingv1beta1.NetworkingV1beta1Client
nodeV1alpha1 *nodev1alpha1.NodeV1alpha1Client
nodeV1beta1 *nodev1beta1.NodeV1beta1Client
policyV1beta1 *policyv1beta1.PolicyV1beta1Client
rbacV1 *rbacv1.RbacV1Client
rbacV1beta1 *rbacv1beta1.RbacV1beta1Client
rbacV1alpha1 *rbacv1alpha1.RbacV1alpha1Client
schedulingV1alpha1 *schedulingv1alpha1.SchedulingV1alpha1Client
schedulingV1beta1 *schedulingv1beta1.SchedulingV1beta1Client
schedulingV1 *schedulingv1.SchedulingV1Client
settingsV1alpha1 *settingsv1alpha1.SettingsV1alpha1Client
storageV1beta1 *storagev1beta1.StorageV1beta1Client
storageV1 *storagev1.StorageV1Client
storageV1alpha1 *storagev1alpha1.StorageV1alpha1Client
}
RESTClient()的返回值为Interface接口类型,该类型中包含如下对资源的操作方法,如Get()就封装了HTTP的Get方法。NewListWatchFromClient初始化ListWatch的时候使用了Get方法
// client-go/rest/client.go
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
controller的结构如下,其包含一个配置变量config,在注释中可以看到Config.Queue就是DeltaFIFO。controller定义了如何调度Reflector。
此controller非我们比较熟悉的controller-manager管理的各种各样的controller,kubernetes里面controller简直是泛滥啊。这里的controller定义在client-go/tools/cache/controller.go中,目的是用来把Reflector、DeltaFIFO组合起来形成一个相对固定的、标准的处理流程。理解了Controller,基本就算把SharedInfomer差不多搞懂了。话不多说,先上代码:
// 代码源自client-go/tools/cache/controller.go
type Config struct {
Queue // SharedInformer使用DeltaFIFO
ListerWatcher // 这个用来构造Reflector
Process ProcessFunc // 这个在调用DeltaFIFO.Pop()使用,弹出对象要如何处理
ObjectType runtime.Object // 对象类型,这个肯定是Reflector使用
FullResyncPeriod time.Duration // 全量同步周期,这个在Reflector使用
ShouldResync ShouldResyncFunc // Reflector在全量更新的时候会调用该函数询问
RetryOnError bool // 错误是否需要尝试
}
// 这是一个Controller的抽象
type Controller interface {
Run(stopCh <-chan struct{}) // 核心流程函数
HasSynced() bool // apiserver中的对象是否已经同步到了Store中
LastSyncResourceVersion() string // 最新的资源版本号
}
// 代码源自client-go/tools/cache/controller.go
// controller是Controller的实现类型
type controller struct {
config Config // 配置,上面有讲解
reflector *Reflector // 反射器
reflectorMutex sync.RWMutex // 反射器的锁
clock clock.Clock // 时钟
}
// 核心业务逻辑实现
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 创建一个协程,如果收到系统退出的信号就关闭队列,相当于在这里析构的队列
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 创建Reflector,传入的参数都是我们上一个章节解释过的,这里不赘述
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
// r.ShouldResync的存在就是为了以后使用少些一点代码?否则直接使用c.config.ShouldResync不就完了么?不明白用意
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
// 记录反射器
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
// wait.Group不是本章的讲解内容,只要把它理解为类似barrier就行了
// 被他管理的所有的协程都退出后调用Wait()才会退出,否则就会被阻塞
var wg wait.Group
defer wg.Wait()
// StartWithChannel()会启动协程执行Reflector.Run(),同时接收到stopCh信号就会退出协程
wg.StartWithChannel(stopCh, r.Run)
// wait.Until()在前面的章节讲过了,周期性的调用c.processLoop(),这里来看是1秒
// 不用担心调用频率太高,正常情况下c.processLoop是不会返回的,除非遇到了解决不了的错误,因为他是个循环
wait.Until(c.processLoop, time.Second, stopCh)
}
// 代码源自client-go/tools/cache/controller.go
func (c *controller) processLoop() {
for {
// 从队列中弹出一个对象,然后处理它,这才是最主要的部分,这个c.config.Process是构造Controller的时候通过Config传进来的
// 所以这个读者要特别注意了,这个函数其实是ShareInformer传进来的,所以在分析SharedInformer的时候要重点分析的
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
// 如果FIFO关闭了那就退出
if err == FIFOClosedError {
return
}
// 如果错误可以再试试
if c.config.RetryOnError {
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
// 代码源自client-go/tools/cache/controller.go
// HasSynced() 调用的就是DeltaFIFO.HasSynced()实现的
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
// LastSyncResourceVersion() 是利用Reflector实现的
func (c *controller) LastSyncResourceVersion() string {
if c.reflector == nil {
return ""
}
return c.reflector.LastSyncResourceVersion()
}
controller的框架比较简单它使用wg.StartWithChannel启动Reflector.Run,相当于启动了一个DeltaFIFO的生产者(wg.StartWithChannel(stopCh, r.Run)表示可以将r.Run放在独立的协程运行,并可以使用stopCh来停止r.Run);使用wait.Until来启动一个消费者(wait.Until(c.processLoop, time.Second, stopCh)表示每秒会触发一次c.processLoop,但如果c.processLoop在1秒之内没有结束,则运行c.processLoop继续运行,不会结束其运行状态)
processLoop的框架也很简单,它运行了DeltaFIFO.Pop函数,用于消费DeltaFIFO中的对象,并在DeltaFIFO.Pop运行失败后可能重新处理该对象(AddIfNotPresent)
SharedInformerFactory就是构造各种Informer的地方,每个SharedInformer其实只负责一种对象,在构造SharedInformer的时候指定了对象类型。SharedInformerFactory可以构造Kubernetes里所有对象的Informer,而且主要用在controller-manager这个服务中。因为controller-manager负责管理绝大部分controller,每类controller不仅需要自己关注的对象的informer,同时也可能需要其他对象的Informer(比如ReplicationController也需要PodInformer,否则他无法感知Pod的启动和关闭,也就达不到监控的目的了),所以一个SharedInformerFactory可以让所有的controller共享使用同一个类对象的Informer。
虽然有同名的,但是他们在不同的包中,虽然代码上有很多相似的地方,但是确实是完全独立的对象。
// 代码源自client-go/informers/factory.go
// SharedInformerFactory是个interfaces,所以肯定有具体的实现类
type SharedInformerFactory interface {
// 在informers这个包中又定义了一个SharedInformerFactory,这个主要是包内抽象,所以此处继承了这个接口
internalinterfaces.SharedInformerFactory
// 这个暂时不知道干啥用,所以我也不介绍他了
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
// 等待所有的Informer都已经同步完成,这里同步其实就是遍历调用SharedInformer.HasSynced()
// 所以函数需要周期性的调用指导所有的Informer都已经同步完毕
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface // 返回admissionregistration相关的Informer组
Apps() apps.Interface // 返回app相关的Informer组
Autoscaling() autoscaling.Interface // 返回autoscaling相关的Informer组
Batch() batch.Interface // 返回job相关的Informer组
Certificates() certificates.Interface // 返回certificates相关的Informer组
Coordination() coordination.Interface // 返回coordination相关的Informer组
Core() core.Interface // 返回core相关的Informer组
Events() events.Interface // 返回event相关的Informer组
Extensions() extensions.Interface // 返回extension相关的Informer组
Networking() networking.Interface // 返回networking相关的Informer组
Policy() policy.Interface // 返回policy相关的Informer组
Rbac() rbac.Interface // 返回rbac相关的Informer组
Scheduling() scheduling.Interface // 返回scheduling相关的Informer组
Settings() settings.Interface // 返回settings相关的Informer组
Storage() storage.Interface // 返回storage相关的Informer组
}
// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
// 核心逻辑函数,类似于很多类的Run()函数
Start(stopCh <-chan struct{})
// 这个很关键,通过对象类型,返回SharedIndexInformer,这个SharedIndexInformer管理的就是指定的对象
// NewInformerFunc用于当SharedInformerFactory没有这个类型的Informer的时候创建使用
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// 创建Informer的函数定义,这个函数需要apiserver的客户端以及同步周期,这个同步周期在SharedInformers反复提到
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
// 代码源自client-go/informers/factory.go
type sharedInformerFactory struct {
// apiserver的客户端,暂时不用关心怎么实现的,只要知道他能列举和监听资源就可以了
client kubernetes.Interface
// 哈哈,这样看来每个namesapce需要一个SharedInformerFactory,那cache用namespace建索引还有啥用呢?
// 并不是所有的使用者都需要指定namesapce,比如kubectl,他就可以列举所有namespace的资源,所以他没有指定namesapce
namespace string
// 这是个函数指针,用来调整列举选项的,这个选项用来client列举对象使用
tweakListOptions internalinterfaces.TweakListOptionsFunc
// 互斥锁
lock sync.Mutex
// 默认的同步周期,这个在SharedInformer需要用
defaultResync time.Duration
// 每个类型的Informer有自己自定义的同步周期
customResync map[reflect.Type]time.Duration
// 每类对象一个Informer,但凡使用SharedInformerFactory构建的Informer同一个类型其实都是同一个Informer
informers map[reflect.Type]cache.SharedIndexInformer
// 各种Informer启动的标记
startedInformers map[reflect.Type]bool
}
// 代码源自client-go/tools/cache/shared_informer.go
// 这是一个通用的构造SharedInformerFactory的接口函数,没有任何其他的选项,只包含了apiserver的client以及同步周期
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
// 最终是调用NewSharedInformerFactoryWithOptions()实现的,无非没有选项而已
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
// 相比于上一个通用的构造函数,这个构造函数增加了namesapce过滤和调整列举选项
func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
// 最终是调用NewSharedInformerFactoryWithOptions()实现的,无非选项是2个
// WithNamespace()和WithTweakListOptions()会在后文讲解
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}
// 到了构造SharedInformerFactory核心函数了,其实SharedInformerOption是个有意思的东西
// 我们写程序喜欢Option是个结构体,但是这种方式的扩展很麻烦,这里面用的是回调函数,这个让我get到新技能了
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
// 默认只有apiserver的client以及同步周期是需要外部提供的其他的都是可以有默认值的
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
//逐一遍历各个选项函数,opt是选项函数,下面面有详细介绍
for _, opt := range options {
factory = opt(factory)
}
return factory
}
// 代码源自client-go/informers/factory.go
// 这个是SharedInformerFactory构造函数的选项,是一个函数指针,传入的是工厂指针,返回也是工厂指针
// 很明显,选项函数直接修改工厂对象,然后把修改的对象返回就可以了
type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
// 把每个对象类型的同步周期这个参数转换为SharedInformerOption类型
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
// 这个实现很简单了,我就不多解释了
return func(factory *sharedInformerFactory) *sharedInformerFactory {
for k, v := range resyncConfig {
factory.customResync[reflect.TypeOf(k)] = v
}
return factory
}
}
// 这个选项函数用于使用者自定义client的列举选项的
func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
factory.tweakListOptions = tweakListOptions
return factory
}
}
// 这个选项函数用来设置namesapce过滤的
func WithNamespace(namespace string) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
factory.namespace = namespace
return factory
}
}
// 代码源自client-go/informers/factory.go
// 其实sharedInformerFactory的Start()函数就是启动所有具体类型的Informer的过程
// 因为每个类型的Informer都是SharedIndexInformer,需要需要把每个SharedIndexInformer都要启动起来
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
// 加锁操作
f.lock.Lock()
defer f.lock.Unlock()
// 遍历informers这个map
for informerType, informer := range f.informers {
// 看看这个Informer是否已经启动过
if !f.startedInformers[informerType] {
// 如果没启动过,那就启动一个协程执行SharedIndexInformer的Run()函数,我们在分析SharedIndexInformer的时候
// 我们知道知道Run()是整个Informer的启动入口点,看了《深入浅出kubernetes之client-go的SharedInformer》
// 的同学应该会想Run()是谁调用的呢?这里面应该给你们答案了吧?
go informer.Run(stopCh)
// 设置Informer已经启动的标记
f.startedInformers[informerType] = true
}
}
}
// 代码源自client-go/informers/factory.go
// InformerFor()相当于每个类型Informer的构造函数了,即便具体实现构造的地方是使用者提供的
// 这个函数需要使用者传入对象类型,因为在sharedInformerFactory里面是按照对象类型组织的Informer
// 更有趣的是这些Informer不是sharedInformerFactory创建的,需要使用者传入构造函数
// 这样做既保证了每个类型的Informer只构造一次,同时又保证了具体Informer构造函数的私有化能力
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
// 加锁操作
f.lock.Lock()
defer f.lock.Unlock()
// 通过反射获取obj的类型
informerType := reflect.TypeOf(obj)
// 看看这个类型的Informer是否已经创建了?
informer, exists := f.informers[informerType]
// 如果Informer已经创建,那么就复用这个Informer
if exists {
return informer
}
// 获取这个类型定制的同步周期,如果定制的同步周期那就用统一的默认周期
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 调用使用者提供构造函数,然后把创建的Informer保存起来
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
// 代码源自client-go/informers/internalinterfaces/factory_interfaces.go
// 这个函数定义就是具体类型Informer的构造函数,后面会有地方说明如何使用
type NewInformerFunc func(kubernetes.Interface, time.Duration) cache.SharedIndexInformer
// 代码源自client-go/informers/core/v1/pod.go
// 这个PodInformer是抽象类,Informer()就是获取SharedIndexInformer的接口函数
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// 这个是PodInformer的实现类,看到了没,他需要工厂对象的指针,貌似明细了很多把?
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// 这个就是要传入工厂的构造函数了
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 这个是实现Informer()的地方,看到了把,这里面调用了工厂的InformerFor把自己注册进去
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
也就是说SharedInformerFactory的使用者使用Core().Pod() 这个接口构造了PodInformer,但是需要调用PodInformer.Informer()才能获取到的SharedIndexInformer,而正是这个接口实现了向工厂注册自己。既然已经涉及到了具体的Informer,我们就开始看看每个都是干啥的吧?
client-go为了方便管理,把Informer分类管理,具体的分类在开篇那个图里面已经展示了
// 代码源自client-go/informers/factory.go
func (f *sharedInformerFactory) Core() core.Interface {
// 调用了内核包里面的New()函数,详情见下文分析
return core.New(f, f.namespace, f.tweakListOptions)
}
// 代码源自client-go/informers/core/interface.go
// Interface又是一个被玩坏的名字,如果没有报名,根本不知道干啥的
type Interface interface {
V1() v1.Interface // 只有V1一个版本
}
// 这个是Interface的实现类,从名字上没任何关联吧?其实开发者命名也是挺有意思的,Interface定义的是接口
// 供外部使用,group也有意义,因为Core确实是内核Informer的分组
type group struct {
// 需要工厂对象的指针
factory internalinterfaces.SharedInformerFactory
// 这两个变量决定了Core这个分组对于SharedInformerFactory来说只有以下两个选项
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// 构造Interface的接口
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
// 代码也挺简单的,不多说了
return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// 实现V1()这个接口的函数
func (g *group) V1() v1.Interface {
// 通过调用v1包的New()函数实现的,下面会有相应代码的分析
return v1.New(g.factory, g.namespace, g.tweakListOptions)
}
// 代码源自client-go/informers/core/v1/interface.go
// 还是抽象类
type Interface interface {
// 获取ComponentStatusInformer
ComponentStatuses() ComponentStatusInformer
// 获取ConfigMapInformer
ConfigMaps() ConfigMapInformer
// 获取EndpointsInformer
Endpoints() EndpointsInformer
// 获取EventInformer
Events() EventInformer
// 获取LimitRangeInformer
LimitRanges() LimitRangeInformer
// 获取NamespaceInformer
Namespaces() NamespaceInformer
// 获取NodeInformer
Nodes() NodeInformer
// 获取PersistentVolumeInformer
PersistentVolumes() PersistentVolumeInformer
// 获取PersistentVolumeClaimInformer
PersistentVolumeClaims() PersistentVolumeClaimInformer
// 获取PodInformer
Pods() PodInformer
// 获取PodTemplateInformer
PodTemplates() PodTemplateInformer
// 获取ReplicationControllerInformer
ReplicationControllers() ReplicationControllerInformer
// 获取ResourceQuotaInformer
ResourceQuotas() ResourceQuotaInformer
// 获取SecretInformer
Secrets() SecretInformer
// 获取ServiceInformer
Services() ServiceInformer
// 获取ServiceAccountInformer
ServiceAccounts() ServiceAccountInformer
}
// 这个就是上面抽象类的实现了,这个和Core分组的命名都是挺有意思,分组用group作为实现类名
// 这个用version作为实现类名,确实这个是V1版本
type version struct {
// 工厂的对象指针
factory internalinterfaces.SharedInformerFactory
// 两个选项,不多说了,说了好多遍了
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// 这个就是Core分组V1版本的构造函数啦
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
// 应该好理解吧?
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
Core分组有管理了很多Informer,每一个Informer负责获取相应类型的对象。
PodInformer是通过Core分组Pods()创建的
// 代码源自client-go/informers/core/v1/interface.go
// 上面我们已经说过了version是v1.Interface的实现
func (v *version) Pods() PodInformer {
// 返回了podInformer的对象,说明podInformer是PodInformer 实现类
return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// 代码源自client-go/informers/core/v1/pod.go
// PodInformer定义了两个接口,分别为Informer()和Lister(),Informer()用来获取SharedIndexInformer对象
// Lister()用来获取PodLister对象,这个后面会有说明,当前可以不用关心
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
// PodInformer的实现类,参数都是上面层层传递下来的,这里不说了
type podInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// 这个就是需要传递给SharedInformerFactory的构造函数啦,前面也提到了
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
// NewFilteredPodInformer下面有代码注释
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 实现了PodInformer.Informer()接口函数
func (f *podInformer) Informer() cache.SharedIndexInformer {
// 此处调用了工厂实现了Informer的创建
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// 实现了PodInformer.Lister()接口函数
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
// 真正创建PodInformer的函数
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
// 还有谁记得构造SharedIndexInformer需要写啥?自己温习《深入浅出kubernetes之client-go的SharedInformer》
return cache.NewSharedIndexInformer(
// 需要ListWatch两个函数,就是用apiserver的client实现的,此处不重点解释每个代码什么意思
// 读者应该能够看懂是利用client实现了Pod的List和Watch
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(options)
},
},
// 这个是要传入对象的类型,肯定是Pod了
&corev1.Pod{},
// 同步周期
resyncPeriod,
// 对象键的计算函数
indexers,
)
}
下面以(Core,v1,podInformer)为例结合client-go中提供的代码进行讲解。代码如下,在调用informers.Core().V1().Pods().Informer()的时候会同时调用informers.InformerFor注册到sharedInformerFactory,后续直接调用informers.Start启动注册的informer。
// client-go/examples/fake-client/main_test.go
func TestFakeClient(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create the fake client.
client := fake.NewSimpleClientset()
// We will create an informer that writes added pods to a channel.
pods := make(chan *v1.Pod, 1)
informers := informers.NewSharedInformerFactory(client, 0) //创建一个新的shareInformerFactory
podInformer := informers.Core().V1().Pods().Informer() //创建一个podInformer,并调用InformerFor函数进行注册
podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
t.Logf("pod added: %s/%s", pod.Namespace, pod.Name)
pods <- pod
},
})
// Make sure informers are running.
informers.Start(ctx.Done()) //启动所有的informer
...
包缓存中的基本控制器中定义的一个Informer从Delta Fifo队列中弹出对象。执行此操作的函数是processLoop。这个基本控制器的工作是保存对象以便以后检索,并调用我们的控制器将对象传递给它。
controller机制的基础
Client-go包中一个相对较为高端的设计在于Informer的设计,我们知道我们可以直接通过Kubernetes API交互,但是考虑一点就是交互的形式,Informer设计为List/Watch的方式。Informer在初始化的时先通过List去从Kubernetes API中取出资源的全部object对象,并同时缓存,然后后面通过Watch的机制去监控资源,这样的话,通过Informer及其缓存,我们就可以直接和Informer交互而不是每次都和Kubernetes API交互。
Informer另外一块内容在于提供了事件handler机制,并会触发回调,这样上层应用如Controller就可以基于回调处理具体业务逻辑。因为Informer通过List、Watch机制可以监控到所有资源的所有事件,因此只要给Informer添加ResourceEventHandler 实例的回调函数实例取实现OnAdd(obj interface{}) OnUpdate(oldObj, newObj interface{}) 和 OnDelete(obj interface{})这三个方法,就可以处理好资源的创建、更新和删除操作
Kubernetes中都是各种controller的实现,各种controller都会用到Informer。
# tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
Indexer为对象提供索引功能。它是在包缓存中的类型索引器中定义的。一个典型的索引用例是基于对象标签创建索引。索引器可以基于多个索引函数维护索引。索引器使用线程安全的数据存储来存储对象及其密钥。包缓存中的类型存储中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数将对象的键生成为该对象的<namespace>/<name>
组合。
提供object对象的索引,是线程安全的,缓存对象信息
# tools/cache/index.go
// 索引器是一个存储界面,使您可以使用多个索引功能列出对象。
// 这里有三种字符串。
// 一个是存储密钥,如存储界面中所定义。
// 另一种是索引的名称。
// 第三类字符串是“索引值”,它是由
// IndexFunc,可以是字段值或从对象计算出的任何其他字符串。
type Indexer interface {
Store // 此处继承了Store这个interface,定义在cliet-go/tool/cache/store.go中
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
// Indexer在Store基础上扩展了索引能力,那Indexer是如何实现索引的呢?让我们来看看几个非常关键的类型:
// IndexFunc知道如何计算对象的一组索引值。计算索引的函数,传入对象,输出字符串索引
type IndexFunc func(obj interface{}) ([]string, error)
// IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns
// unique values for every object. This conversion can create errors when more than one key is found. You
// should prefer to make proper key and index functions.
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
return func(obj interface{}) (string, error) {
indexKeys, err := indexFunc(obj)
if err != nil {
return "", err
}
if len(indexKeys) > 1 {
return "", fmt.Errorf("too many keys: %v", indexKeys)
}
if len(indexKeys) == 0 {
return "", fmt.Errorf("unexpected empty indexKeys")
}
return indexKeys[0], nil
}
}
const (
// NamespaceIndex is the lookup name for the most comment index function, which is to index by the namespace field.
NamespaceIndex string = "namespace"
)
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
// 每种计算索引的方式会输出多个索引(数组),而多个目标可能会算出相同索引,所以就有了这个类型
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String
// 计算索引的函数有很多,用名字分类
// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc
// 由于有多种计算索引的方式,那就又要按照计算索引的方式组织索引
// Indices maps a name to an Index
type Indices map[string]Index
定义这些类型的目的。何所谓索引,索引目的就是为了快速查找。比如,我们需要查找某个节点上的所有Pod,那就要Pod按照节点名称排序,对应上面的Index类型就是map[nodename]sets.podname。我们可能有很多种查找方式,这就是Indexers这个类型作用了。下面的图帮助读者理解,不代表真正实现:
Indexers和Indices都是按照IndexFunc(名字)分组, 每个IndexFunc输出多个IndexKey,产生相同IndexKey的多个对象存储在一个集合中。注意:上图中不代表Indexers和Indices都指向了同一个数据,只是示意都用相同的IndexFunc的名字。
为了方便后面内容的开展,我们先统一一些概念:
有了上面的基础,我们再来看看Indexer与索引相关的接口都定义了哪些?
// indexName索引类,obj是对象,计算obj在indexName索引类中的索引键,通过索引键把所有的对象取出来
// 基本就是获取符合obj特征的所有对象,所谓的特征就是对象在索引类中的索引键
Index(indexName string, obj interface{}) ([]interface{}, error)
// indexKey是indexName索引类中一个索引键,函数返回indexKey指定的所有对象键
// 这个对象键是Indexer内唯一的,在添加的时候会计算,后面讲具体Indexer实例的会讲解
IndexKeys(indexName, indexKey string) ([]string, error)
// 获取indexName索引类中的所有索引键
ListIndexFuncValues(indexName string) []string
// 这个函数和Index类似,只是返回值不是对象键,而是所有对象
ByIndex(indexName, indexKey string) ([]interface{}, error)
// 返回Indexers
GetIndexers() Indexers
// 添加Indexers,就是增加更多的索引分类
AddIndexers(newIndexers Indexers) error
我相信通过我的注释很多人已经对Indexer有了初步认识,我们再来看看Store这个interface有哪些接口:
# tools/cache/store.go
// Store is a generic object storage interface. Reflector knows how to watch a server
// and update a store. A generic store is provided, which allows Reflector to be used
// as a local caching system, and an LRU store, which allows Reflector to work like a
// queue of items yet to be processed.
//
// Store makes no assumptions about stored object identity; it is the responsibility
// of a Store implementation to provide a mechanism to correctly key objects and to
// define the contract for obtaining objects by some arbitrary key type.
type Store interface {
Add(obj interface{}) error// 添加对象
Update(obj interface{}) error// 更新对象
Delete(obj interface{}) error// 删除对象
List() []interface{}// 列举对象
ListKeys() []string// 列举对象键
// 返回obj相同对象键的对象,对象键是通过对象计算出来的字符串
Get(obj interface{}) (item interface{}, exists bool, err error)
// 通过对象键获取对象
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
// 用[]interface{}替换Store存储的所有对象,等同于删除全部原有对象在逐一添加新的对象
Replace([]interface{}, string) error
Resync() error// 重新同步
}
从Store的抽象来看,要求每个对象都要有唯一的键,至于键的计算方式就看具体实现了。我们看了半天的各种抽象,是时候讲解一波具体实现了
cache就是在ThreadSafeStore的再封装
cache是Indexer的一种非常经典的实现,所有的对象缓存在内存中,而且从cache这个类型的名称来看属于包内私有类型,外部无法直接使用,只能通过专用的函数创建。其实cache的定义非常简单,如下所以:
# tools/cache/store.go
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
// 计算对象键的函数
type KeyFunc func(obj interface{}) (string, error)
...
// cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore// 线程安全的存储
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc // 计算对象键的函数
}
var _ Store = &cache{}
这里可以看出来cache有一个计算对象键的函数,创建cache对象的时候就要指定这个函数了。
# tools/cache/thread_safe_store.go
// ThreadSafeStore is an interface that allows concurrent access to a storage backend.
// TL;DR caveats: you must not modify anything returned by Get or List as it will break
// the indexing feature in addition to not being thread safe.
//
// The guarantees of thread safety provided by List/Get are only valid if the caller
// treats returned items as read-only. For example, a pointer inserted in the store
// through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
// on the same key and modify the pointer in a non-thread-safe way. Also note that
// modifying objects stored by the indexers (if any) will *not* automatically lead
// to a re-index. So it's not a good idea to directly modify the objects returned by
// Get/List, in general.
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
Resync() error
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex // 读写锁,毕竟读的多写的少,读写锁性能要更好
items map[string]interface{} // 存储对象的map,对象键:对象
// indexers maps a name to an IndexFunc
indexers Indexers// 这个不用多解释了把,用于计算索引键的函数map
// indices maps a name to an Index
indices Indices// 快速索引表,通过索引可以快速找到对象键,然后再从items中取出对象
}
// 添加对象函数
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()// 加锁,因为是写操作,所以是全部互斥的那种
defer c.lock.Unlock()
// 把老的对象取出来
oldObject := c.items[key]
// 写入新的对象
c.items[key] = obj
// 由于对象的添加就要更新索引
c.updateIndices(oldObject, obj, key)
}
// 更新对象函数,和添加对象一模一样,所以就不解释了,为啥Add函数不直接调用Update呢?
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
// 删除对象
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()// 加锁,因为是写操作,所以是全部互斥的那种
defer c.lock.Unlock()
// 判断对象是否存在?
if obj, exists := c.items[key]; exists {
// 删除对象的索引
c.deleteFromIndices(obj, key)
// 删除对象本身
delete(c.items, key)
}
}
// 获取对象
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()// 此处只用了读锁
defer c.lock.RUnlock()
// 利用对象键取出对象
item, exists = c.items[key]
return item, exists
}
// 列举对象
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()// 此处只用了读锁
defer c.lock.RUnlock()
// 直接遍历对象map就可以了
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
// 列举对象键
// ListKeys returns a list of all the keys of the objects currently
// in the threadSafeMap.
func (c *threadSafeMap) ListKeys() []string {
c.lock.RLock()// 此处只用了读锁
defer c.lock.RUnlock()
// 同样是遍历对象map,但是只输出对象键
list := make([]string, 0, len(c.items))
for key := range c.items {
list = append(list, key)
}
return list
}
// 取代所有对象,相当于重新构造了一遍threadSafeMap
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()// 此处必须要用全局锁,因为有写操作
defer c.lock.Unlock()
// 直接覆盖以前的对象
c.items = items
// rebuild any index
c.indices = Indices{}// 重建索引
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
// 发现没有,resourceVersion此处没有用到,估计是其他的Indexer实现有用
}
// 这个函数就是通过指定的索引函数计算对象的索引键,然后把索引键的对象全部取出来
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()// 只读,所以用读锁
defer c.lock.RUnlock()
// 取出indexName这个分类索引函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 计算对象的索引键
indexKeys, err := indexFunc(obj)
if err != nil {
return nil, err
}
// 取出indexName这个分类所有索引
index := c.indices[indexName]
// 返回对象的对象键的集合
var returnKeySet sets.String
if len(indexKeys) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
returnKeySet = index[indexKeys[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
returnKeySet = sets.String{}
// 遍历刚刚计算出来的所有索引键
for _, indexKey := range indexKeys {
// 把所有的对象键输出到对象键的集合中
for key := range index[indexKey] {
returnKeySet.Insert(key)
}
}
}
// 通过对象键逐一的把对象取出
list := make([]interface{}, 0, returnKeySet.Len())
for absoluteKey := range returnKeySet {
list = append(list, c.items[absoluteKey])
}
return list, nil
}
// 这个函数和上面的函数基本一样,只是索引键不用再计算了,使用者提供
// ByIndex returns a list of items that match an exact value on the index function
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
c.lock.RLock()// 同样是读锁
defer c.lock.RUnlock()
// 判断indexName这个索引分类是否存在
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 取出索引分类的所有索引
index := c.indices[indexName]
// 再出去索引键的所有对象键
set := index[indexKey]
// 遍历对象键输出
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
// 你会发现这个函数和ByIndex()基本一样,只是输出的是对象键
// IndexKeys returns a list of keys that match on the index function.
// IndexKeys is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
c.lock.RLock()// 同样是读锁
defer c.lock.RUnlock()
// 判断indexName这个索引分类是否存在
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 取出索引分类的所有索引
index := c.indices[indexName]
// 直接输出索引键内的所有对象键
set := index[indexKey]
return set.List(), nil
}
// 这个函数用来获取索引分类内的所有索引键的
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
c.lock.RLock()// 依然是读锁
defer c.lock.RUnlock()
// 获取索引分类的所有索引
index := c.indices[indexName]
// 直接遍历后输出索引键
names := make([]string, 0, len(index))
for key := range index {
names = append(names, key)
}
return names
}
func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}
oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)
if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}
for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
}
// 当有对象添加或者更新是,需要更新索引,因为代用该函数的函数已经加锁了,所以这个函数没有加锁操作
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
// 在添加和更新的时候都会获取老对象,如果存在老对象,那么就要删除老对象的索引,后面有说明
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
// 遍历所有的索引函数,因为要为对象在所有的索引分类中创建索引键
for name, indexFunc := range c.indexers {
// 计算索引键
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 获取索引分类的所有索引
index := c.indices[name]
if index == nil {
// 为空说明这个索引分类还没有任何索引
index = Index{}
c.indices[name] = index
}
// 遍历对象的索引键,上面刚刚用索引函数计算出来的
for _, indexValue := range indexValues {
// 找到索引键的对象集合
set := index[indexValue]
// 为空说明这个索引键下还没有对象
if set == nil {
// 创建对象键集合
set = sets.String{}
index[indexValue] = set
}
// 把对象键添加到集合中
set.Insert(key)
}
}
}
// 这个函数用于删除对象的索引的
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
// 遍历索引函数,也就是把所有索引分类
for name, indexFunc := range c.indexers {
// 计算对象的索引键
indexValues, err := indexFunc(obj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 获取索引分类的所有索引
index := c.indices[name]
if index == nil {
continue
}
// 遍历对象的索引键
for _, indexValue := range indexValues {
// 把对象从索引键指定对对象集合删除
set := index[indexValue]
if set != nil {
set.Delete(key)
// If we don't delete the set when zero, indices with high cardinality
// short lived resources can cause memory to increase over time from
// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
if len(set) == 0 {
delete(index, indexValue)
}
}
}
}
}
func (c *threadSafeMap) Resync() error {
// Nothing to do
return nil
}
索引键(indexers)和对象键(indices)是两个重要概念,索引键是用于对象快速查找的,经过索引建在map中排序查找会更快;对象键是为对象在存储中的唯一命名的,对象是通过名字+对象的方式存储的。
Indexer保存了来自apiServer的资源。使用listWatch方式来维护资源的增量变化。通过这种方式可以减小对apiServer的访问,减轻apiServer端的压力
cache实现了Indexer接口,但cache是包内私有的(首字母小写),只能通过包内封装的函数进行调用。
可以通过NewStore和NewIndexer初始化cache来返回一个Store或Indexer指针(cache实现了Store和Indexer接口)。NewStore和NewIndexer返回的Store和Indexer接口的数据载体为threadSafeMap,threadSafeMap通过NewThreadSafeStore函数初始化。
// tools/cache/store.go
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
index.go中给出了索引相关的操作(接口);store.go中给出了与操作存储相关的接口,并提供了一个cache实现,当然也可以实现自行实现Store接口;thread_safe_store.go为cache的私有实现。
client-go的indexer实际操作的还是threadSafeMap中的方法和数据,调用关系如下:
可以通过下图理解threadSafeMap中各种索引之间的关系
以namespace作为索引类型为例来讲,首先从indexers获取计算namespace的indexFunc,然后使用该indexFunc计算出与入参对象相关的所有namespaces。indices中保存了所有namespaces下面的对象键,可以获取特定namespace下面的所有对象键,在items中输入特定的对象键就可以得出特定的对象。indexers用于找出与特定对象相关的资源,如找出某Pod相关的secrets。
默认的indexFunc如下,根据对象的namespace进行分类
# tools/cache/index.go
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
cache结构中的keyFunc用于生成objectKey,下面是默认的keyFunc。
# tools/cache/thread_safe_store.go
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
这是对Informer实例的引用,该实例知道如何使用自定义资源对象。您的自定义控制器代码需要创建适当的Informer。
controller需要创建合适的Informer才能通过Informer reference操作资源对象
这是对Indexer实例的引用,该实例知道如何使用自定义资源对象。您的自定义控制器代码需要创建它。您将使用此参考来检索对象以供以后处理。
controller创建Indexer reference然后去利用索引做相关处理
client-go中的基本控制器提供NewIndexerInformer函数来创建Informer和Indexer。在您的代码中,您可以直接调用此函数,也可以使用工厂方法创建通知程序。
# kubernetes/client-go/blob/master/examples/workqueue/main.go#L174
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
# kubernetes/sample-controller/blob/master/main.go#L61
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
这些是回调函数,当Informer想要将一个对象传递到您的controller时,它将由Informer调用。编写这些功能的典型模式是获取调度对象的密钥,并将该密钥排入work queue以进行进一步处理。
Informer会回调这些handlers
这是您在控制器代码中创建的队列,用于将对象的交付与其处理分离。编写资源事件处理程序(Resource event handler)函数是为了提取交付的对象的键并将其添加到工作队列中。
Resource Event Handlers被回调后将key写到工作队列,这里的key相当于事件通知,后面根据取出事件后,做后续的处理
indexer用于保存apiserver的资源信息,而workqueue用于保存informer中的handler处理之后的数据。
workqueue的接口定义如下:
// 代码源自client-go/util/workqueue/queue.go
// 这是一个interface类型,说明有其他的各种各样的实现
type Interface interface {
Add(item interface{}) // 向队列中添加一个元素,interface{}类型,说明可以添加任何类型的元素
Len() int // 队列长度,就是元素的个数
Get() (item interface{}, shutdown bool) // 从队列中获取一个元素,双返回值,这个和chan的<-很像,第二个返回值告知队列是否已经关闭了
Done(item interface{}) // 告知队列该元素已经处理完了
ShutDown() // 关闭队列
ShuttingDown() bool // 查询队列是否正在关闭
}
参见上图可以看到真正处理的元素来自queue,dirty和queue中的元素可能不一致,不一致点来自于当Get一个元素后且Done执行前,此时Get操作会删除dirty中的该元素,如果此时发生了Add正在处理的元素的操作,由于此时dirty中没有该元素且processing中存在该元素,会发生dirty中的元素大于queue中元素的情况。但对某一元素的不一致会在Done完成后消除,即Done函数中会判断该元素是否在dirty中,如果存在则会将该元素append到queue中。总之,dirty中的数据都会被append到queue中,后续queue中的数据会insert到processing中进行处理()
dType实现了Interface接口。包含下面几个变量:
// 代码源于client-go/util/workqueue/queue.go
type Type struct {
queue []t // 元素数组
dirty set // dirty的元素集合
processing set // 正在处理的元素集合
cond *sync.Cond // 与pthread_cond_t相同,条件同步
shuttingDown bool // 关闭标记
metrics queueMetrics // 这个metrics和prometheus的metrics概念相同,此处不做过多说明,知道功能就行
}
// 以下的这些类型定义也是够了,对于C/C++程序猿来说不能忍~
type empty struct{} // 空类型,因为sizeof(struct{})=0
type t interface{} // 元素类型是泛型
type set map[t]empty // 用map实现的set,所有的value是空数据就行了
// 代码源自client-go/util/workqueue/queue.go
func (q *Type) Add(item interface{}) {
// 和pthread_cond_t不同的是golang的cond自带了互斥锁
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 队列正在关闭,直接返回
if q.shuttingDown {
return
}
// 已经标记为脏的数据,也直接返回,因为存储在了脏数据的集合中
if q.dirty.has(item) {
return
}
// 告知metrics添加了元素
q.metrics.add(item)
// 添加到脏数据集合中
q.dirty.insert(item)
// 元素刚被拿走处理,那就直接返回
if q.processing.has(item) {
return
}
// 追加到元素数组的尾部
q.queue = append(q.queue, item)
// 通知有新元素到了,此时有协程阻塞就会被唤醒
q.cond.Signal()
}
1. 队列关闭了,所以不接受任何数据,上面代码也是这么实现的;
2. 队列中没有该元素,那就直接存储在队列中;
3. 队列中已经有了该元素,这个改如何判断?set类型肯定最快,数组需要遍历效率太低,这也是dirty存在的价值之一,上面的代码也通过dirty判断元素是否存在的;
4. 队列曾经存储过该元素,但是已经被拿走还没有调用Done()时,也就是正在处理中的元素,此时再添加当前的元素应该是最新鲜的,处理中的应该是过时的,也就是脏的,我认为dirty的来源就是这个吧~
// 代码源自client-go/util/workqueue/queue.go
func (q *Type) Get() (item interface{}, shutdown bool) {
// 加锁解锁不解释
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 没有数据,阻塞协程
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
// 协程被激活但还没有数据,说明队列被关闭了,这个和chan一样
if len(q.queue) == 0 {
return nil, true
}
// 弹出第一个元素,我一直感觉golang的slice[1:]这种操作性能不太高~以后有时间看看代码实现
item, q.queue = q.queue[0], q.queue[1:]
// 通知metrics元素被取走了
q.metrics.get(item)
// 从dirty集合中移除,加入到processing集合,经过前面的分析这里就很好理解了
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
// 代码源自client-go/util/workqueue/queue.go
func (q *Type) Done(item interface{}) {
// 加锁解锁不解释
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 通知metrics元素处理完了
q.metrics.done(item)
// 从processing集合中删除
q.processing.delete(item)
// 重点来啦,此处判断脏元素集合,看看处理期间是不是又被添加,如果是那就在放到队列中,完全符合我们的分析
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
延时队列接口继承了queue的Interface接口,仅新增了一个AddAfter方法,它用于在duration时间之后将元素添加到queue中。
// client-go/util/workqueue/delaying_queue.go
type DelayingInterface interface {
Interface// 继承了通用队列所有接口
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)// 增加了延迟添加的接口
}
从延时队列的抽象来看,和通用队列基本一样,只是多了延迟添加的接口, 也就增加了一些机制实现元素的延迟添加,这一点可以从延时队列的实现类型上可以看出:
delayingType实现了DelayingInterface接口使用waitingForAddCh来传递需要添加到queue的元素,
// 代码源自client-go/util/workqueue/delaying_queue.go
type delayingType struct {
Interface // 这参数不出意外是通用队列的实现
clock clock.Clock // 时钟,用于获取时间
stopCh chan struct{} // 延时就意味着异步,就要有另一个协程处理,所以需要退出信号
heartbeat clock.Ticker // 定时器,在没有任何数据操作时可以定时的唤醒处理协程,定义为心跳没毛病
waitingForAddCh chan *waitFor // 所有延迟添加的元素封装成waitFor放到chan中
metrics retryMetrics // 和通用队列中的metrics功能类似
}
//
type waitFor struct {
data t // 元素数据,这个t就是在通用队列中定义的类型interface{}
readyAt time.Time // 在什么时间添加到队列中
index int // 这是个索引,后面会详细说明
}
delayingType.waitingForAddCh中的元素如果没有超过延时时间会添加到waitForPriorityQueue中,否则直接加入queue中。
// 代码源自client-go/util/workqueue/delaying_queue.go
// waitFor的定义上面有,是需要延时添加的元素都要封装成这个类型
// waitForPriorityQueue就把需要延迟的元素形成了一个队列,队列按照元素的延时添加的时间(readyAt)从小到大排序
// 实现的策略就是实现了go/src/container/heap/heap.go中的Interface类型,读者可以自行了解heap
// 这里只需要知道waitForPriorityQueue这个数组是有序的,排序方式是按照时间从小到大
type waitForPriorityQueue []*waitFor
// heap需要实现的接口,告知队列长度
func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
// heap需要实现的接口,告知第i个元素是否比第j个元素小
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt) // 此处对比的就是时间,所以排序按照时间排序
}
// heap需要实现的接口,实现第i和第j个元素换
func (pq waitForPriorityQueue) Swap(i, j int) {
// 这种语法好牛逼,有没有,C/C++程序猿没法理解~
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i // 因为heap没有所以,所以需要自己记录索引,这也是为什么waitFor定义索引参数的原因
pq[j].index = j
}
// heap需要实现的接口,用于向队列中添加数据
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n // 记录索引值
*pq = append(*pq, item) // 放到了数组尾部
}
// heap需要实现的接口,用于从队列中弹出最后一个数据
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)] // 缩小数组,去掉了最后一个元素
return item
}
// 返回第一个元素
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}
因为延时队列利用waitForPriorityQueue管理所有延时添加的元素,所有的元素在waitForPriorityQueue中按照时间从小到大排序,对于延时队列的处理就会方便很多了。
延时队列实现逻辑比较简单,需要注意的是waitingForQueue是以heap方式实现的队列,队列的pop和push等操作使用的是heap.pop和heap.push
接下来我们就可以分析延时队列的实现了,因为延时队列集成通用队列,所以这里只对新增的函数做说明:
// 代码源自client-go/util/workqueue/delaying_queue.go
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// 如果队列关闭就直接退出
if q.ShuttingDown() {
return
}
// 记录metrics
q.metrics.retry()
// 不需要延迟,那就直接像通用队列一样添加
if duration <= 0 {
q.Add(item)
return
}
// 把元素封装成waitFor传入chan,切记select没有default,所以可能会被阻塞
// 这里面用到了stopChan,因为有阻塞的可能,所以用stopChan可以保证退出
select {
case <-q.stopCh:
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
// 代码源自client-go/util/workqueue/delaying_queue.go
// 这部分就是演示队列的核心代码
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// 这个变量后面会用到,当没有元素需要延时添加的时候利用这个变量实现长时间等待
never := make(<-chan time.Time)
// 构造我们上面提到的有序队列了,并且初始化
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
// 这个map是用来避免对象重复添加的,如果重复添加就只更新时间
waitingEntryByData := map[t]*waitFor{}
// 开始无限循环
for {
// 队列关闭了,就可以返回了
if q.Interface.ShuttingDown() {
return
}
// 获取当前时间
now := q.clock.Now()
// 有序队列中是否有元素,有人肯定会问还没向有序队列里添加呢判断啥啊?后面会有添加哈
for waitingForQueue.Len() > 0 {
// Peek函数我们前面注释了,获取第一个元素,注意:不会从队列中取出哦
entry := waitingForQueue.Peek().(*waitFor)
// 元素指定添加的时间过了么?如果没有过那就跳出循环
if entry.readyAt.After(now) {
break
}
// 既然时间已经过了,那就把它从有序队列拿出来放入通用队列中,这里面需要注意几点:
// 1.heap.Pop()弹出的是第一个元素,waitingForQueue.Pop()弹出的是最后一个元素
// 2.从有序队列把元素弹出,同时要把元素从上面提到的map删除,因为不用再判断重复添加了
// 3.此处是唯一一个地方把元素从有序队列移到通用队列,后面主要是等待时间到过程
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// 如果有序队列中没有元素,那就不用等一段时间了,也就是永久等下去
// 如果有序队列中有元素,那就用第一个元素指定的时间减去当前时间作为等待时间,逻辑挺简单
// 有序队列是用时间排序的,后面的元素需要等待的时间更长,所以先处理排序靠前面的元素
nextReadyAt := never
if waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
}
// 进入各种等待
select {
// 有退出信号么?
case <-q.stopCh:
return
// 定时器,没过一段时间没有任何数据,那就再执行一次大循环,从理论上讲这个没用,但是这个具备容错能力,避免BUG死等
case <-q.heartbeat.C():
// 这个就是有序队列里面需要等待时间信号了,时间到就会有信号
case <-nextReadyAt:
// 这里是从chan中获取元素的,AddAfter()放入chan中的元素
case waitEntry := <-q.waitingForAddCh:
// 如果时间已经过了就直接放入通用队列,没过就插入到有序队列
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
// 下面的代码看似有点多,目的就是把chan中的元素一口气全部取干净,注意用了default意味着chan中没有数据就会立刻停止
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// 下面的代码是把元素插入有序队列的实现
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// 看看元素是不是被添加过?如果添加过看谁的时间靠后就用谁的时间
existing, exists := knownEntries[entry.data]
if exists {
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
return
}
// 把元素放入有序队列中,并记录在map里面,这个map就是上面那个用于判断对象是否重复添加的map
// 注意,这里面调用的是heap.Push,不是waitForPriorityQueue.Push
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
到这里延时队列核心代码基本分析完了,其重要的一点就是golang的heap,他辅助实现了元素按时间先后进行排序,这样延时队列就可以一个一个的等待超时添加了。heap的排序算法实现非常有意思
限速队列应用非常广泛,比如我们做某些操作失败时希望重试几次,但是立刻重试很有可能还会失败,我们希望延迟一段时间在重试,而且失败次数越多延迟时间越长,这个时候就有限速的概念在里面了。在分析限速队列前,我们需要知道限速器。
限速队列实现了RateLimiter的3个接口,When用于返回元素的重试时间,Forget用于清除元素的重试记录,NumRequeues返回元素的重试次数
// 代码源自client-go/util/workqueue/default_rate_limiter.go
type RateLimiter interface {
When(item interface{}) time.Duration // 返回元素需要等待多长时间
Forget(item interface{}) // 抛弃该元素,意味着该元素已经被处理了
NumRequeues(item interface{}) int // 元素放入队列多少次了
}
他会根据元素错误次数逐渐累加等待时间,具体实现如下:
ItemExponentialFailureRateLimiter对使用指数退避的方式进行失败重试,当failures增加时,下次重试的时间就变为了baseDelay.Nanoseconds()) * math.Pow(2, float64(exp),maxDelay用于限制重试时间的最大值,当计算的重试时间超过maxDelay时则采用maxDelay
// 代码源自client-go/util/workqueue/default_rate_limiters.go
// 限速器的定义
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex // 互斥锁
failures map[interface{}]int // 记录每个元素错误次数,每调用一次When累加一次
baseDelay time.Duration // 元素延迟基数,算法后面会有说明
maxDelay time.Duration // 元素最大的延迟时间
}
// 实现限速器的When接口
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加错误计数,比较好理解
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// 通过错误次数计算延迟时间,公式是2^i * baseDelay,按指数递增,符合Exponential名字
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
// 计算后的延迟值和最大延迟值二者取最小值
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
// 实现限速器的NumRequeues接口,很简单,没什么好说的
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 实现限速器的Forget接口,也很简单,没什么好说的
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
ItemFastSlowRateLimiter针对失败次数采用不同的重试时间。当重试次数小于maxFastAttempts时,重试时间为fastDelay,否则我为slowDelay。
ItemFastSlowRateLimiter的限速策略是尝试次数超过阈值用长延迟,否则用短延迟。
// 代码源自client-go/util/workqueue/default_rate_limiters.go
// 限速器定义
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex // 互斥锁
failures map[interface{}]int // 错误次数计数
maxFastAttempts int // 错误尝试阈值
fastDelay time.Duration // 短延迟时间
slowDelay time.Duration // 长延迟时间
}
// 限速器实现When接口
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
// 累加错误计数
r.failures[item] = r.failures[item] + 1
// 错误次数超过阈值用长延迟,否则用短延迟
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
// 限速器实现NumRequeues接口,比较简单不多解释
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
// 限速器实现Forget接口,比较简单不多解释
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
MaxOfRateLimiter为一个限速队列列表,它的实现中返回列表中重试时间最长的限速队列的值。
MaxOfRateLimiter是一个非常有意思的限速器,他内部有多个限速器,每次返回最悲观的。何所谓最悲观的,比如内部有三个限速器,When()接口返回的就是三个限速器里面延迟最大的。让我们看看具体实现:
// 代码源自client-go/util/workqueue/default_rate_limiters.go
type MaxOfRateLimiter struct {
limiters []RateLimiter // 限速器数组,创建该限速器需要提供一个限速器数组
}
// 限速器实现When接口
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
// 这里在获取所有限速里面时间最大的
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
// 限速器实现NumRequeues接口
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
ret := 0
// Requeues也是取最大值
for _, limiter := range r.limiters {
curr := limiter.NumRequeues(item)
if curr > ret {
ret = curr
}
}
return ret
}
// 限速器实现Forget接口
func (r *MaxOfRateLimiter) Forget(item interface{}) {
// 逐一遍历Forget就行了,比较简单
for _, limiter := range r.limiters {
limiter.Forget(item)
}
}
使用令牌桶实现一个固定速率的限速器
BucketRateLimiter是利用golang.org.x.time.rate.Limiter实现固定速率(qps)的限速器,至于golang.org.x.time.rate.Limiter的实现原理读者可以自行分析,此处只对BucketRateLimiter做说明。
// 代码源自client-go/util/workqueue/default_rate_limiters.go
type BucketRateLimiter struct {
*rate.Limiter // 这个就是golang.org.x.time.rate.Limiter
}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay() // 获取延迟,这个延迟会是个相对固定的周期
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0 // 因为固定频率的,也就不存在重试什么的了
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
所有的限速队列实际上就是根据不同的需求,最终提供一个延时时间,在延时时间到后通过AddAfter函数将元素添加添加到队列中。在queue.go中给出了workqueue的基本框架,delaying_queue.go扩展了workqueue的功能,提供了限速的功能,而default_rate_limiters.go提供了多种限速队列,用于给delaying_queue.go中的AddAfter提供延时参数,最后rate_limiting_queue.go给出了使用使用限速队列的入口。
// 代码源自client-go/util/workqueue/rate_limiting_queue.go
type RateLimitingInterface interface {
DelayingInterface // 继承了延时队列
AddRateLimited(item interface{}) // 按照限速方式添加元素的接口
Forget(item interface{}) // 丢弃指定元素
NumRequeues(item interface{}) int // 查询元素放入队列的次数
}
// 这个是限速队列的实现
type rateLimitingType struct {
DelayingInterface // 同样要继承延迟队列
rateLimiter RateLimiter // 哈哈,这就对了嘛,加一个限速器就可以了
}
// 代码源自client-go/util/workqueue/rate_limitting_queue.go
func (q *rateLimitingType) AddRateLimited(item interface{}) {
// 通过限速器获取延迟时间,然后加入到延时队列
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item) // 太简单了,不解释了
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item) // 太简单了,不解释了
}
// client-go/util/workqueue/rate_limiting_queue_test.go
func TestRateLimitingQueue(t *testing.T) {
limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
fakeClock := clock.NewFakeClock(time.Now())
delayingQueue := &delayingType{
Interface: New(),
clock: fakeClock,
heartbeat: fakeClock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
deprecatedMetrics: newDeprecatedRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue
queue.AddRateLimited("one")
waitEntry := <-delayingQueue.waitingForAddCh
if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
t.Errorf("expected %v, got %v", e, a)
}
queue.Forget("one")
if e, a := 0, queue.NumRequeues("one"); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
这是您在代码中创建的用于处理工作队列中项目的功能。可以有一个或多个其他函数进行实际处理。这些函数通常将使用索引器引用(Indexer reference)或列表包装器来检索与键相对应的对象。
从工作队列中取出key后进行后续处理,具体处理可以通过Indexer reference
# kubernetes/client-go/blob/master/examples/workqueue/main.go#L73
obj, exists, err := c.indexer.GetByKey(key)