This page looks best with JavaScript enabled

Kubernetes DeltaFIFO 承上啟下

 ·  ☕ 20 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

前面講了很多 Indexer 的實作細節,不知道各位還不記不記得 kubernetes controller/operator 的架構,我們再來複習一下整個架構圖以及目前我們已經了解的元件~


圖片來源:How to Create a Kubernetes Custom Controller Using client-go

目前我們已經了解

  1. Work Queue
    • Reflector ,Delta Queue 為生產者
      產出 object key 放入 work queue 按照特定規則排序(如delaying等)
    • Controller/Handler 為消費者
      從 work queue 中拿出 object key
  2. Indexer
    • key / value 形態的 local cache
      存放 object key 跟反序列化後的 obejct value(Pod, Deployment e.t.c)

還有許多元件是我們不了解的,例如 Reflector 中的 list watch 元件、Delta FIFO 元件 以及最後 Handler 如何從 work queue 中取出 object Key 到 indexer 查找資料。

本章節先從 Reflector 中的 Delta FIFO 元件進行剖析,用以了解當 Reflector 中的 list watch 觀察到物件變化後如何存到 indexer 以及把 Object key 傳遞給 work queue,做到承上啟下的作用。

DeltaFIFO

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。

interface

由於 GO 語言判斷 struct 有沒有實作 interface 採用隱式的判斷,我先將DeltaFIFO 有實作的 Interface 列出,接再著一一剖析。

DeltaFIFO 首先實作了 Queue interface
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Queue interface {
	Store                                        //上一篇 indexer 文章中有提過這裡不再贅述, DeltaFIFO 也實作了儲存。

	Pop(PopProcessFunc) (interface{}, error)     //用來把DeltaFIFO Object 彈出

	
	AddIfNotPresent(interface{}) error           //如果 Object 不再FIFO queue 就透過這個function 添加

	
	HasSynced() bool                             //todo

	
	Close()                                      //關閉 queue
}

DeltaFIFO 還實作了 KeyListerGetter interface ,對 ListKeys() 以及 GetByKey() 這兩個 function 還有印象的地方應該是在 Store interface 裡面,如果忘記了可以回去複習一下。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
	KeyLister                            //繼承 KeyLister
	KeyGetter                            //繼承 KeyGetter
}

// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
	ListKeys() []string                    //回傳所有的 object key
}

// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
	// GetByKey returns the value associated with the key, or sets exists=false.
	GetByKey(key string) (value interface{}, exists bool, err error)                                    //透過object key 取的 object
}

看完了 DeltaFIFO 相關的 Interface 後我們來看一下 DeltaFIFO 的資料結構吧!

struct

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex                            //當物件被加入、更新、刪除操作時,需要保證所有對 queue 的操作都是 atomic 
                                                    //因此需要鎖
	cond sync.Cond                              //當 queue 中沒有資料 thread 會卡住,需要透過cond發送通知喚醒 thread                             

	items map[string]Deltas                     // object key對應object 的變化(等等會看到Deltas 是什麼)


        queue []string                              //儲存 Object key (fifo queue)

	
	populated bool                              //如果透過 Replace function 將第一批 object 放入 queue
                                                   //或是第一次使用Delete/Add/Update/AddIfNotPresent 標記這個數值為true
	
	initialPopulationCount int                  //第一次使用 Replace function 放入的 object 數量

	
	keyFunc KeyFunc                            //前一章有提過,計算Object key的方法

	
    knownObjects KeyListerGetter               //列出已知的 object key 或是列出已知的 object ,這裡應該是 Indexer 

	
	closed bool                                //判斷queue是否關閉

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool                //決定使用替換或是同步
}

看完了資料結構接著來看看如何把一個 DeltaFIFO 建立起來吧

new function

目前有兩種方式把 DeltaFIFO 的物件建立起來,第一種已經被 Deprecated 要使用的朋友需要注意一下。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
//該方法已經被棄用了,需要特別注意!
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ //透過Options 封裝建構請求,接下來會看到
		KeyFunction:  keyFunc,            //傳入計算 object key 的function 
		KnownObjects: knownObjects,       //傳入已知的有實作 KeyListerGetter 的物件,一般來說會傳入 indexer 
	})
}


