This page looks best with JavaScript enabled

Kubernetes Indexers local cache 之美 (I)

 ·  ☕ 12 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

上一篇Kubernetes Controller 設計挖掘之路 提到為什麼我們要了解 kubernetes 內部其他元件的運作方式,例如 work queue , indexer 或是informerindexer 作為 controller 內保存物件資料的地方執得我們去了解它的設計思維與架構。

Indexer

Indexer 位於 client-go 的 cache package,從字面上來看他就是一個索引器,位於 infromer 之中作為 informer 的 local cache,至於為什麼會是使用 indexer 作為命名的方法,我猜是透過某些條件查詢儲存的物件加快整體的速度吧?(如果有錯麻煩大大糾正)

interface

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
//    can be a field value or any other string computed from the object.
type Indexer interface {
    //嵌入了stroe,擁有stroe全部的方法
	Store
	
    // 回傳在indexName內與給定的Object有交集的所有object
	Index(indexName string, obj interface{}) ([]interface{}, error)
	
    // 回傳在indexname與透過indexed取的所有關聯的Object  Key
	IndexKeys(indexName, indexedValue string) ([]string, error)
    
    // 回傳在indexName所有的indexed
	ListIndexFuncValues(indexName string) []string
    
    // 回傳在indexName與透過indexed回傳所有關聯的object
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
    
    // 回傳indexer
	GetIndexers() Indexers

    // 新增indexer
	AddIndexers(newIndexers Indexers) error
}

研究到這裡我真的很痛苦一下Index 一下 Indexer 一下Obejct 一下Obejct key 一下Indexed!!!神煩xD

好AnyWay總之有五種東西分別是

  1. Indexer
  2. Indexed
  3. Obejct
  4. Obejct key
  5. Index

本系列會幫這些名稱進行一個分類與說明,希望能夠幫助到正在痛苦研究的夥伴….

回歸正題可以看到 Indexer 嵌入了 Store Interface ,就名稱來看 Indexer 有基礎的儲存能力,至於實際上有什麼儲存能力我們繼續看 Store Interface 定義了什麼方法。

source-code
可以從 Store interface 中簡單的看到有新增、刪除、修改等動作,由於操作的事Key Value store所以會有,列出Key 列出物件等動作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Store interface {

    //新增物件
	Add(obj interface{}) error

    //更新物件
	Update(obj interface{}) error

    //刪除物件
	Delete(obj interface{}) error

    //列出所有物件
	List() []interface{}

    //列出所有物件的key
	ListKeys() []string

    // 透過物件取得object key
	Get(obj interface{}) (item interface{}, exists bool, err error)

    //透過object key取得object
	GetByKey(key string) (item interface{}, exists bool, err error)

    //一次性替換Object key 所代表的所有Object
	Replace([]interface{}, string) error

    //重新同步
	Resync() error
}

看完了抽象的定義之後,必須要回過來看 cache 實際物件定義了哪些屬性

struct

只有 cache 這個物件實作了 Indexer Interface,我們先來分析這傢伙!

cache

source-code

1
2
3
4
5
6
7
type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore    //一個 thread safe 的 interface ,稍後會解釋。
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	keyFunc KeyFunc        //計算object key的方法,稍後會解釋
}

cache 組合 ThreadSafeStore 以及 KeyFunc ,我們繼續展開來看這兩個是什麼東西!

ThreadSafeStore

先來看 ThreadSafeStore interface 定義了什麼東西,從中來了解實作他的物件可能有什麼行為。

interface
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexKey string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	// Resync is a no-op and is deprecated
	Resync() error
}

快速地看過一次應該會覺得跟上面提到的 Store Interface 非常相似,我認為大致上的差異在於 ThreadSafeStroe 主要針對 Index 做處理,我們看一下UML會更加清楚。

在使用者面向的應該是 Store 這個 Interface , cache 把功能委任(委託)給 cacheStorage Interface 有Thread safe 特性且有索引的 storage 。

看完了抽象的定義之後,必須要回過來看 threadSafeMap 實際物件定義了哪些屬性

struct

只有 threadSafeMap 這個物件實作了 cacheStorage Interface,我們接著來分析threadSafeMap!

source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
	lock  sync.RWMutex                          //讀寫鎖(可能讀多寫少?)
        items map[string]interface{}               //儲存Object Key:Object

	// indexers maps a name to an IndexFunc
	indexers Indexers                           //用以計算indexed的function
	// indices maps a name to an Index
	indices Indices                             //indexed map透過這個表獲取Onject Key
}

我知道頭很痛,又出現 Index , indexed 這一坨東西,我會盡力講解些東西的關聯,先看看在 Indexers Indices 這些東西在 client go 裡面的定義吧,從中我們可以獲取一些靈感。

