This page looks best with JavaScript enabled

Kubernetes kubelet 探測 pod 的生命症狀探針得如何出生-2

 ·  ☕ 18 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

希望閱讀文章的朋友可以先去看前一篇Kubernetes kubelet 探測 pod 的生命症狀探針得如何出生-1
,了解一下前後文的關係比較好理解本篇要說明的重點。

另外想要了解kubernetes probe 有哪些型態以及底層是如何實作的可以參考以下三篇文章Kubernetes kubelet 探測 pod 的生命症狀 Http GetKubernetes kubelet 探測 pod 的生命症狀 tcp socket以及Kubernetes kubelet 探測 pod 的生命症狀 Exec,可以從這三篇文章了解 kubernetes probe 如何提供 tcp socket、 exec 以及 http get 三種方法的基本操作。

上一篇文章Kubernetes kubelet 探測 pod 的生命症狀探針得如何出生-1
,livenessManager readinessManager startupManager 以及 probeManager 的物件是從哪裡生成的,用在什麼地方,以及作用在哪裡。因此本篇文章會將重點聚焦在這些物件之後的相關的調用鍊。

這邊來回顧一下 proberesults.Manager 是怎麼產生的
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Kubelet is the main kubelet implementation.
type Kubelet struct 
	//用來控制哪些 pod 要加入到 probe manager
	// Handles container probing.
	probeManager prober.Manager

	// 用來探測 container liveness 、 readiness 、  startup probe 結果儲存
	// Manages container health check results.
	livenessManager  proberesults.Manager
	readinessManager proberesults.Manager
	startupManager   proberesults.Manager
	...
}

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...)(*Kubelet, error) {

	...
	//在這裡透過 proberesults.NewManager() 生成 並且設定 kubelet 的 livenessManager readinessManager startupManager,用來儲存 container 探測 liveness 、 readiness 、  startup prob 的結果
	//後面會繼續解析 proberesults.NewManager 得實作
	klet.livenessManager = proberesults.NewManager()
	klet.readinessManager = proberesults.NewManager()
	klet.startupManager = proberesults.NewManager()
    
	...
	// kubelet 的 probemanager 則是需要組合上面提到的三種 manager 以及 runner 與 recorder 透過 prober.NewManager 新增對應的物件,用以用來控制哪些 pod 要加入到 prob manager。
	//後面會繼續解析 prober.NewManager 得實作    
	klet.probeManager = prober.NewManager(
		klet.statusManager,
		klet.livenessManager,
		klet.readinessManager,
		klet.startupManager,
		klet.runner,
		kubeDeps.Recorder)
	...
} 

回顧完 proberesults.Manager 是怎麼產生的,我們就接著來了解一下上篇未講完的內容吧。
上一個章節中我們有談到 proberesults.Manager 是一個 interface 他定義了許多方法ref,我們在kubelet 的 syncLoopIteration 階段可以發現這邊會呼叫 proberesults.Manager 的 update function 等待 channel 把資料送來ref,就沒有看到其他地方有呼叫 proberesults.Manager 所定義的其他方法了。

我們循著線索找到 prober.Manager 是由 三種 manager 分別是 container liveness 、 readiness 、 startup 組合而成的(他們的型態都為 proberesults.Manager )。

所以說 proberesults.Manager 定義的其他 function 會在 prober.Manager 裡面被調用囉?是沒錯!所以我們需要進一步的來分析 prober.Manager 是什麼。

prober.Manager

我們了解完 livenessManager 、 readinessManager 以及 startupManager 之後,需要進一分析三個組再一起變成的 probeManager 到底是什麼東西,我們先來看他的型態 prober.Manager

interface

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type Manager interface {
	// 為每個 container probe 創建新的 probe worker,在建立 pod 時呼叫。
	AddPod(pod *v1.Pod)

	// 為刪除已存在的 container probe worker,在移除 pod 時呼叫。
	RemovePod(pod *v1.Pod)

	// CleanupPods handles cleaning up pods which should no longer be running.
	// It takes a map of "desired pods" which should not be cleaned up.
	CleanupPods(desiredPods map[types.UID]sets.Empty)

	// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
	// container based on container running status, cached probe results and worker states.
	UpdatePodStatus(types.UID, *v1.PodStatus)
}

struct