//封裝 DeltaFIFO 的建構物件
type DeltaFIFOOptions struct {
	KeyFunction KeyFunc     //計算 object key 的方法預設採用MetaNamespaceKeyFunc

	KnownObjects KeyListerGetter    // 實作 KeyListerGetter  的物件

	EmitDeltaTypeReplaced bool      //支援 delta fifo 是否支援replace 方法
}

// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
// items. See also the comment on DeltaFIFO.
//目前都是使用 NewDeltaFIFOWithOptions 作為delta fifo 的初始化方法
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
    //如果 object key 計算方法沒有指定就用預設的 MetaNamespaceKeyFunc
	if opts.KeyFunction == nil {
		opts.KeyFunction = MetaNamespaceKeyFunc
	}
    // 建立 DeltaFIFO
	f := &DeltaFIFO{
		items:        map[string]Deltas{},    
		queue:        []string{},
		keyFunc:      opts.KeyFunction,
		knownObjects: opts.KnownObjects,

		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
	}
    // cond 復用 DeltaFIFO 的 sync lock ,提升效能?
	f.cond.L = &f.lock
	return f
}

var (
	_ = Queue(&DeltaFIFO{})                 //在編譯期間或是在寫code的時候就能確認 DeltaFIFO 有沒有實作 Queue 了
)

會看到 DeltaFIFO 的資料結構中有一個 items 的欄位資料型態為 Deltas ,我們先來看看 Deltas 是什麼

Deltas

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

//Deltas 是一個 slice 的資料結構,再往下看 slice 裡面存了什麼
type Deltas []Delta

// slice 裡面存了 DeltaType 以及 Object 
type Delta struct {
	Type   DeltaType
	Object interface{}
}

// DeltaType 表示 Object 的變化型態為 string
type DeltaType string

//事件總共會有五種,分別是 Add 事件,updated 事件, Deleted 事件 , Replaced 事件 以及 Sync事件。
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// Replaced is emitted when we encountered watch errors and had to do a
	// relist. We don't know if the replaced object has changed.
	//
	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
	// as well. Hence, Replaced is only emitted when the option
	// EmitDeltaTypeReplaced is true.
	Replaced DeltaType = "Replaced"
	// Sync is for synthetic events during a periodic resync.
	Sync DeltaType = "Sync"
)

//取出最舊一筆資料
func (d Deltas) Oldest() *Delta {
	if len(d) > 0 {        
		return &d[0]
	}
	return nil
}

//取出最新一筆資料
func (d Deltas) Newest() *Delta {
	if n := len(d); n > 0 {
		return &d[n-1]
	}
	return nil
}

//複製一組 deltas 的資料(因為是複製的所以就算更改了複製的資料也跟原先那組 deltas 沒有關係)
func copyDeltas(d Deltas) Deltas {
	d2 := make(Deltas, len(d))
	copy(d2, d)
	return d2
}

default 的 DeltaFIFO 是以 MetaNamespaceKeyFunc 作為 Object Key 的計算方式,因此我們需要來看看 Object key 是如何計算的

MetaNamespaceKeyFunc

這裏 source code 的註解寫得很棒,我直接引用原文,簡單來說 Object key 就是 namespace/name 如果沒有 namespace 的話 Object key 就只保留 name。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

impliment

Close

1
2
3
4
5
6
7
// Close the queue.
func (f *DeltaFIFO) Close() {
	f.lock.Lock()                        //鎖不解釋
	defer f.lock.Unlock()                //退出解鎖不解釋
	f.closed = true                      //設置關閉標記 
	f.cond.Broadcast()                   //通知所有等待的工人
}

KeyOf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
// Keyof 就是把 DeltaFIFO 的私有變數 keyfunc 變形一個暴露,並且檢測 Object 轉換成Deltas 型態與 DeletedFinalStateUnknown 是否會出現問題
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
    //嘗試轉換型態
	if d, ok := obj.(Deltas); ok {
		if len(d) == 0 {
			return "", KeyError{obj, ErrZeroLengthDeltasObject}
		}
        //取得 Deltas 中最新一筆資料
		obj = d.Newest().Object
	}
    //嘗試轉換型態,如果是 DeletedFinalStateUnknown 型態的話,就直接回傳Object key,關於 DeletedFinalStateUnknown 是什麼後面會提到。
	if d, ok := obj.(DeletedFinalStateUnknown); ok {
		return d.Key, nil
	}
	return f.keyFunc(obj)
}