source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)        //傳入Obaject會幫你算出多種[]indexed-name

// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String                             //每個indexed-name會對應到一個[set]object key set
                                                              //inedxed->object key set

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc                            //計算 indexed 的 function 們(透過 name 去分類)
                                                              //index name->index function->index

// Indices maps a name to an Index
type Indices map[string]Index                                 //index name(function name)->index[indexed-name]->[set]object key

把上面的source code轉換成圖形應該會變成這樣子,

從最上面的部分往下看

  • Indexer 裡面有許多 index-name 分別對應著不同的 index function
    • index function 算出多個 indexed-name
  • 從 Indices 內透過 index-name 可以拿到一個 Index-name 所對應的 index
  • 從 Index 內透過 inexed-name 可以獲得一組 Object key set

好了解到這裡後,應該會有一點概念吧?XD
這一邊我讀了很久才看懂一點,如果分析有錯希望各位大大可以指出!感謝

接著我們來看 New Function 要怎麼把 ThreadSafeStore 建立出來

new Function

可以看到 ThreadSafeStore 要求傳入一個 indexer 以及 indeices,這邊可以對照著 client go 中有使用到 NewThreadSafeStore 的人傳入什麼進去做為參考。
source-code

1
2
3
4
5
6
7
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
	return &threadSafeMap{
		items:    map[string]interface{}{},
		indexers: indexers,
		indices:  indices,
	}
}

可以看到 NewStore 這個 function 有使用到 NewThreadSafeStore 方法,並且傳入 Indexers{} 以及 Indices{}
source-code

1
2
3
4
5
6
7
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
		keyFunc:      keyFunc,
	}
}

看完了如何建立 ThreadSafeStore 的物件之後,我們接著來分析該物件每個function 實作了什麼事情吧!

implement function

單純的把 Object 放入 map,以 Object key 作為 map 的索引,重點在於 updateIndices 的動作。

Add

source-code

1
2
3
4
5
6
7
func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.lock.Lock()                                //鎖不解釋
	defer c.lock.Unlock()                        //退出解鎖
	oldObject := c.items[key]                    //取出舊的物件
	c.items[key] = obj                           //寫入新的物件
	c.updateIndices(oldObject, obj, key)         //更新indexed索引 ,稍後會解釋更新 indexed 索引的動作
}
Update

單純的更新物件,透過 object key 作為索引找到物件並且更新,重點在於 updateIndices 的動作。
source-code

1
2
3
4
5
6
7
8
//跟Add不是一模一樣...為什麼不重用add不明白
func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()                                //鎖不解釋
	defer c.lock.Unlock()                        //退出解鎖
	oldObject := c.items[key]                    //透過Object key 取出舊的物件     
	c.items[key] = obj                           //透過Object key寫入新的物件
	c.updateIndices(oldObject, obj, key)         //更新indexed索引
}
Delete

刪除物件,透過 Object key作為索引找到物件並且刪除,重點在於 deleteFromIndices 的動作。
source-code

1
2
3
4
5
6
7
8
func (c *threadSafeMap) Delete(key string) {
	c.lock.Lock()                                    //鎖不解釋
	defer c.lock.Unlock()                            //退出解鎖
	if obj, exists := c.items[key]; exists {         //如果物件存在
		c.deleteFromIndices(obj, key)               //刪除indexed索引
		delete(c.items, key)                        //刪除物件
	}
}
Get

取得物件,透過 Object key 作為索引取的物件
source-code

1
2
3
4
5
6
7

func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
	c.lock.RLock()                //讀操作,用讀鎖
	defer c.lock.RUnlock()        //解鎖
	item, exists = c.items[key]    //透過Object key拿取物件
	return item, exists            //回傳
}
List

列出所有物件
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

func (c *threadSafeMap) List() []interface{} {
	c.lock.RLock()                //讀操作,用讀鎖
	defer c.lock.RUnlock()        //解鎖
        //for跑一次map全部加入到slice就好
	list := make([]interface{}, 0, len(c.items))
	for _, item := range c.items {
		list = append(list, item)
	}
	return list                    //回傳slice
}

ListKeys

列出所有物件的 Object key
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// ListKeys returns a list of all the keys of the objects currently
// in the threadSafeMap.
func (c *threadSafeMap) ListKeys() []string {
	c.lock.RLock()                //讀操作,用讀鎖
	defer c.lock.RUnlock()        //解鎖
        //for跑一次map全部加入到slice就好
	list := make([]string, 0, len(c.items))
	for key := range c.items {
		list = append(list, key)
	}
	return list                    //回傳slice
}
Replace