了解完 prober.Manager 所定義的 function 之後我們要來看實作的物件是誰囉!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
type manager struct {
	// 用來記錄哪個 container 對應哪一個 probe worker
	workers map[probeKey]*worker
	// 防止存取 worker 競爭
	workerLock sync.RWMutex

	// 提供 pod IP 和 container ID
	statusManager status.Manager

	// 用來存取 readiness probe 的結果
	readinessManager results.Manager

	// 用來存取 liveness probe 的結果
	livenessManager results.Manager

	// 用來存取 startup probe 的結果
	startupManager results.Manager

	// probe 實作層,可以參考[Kubernetes kubelet 探測 pod 的生命症狀 Http Get](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-http-get/),[Kubernetes kubelet 探測 pod 的生命症狀 tcp socket](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-tcp-socket/)以及[Kubernetes kubelet 探測 pod 的生命症狀 Exec](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-exec/)
	prober *prober

	start time.Time
}

// probekey 包含 probe ,pod id 、 container name 、 以及目前在 probe 型態,用來識別 worker ,也可以視為 worker 唯一辨別方式。
type probeKey struct {
	podUID        types.UID
	containerName string
	probeType     probeType
}


//這個部分可以回顧[Kubernetes kubelet 探測 pod 的生命症狀 Http Get](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-http-get/),[Kubernetes kubelet 探測 pod 的生命症狀 tcp socket](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-tcp-socket/)以及[Kubernetes kubelet 探測 pod 的生命症狀 Exec](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-exec/)
type prober struct {
	//注入 probe exec 實作對象
	exec execprobe.Prober
	//注入 http get 實作對象
	readinessHTTP httpprobe.Prober
	livenessHTTP  httpprobe.Prober
	startupHTTP   httpprobe.Prober
	//注入 tcp socket probe 實作對象
	tcp           tcpprobe.Prober
	//注入 CRI 控制器用來對 container 下達命令
	runner        kubecontainer.CommandRunner

	//注入事件紀錄收集器
	recorder record.EventRecorder
}

New function

看完了資料結構是如何定義後我們來看怎麼把這個物件建立起來。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func NewManager(
	statusManager status.Manager,
	livenessManager results.Manager,
	readinessManager results.Manager,
	startupManager results.Manager,
	runner kubecontainer.CommandRunner,
	recorder record.EventRecorder) Manager {

	//建立實際上各種執行 probe 的物件(Http get、Tcp socket、Exec)
	prober := newProber(runner, recorder)
    //剩下的是把輸入的物件進行簡單的組合
	return &manager{
		statusManager:    statusManager,
		prober:           prober,
		readinessManager: readinessManager,
		livenessManager:  livenessManager,
		startupManager:   startupManager,
		workers:          make(map[probeKey]*worker),
		start:            clock.RealClock{}.Now(),
	}
}

AddPod

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

//當 kubelet 接收到有 pod 到加入到這個節點時,會觸發 HandlePodAdditions function ,並且傳入有哪些 pod 要加入。
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {

	...
	// for 迴圈地回所有要加入的 pod 	
	for _, pod := range pods {
    
		...
		//加入到 probeManager 中
		kl.probeManager.AddPod(pod)
        
		...
	}
}

處理 pod 是否需要 probe ,如果有需要則建立對應的 probe worker (worker 後面我們會看到是什麼,這邊先知道有這個事情發生即可)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (m *manager) AddPod(pod *v1.Pod) {
	//確保 map 一致性上鎖
	m.workerLock.Lock()
	defer m.workerLock.Unlock()

	//以傳入 pod uid 組合成 probeKey 物件
	key := probeKey{podUID: pod.UID}

	//遞迴 pod spec container 欄位
	for _, c := range pod.Spec.Containers {
		//設定 probeKey containerName 為 container name
		key.containerName = c.Name

		//如果 pod spec container 有設定 StartupProbe 的話
		if c.StartupProbe != nil {
			//將 key probeType 設定成 startup
			key.probeType = startup
            
			//透過 worker map 檢查是否曾經處理過相同的 key ,如果有表示已經有 worker 在處理了,直接略過不理。
			if _, ok := m.workers[key]; ok {
				klog.ErrorS(nil, "Startup probe already exists for container",
					"pod", klog.KObj(pod), "containerName", c.Name)
				return
			}
			
			//建立新的 worker,並且傳入 pod 資訊、 container 資訊以及 manager 本身,後續會看到 worker 到底是什麼這邊先了解有這個步驟就好。
			w := newWorker(m, startup, pod, c)
			
			//將 key 與對應的 worker 設定到 map 中 
			m.workers[key] = w
            
			//啟動 worker
			go w.run()
		}
        
		//如果 pod spec container 有設定 ReadinessProbe 的話
		if c.ReadinessProbe != nil {
			//將 key probeType 設定成 readiness
			key.probeType = readiness
            
			//透過 map 檢查是否曾經處理過相同的 key ,如果有表示已經處理過了。不再處理。
			if _, ok := m.workers[key]; ok {
				klog.ErrorS(nil, "Readiness probe already exists for container",
					"pod", klog.KObj(pod), "containerName", c.Name)
				return
			}
            
			//建立新的 worker並且傳入 pod 資訊、 container 資訊以及 manager 本身,後續會看到 worker 到底是什麼這邊先了解有這個步驟就好。
			w := newWorker(m, readiness, pod, c)
            
			//將 key 與對應的 worker 設定到 map 中 
			m.workers[key] = w
            
			//啟動 worker
			go w.run()
		}
        
		//如果 pod spec container 有設定 LivenessProbe 的話
		if c.LivenessProbe != nil {
			//將 key probeType 設定成 readiness
			key.probeType = liveness
            
			//透過 map 檢查是否曾經處理過相同的 key ,如果有表示已經處理過了。不再處理。
			if _, ok := m.workers[key]; ok {
				klog.ErrorS(nil, "Liveness probe already exists for container",
					"pod", klog.KObj(pod), "containerName", c.Name)
				return
			}
            
			//建立新的 worker並且傳入 pod 資訊、 container 資訊以及 manager 本身,後續會看到 worker 到底是什麼這邊先了解有這個步驟就好。
			w := newWorker(m, liveness, pod, c)
            
			//將 key 與對應的 worker 設定到 map 中 
			m.workers[key] = w
            
			//啟動 worker
			go w.run()
		}
	}
}

