This page looks best with JavaScript enabled

Kubernetes kubelet 探測 pod 的生命症狀 tcp socket

 ·  ☕ 10 min read

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

本篇文章基於Kubernetes kubelet 探測 pod 的生命症狀 Http Get以及Kubernetes kubelet 探測 pod 的生命症狀 Exec繼續往上蓋的違建(X),大部分的內容都差不多,如有看過前一篇的並且建立基礎觀念的朋友可以直接滑到最下面看 kubernetes kubelet 如何透過 tcp socket 探測 pod 的生命症狀。

使用 kubernetes 在建置自己的服務時,我們通常會透過 kubernetes 所提供的探針(probes) 來探測 pod 的特定服務是否正常運作。 probes 主要用來進行自我修復的功能,例如今天某一隻 process 因為業務邏輯或是程式寫錯造成死鎖的問題,我們就能透過 probes 來重新啟動 pod 來恢復程式的運行。或是假設今天 process 啟動到真正可以提供外部存取提供服務,所花費的時間需要比較長的時候我們也會透過 kubernetes 所提供的探針(probes) 來探測服務是不是可以提供外部使用。

綜上所述 probes 分成兩種

  1. liveness
    主要用來判斷 pod 是否正常運作,如果探測失敗的話 kubelet 會把 pod 殺掉再重新建置。
  2. readiness
    主要用來判斷 pod 是否可以提供給其他服務存取,如果探測失敗的話 kubelet 會把 pod 從 service backend 移除,這樣的話其他服務就無法從 service 存取到該服務。

今天主要跟大家分享是的 kubernetes 怎麼透過 liveness probes 的 tcp socket 去探測 pod 的生命狀態。

probes

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// probe probes the container.
func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
	var probeSpec *v1.Probe
	//首先判斷這次要執行探針的是哪一種類別,分別有readiness、liveness、startup
	switch probeType {
	//如果判斷是readiness就需要載入 container spec ReadinessProbe 寫的要求
	case readiness:
		probeSpec = container.ReadinessProbe
	//如果判斷是liveness就需要載入 container spec LivenessProbe 寫的要求
	case liveness:
		probeSpec = container.LivenessProbe
	//如果判斷是startup就需要載入 container spec StartupProbe 寫的要求
	case startup:
		probeSpec = container.StartupProbe
	//不是上述這三種的話 kubernetes 目前無法處理。    
	default:
		return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
	}
	
    
	ctrName := fmt.Sprintf("%s:%s", format.Pod(pod), container.Name)
	//如果 pod 裡面沒有定義 probe 的話就當作探測成功
	if probeSpec == nil {
		klog.Warningf("%s probe for %s is nil", probeType, ctrName)
		return results.Success, nil
	}
	//傳入探針型態,探針規格,pod狀態,pod spec,以及要探測哪一個 container,以及重試次次數。
	//接著會依照探測結果進行不同策略
	result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
	//如果 err 不是 nil 或是 result 不是 Success 同時不是 Warning,就要進行 log 處理
	if err != nil || (result != probe.Success && result != probe.Warning) {
		// Probe failed in one way or another.
		//簡單來說就是紀錄哪個 pod 哪個 container 發生了探針探測結果 ContainerUnhealthy,以及印一下 log 。
		if err != nil {
			klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
			pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
		} else { // result != probe.Success
			klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
			pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
		}
		return results.Failure, err
	}
	//如果 result 是 Warning ,簡單來說就是紀錄哪個 pod 哪個 container 發生了探針探測結果 warning,以及印一下 log 。
	if result == probe.Warning {
		pb.recordContainerEvent(pod, &container, v1.EventTypeWarning, events.ContainerProbeWarning, "%s probe warning: %s", probeType, output)
		klog.V(3).Infof("%s probe for %q succeeded with a warning: %s", probeType, ctrName, output)
	} 
	//不然就是成功,發個 log 沒什麼其他的用途。
	else {
		klog.V(3).Infof("%s probe for %q succeeded", probeType, ctrName)
	}
	//回傳一下探測結果    
	return results.Success, nil
}

runProbeWithRetries 主要傳入探針型態,探針規格,pod狀態,pod spec,以及要探測哪一個 container,接著透過 runProbe function 去執行探測。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// runProbeWithRetries tries to probe the container in a finite loop, it returns the last result
// if it never succeeds.
func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
	//探針錯誤訊息
	var err error
	//探針結果
	var result probe.Result
	//探針結果
	var output string
	//若是失敗需要探測的總次數
	for i := 0; i < retries; i++ {
		//開始探測,帶入探針型態,探針規格,pod狀態,pod spec,以及要探測哪一個 container。    
		result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
		//如果探測成功直接回傳
		if err == nil {
			return result, output, nil
		}
	}
	//如果探測失敗達到重試次數
	return result, output, err
}

