This page looks best with JavaScript enabled

Kubernetes kubelet cache pod status 怎麼一回事

 ·  ☕ 5 min read

首先本文所有的 source code 基於 kubernetes 1.21 版本(終於版更了,這部分跟 1.19 沒什麼差別),所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

本篇文章將介紹 kubelet 如何 cache pod status 有助於我們後續在拆解 kubelet 的部分功能,cache 的實作也相當的簡單,如果對於 kubernetes 其他 cache 也感興趣的朋友歡迎看看筆者之前寫的文章Kubernetes util tool 使用 cache LRU 定格一瞬間Kubernetes Indexers local cache 之美 (I)Kubernetes Indexers local cache 之美 (II),其中包含 kubernetes 使用 cache 的各式情境。

interface

Cache interface 定義了兩種獲取 PodStatus 的方法:一種是非阻塞的 Get()另外一種為阻塞 GetNewerThan() 方法,分別用於不同的情境。
另外也定義了 Set 以及 Delete 的方法用來新增 pod status 以及刪除對應的 pod status 以及透過 UpdateTime 確保 cache 的新鮮度,我們就來看看 interface 實際上對應到的 code 。

1
2
3
4
5
6
7
8
9
type Cache interface {
	Get(types.UID) (*PodStatus, error)
	Set(types.UID, *PodStatus, error, time.Time)
	// GetNewerThan is a blocking call that only returns the status
	// when it is newer than the given time.
	GetNewerThan(types.UID, time.Time) (*PodStatus, error)
	Delete(types.UID)
	UpdateTime(time.Time)
}

了解 cache 定義的 interface 後就來看看哪個物件實作 interface 囉。

struct

cache struct 物件實作了 cache interface 。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

type cache struct {
	
	lock sync.RWMutex                    //確保資料的一致性讀多寫少,使用rw鎖
	
	pods map[types.UID]*data               //使用 map 儲存 key 為 types.UID value為 pod status data 
	
	timestamp *time.Time                    //透過權域的timestamp來確認資料的新鮮度

	subscribers map[types.UID][]*subRecord    //透過 map 紀錄哪一個 type.UID 目前是誰監聽
}


//data 資料結構儲存 pod status 
type data struct {
	
	status *PodStatus                        //儲存 pod status 
	// Error got when trying to inspect the pod.
	err error                                //儲存 pod inspect 錯誤
	
	modified time.Time                        //上一次 pod 被修改的時間
}


//透過 subRecord 可以回傳給監聽者
type subRecord struct {
	time time.Time
	ch   chan *data                            //透過 channel 回傳 pod status 
}

了解了資料結構後,我們來看一下怎麼把 cache 建立起來。

New function

1
2
3
4
5
// NewCache creates a pod cache.
func NewCache() Cache {
	//簡單的初始化 cache 會用到的 map 
	return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
}

接著了解一下 cache 物件如何實作 cache interface 的需要吧!

impliment

Set 會設置 Pod 的 PodStatus 到 cache 中

set

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
	//防止競爭上鎖
	c.lock.Lock()
	defer c.lock.Unlock()
	//最後通知所有的訂閱者哪個 types.UID 發生了變化
	defer c.notify(id, timestamp)
	//設定 cache 對應的資料,透過 types.UID 對應到 PodStatus
	c.pods[id] = &data{status: status, err: err, modified: timestamp}
}

// 如果滿足要求,則通知為具有給定 id 的 pod 發送通知。請注意,調用者應該獲取鎖。
func (c *cache) notify(id types.UID, timestamp time.Time) {
	//取得監聽某個 types.UID 的 subRecord slice 物件
	//slice 的長度表示有多人在監聽這個 types.UID
	list, ok := c.subscribers[id]

	//如果在 map 中找不到對應的 subRecord slice 物件表示沒有人在監聽
	if !ok {
		// No one to notify.
		return
	}
    
	newList := []*subRecord{}

	//取出 subRecord slice 的每個元素(subRecord)
	for i, r := range list {
		//如果該 subRecord 要的資料在 cache 內是不新鮮(透過 timestamp 進行比對。當前 cache 的 timestamp 太舊、subRecord 想要的 timestamp 較新)    
		if timestamp.Before(r.time) {
			// 那些追蹤較新資料的追蹤者需要保留起來,下次 cache 更新 timestamp 與資料的時候可以通知那些追蹤者。
			newList = append(newList, list[i])
			continue
		}
		//反之 subRecord 追蹤的元素對於 cache 來說是新鮮的資料(透過 timestamp 進行比對。當前 cache 的 timestamp 比 subRecord 想要的 timestamp 要新),就透過 channel 回傳
		r.ch <- c.get(id)
		//關閉channel
		close(r.ch)
	}
	//檢查 newlist 長度,若長度為零代表沒有遺留追蹤較新資料的觀察者,可以把觀察這筆資料的所有觀察者清除。
	//反之更新 追蹤較新資料的觀察者到追蹤特定 types.UID 到 map 中
	if len(newList) == 0 {
		delete(c.subscribers, id)
	} else {
		c.subscribers[id] = newList
	}
}