RemovePod

1
2
3
4
5
6
7
8
// 當 kubelet 接收到有哪些 pod 要從這個節點移除,會觸發 HandlePodRemoves function ,並且傳入有哪些 pod 要從節點移除。
func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		
		kl.probeManager.RemovePod(pod)
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//當 pod 被移除的時候會觸發
func (m *manager) RemovePod(pod *v1.Pod) {
	//確保 map 一致性上鎖
	m.workerLock.RLock()
	defer m.workerLock.RUnlock()

	//以 pod uid 組合成 probeKey 物件
	key := probeKey{podUID: pod.UID}
    
	//遞迴 pod spec container 欄位
	for _, c := range pod.Spec.Containers {
    
		//設定 probeKey containerName 為 container name
		key.containerName = c.Name
        
		//因為需要找到 map 裡面 key 對應 worker,現在 probeKey 物件已經有 podUID containerName 現在還缺少 probeType
		//我們需要用 for 迴圈跑 readiness, liveness, startup 組合 probeKey 物件物件
		//再透過組合出來的 probeKey 從 map 中找找看有沒有對應的 worker
		for _, probeType := range [...]probeType{readiness, liveness, startup} {
			
            //設定 probeType 為 readiness 或是 liveness 或是 startup
			key.probeType = probeType
            
			//再透過組合出來的 probeKey 從 map 中找找看有沒有對應的 worker,接著關閉 worker, 後續會看到 worker 到底是什麼這邊先了解有這個步驟就好。
			if worker, ok := m.workers[key]; ok {
				worker.stop()
			}
		}
	}
}

UpdatePodStatus