runProbe function 主要是執行探針探測的動作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
	//設定探測多久會 timeout
	timeout := time.Duration(p.TimeoutSeconds) * time.Second
    
	//如果 pod 有設定 exec 的話,就會透過 pb.exec.Probe 進行探測,上一篇主要在探討這一塊[Kubernetes kubelet 探測 pod 的生命症狀 Exec]([https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-http-get/](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-exec/))
	if p.Exec != nil {
		//先打個要執行 exec 的 log     
		klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod.Name, container.Name, p.Exec.Command)
        
		//組合要執行的 command 與 container 環境變數
		command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
        
		//回傳執行結果
		return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
	}
    
    
	//如果 pod 有設定 HTTPGet 的話,就會透過 pb.HTTPGet.Probe 進行探測,上上篇[Kubernetes kubelet 探測 pod 的生命症狀 Http Get](https://blog.jjmengze.website/posts/kubernetes/source-code/kubelet/prob/kubernetes-kubelet-http-get/)主要在討論這一塊。
	if p.HTTPGet != nil {
    
    
		//先把  HTTPGet.Scheme 轉成小寫,一般來說就是 http 或是 https
		scheme := strings.ToLower(string(p.HTTPGet.Scheme))
        
		//取出目標 host 位置
		host := p.HTTPGet.Host
		//如果目標 host 位置為空,預設用 pod 本身的 ip
		if host == "" {
			host = status.PodIP
		}
        
        //取出 pod 裡面指定 prob 的 port 號,有可能有人寫成 port: "http"或是寫成 port: 80 又或是 port : "80"
        //因此不能做簡單的提取
		port, err := extractPort(p.HTTPGet.Port, container)
		if err != nil {
			return probe.Unknown, "", err
		}
        
		//取出目標探測目標位置的路徑
		path := p.HTTPGet.Path
        
		klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
        
		//把 scheme 、 host 、 port 、 path 組成  url 物件
		url := formatURL(scheme, host, port, path)
        
		//填充這次要探測的 http header
		headers := buildHeader(p.HTTPGet.HTTPHeaders)
		klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
        
		//本次要探測的型態,依照不同的探測型態去進行探測。
		switch probeType {
		//若為 liveness 就透過 liveness Probe function 去檢測
		case liveness:
			return pb.livenessHTTP.Probe(url, headers, timeout)
            
		//若為 startupHTTP 就透過 startupHTTP Probe function 去檢測
		case startup:
			return pb.startupHTTP.Probe(url, headers, timeout)
            
		//若為 readinessHTTP 就透過 readinessHTTP Probe function 去檢測
		default:
			return pb.readinessHTTP.Probe(url, headers, timeout)
		}
	}
	//如果有 pod 定義 tcp socket 的話,就會透過 pb.tcp.Probe 進行探測,本篇主要探討的部分。
	if p.TCPSocket != nil {
    
		//取出 pod 裡面指定 prob 的 port 號,有可能有人寫成 port: "http"或是寫成 port: 80 又或是 port : "80"
		//因此不能做簡單的提取
		port, err := extractPort(p.TCPSocket.Port, container)
		if err != nil {
			return probe.Unknown, "", err
		}
        
		//取出目標 host 的位置
		host := p.TCPSocket.Host
		//如果目標 host 位置為空,預設用 pod 本身的 ip
		if host == "" {
			host = status.PodIP
		}
        
		klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
        //實際執行 tcp prob 的部分這等等會看到如何處理。
		return pb.tcp.Probe(host, port, timeout)
	}
	klog.Warningf("Failed to find probe builder for container: %v", container)
    
	//不屬於以上三種的 kubernetes 目前不支援呦,所以會還傳結果probe.Unknown,以及不支援 probe 的錯誤。
	return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