以下幾個 function 是對 store 的實作,因為 DeltaFIFO 同時也是一個儲存~

Add

1
2
3
4
5
6
7
8

// 把物件放入 FIFO queue 中
func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true                        //標記有 object inqueue
	return f.queueActionLocked(Added, obj)    //這個是重點,後面會看到
}

Update

1
2
3
4
5
6
7
// 更新物件
func (f *DeltaFIFO) Update(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true                        //標記有 object inqueue
	return f.queueActionLocked(Updated, obj)  //這個是重點,後面會看到    
}

Delete

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//刪除物件
func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := f.KeyOf(obj)                        //透過 keyfunc 取得 object key
	if err != nil {
		return KeyError{obj, err}
	}
	f.lock.Lock()
	defer f.lock.Unlock()
	f.populated = true                             //標記有 object inqueue   
	if f.knownObjects == nil {                     //如果沒有 indexer 的話就只要檢查該 object 在本地(detlas item map)有沒有資料
		if _, exists := f.items[id]; !exists {
			return nil
		}
	} else {                                       //有 indexer 的話就要確認 indexer 內有沒有資料
		_, exists, err := f.knownObjects.GetByKey(id)
		_, itemsExist := f.items[id]
		if err == nil && !exists && !itemsExist {
			return nil
		}
	}

	return f.queueActionLocked(Deleted, obj)      //這個是重點,後面會看到    
}

List

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//列出所有 queue 的 object 
func (f *DeltaFIFO) List() []interface{} {
	f.lock.RLock()
	defer f.lock.RUnlock()
	return f.listLocked()                            //呼叫私有方法處理
}

func (f *DeltaFIFO) listLocked() []interface{} {
	list := make([]interface{}, 0, len(f.items))    //建立slice
	for _, item := range f.items {                  //遞迴 delta item maps   
		list = append(list, item.Newest().Object)   //取出 delta 最新的資料,組合成slice 
	}
	return list
}

ListKeys

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
//列出所有 queue 的 object key
func (f *DeltaFIFO) ListKeys() []string {
	f.lock.RLock()
	defer f.lock.RUnlock()
	list := make([]string, 0, len(f.items))    //建立slice
	for key := range f.items {                  //遞迴 delta item maps  
		list = append(list, key)                //取出 delta item maps   每一個 key ,組合成slice 
	}
	return list
}

Get

1
2
3
4
5
6
7
8
// 傳入 object ,透過 keyfunc 計算出 object key 再利用該 key 取的相對應 delta 的複製 slice 
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
	key, err := f.KeyOf(obj)                    //計算 object key 
	if err != nil {
		return nil, false, KeyError{obj, err}
	}
	return f.GetByKey(key)                      //獲取對應的 delta slice 
}

GetByKey

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
//以 object key 從 delta 紀錄中拿到相對的資料,取出的資料是 delta 的複製 slice 
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
	f.lock.RLock()
	defer f.lock.RUnlock()
	d, exists := f.items[key]            //從 deltas items map 中透過 object key 取的 deltas slice 
	if exists {
		// Copy item's slice so operations on this slice
		// won't interfere with the object we return.
		d = copyDeltas(d)
	}
	return d, exists, nil
}

queueActionLocked