根據 container 運行狀態、probe 結果,針對每個 container 進行適當狀態修改。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 給定 pod 狀態,為 pod 建立最終的 API pod 狀態。這裡的狀態可以想像成 pod spec 最後的 pod status 欄位。
func (kl *Kubelet) generateAPIPodStatus(pod *v1.Pod, podStatus *kubecontainer.PodStatus) v1.PodStatus {
	...
	spec := &pod.Spec
    
	...
	//透過pod status 
	kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
    
	...
	return *s
} 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
	//透過傳入的 pod status 遞迴所有的 ContainerStatuses
	for i, c := range podStatus.ContainerStatuses {
		var started bool

		//看看 container 是否已經開始執行,已經開始執行的話需要判斷 startup probe 是否成功。
		if c.State.Running == nil {
			
			//如果還沒開始執行設定為 false
			started = false
            
		// 傳入的 container id 透過 kubecontainer.ParseContainerID function 轉成 containerId 物件
		// 透過 startupManager get function 傳入 containerId 物件得到 startup 探測結果。
		} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
        
			// 如果 startup 探測成功的話就設定 started 為 true   
			started = result == results.Success
		} else {
        
			// 透過 pod id 、container name 與 startup 從 getWorker 拿到 worker
			_, exists := m.getWorker(podUID, c.Name, startup)
            
			//如果找不到 worker ,就當作探測成功,因為沒有 startup worker 
			started = !exists
		}

		//依照幾種情況來修正 container 狀態
		//State.Runnin 判斷 container 是否啟動
		//startupManager.Get 探測 container startup 狀態
		//getWorker 判斷是否有 worker ,若沒有worker 表示沒有 startup probe
		//依照上述情況設定設定 Container 的 Statuses
		podStatus.ContainerStatuses[i].Started = &started

		// 若是確認 container 已經啟動
		if started {
			var ready bool
            
			// 再次確認 container 是否有啟動
			if c.State.Running == nil {
				ready = false
                
			// 如果 container 有啟動繼續透過 傳入的 container id 透過 kubecontainer.ParseContainerID function 轉成 containerId 物件
			// readinessManager.Get  傳入 containerId 物件得到 readiness 探測結果。
			} else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {

				// 如果 readiness 探測成功的話就設定 started 為 true
				ready = result == results.Success
			} else {
            
				// 透過 pod id 、container name 與 readiness 從 getWorker 拿到 readiness worker 
				w, exists := m.getWorker(podUID, c.Name, readiness)
                
				//如果找不到 worker ,就當作探測 readinessProbe 成功
				ready = !exists // no readinessProbe -> always ready
				
				//如果有找到我們需要進一步判斷 worker 狀態
				if exists {
					// 手動觸發探測下次就可以得知結果
					select {
					case w.manualTriggerCh <- struct{}{}:
					default: // Non-blocking.
						klog.InfoS("Failed to trigger a manual run", "probe", w.probeType.String())
					}
				}
			}
			//依照幾種情況來修正 container 狀態
			//State.Runnin 判斷 container 是否啟動
			//readinessManager.Get 探測 container readiness 狀態
			//getWorker 判斷是否有 worker 
			//依照上述情況設定設定 Container 的 Statuses
			podStatus.ContainerStatuses[i].Ready = ready
		}
	}
    
	// 如果 init container 為成功退出或就把 init container status ready 設定為成功。
	for i, c := range podStatus.InitContainerStatuses {
		//預設 ready 為 false
		var ready bool
		
		//如果 init container 狀態為 Terminated 並且退出狀態碼為 0 ,就把 ready 設定為 true
		if c.State.Terminated != nil && c.State.Terminated.ExitCode == 0 {
			ready = true
		}
		// init container status ready 設定為成功。
		podStatus.InitContainerStatuses[i].Ready = ready
	}
}

CleanupPods

執行一系列清理工作,包括終止 pod worker、殺死不需要的 pod 以及刪除 orphaned 的volume/pod directories。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
func (kl *Kubelet) HandlePodCleanups() error {
	...
	//這裡有點複雜我們先當作 desiredPods 就是那些已經不存在的 pod 就好
	//Stop the workers for no-longer existing pods.
	kl.probeManager.CleanupPods(desiredPods)
	...    
	return nil
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) {
	//確保 map 一致性上鎖
	m.workerLock.RLock()
	defer m.workerLock.RUnlock()
	//遞迴 manager 所有 worker 找到 預期要消失的 pod uid ,透過 worker stop 結束 probe 作業。
	for key, worker := range m.workers {
		if _, ok := desiredPods[key.podUID]; !ok {
			worker.stop()
		}
	}
}

support

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//透過 getworker 我們可以傳入 pod uid 、 container name 以及 probeType 獲得 probe worker
func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) {
	//確保 map 一致性上鎖
	m.workerLock.RLock()
	defer m.workerLock.RUnlock()

	//透過 pod uid 以及 container name 還有 probetype 組合成 probeKey 
	//再從 map 找到對應的 worker
	worker, ok := m.workers[probeKey{podUID, containerName, probeType}]
	return worker, ok
}

// 當不再需要某一個 probe worker 時候我們可以呼叫 removeWorker 傳入 pod uid 、 container name 以及 probeType 刪除 probe worker。
func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) {
	//確保 map 一致性上鎖
	m.workerLock.Lock()
	defer m.workerLock.Unlock()

	//透過 pod uid 以及 container name 還有 probetype 組合成 probeKey 
	//再從 map 刪除對應的 worker
	delete(m.workers, probeKey{podUID, containerName, probeType})
}

// workerCount 返回 probe worker 的總數,測試用。
func (m *manager) workerCount() int {
	//確保 map 一致性上鎖
	m.workerLock.RLock()
	defer m.workerLock.RUnlock()
    
	//看看目前現在有多少的 worker
	return len(m.workers)
}

整理一下

