This page looks best with JavaScript enabled

Kubernetes Indexers localcache 之美(II)

 ·  ☕ 6 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

cache

重新複習一次前一章Kubernetes Indexers localcache 之美(I)提到的 cache 相關的內容

struct

先來看他的 cache 的資料結構, cache 組合了 ThreadSafeStore 以及 KeyFunc

1
2
3
4
5
6
7
8
9
// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
	// cacheStorage bears the burden of thread safety for the cache
	cacheStorage ThreadSafeStore    //一個 thread safe 的 interface 
	// keyFunc is used to make the key for objects stored in and retrieved from items, and
	// should be deterministic.
	keyFunc KeyFunc                //計算object key的方法,稍後會解釋
}

複習完資料結構後,看看如何新增一個有實作 Store Interface 的 cache 物件。

new function

分別有兩種 function

  • 第一種為傳入 KeyFunc 也就是只傳入 Object Key 如何計算告訴 cache object key 如何計算,其他的 Indexers 以及 Indices 使用預設的。

  • 第二種為傳入 KeyFunc 以及 Indexers ,也就是告訴 cache Object Key 如何計算方法以及儲存 KeyFunc 的 Indexers 採用哪一種。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
		keyFunc:      keyFunc,
	}
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

我們先來看看 cache 實作了哪些方法再來探討怎麼使用 cache

impliment

這裡的 Function 幾乎依賴 ThreadSafeStore 的實作,如果不了解 ThreadSafeStore 做了什麼的朋友可以回到前一章Kubernetes Indexers localcache 之美(I)複習一下!

Add

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
    //透過 key function 計算出 object 所對應的 key
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore Add function 這裡的細節可以去上章節複習。
    //簡單來說就更新 local cache 的物件以及更新 indices 以及 index
	c.cacheStorage.Add(key, obj)
	return nil
}

Update

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Update sets an item in the cache to its updated state.
func (c *cache) Update(obj interface{}) error {
    //透過 key function 計算出 object 所對應的 key
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore Update function 這裡的細節可以去上一章節複習。
    //簡單來說就更新 local cache 的物件以及更新 indices 以及 index
	c.cacheStorage.Update(key, obj)
	return nil
}

Delete

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {
    //透過 key function 計算出 object 所對應的 key
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore Delete 這裡的細節可以去上一章節複習。
    //簡單來說就刪除 local cache 的物件以及更新 indices 以及 index
	c.cacheStorage.Delete(key)
	return nil
}

List

1
2
3
4
5
6
7
// List returns a list of all the items.
// List is completely threadsafe as long as you treat all items as immutable.
func (c *cache) List() []interface{} {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore List 這裡的細節可以去上一章節複習。
    //簡單來說就是列出 local cache 所有 Object 
	return c.cacheStorage.List()
}

ListKeys

1
2
3
4
5
6
7
// ListKeys returns a list of all the keys of the objects currently
// in the cache.
func (c *cache) ListKeys() []string {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore ListKeys 這裡的細節可以去上一章節複習。
    //簡單來說就是列出 local cache 所有 Object Key
	return c.cacheStorage.ListKeys()
}

GetIndexers

1
2
3
4
5
6
// GetIndexers returns the indexers of cache
func (c *cache) GetIndexers() Indexers {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore GetIndexers 這裡的細節可以去上一章節複習。
    //簡單來說就是列出 loacl cache 目前有的所有 Indexer
	return c.cacheStorage.GetIndexers()
}

Index

1
2
3
4
5
6
7
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore Index 這裡的細節可以去上一章節複習。
    //簡單來說就是透過以知道的index name 以及 Object 合作幫忙分類列出 loacl cache 相關的 Object  
	return c.cacheStorage.Index(indexName, obj)
}

List

1
2
3
4
5
func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore IndexKeys 這裡的細節可以去上一章節複習。
    //簡單來說就是透過已知道 index name 以及 indexkey 合作幫忙分類列出 loacl cache 上相關的 Objecy Key
	return c.cacheStorage.IndexKeys(indexName, indexKey)
}

List

1
2
3
4
5
6
// ListIndexFuncValues returns the list of generated values of an Index func
func (c *cache) ListIndexFuncValues(indexName string) []string {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore ListIndexFuncValues 這裡的細節可以去上一章節複習。
    //簡單來說就是透過已知道 index name 幫忙分類列出 loacl cache 上負責計算Object key 的名字也就是 indexed function name
	return c.cacheStorage.ListIndexFuncValues(indexName)
}

ByIndex

1
2
3
4
5
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore ByIndex 這裡的細節可以去上一章節複習。
    //簡單來說就是透過已知道 index name 以及 index key 合作幫忙分類列出 loacl cache 上所有有關的 Object 
	return c.cacheStorage.ByIndex(indexName, indexKey)
}