get

輸入 type id 取的某一個 pod status 當前狀態 ,要注意這是一個 nonblock 操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *cache) Get(id types.UID) (*PodStatus, error) {
	//防止競爭上鎖
	c.lock.RLock()
	defer c.lock.RUnlock()
    
	//透過 types.UID 取得對應的 pod status    
	d := c.get(id)
	return d.status, d.err
}

func (c *cache) get(id types.UID) *data {
	//從 map 中透過 types.UID 找到對應的資料
	d, ok := c.pods[id]
	//如果沒有找到的話就建立一個預設的資料回傳		
	if !ok {
		return makeDefaultData(id)
	}
    //如果有找到直接回傳 map 中對應的資料	
    return d
}
//建立一個空資料的 pod status
func makeDefaultData(id types.UID) *data {
	return &data{status: &PodStatus{ID: id}, err: nil}
}

GetNewerThan

透過 GetNewerThan function 我們可以傳入 types.UID 以及 minTime (用來對比新鮮度)可以得到對應的 pod status ,要注意它是一個 block 操作(直到取的 pod status 為止)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
	//得到某一個 types.UID 對應的資料變化的 channel
	ch := c.subscribe(id, minTime)
	//從 channel 取得資料變化    
	d := <-ch
	//回傳資料狀態,以及錯誤訊息    
	return d.status, d.err
}

func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
	//建立 channel 以供後續傳輸資料變化
	ch := make(chan *data, 1)
	//防止競爭上鎖
	c.lock.Lock()
	defer c.lock.Unlock()
    
	//透過 id 以及 timestamp 查詢 cache 中對應的資料 ,若當時 cache 的新限度滿足客戶端的需求,就能早到資料
	d := c.getIfNewerThan(id, timestamp)
	//如果找得到資料就回傳給 channel    
	if d != nil {
		ch <- d
		return ch
	}
	// 找不到資料就會加入 cache 某一 types.UID 的 subscribers 行列,等到有資料後可以透過 channel 通知使用者。
	c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
    
	//回傳channel以供後續資料傳輸使用    
	return ch
}

func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
	//透過 type uid 從 map 取處得對應的 pod status 
	d, ok := c.pods[id]

	//透過 timestamp 檢查 cache 是否準備好提供服務,以及透過 cache timestamp 對比輸入物件的 timestamp 檢查當前的 cache 新鮮度
	//情境1:假設沒有 cache 沒有設定timestamp就代表無法比對新鮮度。globalTimestampIsNewer 為 false
	//情境2:假設 cache 最後一次捕捉到的資料是 10:00 的資料,使用者要求 11:00 的資料,表示 cache 不新鮮。globalTimestampIsNewer 為 false
	//情境3:假設 cache 最後一次捕捉到的資料是 11:00 的資料,使用者要求 10:00 的資料,表示 cache 目前儲存的資料是新鮮的。globalTimestampIsNewer 為 true
	globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))

	//判斷 cache 中是否有對應的 pod 以及 cache 新不新鮮
	//如果 cache 沒有對應的 pod 但是 cache 的保存資料是新鮮的 ,就回傳一個 default 的資料
	if !ok && globalTimestampIsNewer {
		return makeDefaultData(id)
	}
	
	//如果 cache 有對應的 pod 同時 cache 保存的新鮮度是足夠的話就回傳資料
	if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
		return d
	}

	//cache 不存在以及 cache 保存的不夠新鮮度回傳 nil
	return nil
}

Delete

某個 podstatus 已經不需要了,所以透過 delete function 清理掉。

1
2
3
4
5
6
7
func (c *cache) Delete(id types.UID) {
	//防止競爭加鎖
	c.lock.Lock()
	defer c.lock.Unlock()
	//透過 types.UID 刪除 map 中對應的資料
	delete(c.pods, id)
}

UpdateTime

更改 cache 的 timestamp 並通知所有的訂閱者,可以更新 pod status 給訂閱者。

1
2
3
4
5
6
7
8
9
func (c *cache) UpdateTime(timestamp time.Time) {
	c.lock.Lock()
	defer c.lock.Unlock()
	c.timestamp = &timestamp
	// Notify all the subscribers if the condition is met.
	for id := range c.subscribers {
		c.notify(id, *c.timestamp)
	}
}

小結

kubelet 使用 cache 來儲存 pod status 的狀態,並且透過 timestamp 確保 cache 的新鮮度,若是 cache 更新 timestamp 也會通知 subscribers 訂閱的 pod status 當前的狀態。
此外也提供兩種獲取 PodStatus 的方法:一種是非阻塞的 Get()另外一種為阻塞 GetNewerThan() 方法,分別用於不同的情境。

本篇文章雖然篇幅不長知識量也不大,但作為後續分析系統每個元件都是重要的一份子呢xD,如果文中有錯希望大家不吝嗇提出,讓我們互相交流學習。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page