來到 store 最常出現的,我認為是比較重要的 queueActionLocked function ,可以看到上面許多function 如 Add Delete Update 都有用到這個 function ,我們就來好好看看他做了什麼事情吧!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    //計算 object key 
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //從 deltas item map 中取出舊的 deltas 資料
	oldDeltas := f.items[id]
    //把目標物件轉換成 delta 型態並且加入到既有的 deltas 資料後面
	newDeltas := append(oldDeltas, Delta{actionType, obj})
    //去除冗余的 deltas 資料
	newDeltas = dedupDeltas(newDeltas)

    //去除冗余的 deltass 資料後,還要判斷 deltas 資料是不是被清空了(不過soure code 的註解寫不會發生這種情況XD)
	if len(newDeltas) > 0 {
            //判斷 deltas item map 有沒有資料,如果沒有資料的話,就要加到 fifo queue 中
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
            // 如果 deltas item map 有資料表示 queue 裡面已經有了(還沒取走),只要更新 delta item map就好
		f.items[id] = newDeltas
            //發出果廣播告知卡在pop的人可以醒來取貨囉!
		f.cond.Broadcast()
	} else {
    
		// This never happens, because dedupDeltas never returns an empty list
		// when given a non-empty list (as it is here).
		// If somehow it happens anyway, deal with it but complain.
        //上面原先的註解我就不拿掉,表示去除冗余資料後不會把 deltas slice 清空~如果真的發生的清空的話舊回報給社群然後還是要做處理xD
		if oldDeltas == nil {
			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
			return nil
		}
		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
		f.items[id] = newDeltas
		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
	}
	return nil
}

//去除 delta 冗余資料
func dedupDeltas(deltas Deltas) Deltas {
    //判斷 deltas slice 長度 ,若是小 2 沒什麼好去除的啊
	n := len(deltas)
	if n < 2 {
		return deltas
	}
    
    //拿出倒數兩個來處理
	a := &deltas[n-1]
	b := &deltas[n-2]
    //進行冗余比較
	if out := isDup(a, b); out != nil {
		// `a` and `b` are duplicates. Only keep the one returned from isDup().
		// TODO: This extra array allocation and copy seems unnecessary if all we do to dedup is compare the new delta with the last element in `items`, which could be done by mutating `items` directly.
		// Might be worth profiling and investigating if it is safe to optimize.
		d := append(Deltas{}, deltas[:n-2]...)
		return append(d, *out)
	}
	return deltas
}

//確認 a b delta 事件是否一樣,目前只能判斷 delete delta 事件
func isDup(a, b *Delta) *Delta {
    // 判斷 deletea delta 事件
	if out := isDeletionDup(a, b); out != nil {
		return out
	}
	// TODO: Detect other duplicate situations? Are there any?
	return nil
}

// 判斷 delete delta 事件
func isDeletionDup(a, b *Delta) *Delta {
    //只要兩個其中一個不是 delete delta 事件,那就不是一樣的事件
	if b.Type != Deleted || a.Type != Deleted {
		return nil
	}
	// 如果最後一個狀態是 DeletedFinalStateUnknown ,就回傳在奧數第二個狀態 a
	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
		return a
	}
	return b
}

這裡有幾個部分直得我們思考

  1. 為什麼 deltas 要做冗余合併?

  2. 什麼樣的 deltas 資料可以進行冗余合併?

我自己認為上述兩個問題得答案分別是

  1. 由於加入 delta fifo 的速度與拿出 delta fifo 的速度是不一致的

    • watch kubernetes api (e.g. pod , deployment , service e.t.c)視為放入 delta fifo
    • 處理這些資源變動的 controller 視為從 delta fifo 取出

    如果某一個資源頻繁的更動 controller 還沒處理到,就會發生 object key 還在 queue 中,就要加入同一個 object key ,那這兩個動作就可以合併成一個。

  2. 什麼樣的動作可以合併,就剛剛追蹤 source code 來看只有刪除的事件可以合併成一個,但為什麼是刪除事件呢?

    • 時間軸(time)—————A—————B———-(time)
    • 時間點刪除物件,跟在B時間點刪除物件,可以將兩個事件聚合成如下圖所示
    • 時間軸(time)———————————-AB——-(time)
    • 就結論來看是一樣的

    那為什麼只刪除可以,更新不行嗎?

    • 我是這樣認為的,我知道正不正確
    • 時間軸(time)—————A—————B———-(time)
    • A時間點使用者改了replicas的數量,在B時間點使用者改了port number
    • 要把兩個事件的內容聚合再一起,一定要有兩個事件更新的內容才可以聚合,不是不能做是不好做xD

    你可能會問那新增呢,新增不香嗎,為什麼新增不能聚合?

    • 我認為可以,但就從 source code 來看 kubernetes 沒實作
    • 時間軸(time)—————A—————B———-(time)
    • 時間點新增物件,跟在B時間點新增物件,可以將兩個事件聚合成如下圖所示
    • 時間軸(time)———————————-AB——-(time)
    • 就結論來看是一樣的,我不知道為什麼 kubernetes 沒做 ,如果有知道的大大希望能說明一下原因

