容器健康检查-探针机制
探针
k8s提供了三种探针,确保容器在部署后处于运行状态,这三种探针分别是:就绪检测探针(readinessProbe) 、 存活检测探针(livenessProbe) 、 启动检测探针(startupProbe) 。
- livenessProbe用于检测容器是否处于健康状态,如果不健康,就删除重建。
- readinessProbe用于检测启动时,容器状态是否就绪,当容器状态就绪,才开始接收请求流量,否则会从对应的Endpoint列表中被剔除。
- startupProbe容器启动时执行,如果配置了startupProbe,就会在容器启动后再去执行livenessProbe和readinessProbe,避免容器内的应用程序在启动前就被杀掉。startupProbe检测成功后,就不会再进行检测。
任何一种探针,都存在着3种状态:Success、Failure、Unknown,只有Success表示检测成功。其状态定义为:
1 | // pkg/kubelet/prober/results/results_manager.go |
k8s为每一种探针都提供了三种检测机制:
- HTTP GET Action:对容器的IP地址,指定端口和路径,执行HTTP GET请求。如果探测器收到响应,并且响应状态码不代表错误(状态码2xx或3xx),则认为检测成功。如果没收到响应,或返回错误状态码,则认为检测失败。
- TCP Socket Action:尝试与容器指定的端口建立TCP连接,如果连接成功,则认为检测成功,反之检测失败
- Exec Action:在容器内执行自定义的命令,并检查命令的退出状态码。如果状态码为0,则检测成功。其他所有状态码都会被认为失败。exec执行的指令,会占用容器内的资源,所以指令应当简单、轻量。
每种探针都有通用的可配置字段:
- initialDelaySeconds:探测延迟时长,即容器启动多久后再开始第一次探测操作,显示为delay属性,默认为0秒
- periodSeconds:探测的频度,默认为10秒,最小值为1秒。过高的频率会对Pod对象带来较大的额外开销,过低的频率有会使得对错误的反应不及时。
- successThreshold:处于失败状态时,探测操作至少连续多少次成功才被认为是检测通过,默认值为1,最小值也为1
- failureThreshold:处于成功状态时,探测操作至少连续多少次的失败才被视为检测不通过,默认值为3,最小值为1
- timeoutSeconds:探测的超时时长,默认为1秒,最小是也为1秒
就绪探针(readinessProbe)
定义就绪探针意味着,Pod在启动阶段不接收任何数据,并且仅在探测成功时,才开始接收数据。
在Pod的整个生命周期中,就绪探针都会进行就绪探测,并确定Pod是否可以接收客户端请求,当容器就绪探针返回success时,表示容器已准备好接收请求。如果容器未通过readinessProbe检测,它不会被终止或重新启动,而是通知其尚未就绪,端点控制器(Endpoints Controller)会将该IP从所有匹配到此Pod的Serivce对象的Endpoint列表中删除。
就绪探针的状态值默认为Failure。如果容器未定义就绪探针,则容器默认状态为Success。