還記得之前為了找到在 proberesults.Manager 定義的卻沒被用到的這三個 function 嗎?我們現在來看看都在哪裡用上了吧!

1
2
3
4
5
6
	// 透過 container id 從 實作者身上得到 result 結果
	Get(kubecontainer.ContainerID) (Result, bool)
	// 透過 container id 設定 pod 探測的結果。實作者需要把結果儲存起來。
	Set(kubecontainer.ContainerID, Result, *v1.Pod)
	// 透過 container id 移除時實作者身上的對應的資料
	Remove(kubecontainer.ContainerID)

Get

透過 container id 從 實作者身上得到 result 結果
UpdatePodStatus

1
2
3
4
5
6
7
8
9
func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
	//透過傳入的 pod status 遞迴所有的 ContainerStatuses
	for i, c := range podStatus.ContainerStatuses {
		...
		} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
        
			// 如果 startup 探測成功的話就設定 started 為 true   
			started = result == results.Success
		} 

其他的好像都還沒用上,沒關係等等我們還會看到!

接著來談談 prober.Manager 的實作者 manager 一直有用到的 probe worker 到底是什麼勒~

worker

這裡的 worker 做的事情就是定期地去執行 probe 的工作,一個 worker 只會只能一種工作 startup 、 readliness 或是 liveness。

struct

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type worker struct {
	// 停止 worker 的 channel
	stopCh chan struct{}

	// 手動觸發 probe 的 channel
	manualTriggerCh chan struct{}

	// pod spec 
	pod *v1.Pod

	// container spec
	container v1.Container

	// container probe spec
	spec *v1.Probe

	// worker 目前執行什麼種類的 probe(liveness, readiness or startup)
	probeType probeType

	// 一開始 worker 處於什麼狀態(Unknown、Success、Failure)
	initialValue results.Result

	// 用來儲存 worker probe 後的結果
	resultsManager results.Manager
    
	//worker 會用到上層 manager 一些方法    
	probeManager   *manager

	// worker 處理的 container id 
	containerID kubecontainer.ContainerID
	// worker 的最後一次探測結果。
	lastResult results.Result
	// worker probe 連續返回多少次相同的結果。
	resultRun int

	// 如果有設定,worker在 probe 探測時會略過本次探測。
	onHold bool

	// promethus metric 紀錄 probe metric 用
	proberResultsSuccessfulMetricLabels metrics.Labels
	proberResultsFailedMetricLabels     metrics.Labels
	proberResultsUnknownMetricLabels    metrics.Labels
}

new function

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// 建立 work 去檢測 container 狀態
func newWorker(
	m *manager,
	probeType probeType,
	pod *v1.Pod,
	container v1.Container) *worker {

	//初始化 worker 
	w := &worker{
		//stop channel     呼叫的時候不會有 block (其實還是會拉xD),用來停止worker用
		stopCh:          make(chan struct{}, 1), 
		//manualTriggerCh  呼叫的時候不會有 block (其實還是會拉xD),用來觸發 worker probe 用
		manualTriggerCh: make(chan struct{}, 1), 
		//傳入 pod spec        
		pod:             pod,
		//要 prob 的 container 
		container:       container,
		//要 prob 的型態,有可能是liveness, readiness or startup
		probeType:       probeType,
		//傳入 manager 因為 worker 會操作 manger 物件
		probeManager:    m,
	}
    
	//判定 probeType
	switch probeType {
	//如果是 readiness
	//設定 worker spec 為 container 的 readiness 區塊
	//設定 worker 的 resultsManager 為外部 manger 的 readinessManager ,用以做 readiness probe
	//設定初始狀態為 Failure ,因為 readiness 為 faill 的話就不會把 pod 加到服務( service )上。
	case readiness:
		w.spec = container.ReadinessProbe
		w.resultsManager = m.readinessManager
		w.initialValue = results.Failure
	//如果是 liveness
	//設定 worker spec 為 container 的 LivenessProbe 區塊
	//設定 worker 的 resultsManager 為外部 manger 的 livenessManager ,用以做 livenessManager probe
	//設定初始狀態為Success ,因為 liveness 為 Success 的話一開始就不會把 pod 刪掉/重啟。
	case liveness:
		w.spec = container.LivenessProbe
		w.resultsManager = m.livenessManager
		w.initialValue = results.Success
	//如果是 startup
	//設定 worker spec 為 container 的 startup 區塊
	//設定 worker 的 resultsManager 為外部 manger 的 startupManager ,用以做 startup probe
	//設定初始狀態為Unknown 
	case startup:
		w.spec = container.StartupProbe
		w.resultsManager = m.startupManager
		w.initialValue = results.Unknown
	}

	//設定 promethus metric 結構
	basicMetricLabels := metrics.Labels{
		//worker 在 prob type    
		"probe_type": w.probeType.String(),
		//worker prob 的 container 對象
		"container":  w.container.Name,
		//worker prob 的 pod 對象
		"pod":        w.pod.Name,
		//worker prob 的 namespace 
		"namespace":  w.pod.Namespace,
		//worker prob 的 pod id 對象
		"pod_uid":    string(w.pod.UID),
	}

    //透過 deepCopyPrometheusLabels 把 basicMetricLabels 複製一份並且	
    //設定 worker proberResultsSuccessfulMetricLabels 
    //proberResultsSuccessfulMetricLabels 這邊 metric label 設定成 probeResultSuccessful
	w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
	w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful

    //透過 deepCopyPrometheusLabels 把 basicMetricLabels 複製一份並且	
    //設定 worker proberResultsFailedMetricLabels 
    //proberResultsFailedMetricLabels 這邊 metric label 設定成 probeResultSuccessful
	w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
	w.proberResultsFailedMetricLabels["result"] = probeResultFailed

    //透過 deepCopyPrometheusLabels 把 basicMetricLabels 複製一份並且	
    //設定 worker proberResultsUnknownMetricLabels 
    //proberResultsUnknownMetricLabels 這邊 metric label 設定成 probeResultSuccessful
	w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
	w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown

	//回傳worker
	return w
}