一次替換整個儲存庫,重點在於會重建整個indices以及更新index。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
	c.lock.Lock()                                    //鎖不解釋
	defer c.lock.Unlock()                            //退出解鎖
	c.items = items                                  //覆蓋整個紀錄

	// rebuild any index
	c.indices = Indices{}                            //indexed 索引重建
	for key, item := range c.items {                 //更新indexed索引列表
		c.updateIndices(nil, item, key)
	}
}

接下來的部分會著重在 indexed 的計算

Index

透過 index-name 取得indexer中的 object key 計算 function 算出對應的 indexed value , 透過對應的 indices-name 像 indices 取的對應的 index 。
使用對應的 index 以及計算出來的 indexed value取得 object key set ,遞迴跑一是 key set 從 map 中一一取得對應的物件。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
//透過index name取得indexed後
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
	c.lock.Lock()                                    //鎖不解釋
	defer c.lock.Unlock()                            //退出解鎖
    
	indexFunc := c.indexers[indexName]               //透過index name從indexer中取得對應的infexed function
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	indexedValues, err := indexFunc(obj)            //透過indexed function 計算indexed
	if err != nil {
		return nil, err
	}
	index := c.indices[indexName]                  //透過index name 從 indices 拿對應的 index

	var storeKeySet sets.String                    //object key的集合
	if len(indexedValues) == 1 {                   //大多數情狂indexed 只會有一個
		// In majority of cases, there is exactly one value matching.
		// Optimize the most common path - deduping is not needed here.
		storeKeySet = index[indexedValues[0]]    //透過indexed從index拿到對應的 object key set
	} else {                                     //不知道什麼情況會跑進去這裡,我用dlv跑測試沒有跑進這裡過...以下是看註解猜測的
		// Need to de-dupe the return list.
		// Since multiple keys are allowed, this can happen.
		storeKeySet = sets.String{}
		for _, indexedValue := range indexedValues {    //遞迴index所有的indexed,並且將數值插入set中
			for key := range index[indexedValue] {
				storeKeySet.Insert(key)
			}
		}
	}

	list := make([]interface{}, 0, storeKeySet.Len())    //Objest set 取出所有的 Object key 
    //將map有對應到的 Object 放入list 回傳
	for storeKey := range storeKeySet {
		list = append(list, c.items[storeKey])
	}
	return list, nil
}

看完了Index的實作,我是這樣猜的透過一個物件算出 Indexed-name,在 Indices 中 index name 對應到一組 Index,最後再從這個 Index 中透過 Indexed name拿到關聯的Object。

dlv 實際拿到的數值 … todo

IndexKeys

透過 index name 從 indexer 中拿到計算 indexe value 的 function ,接著從 indices 使用 index name 拿到到對應的 index 。
使用算出來的 index value 從 index 拿到所有的 object key set 。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
// IndexKeys is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
	c.lock.RLock()                //讀操作,用讀鎖
	defer c.lock.RUnlock()        //解鎖

	indexFunc := c.indexers[indexName]        //透過 index name從indexer中取得對應的infexed function
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	index := c.indices[indexName]            //透過 index name 從 indices 拿對應的 index

	set := index[indexedValue]                //透過indexed 從 index 拿到所有的 Object key
	return set.List(), nil
}
ListIndexFuncValues

透過 index name 從 indices 取得對應的 index,遞迴跑過 index 取得所有的 index key。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
	c.lock.RLock()                //讀操作,用讀鎖
	defer c.lock.RUnlock()        //解鎖

	index := c.indices[indexName]            //透過 index name 從 indices 拿對應的 index
	names := make([]string, 0, len(index))
    //拿出 index 中所有的 indexed key
	for key := range index {
		names = append(names, key)
	}
	return names
}
ByIndex

透過 index name 從 indexer 取得計算 indexed value 的 function ,以及使用 index name 從 indices 中拿到對應的 index 。
使用對應的 index 以及計算出來的 indexed value 取得對應的 object key set 。
遞迴object key set 把 object key set 對應到的 object 回傳出來。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
	c.lock.RLock()                //讀操作,用讀鎖
	defer c.lock.RUnlock()        //解鎖

	indexFunc := c.indexers[indexName]        //透過index name從indexer 中取的 indexed function
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	index := c.indices[indexName]            //透過index name從Indices 中取得 index

	set := index[indexedValue]               //透過indexed name從index取的 Object key set
	list := make([]interface{}, 0, set.Len())   
    //遞迴儲存的object key set 對應的object倒出
	for key := range set {
		list = append(list, c.items[key])
	}

	return list, nil
}
GetIndexers