Replace

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K.  If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K.  Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
// replace做的有兩件事:(1)使用 Sync 或 Replace DeltaType 事件(2)刪除一些就的物件。
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
	f.lock.Lock()
	defer f.lock.Unlock()
    //建立key set 
	keys := make(sets.String, len(list))

    // 使用 sync 標記    
	action := Sync
    //若是設定為可以用replace,那標記成replace
	if f.emitDeltaTypeReplaced {
		action = Replaced
	}

	// 為每個Object 轉換成 replace/sync delta 型態
	for _, item := range list {
            //計算 object key
		key, err := f.KeyOf(item)
		if err != nil {
			return KeyError{item, err}
		}
            //插入object key 
		keys.Insert(key)
            //處理 object key enqueue 以及設定 object ket map 的 delta object 對應
		if err := f.queueActionLocked(action, item); err != nil {
			return fmt.Errorf("couldn't enqueue object: %v", err)
		}
	}
    //如果沒有設定indexer的話,就要對自己儲存的進行檢查
	if f.knownObjects == nil {
		// Do deletion detection against our own list.
		queuedDeletions := 0
            //遞迴所有的 deltas items map 
		for k, oldItem := range f.items {
                   //如果新的 key set 裡面有 deltas map 所包含的 key 的話,就忽略。(deltas items map 有 新的 key set 的 key 下次只要更新就好)
			if keys.Has(k) {
				continue
			}
			// Delete pre-existing items not in the new list.
			// This could happen if watch deletion event was missed while
			// disconnected from apiserver.
                   //刪除不再新的 object key sey 內的 object ,並將其標記成 DeletedFinalStateUnknown。
			var deletedObj interface{}
			if n := oldItem.Newest(); n != nil {
				deletedObj = n.Object
			}
                    //累加不在新的 object key sey 內的 object,總共有多少個
			queuedDeletions++
                    //處理 object key enqueue 以及設定 object ket map 的 delta object 對應(設定成DeletedFinalStateUnknown)
			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
				return err
			}
		}
            //如果populated 為 false 表示還沒有人操作(add delete update)過 delta fifo,這是第一次 replace。
		if !f.populated {
                    //標記inqueue
			f.populated = true
			// While there shouldn't be any queued deletions in the initial population of the queue, it's better to be on the safe side.
                    //initialPopulationCount 表示初始化要pop多少數量是由queue的長度決定
                    //這邊比較弔詭的是如果沒有 delete 操作過()也就是 populated=false 那應該不會有 queuedDeletions(也就是queuedDeletions=0) ,註解是寫為了保險~
			f.initialPopulationCount = len(list) + queuedDeletions
		}

		return nil
	}

	// 從 indexer 中取得所有的 object key 
	knownKeys := f.knownObjects.ListKeys()
	queuedDeletions := 0
        // 遞迴 indexer 中所有的 object key 
	for _, k := range knownKeys {
            //如果新的object key set 有indexer object key 的話就忽略不處理(表示已經有delta item maps 已經有資料了)
		if keys.Has(k) {
			continue
		}
            // 新的 object key set 沒有 indexer object key 的話
            // 從 indexer 透過 object key 取的 object 
		deletedObj, exists, err := f.knownObjects.GetByKey(k)
		if err != nil {
			deletedObj = nil
			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
		} else if !exists {
			deletedObj = nil
			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
		}
            // 計數 indexer 有的 key ,但新的 object key set 沒有的 
		queuedDeletions++
           //處理 object key enqueue 以及設定 object ket map 的 delta object 對應(設定成DeletedFinalStateUnknown)
		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}
    //如果populated 為 false 表示還沒有人操作(add delete update)過 delta fifo,這是第一次 replace。
	if !f.populated {
		f.populated = true
                //這邊比較弔詭的是如果沒有 delete 操作過()也就是 populated=false 那應該不會有 queuedDeletions(也就是queuedDeletions=0) ,註解是寫為了保險~
		f.initialPopulationCount = len(list) + queuedDeletions
	}

	return nil
}