AddIndexers

1
2
3
4
5
func (c *cache) AddIndexers(newIndexers Indexers) error {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore AddIndexers 這裡的細節可以去上一章節複習。
    //簡單來說就是新增一個 Indexer
	return c.cacheStorage.AddIndexers(newIndexers)
}

Get

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
    //透過 key function 計算出 Object 所代表的 object key
	key, err := c.keyFunc(obj)
	if err != nil {
		return nil, false, KeyError{obj, err}
	}
    //使用該 Object key  來取得 Object 
	return c.GetByKey(key)
}

GetByKey

1
2
3
4
5
6
7
8
// GetByKey returns the request item, or exists=false.
// GetByKey is completely threadsafe as long as you treat all items as immutable.
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore Get 這裡的細節可以去上一章節複習。
    //簡單來說就是使用 Object key 從 local stoarge 取得相關的 Object 
	item, exists = c.cacheStorage.Get(key)
	return item, exists, nil
}

ReplaceList

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Replace will delete the contents of 'c', using instead the given list.
// 'c' takes ownership of the list, you should not reference the list again
// after calling this function.
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
    // 傳入一堆 object 以及這批物件的版本
    //建立一個 item slice 進行遞迴計算他們的Object key
	items := make(map[string]interface{}, len(list))
	for _, item := range list {
		key, err := c.keyFunc(item)
		if err != nil {
			return KeyError{item, err}
		}
		items[key] = item
	}
    //委任給 cacheStorage 也就是 threadSafeStore 去做處理,threadSafeStore Replace 這裡的細節可以去上一章節複習。
    //簡單來說就是使用 這一坨 Object 直接對 原有的進行取代
	c.cacheStorage.Replace(items, resourceVersion)
	return nil
}

Resync

1
2
3
4
5
// Resync is meaningless for one of these
func (c *cache) Resync() error {
    //沒做事xD
	return nil
}

快速地看完 cache 實作了什麼,絕大多數的 function 都是依賴著更加底層的 threadSafeStore 去完成,有了上一章節的邏輯整理 cache 實作的部分很快地就可以掃過去!
接著讓我來看看外面怎麼來使用 cache 幫忙儲存以及取出資料吧!

how to use

client go 官方有範例其中有一段用到 cache 的初始化 function NewIndexer ,我們先來看看怎麼用,以及他傳入什麼參數。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
//cache package 中的 NewIndexerInformer是我們要關注的重點
//傳入的參數我們先不要管那麼多菜後續的章節會慢慢的頗析到
indexer, informer := cache.NewIndexerInformer(
    podListWatcher,
    &v1.Pod{},
    0,
    cache.ResourceEventHandlerFuncs{
    	...
    },
    cache.Indexers{})

上段 source code 中提到的 cache package 中的 NewIndexerInformer 是我們要關注的重點是一個重要的 function ,我們先來看看這個 function 裡面寫了什麼吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//傳入數值目前不是很重要,我們需要把重要放到 NewIndexer 來
//可以看到 NewIndexer 傳入了 DeletionHandlingMetaNamespaceKeyFunc 以及 cache package 中預設的 indexers。
//表示所有的物件都會透過 DeletionHandlingMetaNamespaceKeyFunc 來計算Object key 並且存放在 local cache 中。
func NewIndexerInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	indexers Indexers,
) (Indexer, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}

我們再來看看其他用法,雖然這個方法在 Kubernetes 中非常少被用到但值得我們去了解。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//傳入數值目前不是很重要,我們需要把重要放到 NewStore 來
//一樣這邊傳入DeletionHandlingMetaNamespaceKeyFunc 來處理 Object  Key 的計算
func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller) {
	// This will hold the client state, as we know it.
	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}

小結

大家可能比較不了解的地方在於 Object key 計算的方法,我在 Kubernetes source code 中挖掘了很久找到一個實作 keyFunc 的 function ,不確定還有沒有其他實作 keyFunc 的 function 。

  1. MetaNamespaceIndexFunc
    source code
1
2
3
4
5
6
7
8
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
	meta, err := meta.Accessor(obj)
	if err != nil {
		return []string{""}, fmt.Errorf("object has no meta: %v", err)
	}
	return []string{meta.GetNamespace()}, nil
}

我就不展開來細看裡面的實作,不就大致上能看出從 Object 拿到 metadata 中的 namespace ,以 namespace 作為 indexed name 非常簡單

了解了 cache 的實作以及底層的 threadSafeMap 與如何計算 Object key 的 KeyFunc 後,下一篇我想探討 DeltaFIFO 的區塊, 可以看到 DeltaFiFo如何跟 cache 進行資料的交換,文章若寫錯誤的地方還請各位大大提出,感謝!


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page