run

worker 開始定時執行 probe 作業

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// run periodically probes the container.
func (w *worker) run() {
	//透過 pod spec 設定 probe 的間隔時間 
	probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

	//我猜的...不確定為什麼會這樣設計,如果有知道的大大希望能不吝嗇告知
	//依照現在的時間點減去 kubelet 的啟動時間,得到 kubelet 存活的時間。
	//如果 PeriodSeconds 大於 kubelet 存活的時間的話,讓worker 睡一下等等在 probe 。
	//我猜可能是因為kubelet 還沒完全準備好(吧?)
	if probeTickerPeriod > time.Since(w.probeManager.start) {
		time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
	}

	//設定 ticker 多久要觸發一次
	probeTicker := time.NewTicker(probeTickerPeriod)

	//clean up function 
	defer func() {
		// 關閉ticker
		probeTicker.Stop()
		//如果 container id 還在的話就移除 resultsManager 跟這個 container 有關的資料
		if !w.containerID.IsEmpty() {
			w.resultsManager.Remove(w.containerID)
		}
		
		//移除map中紀錄的 worker 資訊
		w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
		//metric 不在記錄這個 worker 所發生的 metric 
		ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
		ProberResults.Delete(w.proberResultsFailedMetricLabels)
		ProberResults.Delete(w.proberResultsUnknownMetricLabels)
	}()

probeLoop:
	//每次 prob 完成後如果 probe 結果為 true ,需要等待 probeTicker ,或是 manualTriggerCh 再進行下一次的 probe trigger
	//如果是收到 stopCh 或是 probe 結果為 flase 則關閉 worker 。
	for w.doProbe() {
		// Wait for next probe tick.
		select {
		case <-w.stopCh:
			break probeLoop
		case <-probeTicker.C:
		case <-w.manualTriggerCh:
			// continue
		}
	}
}

//用來關閉 worker,設計成 Non-blocking的,簡單來看就是往 stop channel 送 stop 訊號。
func (w *worker) stop() {
	select {
	case w.stopCh <- struct{}{}:
	default: // Non-blocking.
	}
}