以上就是 store 的實作,說簡單不簡單說困難倒也還好。就是要花點時間整理思路了解到底儲存了什麼,怎麼存的已經批次取代要標記成DeletedFinalStateUnknown的狀態。

接下來要來說說要怎麼把物件彈出 fifo queue~

Pop

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
        //不斷循環等有資料可以取
	for {
            //如果 fifo queue 沒有資料了
		for len(f.queue) == 0 {
                    //需要先判斷 fifo queue 是不是已經關閉了
			if f.closed {
				return nil, ErrFIFOClosed
			}
                    //工人需要在這裡等有資料,在加入資料時工人會被喚醒
			f.cond.Wait()
		}
            //把第一筆資料pop出來
		id := f.queue[0]
		f.queue = f.queue[1:]
            //todo
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
            //從 deltas item map 找到對應的 delta slice 
            
		item, ok := f.items[id]
            //註解也寫了這不會發生xD,deltas items map一定找得到資料的意思(但也不處理錯誤讓他跳過去)
		if !ok {
			// This should never happen
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
        //pop出一個 delta 資料後 需要把 delta 從 deltas items map刪除
		delete(f.items, id)
        //外部處理 delta 資料的 function
		err := process(item)
        //如果有錯就需要從新加入 queue 中
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring ownership to the caller.
		return item, err
	}
}

在 process delta 資料的時候若是發生錯誤,需要再把資料加入 fifo queue 中,我們來看一下怎麼再把資料重新加到 fifo queue 中。

AddIfNotPresent

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
// present in the set, it is neither enqueued nor added to the set.
//
// This is useful in a single producer/consumer scenario so that the consumer can
// safely retry items without contending with the producer and potentially enqueueing
// stale items.
//
// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
// different from the Add/Update/Delete functions.
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
    //把物件轉換成 deltas 型態
	deltas, ok := obj.(Deltas)
	if !ok {
		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
	}
    //取得 deltas 物件最新的 object key 
	id, err := f.KeyOf(deltas.Newest().Object)
	if err != nil {
		return KeyError{obj, err}
	}
    
	f.lock.Lock()
	defer f.lock.Unlock()
    //透過私有方法重新enque物件
	f.addIfNotPresent(id, deltas)
	return nil
}

// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
// already holds the fifo lock.
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
	f.populated = true                        //標記有 object inqueue 
	if _, exists := f.items[id]; exists {     //需要先確認 object 使否存在於 deltas map 中,如果存在表示  fifo queue 裡面還有資料
		return
	}
    //重新加入 fifo queue
	f.queue = append(f.queue, id)
    //對應deltas 與 deltas map 關係
	f.items[id] = deltas
    //通知pop工人可以來取資料
	f.cond.Broadcast()
}

Resync

不是很確定用途,不過從 source code 分析結果來看是同步 indexer 裡面的資料?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (f *DeltaFIFO) Resync() error {
	f.lock.Lock()
	defer f.lock.Unlock()
        //如果沒有 indexer 就不需要處理
	if f.knownObjects == nil {
		return nil
	}
        //從 indexer 中取出每一個 object key set 
	keys := f.knownObjects.ListKeys()
        //遞迴 indexer object key set 
	for _, k := range keys {
            //取出 object key 透過 syncKeyLocked ,下面會看到~
		if err := f.syncKeyLocked(k); err != nil {
			return err
		}
	}
	return nil
}

//實作 object sync 的方法~
func (f *DeltaFIFO) syncKeyLocked(key string) error {
        //透過 indexer 使用 object 取得對應的 object key 
	obj, exists, err := f.knownObjects.GetByKey(key)
	if err != nil {
		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
		return nil
	} else if !exists {
		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
		return nil
	}

	// If we are doing Resync() and there is already an event queued for that object,
	// we ignore the Resync for it. This is to avoid the race, in which the resync
	// comes with the previous value of object (since queueing an event for the object
	// doesn't trigger changing the underlying store <knownObjects>.
    //透過key func 計算 object key
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //如果 deltas item map 已經有資料了就不處理
	if len(f.items[id]) > 0 {
		return nil
	}
    //標記已sync的方式加入fifo queue中
	if err := f.queueActionLocked(Sync, obj); err != nil {
		return fmt.Errorf("couldn't queue object: %v", err)
	}
	return nil
}

