This page looks best with JavaScript enabled

Kubernetes Reflector 我在盯著你 ( II )

 ·  ☕ 6 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

幾本上這一塊相當的複雜與龐大,有些部分我認為不需要深入理解其運作的機制,我們就來看看一個 Reflector 是透過哪裡零件組裝起來的吧!

上一篇 Kubernetes Reflector 我在盯著你 ( I )一文章有提到 client go 的範例建立 informer 的過程,在 NewIndexerInformer 的過程中最後回傳的是 一個 indexer 以及一個 實作 Controller interface 的物件,本篇會展開討論 Controller 到底是什麼

先來回顧一下 client go 建立 controller 的範例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
    ...
}, cache.Indexers{})

func NewIndexerInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	indexers Indexers,
) (Indexer, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}

從上述的source code中可以看到透過 NewIndexerInformer function 最後會回傳 indexer 以及 controller , indexer 在先前的章節已經討論過,還不了解的小夥伴可以到本篇文章Kubernetes Indexers local cache 之美 (I)複習相關知識,以下會針對 controller 的實作進行琢磨。

Controller

我們先從 Controller 定義了什麼行為開始探討

interface

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
type Controller interface {
	// Run does two things.  One is to construct and run a Reflector
	// to pump objects/notifications from the Config's ListerWatcher
	// to the Config's Queue and possibly invoke the occasional Resync
	// on that Queue.  The other is to repeatedly Pop from the Queue
	// and process with the Config's ProcessFunc.  Both of these
	// continue until `stopCh` is closed.
	Run(stopCh <-chan struct{})           //執行一個函數,並且透過stopchannel來決定是否跳出

	// HasSynced delegates to the Config's Queue
	HasSynced() bool                    //檢查觀測到 object 是不是同步到 indexer 了

	// LastSyncResourceVersion delegates to the Reflector when there
	// is one, otherwise returns the empty string
	LastSyncResourceVersion() string  //觀測到最新的 object version
}

從上面的行為應該是看不出來 controller 要做什麼吧!沒關係我們繼續順藤摸瓜,看看這葫蘆裡賣的是什麼藥。

new function

我們透過 new function 來了解實作 controller interface 的物件需要什麼參數!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//傳入的參數我們在上一章節有看過,這邊再來複習一次
//傳入監控物件變化的觀察者
//傳入要觀察的物件資料型態
//傳入多久要同步一次
//傳入事件處理器
//傳入本地儲存器(local storage)
func newInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	clientState Store,
) Controller {
    
    //建立delta fifo queue ,之前章節有探討過delta fifo queue
    //不了解的讀者可以回去複習一次
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
	})
    
    //建立controller config的設定檔
	cfg := &Config{
		Queue:            fifo,                //使用delta fifo queue
               ListerWatcher:    lw,                  //使用觀測哪個物件的 listwatch(e.g. pod configmap e.t.c)
		ObjectType:       objType,             //觀測物件的資料型態(e.g. pod configmap e.t.c)
		FullResyncPeriod: resyncPeriod,        //多久要重同步一次
		RetryOnError:     false,                //錯誤是否要重試

		Process: func(obj interface{}) error {    //事件處理器
			// from oldest to newest
			...
		},
	}
	return New(cfg)                                //建立一個實作 controller interface 的物件
}


