typeManagerinterface{// 為每個 container probe 創建新的 probe worker,在建立 pod 時呼叫。
AddPod(pod*v1.Pod)// 為刪除已存在的 container probe worker,在移除 pod 時呼叫。
RemovePod(pod*v1.Pod)// CleanupPods handles cleaning up pods which should no longer be running.
// It takes a map of "desired pods" which should not be cleaned up.
CleanupPods(desiredPodsmap[types.UID]sets.Empty)// UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
// container based on container running status, cached probe results and worker states.
UpdatePodStatus(types.UID,*v1.PodStatus)}
//當 kubelet 接收到有 pod 到加入到這個節點時,會觸發 HandlePodAdditions function ,並且傳入有哪些 pod 要加入。
func(kl*Kubelet)HandlePodAdditions(pods[]*v1.Pod){...// for 迴圈地回所有要加入的 pod
for_,pod:=rangepods{...//加入到 probeManager 中
kl.probeManager.AddPod(pod)...}}
處理 pod 是否需要 probe ,如果有需要則建立對應的 probe worker (worker 後面我們會看到是什麼,這邊先知道有這個事情發生即可)
// 當 kubelet 接收到有哪些 pod 要從這個節點移除,會觸發 HandlePodRemoves function ,並且傳入有哪些 pod 要從節點移除。
func(kl*Kubelet)HandlePodRemoves(pods[]*v1.Pod){start:=kl.clock.Now()for_,pod:=rangepods{kl.probeManager.RemovePod(pod)}}
// 給定 pod 狀態,為 pod 建立最終的 API pod 狀態。這裡的狀態可以想像成 pod spec 最後的 pod status 欄位。
func(kl*Kubelet)generateAPIPodStatus(pod*v1.Pod,podStatus*kubecontainer.PodStatus)v1.PodStatus{...spec:=&pod.Spec...//透過pod status
kl.probeManager.UpdatePodStatus(pod.UID,podStatus)...return*s}
func(m*manager)UpdatePodStatus(podUIDtypes.UID,podStatus*v1.PodStatus){//透過傳入的 pod status 遞迴所有的 ContainerStatuses
fori,c:=rangepodStatus.ContainerStatuses{varstartedbool//看看 container 是否已經開始執行,已經開始執行的話需要判斷 startup probe 是否成功。
ifc.State.Running==nil{//如果還沒開始執行設定為 false
started=false// 傳入的 container id 透過 kubecontainer.ParseContainerID function 轉成 containerId 物件
// 透過 startupManager get function 傳入 containerId 物件得到 startup 探測結果。
}elseifresult,ok:=m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID));ok{// 如果 startup 探測成功的話就設定 started 為 true
started=result==results.Success}else{// 透過 pod id 、container name 與 startup 從 getWorker 拿到 worker
_,exists:=m.getWorker(podUID,c.Name,startup)//如果找不到 worker ,就當作探測成功,因為沒有 startup worker
started=!exists}//依照幾種情況來修正 container 狀態
//State.Runnin 判斷 container 是否啟動
//startupManager.Get 探測 container startup 狀態
//getWorker 判斷是否有 worker ,若沒有worker 表示沒有 startup probe
//依照上述情況設定設定 Container 的 Statuses
podStatus.ContainerStatuses[i].Started=&started// 若是確認 container 已經啟動
ifstarted{varreadybool// 再次確認 container 是否有啟動
ifc.State.Running==nil{ready=false// 如果 container 有啟動繼續透過 傳入的 container id 透過 kubecontainer.ParseContainerID function 轉成 containerId 物件
// readinessManager.Get 傳入 containerId 物件得到 readiness 探測結果。
}elseifresult,ok:=m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID));ok{// 如果 readiness 探測成功的話就設定 started 為 true
ready=result==results.Success}else{// 透過 pod id 、container name 與 readiness 從 getWorker 拿到 readiness worker
w,exists:=m.getWorker(podUID,c.Name,readiness)//如果找不到 worker ,就當作探測 readinessProbe 成功
ready=!exists// no readinessProbe -> always ready
//如果有找到我們需要進一步判斷 worker 狀態
ifexists{// 手動觸發探測下次就可以得知結果
select{casew.manualTriggerCh<-struct{}{}:default:// Non-blocking.
klog.InfoS("Failed to trigger a manual run","probe",w.probeType.String())}}}//依照幾種情況來修正 container 狀態
//State.Runnin 判斷 container 是否啟動
//readinessManager.Get 探測 container readiness 狀態
//getWorker 判斷是否有 worker
//依照上述情況設定設定 Container 的 Statuses
podStatus.ContainerStatuses[i].Ready=ready}}// 如果 init container 為成功退出或就把 init container status ready 設定為成功。
fori,c:=rangepodStatus.InitContainerStatuses{//預設 ready 為 false
varreadybool//如果 init container 狀態為 Terminated 並且退出狀態碼為 0 ,就把 ready 設定為 true
ifc.State.Terminated!=nil&&c.State.Terminated.ExitCode==0{ready=true}// init container status ready 設定為成功。
podStatus.InitContainerStatuses[i].Ready=ready}}
CleanupPods
執行一系列清理工作,包括終止 pod worker、殺死不需要的 pod 以及刪除 orphaned 的volume/pod directories。
1
2
3
4
5
6
7
8
9
10
// NOTE: This function is executed by the main sync loop, so it
// should not contain any blocking calls.
func(kl*Kubelet)HandlePodCleanups()error{...//這裡有點複雜我們先當作 desiredPods 就是那些已經不存在的 pod 就好
//Stop the workers for no-longer existing pods.
kl.probeManager.CleanupPods(desiredPods)...returnnil}
//透過 getworker 我們可以傳入 pod uid 、 container name 以及 probeType 獲得 probe worker
func(m*manager)getWorker(podUIDtypes.UID,containerNamestring,probeTypeprobeType)(*worker,bool){//確保 map 一致性上鎖
m.workerLock.RLock()deferm.workerLock.RUnlock()//透過 pod uid 以及 container name 還有 probetype 組合成 probeKey
//再從 map 找到對應的 worker
worker,ok:=m.workers[probeKey{podUID,containerName,probeType}]returnworker,ok}// 當不再需要某一個 probe worker 時候我們可以呼叫 removeWorker 傳入 pod uid 、 container name 以及 probeType 刪除 probe worker。
func(m*manager)removeWorker(podUIDtypes.UID,containerNamestring,probeTypeprobeType){//確保 map 一致性上鎖
m.workerLock.Lock()deferm.workerLock.Unlock()//透過 pod uid 以及 container name 還有 probetype 組合成 probeKey
//再從 map 刪除對應的 worker
delete(m.workers,probeKey{podUID,containerName,probeType})}// workerCount 返回 probe worker 的總數,測試用。
func(m*manager)workerCount()int{//確保 map 一致性上鎖
m.workerLock.RLock()deferm.workerLock.RUnlock()//看看目前現在有多少的 worker
returnlen(m.workers)}
整理一下
還記得之前為了找到在 proberesults.Manager 定義的卻沒被用到的這三個 function 嗎?我們現在來看看都在哪裡用上了吧!
1
2
3
4
5
6
// 透過 container id 從 實作者身上得到 result 結果
Get(kubecontainer.ContainerID)(Result,bool)// 透過 container id 設定 pod 探測的結果。實作者需要把結果儲存起來。
Set(kubecontainer.ContainerID,Result,*v1.Pod)// 透過 container id 移除時實作者身上的對應的資料
Remove(kubecontainer.ContainerID)
func(m*manager)UpdatePodStatus(podUIDtypes.UID,podStatus*v1.PodStatus){//透過傳入的 pod status 遞迴所有的 ContainerStatuses
fori,c:=rangepodStatus.ContainerStatuses{...}elseifresult,ok:=m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID));ok{// 如果 startup 探測成功的話就設定 started 為 true
started=result==results.Success}
// run periodically probes the container.
func(w*worker)run(){//透過 pod spec 設定 probe 的間隔時間
probeTickerPeriod:=time.Duration(w.spec.PeriodSeconds)*time.Second//我猜的...不確定為什麼會這樣設計,如果有知道的大大希望能不吝嗇告知
//依照現在的時間點減去 kubelet 的啟動時間,得到 kubelet 存活的時間。
//如果 PeriodSeconds 大於 kubelet 存活的時間的話,讓worker 睡一下等等在 probe 。
//我猜可能是因為kubelet 還沒完全準備好(吧?)
ifprobeTickerPeriod>time.Since(w.probeManager.start){time.Sleep(time.Duration(rand.Float64()*float64(probeTickerPeriod)))}//設定 ticker 多久要觸發一次
probeTicker:=time.NewTicker(probeTickerPeriod)//clean up function
deferfunc(){// 關閉ticker
probeTicker.Stop()//如果 container id 還在的話就移除 resultsManager 跟這個 container 有關的資料
if!w.containerID.IsEmpty(){w.resultsManager.Remove(w.containerID)}//移除map中紀錄的 worker 資訊
w.probeManager.removeWorker(w.pod.UID,w.container.Name,w.probeType)//metric 不在記錄這個 worker 所發生的 metric
ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)ProberResults.Delete(w.proberResultsFailedMetricLabels)ProberResults.Delete(w.proberResultsUnknownMetricLabels)}()probeLoop://每次 prob 完成後如果 probe 結果為 true ,需要等待 probeTicker ,或是 manualTriggerCh 再進行下一次的 probe trigger
//如果是收到 stopCh 或是 probe 結果為 flase 則關閉 worker 。
forw.doProbe(){// Wait for next probe tick.
select{case<-w.stopCh:breakprobeLoopcase<-probeTicker.C:case<-w.manualTriggerCh:// continue
}}}//用來關閉 worker,設計成 Non-blocking的,簡單來看就是往 stop channel 送 stop 訊號。
func(w*worker)stop(){select{casew.stopCh<-struct{}{}:default:// Non-blocking.
}}// probe container 並且回傳 probe 結果。如果 probe 過程中有錯會回傳 false 呼叫者需要關閉 worker。
func(w*worker)doProbe()(keepGoingbool){//無腦回復狀態 panic 。
deferfunc(){recover()}()// runtime.HandleCrash 紀錄一下而已。
deferruntime.HandleCrash(func(_interface{}){keepGoing=true})//透過 statusManager 跟著 UID 拿到 pod status(statusManager 在這裡不太重要,知道可以拿到 pod status 就好了)
status,ok:=w.probeManager.statusManager.GetPodStatus(w.pod.UID)if!ok{// Pod 尚未創建,或者已被刪除。
klog.V(3).InfoS("No status for pod","pod",klog.KObj(w.pod))returntrue}// 如果 pod 處於 PodFailed 跟 PodSucceeded 狀態,這個 worker 就可以關閉了
ifstatus.Phase==v1.PodFailed||status.Phase==v1.PodSucceeded{klog.V(3).InfoS("Pod is terminated, exiting probe worker","pod",klog.KObj(w.pod),"phase",status.Phase)returnfalse}//遞迴 container status ,判斷所有的 container status 對應到輸入的 ccontainer name 回傳 status 狀態
//如果找不到對應的 container status 就等待下一輪
c,ok:=podutil.GetContainerStatus(status.ContainerStatuses,w.container.Name)if!ok||len(c.ContainerID)==0{// 容器尚未創建,或者已被刪除。
klog.V(3).InfoS("Probe target container not found","pod",klog.KObj(w.pod),"containerName",w.container.Name)returntrue// Wait for more information.
}//判斷 worker 負責的 container id 是不是跟 status 的 container 可以對上,如果對不上這種狀況可能發生在 container 被刪除或是 container 改變了。
ifw.containerID.String()!=c.ContainerID{//如果 container id 不是空的
if!w.containerID.IsEmpty(){//從 resultsManager 刪除關於 worker container 的結果
w.resultsManager.Remove(w.containerID)}//設定新的 container id
w.containerID=kubecontainer.ParseContainerID(c.ContainerID)//設定 resultsManager 要接收新的 worker container id ,並且給他初始化的 value 與 pod spec (這裡要注意根據 probe 形式不同他們初始化得數值也不一樣 例如readiness :faill liveness :Success startup :Unknown)
w.resultsManager.Set(w.containerID,w.initialValue,w.pod)// 因為有新的 container 我們繼續 prob 流程
w.onHold=false}//判斷是否要繼續 probe 流程
ifw.onHold{// Worker is on hold until there is a new container.
returntrue}//判斷 container 的狀態是否正在Running,如果不是正在 Running 有可能在做waiting 、有可能在 Terminated 。
ifc.State.Running==nil{klog.V(3).InfoS("Non-running container probed","pod",klog.KObj(w.pod),"containerName",w.container.Name)//如果 container id 不是空的
if!w.containerID.IsEmpty(){//從 resultsManager 刪除關於 worker container 的結果
w.resultsManager.Set(w.containerID,results.Failure,w.pod)}// 如果 RestartPolicy 為 不重新啟動,則中止 worker。
returnc.State.Terminated==nil||w.pod.Spec.RestartPolicy!=v1.RestartPolicyNever}// 這邊我們要先了解一點!!!非長重要如果不了解建議先看之前我這篇文章[學習Kubernetes Garbage Collection機制](https://blog.jjmengze.website/posts/kubernetes/kubernetes-garbage-collection/)
// 簡單來說,pod 被刪掉有可能先出現 DeletionTimestamp 的狀態
// 在這個狀態之下 pod 會在 BackGround 狀態被回收
// 可以把它想像成處於 Deletion 狀態的 pod ,且有設定 probe liveness 或是 startup,透過 resultsManager 設定成 probe 成功,不然會把 container 刪掉(重啟)。
// 最後停止 worker ,因為 pod 已經要被刪掉了 worker 就沒用處囉。
ifw.pod.ObjectMeta.DeletionTimestamp!=nil&&(w.probeType==liveness||w.probeType==startup){klog.V(3).InfoS("Pod deletion requested, setting probe result to success","probeType",w.probeType,"pod",klog.KObj(w.pod),"containerName",w.container.Name)ifw.probeType==startup{klog.InfoS("Pod deletion requested before container has fully started","pod",klog.KObj(w.pod),"containerName",w.container.Name)}// Set a last result to ensure quiet shutdown.
w.resultsManager.Set(w.containerID,results.Success,w.pod)// Stop probing at this point.
returnfalse}// 判斷 probe 的初始 Delay 時間是否到了,如果還沒到就需要等下一次觸發
ifint32(time.Since(c.State.Running.StartedAt.Time).Seconds())<w.spec.InitialDelaySeconds{returntrue}//判斷 (過去) startup probe 是否已經成功,如果已經成功就可以關閉 startup worker,其他種類的 woker 保留。
//如果是還沒 startup probe 失敗 其他 probe 都不用談直接退回去重新等待下一次觸發。
ifc.Started!=nil&&*c.Started{// Stop probing for startup once container has started.
ifw.probeType==startup{returnfalse}}else{// Disable other probes until container has started.
ifw.probeType!=startup{returntrue}}//實際執行各種 probe 的地方,如果 probe 有 error 直接停止 worker
//這裡依賴之前注入的 probeManager 的 probe 實作,藉由我們丟入的 probe 型態決定要怎麼執行 probe 。
result,err:=w.probeManager.prober.probe(w.probeType,w.pod,status,w.container,w.containerID)iferr!=nil{// Prober error, throw away the result.
returntrue}//如果 probe 沒有 error 透過 ProberResults.With 去觸發 metric 以提供後續監控服務
switchresult{caseresults.Success:ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()caseresults.Failure:ProberResults.With(w.proberResultsFailedMetricLabels).Inc()default:ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()}//用來判斷同一個 probe 結果,並且計算執行 probe 次數用
ifw.lastResult==result{w.resultRun++}else{w.lastResult=resultw.resultRun=1}//如果 probe 錯誤或是成功 低於閥值就直接安排下一次的 probe
if(result==results.Failure&&w.resultRun<int(w.spec.FailureThreshold))||(result==results.Success&&w.resultRun<int(w.spec.SuccessThreshold)){// Success or failure is below threshold - leave the probe state unchanged.
returntrue}//透過 resultsManager 設定哪個 container 的 prob 結果是什麼,給外面的人做事(重啟、刪除之類的)
w.resultsManager.Set(w.containerID,result,w.pod)//如果 worker 型態為 ( liveness 或是 startup )並且本次 prob 結果為失敗。 container 會重啟所以需要把 resultRun 重置
//並且設定 onHold=true ,因為 container 重啟了。前面要重新獲取 container id 不需要。
if(w.probeType==liveness||w.probeType==startup)&&result==results.Failure{w.onHold=truew.resultRun=0}//worker繼續 執行
returntrue}
整理一下
還記得之前為了找到在 proberesults.Manager 定義的卻沒被用到的這三個 function 嗎?我們現在來看看是不是都用上了!
1
2
3
4
5
6
// 透過 container id 從 實作者身上得到 result 結果
Get(kubecontainer.ContainerID)(Result,bool)// 透過 container id 設定 pod 探測的結果。實作者需要把結果儲存起來。
Set(kubecontainer.ContainerID,Result,*v1.Pod)// 透過 container id 移除時實作者身上的對應的資料
Remove(kubecontainer.ContainerID)
Set
Worker - run
透過 container id 設定 pod 探測的結果。實作者需要把結果儲存起來。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func(w*worker)doProbe()(keepGoingbool){...ifw.containerID.String()!=c.ContainerID{if!w.containerID.IsEmpty(){w.resultsManager.Remove(w.containerID)}w.containerID=kubecontainer.ParseContainerID(c.ContainerID)w.resultsManager.Set(w.containerID,w.initialValue,w.pod)// We've got a new container; resume probing.
w.onHold=false}result,err:=w.probeManager.prober.probe(w.probeType,w.pod,status,w.container,w.containerID)...w.resultsManager.Set(w.containerID,result,w.pod)...