回傳 indexer
source-code

1
2
3
func (c *threadSafeMap) GetIndexers() Indexers {
	return c.indexers                //沒啥好說的回傳一個indexer 
}
AddIndexers

加入新的 indexer ,前提是不能有資料才可以加入 indexer 。

source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
	c.lock.Lock()                                    //鎖不解釋
	defer c.lock.Unlock()                            //退出解鎖

	if len(c.items) > 0 {                            //現在有存資料了,不能加入indexer 
		return fmt.Errorf("cannot add indexers to running index")
	}

	oldKeys := sets.StringKeySet(c.indexers)        //把舊的indexer轉乘一個可以比對的資料
	newKeys := sets.StringKeySet(newIndexers)        //把新的indexer轉乘一個可以比對的資料

	if oldKeys.HasAny(newKeys.List()...) {           //比對兩個indexer 是否有重複
		return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
	}
    //上述不引響核心邏輯,不展開解析
    

    //把新的indexer加入indexer中
	for k, v := range newIndexers {
		c.indexers[k] = v
	}
	return nil
}
Resync

沒幹啥事
source-code

1
2
3
4
func (c *threadSafeMap) Resync() error {
	// Nothing to do
	return nil            //沒幹啥
}

最後幾個 function 加油!

updateIndices

更新 indices ,如果有就的物件就需要刪除舊的 index

遞迴所有 indexer 拿到計算 indexed value 的 function ,透過 indexed function 計算出 indexed value。
使用 index name 從 indices 拿到 index。

如果 index 不存在建立新的index,並且建立 index name 與 indices 的連結

透過 indexed function 計算出 indexed value
遞迴所有 indexed value ,從 index 中到對應的 Object key set ,若是找不到 Set ,則建立一個 Object ket set 並且讓 indexed value 與 Object key set 建立連結。

source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
//傳入舊的 object 為了確認有沒有這個 index (更新用)
//傳入新的 object 
//私有方法只能從add 或是 update呼叫,add/update 有上鎖,保證thread safe
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	// if we got an old object, we need to remove it before we add it again
    //如果有代舊的object的話需要先刪除舊的index,下一個段落會說到如何刪除
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
    //將所有的indexer跑過一次,拿到所有的index name以及indexed function
	for name, indexFunc := range c.indexers {
            //透過indexed function 算出indexed
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
            //透過 index name 從 indices 拿到對應的 index
		index := c.indices[name]
            //確認 index 是否存在,不存在表示這個 indices 還沒有這個 index
            //建立 index 並且對應到 indecies 上
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}
            
		for _, indexValue := range indexValues {
                //透過 indexed 檢查 index 對應的 Object key set
			set := index[indexValue]
                //若是set 為空表示 index 還沒建立object key set
                //建立 set 並且對應到 index 上
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
                    //object set set 插入 object key
			set.Insert(key)
		}
	}
}

deleteFromIndices

當更新物件或是刪除物件的時候會被呼叫為了更新 Indices

遞迴 indexer 取出所有的 indexed value function ,並且從 indices 確認 index 是否存在

若是不存在,表示物件已經被清理且 indices 已更新

若是存在,需要進一步確認 indexed value function 算出來的所有 indexed value 在 index 中 Object key set 的情況
若是 Object key set 存在就刪除 Object 內對應的 Object key,另外 Obeset set 若是為空 需要回收 Object 減少記憶體使用。
source-code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
    //將indexer 中全部 indexed function 取出
	for name, indexFunc := range c.indexers {
            //透過indexed function 算出 indexed 
		indexValues, err := indexFunc(obj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
            //透過index name 從indices拿到對應的index
		index := c.indices[name]
            //有可能 index name 在indeces中找不到(不太可能...) 
		if index == nil {
			continue
		}
            //將indexed 全部跑一次
		for _, indexValue := range indexValues {
                    //從index拿到對應的object key set 
			set := index[indexValue]
                    //如果set 存在的話就刪除對應的 Object key
			if set != nil {
				set.Delete(key)
                        
				    //刪除 object key 為零的 set ,為了避免佔用空間(kubernetes 上有相關的issuekubernetes/kubernetes/issues/84959)
				if len(set) == 0 {
					delete(index, indexValue)
				}
			}
		}
	}
}

小結

我想先介紹到這邊把 threadSafeMap 的部分介紹完,讓大家先了解 cache 底層是怎麼做的,以及 indexerindexed-nameIndicesindex 以及 Object set 之間的關聯。
因為 cahce 是基於 threadSafeMap 往上蓋的,所以先建立最底層的概念是非常重要,下一篇我將會繼續介紹 cache 的部分。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page