// New makes a new Controller from the given Config.
func New(c *Config) Controller {
	ctlr := &controller{                //建立一個實作 controller 的物件
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

從上面看起來就是放入了一些設定(deltafifo , listerwatch, objecttype)接著把實作 controller interface 的物件產出,那實作 controller interface 的物件資料結構長怎麼樣呢?我們接著來看!

struct

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// `*controller` implements Controller
type controller struct {
	config         Config                //contrller 相關設定
	reflector      *Reflector            //下一章節會展開來看,本章節暫時用不到
	reflectorMutex sync.RWMutex          //Reflector 讀寫鎖,本章節暫時用不到
	clock          clock.Clock           //同步用
}

// Config contains all the settings for one of these low-level controllers.
type Config struct {
	
	Queue                        //這裡要設定 controller 用的 DeltaFIFO Queue
                                    //前幾個章節有詳細的說明,不了解的朋友可以回去複習。

	
	ListerWatcher                //前一個章節有帶到,實作監視以及列出特定的資源的物件

	
	Process ProcessFunc            //當物件從 DeltaFIFO Queue 彈出,處理事件的function

	// ObjectType is an example object of the type this controller is
	// expected to handle.  Only the type needs to be right, except
	// that when that is `unstructured.Unstructured` the object's
	// `"apiVersion"` and `"kind"` must also be right.
        ObjectType runtime.Object        //這個我認為很難理解,要告訴controller即將到來的物件是什麼例如 pod , deployment e.t.c.

	
	FullResyncPeriod time.Duration    //多久要 resync 一次

	
	ShouldResync ShouldResyncFunc    //reflector會定期透過ShouldResync function來確定是否重新同步queue,

	
        RetryOnError bool                //如果為true,則Process()返回錯誤時,需要requeue object。
                                        //看註解這是有爭議的,有些開發者認為要拉到更該高的層次決掉 error的處理方式

	
    
	WatchErrorHandler WatchErrorHandler    //每當ListAndWatch斷開連接並出現錯誤時會呼叫這個 function 處理。

	
    
	WatchListPageSize int64                //初始化時設定list watch 的 chunk size.
}

//reflector會定期透過ShouldResync function來確定是否重新同步queue,
type ShouldResyncFunc func() bool

//每當ListAndWatch斷開連接並出現錯誤時調用。
type WatchErrorHandler func(r *Reflector, err error)

Config 資料結構內有些資料結構是提供給 reflector 做使用,關於 reflector 的細節我想在未來的章節在展開來討論,本篇會專注於 controller 會用到的參數。

從上面的資料結構大致上可以看出來 一個 controller 需要這幾個東西

  1. DeltaFIFO
    負責將觀測到的資料放入 FIFO queue,並且標記變化量(Add , Delete , Update e.t.c)
    會把資料存進 localcache (indexer)

  2. ProcessFunc
    處理 DeltaFIFO 的事件變化,例如當 Delta Pop 出一個 Add 事件,會由 ProcessFunc 處理, Pop 出 Update 會由 ProcessFunc 處理依此類推。

  3. runtime.Object
    Controller 知道等等pop出來的是什麼物件要如何反序列化

  4. ListerWatcher
    列出與監控某一個物件

看到這裡我們還要了解 controller 底層實作了什麼,怎麼把上面提到的這四個元素組合再一起使用。

impliment

Controller 實作以下幾個 function ,我們一個一個來看!

Run

Run function 是最主要的 function

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
    //錯誤處理,有機會再來談,先不理他
	defer utilruntime.HandleCrash()
    
    //當收到stop 訊號關閉 delta fifof queue
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
    
    //建立一個 Reflector ,下一章節會展開討論 Reflector 先不理他
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
    
    //Reflector 設定 resync 時間
	r.ShouldResync = c.config.ShouldResync
    
    //Reflector 設定list watch 的 chunk size.
	r.WatchListPageSize = c.config.WatchListPageSize
    
    //Reflector 設定時間
	r.clock = c.clock
    
    //Reflector 套用錯誤處理
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}

    // todo 我不知道為什麼 controller 綁定 reflector 的時候要加鎖
	c.reflectorMutex.Lock()
    // controller 綁定 Reflector
	c.reflector = r
	c.reflectorMutex.Unlock()

    //wait.Group 預計未來還會拉出來再講一篇
    //簡單來就是被這個 wg 管理的 thread 全部都 done 了之後才會退出 wait 
	var wg wait.Group

    //這個function 會啟動一個 thread 並且在裡面呼叫 剛剛建立的 reflector.run 並且傳入 stop channel 
    //stop channel用來終止 thread 
	wg.StartWithChannel(stopCh, r.Run)
    
    //規律性的呼叫processLoop(),若是收到 stop channel 的訊號就退出
	wait.Until(c.processLoop, time.Second, stopCh)
    
    //等待所有 wait.Group 的 thread done 才能離開,不然會一直卡在這裡~
	wg.Wait()
}


//會被wait.Until 規律性的呼叫
func (c *controller) processLoop() {
	for {
            //從 deltafifo pop 出物件,
            //pop 出的事件會交給 config.Process function 處理
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
                //如果 deltafifo queue關閉就退出
			if err == ErrFIFOClosed {
				return
			}
                //如果處理發生錯誤就重新加回 delta fifo queue 中
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

HasSynced

1
2
3
4
5
6
7
// Returns true once this controller has completed an initial resource listing
//簡單來說就是看Delta FIFO 是不是把資料同步完了
func (c *controller) HasSynced() bool {
    //委任給Delta FIFO QUEUE的HasSynced()
    //不了解的部分可以到前面的章節看一下 Delta FIFO Queue 是怎麼做的
	return c.config.Queue.HasSynced()
}

LastSyncResourceVersion

透過 reflector 得到資源最新的版本

1
2
3
4
5
6
7
8
9
func (c *controller) LastSyncResourceVersion() string {
	c.reflectorMutex.RLock()
	defer c.reflectorMutex.RUnlock()
    // 透過 reflector  得到資源最新的版本 下一章節會看到!
	if c.reflector == nil {
		return ""
	}
	return c.reflector.LastSyncResourceVersion()
}

小結

有了前面幾個章節講述的背景知識如 Delta FIFO Queue 、 Indexer 等等作為鋪墊,讓我們在看 Controller 的時候變得相當的容易, Controller 目前還有 reflector 這個重要的元件還不清楚他的底層是如何實作的。

下一個篇章將會針對 Controller 的 reflector 元件進行解析,文中若有解釋錯誤的地方歡迎各位大大們提出討論,感謝!


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page