針對上述用到的 function 進行一些補充~

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//仔細觀察參數的話,第一個輸入的 param 型態為 intstr.IntOrString,這個型態是什麼東西呢?
//依照文件的註解為IntOrString是可以含有 int32 或  string 的類型。在 JSON / YAML marshalling and unmarshalling 時使用,簡單來說使用者可以傳入 string 或是 int 的型態進來。
func extractPort(param intstr.IntOrString, container v1.Container) (int, error) {
	port := -1
	var err error
	//第一步我們需要先去解析傳入的 port 是什麼型態來做對應的解析。
	switch param.Type {
	//如果是 INT 的話,就把 port 以 int 的方式對出
	case intstr.Int:
		port = param.IntValue()
	//如果是 INT 的話,就把 port 以 int 的方式對出
	case intstr.String:
		//通過名稱查找 container 中的 Port。
		if port, err = findPortByName(container, param.StrVal); err != nil {
			// 覺得註解很有趣,保留下來,最後一搏,嘗試將 string 轉成 int 有可能使用者定義 port : "8080",試試看這樣可不可以轉成功
			// Last ditch effort - maybe it was an int stored as string?
			if port, err = strconv.Atoi(param.StrVal); err != nil {
				return port, err
			}
		}
	// Type 無法處理
	default:
		return port, fmt.Errorf("intOrString had no kind: %+v", param)
	}
	// port 在 0 ~ 65536 這個區間內才有效    
	if port > 0 && port < 65536 {
		return port, nil
	}
	//回傳解析的 port 為多少
	return port, fmt.Errorf("invalid port number: %v", port)
}

//上面有看到透過 param.IntValue() 把 intstr.IntOrString 為 int type 的轉換成 int 是透過這個方法
func (intstr *IntOrString) IntValue() int {
	//應該不會跑到這個方法,外面已經判斷過了,可能多一層做保障?
	if intstr.Type == String {
		i, _ := strconv.Atoi(intstr.StrVal)
		return i
	}
	return int(intstr.IntVal)
}

// 通過名稱查找 container 中的 Port。
func findPortByName(container v1.Container, portName string) (int, error) {
	//透過傳入的 container port 透過迴圈找尋 port 名稱對應到的實際 port 號,以 int 的方式回傳。
	for _, port := range container.Ports {
		if port.Name == portName {
			return int(port.ContainerPort), nil
		}
	}
	return 0, fmt.Errorf("port %s not found", portName)
}



// formatURL 格式化 args 中的 URL。
func formatURL(scheme string, host string, port int, path string) *url.URL {
	// 透過 url package 的 parse function 將 url 去解析。
	u, err := url.Parse(path)
	//不知道這個錯誤什麼時候會出現...先保留註解,求大大幫看xD
	// Something is busted with the path, but it's too late to reject it. Pass it along as is.
	if err != nil {
		u = &url.URL{
			Path: path,
		}
	}
	// url 加上 scheme
	u.Scheme = scheme
	// url host 加上 host:port 
	u.Host = net.JoinHostPort(host, strconv.Itoa(port))
	return u
}

//把 pod spec prob header 加入到 prob 的請求中。
func buildHeader(headerList []v1.HTTPHeader) http.Header {
	//建立一個 head slice 
	headers := make(http.Header)
	//把 pod spec prob header 透過 for rnge 的方式加入到 prob 的請求中。
	for _, header := range headerList {
		headers[header.Name] = append(headers[header.Name], header.Value)
	}
	return headers
}

tcp socket

kubernetes worker 上的 kubelet 會定期發送一個 command exec request 給 pod 內的 container ,如果 coomand exec status code 回傳成功 0 ,判斷目前 container 是否正常運作運作,若是不在這個 status code 範圍就會把 pod 刪掉。

pods/probe/tcp-liveness-readiness.yaml
範例是擷取自 kubernetes 官方網站,撰寫一個 yaml 檔送給 kubernetes 告訴 kubernetes 幫忙啟動一個 pod 並且建立一個 livenessProbe , livenessProbe 會透過 tcp socket 方法去判斷 container 的 8080 port 是否正常。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Pod
metadata:
  name: goproxy
  labels:
    app: goproxy
spec:
  containers:
  - name: goproxy
    image: k8s.gcr.io/goproxy:0.1
    ports:
    - containerPort: 8080
    livenessProbe:
      tcpSocket:
        port: 8080
      initialDelaySeconds: 15
      periodSeconds: 20