// probe container 並且回傳 probe 結果。如果 probe 過程中有錯會回傳 false 呼叫者需要關閉 worker。
func (w *worker) doProbe() (keepGoing bool) {
	//無腦回復狀態 panic 。
	defer func() { recover() }() 
	// runtime.HandleCrash 紀錄一下而已。
	defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

	//透過 statusManager 跟著 UID 拿到 pod status(statusManager 在這裡不太重要,知道可以拿到 pod status 就好了)
	status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
	if !ok {
		// Pod 尚未創建,或者已被刪除。
		klog.V(3).InfoS("No status for pod", "pod", klog.KObj(w.pod))
		return true
	}

	// 如果 pod 處於 PodFailed 跟 PodSucceeded 狀態,這個 worker 就可以關閉了
	if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
		klog.V(3).InfoS("Pod is terminated, exiting probe worker",
			"pod", klog.KObj(w.pod), "phase", status.Phase)
		return false
	}

	//遞迴 container status ,判斷所有的 container status 對應到輸入的 ccontainer name 回傳 status 狀態
	//如果找不到對應的 container status 就等待下一輪
	c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
	if !ok || len(c.ContainerID) == 0 {
		// 容器尚未創建,或者已被刪除。
		klog.V(3).InfoS("Probe target container not found",
			"pod", klog.KObj(w.pod), "containerName", w.container.Name)
		return true // Wait for more information.
	}

	//判斷 worker 負責的 container id 是不是跟 status 的 container 可以對上,如果對不上這種狀況可能發生在 container 被刪除或是 container 改變了。
	if w.containerID.String() != c.ContainerID {
		//如果 container id 不是空的    
		if !w.containerID.IsEmpty() {
			//從 resultsManager 刪除關於 worker container 的結果        
			w.resultsManager.Remove(w.containerID)
		}
		//設定新的 container id         
		w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
        //設定 resultsManager 要接收新的 worker container id ,並且給他初始化的 value 與 pod  spec (這裡要注意根據 probe 形式不同他們初始化得數值也不一樣 例如readiness :faill  liveness :Success  startup :Unknown)
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
		// 因為有新的 container 我們繼續 prob 流程
		w.onHold = false
	}

	//判斷是否要繼續 probe 流程
	if w.onHold {
		// Worker is on hold until there is a new container.
		return true
	}

	//判斷 container 的狀態是否正在Running,如果不是正在 Running 有可能在做waiting 、有可能在 Terminated 。
	if c.State.Running == nil {
		klog.V(3).InfoS("Non-running container probed",
			"pod", klog.KObj(w.pod), "containerName", w.container.Name)
		//如果 container id 不是空的   
		if !w.containerID.IsEmpty() {
			//從 resultsManager 刪除關於 worker container 的結果        
			w.resultsManager.Set(w.containerID, results.Failure, w.pod)
		}
		// 如果 RestartPolicy 為 不重新啟動,則中止 worker。
		return c.State.Terminated == nil ||
			w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
	}


	// 這邊我們要先了解一點!!!非長重要如果不了解建議先看之前我這篇文章[學習Kubernetes Garbage Collection機制](https://blog.jjmengze.website/posts/kubernetes/kubernetes-garbage-collection/)
	// 簡單來說,pod 被刪掉有可能先出現 DeletionTimestamp 的狀態
	// 在這個狀態之下 pod 會在 BackGround 狀態被回收
	
    
	// 可以把它想像成處於 Deletion 狀態的 pod ,且有設定 probe liveness 或是 startup,透過 resultsManager 設定成 probe 成功,不然會把 container 刪掉(重啟)。
	// 最後停止 worker ,因為 pod 已經要被刪掉了 worker 就沒用處囉。
	if w.pod.ObjectMeta.DeletionTimestamp != nil && (w.probeType == liveness || w.probeType == startup) {
		klog.V(3).InfoS("Pod deletion requested, setting probe result to success",
			"probeType", w.probeType, "pod", klog.KObj(w.pod), "containerName", w.container.Name)
		if w.probeType == startup {
			klog.InfoS("Pod deletion requested before container has fully started",
				"pod", klog.KObj(w.pod), "containerName", w.container.Name)
		}
		// Set a last result to ensure quiet shutdown.
		w.resultsManager.Set(w.containerID, results.Success, w.pod)
		// Stop probing at this point.
		return false
	}
    
    

	// 判斷 probe 的初始 Delay 時間是否到了,如果還沒到就需要等下一次觸發
	if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
		return true
	}

	//判斷 (過去) startup probe 是否已經成功,如果已經成功就可以關閉 startup worker,其他種類的 woker 保留。
	//如果是還沒 startup probe 失敗 其他 probe 都不用談直接退回去重新等待下一次觸發。
	if c.Started != nil && *c.Started {
		// Stop probing for startup once container has started.
		if w.probeType == startup {
			return false
		}
	} else {
		// Disable other probes until container has started.
		if w.probeType != startup {
			return true
		}
	}

	
	//實際執行各種 probe 的地方,如果 probe 有 error 直接停止 worker 
	//這裡依賴之前注入的 probeManager 的 probe 實作,藉由我們丟入的 probe 型態決定要怎麼執行 probe 。
	result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
	if err != nil {
		// Prober error, throw away the result.
		return true
	}

	//如果 probe 沒有 error 透過 ProberResults.With 去觸發 metric 以提供後續監控服務
	switch result {
	case results.Success:
		ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
	case results.Failure:
		ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
	default:
		ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
	}

	//用來判斷同一個 probe 結果,並且計算執行 probe 次數用
	if w.lastResult == result {
		w.resultRun++
	} else {
		w.lastResult = result
		w.resultRun = 1
	}

	//如果 probe 錯誤或是成功 低於閥值就直接安排下一次的 probe
	if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
		(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
		// Success or failure is below threshold - leave the probe state unchanged.
		return true
	}

	//透過 resultsManager 設定哪個 container 的 prob 結果是什麼,給外面的人做事(重啟、刪除之類的)
	w.resultsManager.Set(w.containerID, result, w.pod)

	//如果 worker 型態為 ( liveness 或是 startup )並且本次 prob 結果為失敗。 container 會重啟所以需要把 resultRun 重置
	//並且設定 onHold=true ,因為 container 重啟了。前面要重新獲取 container id 不需要。
	if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
		w.onHold = true
		w.resultRun = 0
	}

	//worker繼續 執行
	return true
}