HasSynced

1
2
3
4
5
6
//主要是 object 放放入 DeltaFIFO 後,並且全部的 object 都被 Pop() , HasSynced fucntion 才會回傳 true 
func (f *DeltaFIFO) HasSynced() bool {
	f.lock.Lock()
	defer f.lock.Unlock()
	return f.populated && f.initialPopulationCount == 0
}

小整理

可以把使用者在呼叫 Delta FIFO 的 Add() 、 Delete() 、 Update() 、 Replaced() 、 Sync() 簡化成下圖的結構,應該會比較方便理解。

而使用者呼叫 Delta FIFO 的 Pop()則可以簡化成下圖的結構

怎麼使用

真正的 Operator / Controller 使用 DeltaFIFO 還需要知道其他背景知識,我先以幾個簡單的 DeltaFIFO 的 unit test code 來做範例,讓大家了解delta fifo 使用起來大概是怎麼一回事!

pop

簡單的物件彈出,但是彈出之前 deltas fifo 內必須要有物件才行,接下來看看 test 是怎麼做的吧!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func TestDeltaFIFO_requeueOnPop(t *testing.T) {
    //建立一個delta FIFO queue ,怎麼產生 object key 的不是很重要~
	f := NewDeltaFIFO(testFifoObjectKeyFunc, nil)
    //加入了一個 object ,可以想像成 key = foo , value =10 
	f.Add(mkFifoObj("foo", 10))
    //加入之後,就直接pop出來用~
	_, err := f.Pop(func(obj interface{}) error {
            //只是簡單看一下物件是不是正常而已
		if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
			t.Fatalf("unexpected object: %#v", obj)
		}
            //這裡在處理過程故意讓他產生錯誤,再次重新放入 delta fifo (注意這裡的error一定回傳ErrRequeue,才有辦法requeued)
		return ErrRequeue{Err: nil}
	})
    //檢查一下pop有沒有出現問題
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
    //透過 KeyGetter interface 的 GetByKey 透過 foo key  拿物件
    //這時候應該要拿的到,因為物件剛剛處理錯誤又重新 requeued 了(一般來說只要被pop出來 deltas item map 對應的 key 理論上要被清空)
	if _, ok, err := f.GetByKey("foo"); !ok || err != nil {
		t.Fatalf("object should have been requeued: %t %v", ok, err)
	}
    
    //再次pop出來
	_, err = f.Pop(func(obj interface{}) error {
		if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
			t.Fatalf("unexpected object: %#v", obj)
		}
            //這裡在處理過程故意讓他產生錯誤,再次重新放入 delta fifo (注意這裡的error一定回傳ErrRequeue,才有辦法requeued)
		return ErrRequeue{Err: fmt.Errorf("test error")}
	})
    // 檢查 pop 出來的錯誤訊息
	if err == nil || err.Error() != "test error" {
		t.Fatalf("unexpected error: %v", err)
	}
    
    //透過 KeyGetter interface 的 GetByKey 透過 foo key  拿物件
    //這時候應該要拿的到,因為物件剛剛處理錯誤又重新 requeued 了(一般來說只要被pop出來 deltas item map 對應的 key 理論上要被清空)
	if _, ok, err := f.GetByKey("foo"); !ok || err != nil {
		t.Fatalf("object should have been requeued: %t %v", ok, err)
	}

    //再次pop出來
	_, err = f.Pop(func(obj interface{}) error {
		if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
			t.Fatalf("unexpected object: %#v", obj)
		}
        // 這次順利地處理完成~就不觸發 requeue
		return nil
	})
    // 檢查 pop 出來的錯誤訊息
	if err != nil {
		t.Fatalf("unexpected error: %v", err)
	}
    //這時候應該要拿不到,一般來說只要被pop出來 deltas item map 對應的 key 理論上要被清空。
	if _, ok, err := f.GetByKey("foo"); ok || err != nil {
		t.Fatalf("object should have been removed: %t %v", ok, err)
	}
}