我們就以這個範例 kubelet livenessProbe 的 tcp socket 底層是如何實現的吧,先從觸發點來看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
	
	...
    
	//如果 pod 有設定 exec 的話,就會透過 pb.exec.Probe 進行探測
	if p.Exec != nil {
	...    
    
    
	//如果有 pod 定義 tcp socket 的話,就會透過 pb.tcp.Probe 進行探測,本篇主要探討的部分。
	if p.TCPSocket != nil {
    
		//取出 pod 裡面指定 prob 的 port 號,有可能有人寫成 port: "http"或是寫成 port: 80 又或是 port : "80"
		//因此不能做簡單的提取
		port, err := extractPort(p.TCPSocket.Port, container)
		if err != nil {
			return probe.Unknown, "", err
		}
        
		//取出目標 host 的位置
		host := p.TCPSocket.Host
		//如果目標 host 位置為空,預設用 pod 本身的 ip
		if host == "" {
			host = status.PodIP
		}
        
		klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
        //實際執行 tcp prob 的部分這等等會看到如何處理。
		return pb.tcp.Probe(host, port, timeout)
	}
    ...

如果探針型態為 TCPSocket 的話,就會透過(非常非常非常外面注入的)tcp 物件去處理探針,至於怎麼注入的…之後再找時間整理xD

我們來看怎麼一下透過 pb.tcp.Probe(host, port, timeout) 這個 function 做到 tcp socket 探測吧。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Probe returns a ProbeRunner capable of running an TCP check.
func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) 

	//組合 host 位址與 port 號,例如 hostip:port,傳入給 DoTCPProbe 處理
	return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
}

// DoTCPProbe checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Success
// If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {

	//透過 go 內建的 net package 透過 DialTimeout 建立 tcp 連線
	conn, err := net.DialTimeout("tcp", addr, timeout)
	//如果建立連線失敗或是 time out 就噴錯,並且回傳錯誤訊息。
	if err != nil {
		// Convert errors to failures to handle timeouts.
		return probe.Failure, err.Error(), nil
	}
	//建立連線成功後,就代表探測成功就可以關閉連線了
	err = conn.Close()
	//若是關閉失敗,會印出做錯誤紀錄。
	if err != nil {
		klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
	}
    //回傳探測成功
	return probe.Success, "", nil
}

由於 tcp socket 的篇幅有點短,附上 tcp socket 的測試給大家參考xD
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func TestTcpHealthChecker(t *testing.T) {
	// 透過 go 內建的 http test package 建立 http server ,以及設定好 handler 要做什麼。
	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
	}))
    
	// function 結束後關閉 server 
	defer server.Close()

	//把 hostip:port,切割成 hostip 與 port 
	tHost, tPortStr, err := net.SplitHostPort(server.Listener.Addr().String())
	if err != nil {
		t.Errorf("unexpected error: %v", err)
	}
    
	// port 是 string 型態轉換成 int 型態
	tPort, err := strconv.Atoi(tPortStr)
	if err != nil {
		t.Errorf("unexpected error: %v", err)
	}

	// table driven
	//設定兩種測試情境
	//第一種為測試成功。
	//第二種為測試失敗
	tests := []struct {
		host string
		port int

		expectedStatus probe.Result
		expectedError  error
	}{
		// A connection is made and probing would succeed
		{tHost, tPort, probe.Success, nil},
		// No connection can be made and probing would fail
		{tHost, -1, probe.Failure, nil},
	}

	//建立 tcp prob
	prober := New()

	//for 迴圈遞迴 table deiven 的測試情境
	for i, tt := range tests {
		// 直接呼叫 Probe function 給定目標主機 port號 與  timeout 時間
		status, _, err := prober.
			Probe(tt.host, tt.port, 1*time.Second)
            
		//觀察結果是否是符合測試案例
		if status != tt.expectedStatus {
			t.Errorf("#%d: expected status=%v, get=%v", i, tt.expectedStatus, status)
		}
		if err != tt.expectedError {
			t.Errorf("#%d: expected error=%v, get=%v", i, tt.expectedError, err)
		}
	}
}

小結

以上為 kubelet 探測 pod 的生命症狀 - tcp socket 簡易分析,簡單來說 kubernetes worker node 上的 kubelet process 會有一隻 worker 的 thread 建立一個探針,該 worker 會把 pod prob spec 解析出來並建立對應的探針,本篇以 prob 為 tcp socket 為例。

我們看到了 net.DialTimeout 執行的結果沒有 error 的話就當作當作成功,其他結果都回報 Failure 。

文章中若有出現錯誤的見解希望各位在觀看文章的大大們可以指出哪裡有問題,讓我學習改進,謝謝。


Meng Ze Li
WRITTEN BY
Meng Ze Li
Kubernetes / DevOps / Backend

What's on this Page