整理一下

還記得之前為了找到在 proberesults.Manager 定義的卻沒被用到的這三個 function 嗎?我們現在來看看是不是都用上了!

1
2
3
4
5
6
	// 透過 container id 從 實作者身上得到 result 結果
	Get(kubecontainer.ContainerID) (Result, bool)
	// 透過 container id 設定 pod 探測的結果。實作者需要把結果儲存起來。
	Set(kubecontainer.ContainerID, Result, *v1.Pod)
	// 透過 container id 移除時實作者身上的對應的資料
	Remove(kubecontainer.ContainerID)
Set

Worker - run
透過 container id 設定 pod 探測的結果。實作者需要把結果儲存起來。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (w *worker) doProbe() (keepGoing bool) {
	...
    
	if w.containerID.String() != c.ContainerID {
		if !w.containerID.IsEmpty() {
			w.resultsManager.Remove(w.containerID)
		}
		w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
		w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
		// We've got a new container; resume probing.
		w.onHold = false
	}
    
	result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
	...
    
	w.resultsManager.Set(w.containerID, result, w.pod)
    
	...
Remove

Worker - run
透過 container id 移除時實作者身上的對應的資料

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (w *worker) run() {
	...
    
	probeTicker := time.NewTicker(probeTickerPeriod)

	defer func() {
		// Clean up.
		probeTicker.Stop()
		if !w.containerID.IsEmpty() {
			w.resultsManager.Remove(w.containerID)
		}

		w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
		...
	}()
	...
}   
func (w *worker) doProbe() (keepGoing bool) {
	...
   
	if w.containerID.String() != c.ContainerID {
		if !w.containerID.IsEmpty() {
			w.resultsManager.Remove(w.containerID)
		}
		w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
		w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
		// We've got a new container; resume probing.
		w.onHold = false
	}
    
	...

小結

總結一下 kubernetes kubelet 如何做 probe 的這幾篇文章,我先從最常使用到的 tcp socket、 exec 以及 http get 三種不同 probe 的基本操作相關文章分別是Kubernetes kubelet 探測 pod 的生命症狀 Http GetKubernetes kubelet 探測 pod 的生命症狀 tcp socket以及Kubernetes kubelet 探測 pod 的生命症狀 Exec,可以從這三篇文章了解 kubernetes probe 底層是如何實作這三種 probe 的,由於文章中沒有探討 probe 的物件是如何生成的。

因此在接下來的Kubernetes kubelet 探測 pod 的生命症狀探針得如何出生-1的文章中,探討了 probe 到底怎麼誕生的。首先我們先觀察 proberesults.Manager 經過分析之後得知原來是用來儲存 container liveness 、 readiness 以及 startup 的 probe 結果。

在 kubelet syncLoopIteration 的生命中週期中嘗試獲取 proberesults.Manager 的結果,再依照結果執行不同的動作。


當我們分析完 proberesults.Manager 發現有許多方法沒被呼叫,因此我們順藤摸瓜找到 prober.Manager 主要都本篇文章分析的對象。經過本篇文章包丁解牛後發現原來 prober.Manager 是用來管理 container 是否要加入 probe 以及什麼時候要移除 probe 。

其中我們發現了 prober.Manager 的實作對象透過 worker 的方式,將 container 身上的 probe 任務分配到不同的 worker 上執行,使得管理與實作的職責分離。


以上為 kubernetes kubelet 如何做 probe 的簡易分析,文章中若有出現錯誤的見解希望各位在觀看文章的大大們可以指出哪裡有問題,讓我學習改進,謝謝。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page