resync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
    //先建立一個 delta fifo queue,怎麼計算 object key 的不重要,另外還放入了類似 indexer 的物件(實作 KeyListerGetter interface),這裡可以來看一下 literalListerGetter 這是什麼東西(幫我滾輪滾到下面一點看xD)
	f := NewDeltaFIFO(
		testFifoObjectKeyFunc,
		literalListerGetter(func() []testFifoObject {
			return []testFifoObject{mkFifoObj("foo", 5)}
		}),
	)
    //這裡加入一個刪除的事件
	f.Delete(mkFifoObj("foo", 10))
    //觸發sync
	f.Resync()
    //看看 deltas item map foo 目前所代表的數值是什麼,這時候因為出發過resync 
    //deltas 資料應該會是 event Sync ,key foo ,value 5 
	deltas := f.items["foo"]
	if len(deltas) != 1 {
		t.Fatalf("unexpected deltas length: %v", deltas)
	}
	if deltas[0].Type != Deleted {
		t.Errorf("unexpected delta: %v", deltas[0])
	}
}

//定義了一個 function type ,並且指名要回傳一個 testFifoObject 型態的slice,testFifoObject前面有提到他就是一個 key value。
type literalListerGetter func() []testFifoObject
//在coding 階段就可以確認 literalListerGetter 有沒有實作 KeyListerGetter
var _ KeyListerGetter = literalListerGetter(nil)

//實作 literalListerGetter 就簡單的列出key 
func (kl literalListerGetter) ListKeys() []string {
	result := []string{}
	for _, fifoObj := range kl() {
		result = append(result, fifoObj.name)
	}
	return result
}

//實作 literalListerGetter 就透過 key簡單的列出value 
func (kl literalListerGetter) GetByKey(key string) (interface{}, bool, error) {
	for _, v := range kl() {
		if v.name == key {
			return v, true, nil
		}
	}
	return nil, false, nil
}

replace

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) {
    //先建立一個 delta fifo queue,怎麼計算 object key 的不重要,另外還放入了類似 indexer 的物件(實作 KeyListerGetter interface)
    //上面有提過可以再回去看一下
    //另外這邊打開了 EmitDeltaTypeReplaced 
	f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KeyFunction: testFifoObjectKeyFunc,
		KnownObjects: literalListerGetter(func() []testFifoObject {
			return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
		}),
		EmitDeltaTypeReplaced: true,
	})
    //加入一個刪除的事件
	f.Delete(mkFifoObj("baz", 10))
    //觸發replace事件,帶入要replace的物件
	f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0")
    //預期replace後的結果,基本上就是把指定要replace的物件 inqueue
    //indexer 內 replace 有的就標記成replaced
    //indexer 內 replace 沒有的就標記成deleted 
    //中間如有 add 或是 update 過就會有兩筆資料,一筆是 add/update 一筆是 deleted~
	expectedList := []Deltas{
		{{Deleted, mkFifoObj("baz", 10)}},
		{{Replaced, mkFifoObj("foo", 6)}},
		// Since "bar" didn't have a delete event and wasn't in the Replace list
		// it should get a tombstone key with the right Obj.
		{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
	}
    
	for _, expected := range expectedList {
		cur := Pop(f).(Deltas)
		if e, a := expected, cur; !reflect.DeepEqual(e, a) {
			t.Errorf("Expected %#v, got %#v", e, a)
		}
	}
}

從上述幾個 test case ,可以看到 DeltaFIFO 基本上怎麼使用,需要承上啟下了解 DeltaFIFO 在 controler/operator 的作用還需要了解其他元件如 Reflector怎麼把資料送進來的,所以這裡怎麼使用 Delta FIFO 使用 test case 來呈現。

小結

在本章節我們快速的了解了 Delta FIFO 的作用,除了可以作為一個 FIFO 的 Queue 之外,還有本地儲存的功能(儲存了物件的變化事件,如Add、Delete、Update等),一般來說還會把資料拋給 indexer 做儲存,大致上就是這些特性。
了解了 Delta FIFO的這些特性後我們要繼續往上看Reflector怎麼把資料送進來,我認為這邊有點複雜牽扯到反序列化的過程,中間可能會有錯誤的見解希望各位在觀看文章的大大們可以指出哪裡有問題,讓我學習改進,謝謝。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page