存活探针(livenessProbe)
检查容器是否还在运行。可以为pod中的每个容器单独指定存活探针。如果探测失败,k8s将定期执行探针并重新启动容器。如果容器未定义存活探针,则容器默认状态为Success。
启动探针(startupProbe)
启动检查机制,应用一些启动缓慢的业务,避免业务长时间启动而被上面两类探针kill掉。也可以延长readinessProbe和livenessProbe的initialDelaySeconds来解决。
启动探针启动时执行,如果定义了启动探针,那么在启动探针探测成功之前,其他探针都不会进行探测。如果启动探针探测失败,kubelet会杀死容器,容器将按照Restart Policy进行重启。如果没有定义startupProbe,那么默认为Success
示例
省略的Pod的yaml文件中其他配置,仅保留了spec.containers.livenessProbe相关
exec探针
1 | spec: |
tcpSocket探针
1 | spec: |
httpGet探针
1 | spec: |
探针的创建与工作,源码解析
相关代码基于kubernetes1.20.4
创建probeManager对象,管理探针和探针检测结果
- 在启动kubelet时,kubelet会创建probeManager对象,同时开启一个循环syncLoop(),同步Pod状态。results.Manager是一个管理器,里面用hashmap存放着容器探针的检测结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.startupManager,
klet.runner,
kubeDeps.Recorder)
// NewManager创建了管理pod的各种探针的Manager
func NewManager(
statusManager status.Manager,
livenessManager results.Manager,
startupManager results.Manager,
runner kubecontainer.CommandRunner,
recorder record.EventRecorder) Manager {
prober := newProber(runner, recorder)
readinessManager := results.NewManager()
return &manager{
statusManager: statusManager,
prober: prober,
readinessManager: readinessManager,
livenessManager: livenessManager,
startupManager: startupManager,
workers: make(map[probeKey]*worker),
}
}1
2
3
4
5
6
7
8type manager struct {
// guards the cache
sync.RWMutex
// map of container ID -> probe Result
cache map[kubecontainer.ContainerID]Result
// channel of updates
updates chan Update
}
kubelet的syncLoop()中监听着Pod创建事件,当创建Pod的时候,会把Pod的信息交给probeManager处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// kubelet.go中的syncLoop()调用的syncLoopIteration方法
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select{
......
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
......
}
.......
}
// kubelet.go中的HandlePodAdditions()方法
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
......
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
......
// 创建探针
kl.probeManager.AddPod(pod)
}
}
根据Container的Probe相关属性创建Worker
- 创建Pod的时候,遍历Pod中的容器,判断容器是否添加了StartupProbe、ReadinessProbe、LivenessProbe,每添加一种探针,就会为这个探针创建一个Worker对象。这个Worker在一个goroutine中进行检测工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// pkg/kubelet/prober/prober_manager.go
func (m *manager) AddPod(pod *v1.Pod) {
// 读写锁
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if c.StartupProbe != nil {
key.probeType = startup
if _, ok := m.workers[key]; ok {
klog.Errorf("Startup probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, startup, pod, c)
m.workers[key] = w
go w.run()
}
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
klog.Errorf("Readiness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
klog.Errorf("Liveness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
每个探针的Worker是什么
- newWorker操作会创建一个probe worker,worker定义。初始化worker时,会为其赋值pod、container的相关信息。并根据探针类型,初始化其默认状态:
readinessProbe的默认值是Failure,livenessProbe的默认值是Success,startupProbe的初始值是UNKNOWN1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81// pkg/kubelet/prober/worker.go
func newWorker(
m *manager,
probeType probeType,
pod *v1.Pod,
container v1.Container) *worker {
w := &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
}
switch probeType {
case readiness:
w.spec = container.ReadinessProbe
w.resultsManager = m.readinessManager
w.initialValue = results.Failure
case liveness:
w.spec = container.LivenessProbe
w.resultsManager = m.livenessManager
w.initialValue = results.Success
case startup:
w.spec = container.StartupProbe
w.resultsManager = m.startupManager
w.initialValue = results.Unknown
}
basicMetricLabels := metrics.Labels{
"probe_type": w.probeType.String(),
"container": w.container.Name,
"pod": w.pod.Name,
"namespace": w.pod.Namespace,
"pod_uid": string(w.pod.UID),
}
// Prometheus收集时需要的数据格式
w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful
w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
w.proberResultsFailedMetricLabels["result"] = probeResultFailed
w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown
return w
}
type worker struct{
// 用于停止probe,newWorker()时默认为make(chan struct{},1)
stopCh chan struct{}
// 只读
pod *v1.Pod
// 只读
container v1.Container
spec *v1.Probe
probeType probeType
initialValue results.Result
// 用于存放探测结果
resultsManager results.Manager
probeManager *manager
//当前worker最近一次探测的容器id
containerID kubecontainer.ContainerID
// 最近一次探测结果
lastResult results.Result
// 探测结果相同的次数
resultRun int
// 是否跳过探测
onHold bool
// 探测结果标签
proberResultsSuccessfulMetricLabels metrics.Labels
proberResultsFailedMetricLabels metrics.Labels
proberResultsUnknownMetricLabels metrics.Labels
}
worker的具体工作内容
- worker.run()中,根据Container定义的探针,获取周期,然后根据时间间隔,循环执行doProbe()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34func (w *worker) run() {
probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
// If kubelet restarted the probes could be started in rapid succession.
// Let the worker wait for a random portion of tickerPeriod before probing.
time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))
probeTicker := time.NewTicker(probeTickerPeriod)
defer func() {
// Clean up.
probeTicker.Stop()
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
ProberResults.Delete(w.proberResultsFailedMetricLabels)
ProberResults.Delete(w.proberResultsUnknownMetricLabels)
}()
probeLoop:
for w.doProbe() {
// Wait for next probe tick.
select {
case <-w.stopCh:
break probeLoop
case <-probeTicker.C:
// continue
}
}
}
探测
- doProbe()方法,对container进行一次探测,然后向Manager记录探测结果,并根据执行结果判断是否还需要继续探测。即从kubelet的Manager来获取Pod的状态。
kubelet的Manager通过ClientSet获取Pod对象,用来保证Pod的最新的status,它会把Pod的status信息同步更新到apiserver。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }() // 发生panic,捕获后不处理,直接吞掉
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
// 从kubelet的Manager获取Pod的status
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok {
// Pod还没有被创建,或者已经被删除
klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
return true
}
// Pod如果已经终止了,就不再需要Worker了
if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
klog.V(3).Infof("Pod %v %v, exiting probe worker",
format.Pod(w.pod), status.Phase)
return false
}
// 从hashmap里获取Pod对象中的Container状态,
c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok || len(c.ContainerID) == 0 {
// Either the container has not been created yet, or it was deleted.
klog.V(3).Infof("Probe target container not found: %v - %v",
format.Pod(w.pod), w.container.Name)
return true // Wait for more information.
}
// container发生变化时,将新的container存放到resultsManager里
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
// We've got a new container; resume probing.
w.onHold = false
}
if w.onHold {
// Worker is on hold until there is a new container.
return true
}
// 获取不到container状态是,将结果设置为Failure
if c.State.Running == nil {
klog.V(3).Infof("Non-running container probed: %v - %v",
format.Pod(w.pod), w.container.Name)
if !w.containerID.IsEmpty() {
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
}
// Abort if the container will not be restarted.
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
}
// 启动时间小于设置探针的延迟探测时间,则Worker继续下一次探测
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
return true
}
// 如果容器的没有启动起来,那么只有startupProbe才会进行探测,其他的probe worker,就一直在循环等待。
// 如果容器已经started,那么startupProbe,就直接返回true,不用再执行probe()探测了
if c.Started != nil && *c.Started {
if w.probeType == startup {
return true
}
} else {
if w.probeType != startup {
return true
}
}
// 探测容器,获取容器状态
result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
if err != nil {
// Prober error, throw away the result.
return true
}
// Metric状态收集
switch result {
case results.Success:
ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
case results.Failure:
ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
default:
ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
}
if w.lastResult == result {
w.resultRun++
} else {
w.lastResult = result
w.resultRun = 1
}
if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
// Success or failure is below threshold - leave the probe state unchanged.
return true
}
// 记录探测结果
w.resultsManager.Set(w.containerID, result, w.pod)
if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
// 容器存活检查失败或者启动检查失败时,需要重新启动容器。停止探测,直到获取了新的containerId。
w.onHold = true
w.resultRun = 0
}
return true
}
根据探针类型进行相应操作处理
- w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID),会调用到runProbe(),根据probeType,进行具体处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
timeout := time.Duration(p.TimeoutSeconds) * time.Second
if p.Exec != nil {
klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod.Name, container.Name, p.Exec.Command)
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
}
if p.HTTPGet != nil {
scheme := strings.ToLower(string(p.HTTPGet.Scheme))
host := p.HTTPGet.Host
if host == "" {
host = status.PodIP
}
port, err := extractPort(p.HTTPGet.Port, container)
if err != nil {
return probe.Unknown, "", err
}
path := p.HTTPGet.Path
klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
url := formatURL(scheme, host, port, path)
headers := buildHeader(p.HTTPGet.HTTPHeaders)
klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
switch probeType {
case liveness:
return pb.livenessHTTP.Probe(url, headers, timeout)
case startup:
return pb.startupHTTP.Probe(url, headers, timeout)
default:
return pb.readinessHTTP.Probe(url, headers, timeout)
}
}
if p.TCPSocket != nil {
port, err := extractPort(p.TCPSocket.Port, container)
if err != nil {
return probe.Unknown, "", err
}
host := p.TCPSocket.Host
if host == "" {
host = status.PodIP
}
klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
return pb.tcp.Probe(host, port, timeout)
}
klog.Warningf("Failed to find probe builder for container: %v", container)
return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}
具体操作执行方法
- 根据probeType,会执行对应的execProber.Probe()、httpProber.Probe()、tcpProber.Probe()。相应方法定义在
pkg/probe
- 如果是exec,那么就会解析exec.command,然后
k8s.io/utils/exec的ioutil执行命令,判断exited的值是否为0,如果为0则返回probe.Success,否则返回probe.Failure,如果执行命令发生超时,则返回probe.Unknown1
2
3
4
5func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
......
writer := ioutils.LimitWriter(&dataBuffer, maxReadLength)
} - 如果是HTTPGet,那么会发送一个Http的get请求,如果发生error,就返回
probe.Failure。通过http状态码进行健康检测,如果StatusCode为2xx,就返回probe.Success,如果为3xx就返回probe.Warning。probe.Warning的意思是,逻辑上是成功了,但是会附带一些额外的调试信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return probe.Failure, err.Error(), nil
}
// 对Header的一系列处理
.....
// 通过GetHTTPInterface发送Request
res, err := client.Do(req)
.....
b, err := utilio.ReadAtMost(res.Body, maxRespBodyLength)
// 获取http状态码,20
body := string(b)
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
if res.StatusCode >= http.StatusMultipleChoices { // Redirect
klog.V(4).Infof("Probe terminated redirects for %s, Response: %v", url.String(), *res)
return probe.Warning, body, nil
}
klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
return probe.Success, body, nil
}
return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
} - 如果是TCPSocket,则通过
主机+端口号,检测是否可以建立TCP Socket,如果可以就返回Success,如果不可以,就返回Failure。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
// 拼接主机+端口号
return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
}
// DoTCPProbe检查到该地址的TCP Socket是否可以打开
// TCP Socket可以打开返回Success,反之返回Failure
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return probe.Failure, err.Error(), nil
}
err = conn.Close()
if err != nil {
klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
}
return probe.Success, "", nil
}
停止探针
- 删除Pod。kubelet启动时,在syncLoop()时,监听着channel中发来的事件,如果是删除事件
1
2
3
4
5
6
7
8
9
10
11//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select{
......
case kubetypes.REMOVE:
klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodRemoves(u.Pods)
}
......
} - 调用manager.RemovePod(),该方法中会遍历pod中的container,然后通过worker.stop()停止doProbe()的循环调用。worker.stop()通过向worker的channel发送数据,通知其停止探针:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15func (m *manager) RemovePod(pod *v1.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
for _, probeType := range [...]probeType{readiness, liveness, startup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
}
}
}
}1
2
3
4
5
6func (w *worker) stop() {
select {
case w.stopCh <- struct{}{}:
default: // Non-blocking.
}
}