容器健康检查-探针机制

探针

k8s提供了三种探针,确保容器在部署后处于运行状态,这三种探针分别是:就绪检测探针(readinessProbe)存活检测探针(livenessProbe)启动检测探针(startupProbe)

  • livenessProbe用于检测容器是否处于健康状态,如果不健康,就删除重建
  • readinessProbe用于检测启动时,容器状态是否就绪,当容器状态就绪,才开始接收请求流量,否则会从对应的Endpoint列表中被剔除。
  • startupProbe容器启动时执行,如果配置了startupProbe,就会在容器启动后再去执行livenessProbe和readinessProbe,避免容器内的应用程序在启动前就被杀掉。startupProbe检测成功后,就不会再进行检测。

任何一种探针,都存在着3种状态:Success、Failure、Unknown,只有Success表示检测成功。其状态定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// pkg/kubelet/prober/results/results_manager.go
const (
Unknown Result = iota - 1
Success
Failure
)
func (r Result) String() string {
switch r {
case Success:
return "Success"
case Failure:
return "Failure"
default:
return "UNKNOWN"
}
}

k8s为每一种探针都提供了三种检测机制:

  • HTTP GET Action:对容器的IP地址,指定端口和路径,执行HTTP GET请求。如果探测器收到响应,并且响应状态码不代表错误(状态码2xx或3xx),则认为检测成功。如果没收到响应,或返回错误状态码,则认为检测失败。
  • TCP Socket Action:尝试与容器指定的端口建立TCP连接,如果连接成功,则认为检测成功,反之检测失败
  • Exec Action:在容器内执行自定义的命令,并检查命令的退出状态码。如果状态码为0,则检测成功。其他所有状态码都会被认为失败。exec执行的指令,会占用容器内的资源,所以指令应当简单、轻量。

每种探针都有通用的可配置字段:

  • initialDelaySeconds:探测延迟时长,即容器启动多久后再开始第一次探测操作,显示为delay属性,默认为0秒
  • periodSeconds:探测的频度,默认为10秒,最小值为1秒。过高的频率会对Pod对象带来较大的额外开销,过低的频率有会使得对错误的反应不及时。
  • successThreshold:处于失败状态时,探测操作至少连续多少次成功才被认为是检测通过,默认值为1,最小值也为1
  • failureThreshold:处于成功状态时,探测操作至少连续多少次的失败才被视为检测不通过,默认值为3,最小值为1
  • timeoutSeconds:探测的超时时长,默认为1秒,最小是也为1秒

就绪探针(readinessProbe)

定义就绪探针意味着,Pod在启动阶段不接收任何数据,并且仅在探测成功时,才开始接收数据。

在Pod的整个生命周期中,就绪探针都会进行就绪探测,并确定Pod是否可以接收客户端请求,当容器就绪探针返回success时,表示容器已准备好接收请求。如果容器未通过readinessProbe检测,它不会被终止或重新启动,而是通知其尚未就绪,端点控制器(Endpoints Controller)会将该IP从所有匹配到此Pod的Serivce对象的Endpoint列表中删除。

就绪探针的状态值默认为Failure。如果容器未定义就绪探针,则容器默认状态为Success。

存活探针(livenessProbe)

检查容器是否还在运行。可以为pod中的每个容器单独指定存活探针。如果探测失败,k8s将定期执行探针并重新启动容器。如果容器未定义存活探针,则容器默认状态为Success。

启动探针(startupProbe)

启动检查机制,应用一些启动缓慢的业务,避免业务长时间启动而被上面两类探针kill掉。也可以延长readinessProbe和livenessProbe的initialDelaySeconds来解决。

启动探针启动时执行,如果定义了启动探针,那么在启动探针探测成功之前,其他探针都不会进行探测。如果启动探针探测失败,kubelet会杀死容器,容器将按照Restart Policy进行重启。如果没有定义startupProbe,那么默认为Success

示例

省略的Pod的yaml文件中其他配置,仅保留了spec.containers.livenessProbe相关

exec探针

1
2
3
4
5
6
7
8
spec:
containers:
- name: liveness-demo
image: busybox
args: ["/bin/sh","-c"," touch /tmp/healthy;sleep 60; rm -rf /tmp/healthy; sleep 600"]
livenessProbe:
exec:
command: ["test","-e","/tmp/healthy"]

tcpSocket探针

1
2
3
4
5
6
7
spec:
containers:
- name: liveness-demo
image: busybox
livenessProbe:
tcpSocket:
port: 80

httpGet探针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spec:
containers:
- name: liveness-demo
image: busybox
livenessProbe:
httpGet:
path: /healthz
port: 80
scheme: HTTP
initialDelaySeconds: 10
successThreshold: 1
timeoutSeconds: 1
readinessProbe:
httpGet:
path: /ping
port: 80
initialDelaySeconds: 10
successThreshold: 1
timeoutSeconds: 1

探针的创建与工作,源码解析

相关代码基于kubernetes1.20.4

创建probeManager对象,管理探针和探针检测结果

  1. 在启动kubelet时,kubelet会创建probeManager对象,同时开启一个循环syncLoop(),同步Pod状态。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    klet.probeManager = prober.NewManager(
    klet.statusManager,
    klet.livenessManager,
    klet.startupManager,
    klet.runner,
    kubeDeps.Recorder)

    // NewManager创建了管理pod的各种探针的Manager
    func NewManager(
    statusManager status.Manager,
    livenessManager results.Manager,
    startupManager results.Manager,
    runner kubecontainer.CommandRunner,
    recorder record.EventRecorder) Manager {

    prober := newProber(runner, recorder)
    readinessManager := results.NewManager()
    return &manager{
    statusManager: statusManager,
    prober: prober,
    readinessManager: readinessManager,
    livenessManager: livenessManager,
    startupManager: startupManager,
    workers: make(map[probeKey]*worker),
    }
    }
    results.Manager是一个管理器,里面用hashmap存放着容器探针的检测结果。
    1
    2
    3
    4
    5
    6
    7
    8
    type manager struct {
    // guards the cache
    sync.RWMutex
    // map of container ID -> probe Result
    cache map[kubecontainer.ContainerID]Result
    // channel of updates
    updates chan Update
    }
kubelet的syncLoop()中监听着Pod创建事件,当创建Pod的时候,会把Pod的信息交给probeManager处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// kubelet.go中的syncLoop()调用的syncLoopIteration方法
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select{
......
case kubetypes.ADD:
klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodAdditions(u.Pods)
......
}
.......
}
// kubelet.go中的HandlePodAdditions()方法
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
......
for _, pod := range pods {
existingPods := kl.podManager.GetPods()
kl.podManager.AddPod(pod)
......
// 创建探针
kl.probeManager.AddPod(pod)
}

}

根据Container的Probe相关属性创建Worker

  1. 创建Pod的时候,遍历Pod中的容器,判断容器是否添加了StartupProbe、ReadinessProbe、LivenessProbe,每添加一种探针,就会为这个探针创建一个Worker对象。这个Worker在一个goroutine中进行检测工作。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    // pkg/kubelet/prober/prober_manager.go
    func (m *manager) AddPod(pod *v1.Pod) {
    // 读写锁
    m.workerLock.Lock()
    defer m.workerLock.Unlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
    key.containerName = c.Name

    if c.StartupProbe != nil {
    key.probeType = startup
    if _, ok := m.workers[key]; ok {
    klog.Errorf("Startup probe already exists! %v - %v",
    format.Pod(pod), c.Name)
    return
    }
    w := newWorker(m, startup, pod, c)
    m.workers[key] = w
    go w.run()
    }

    if c.ReadinessProbe != nil {
    key.probeType = readiness
    if _, ok := m.workers[key]; ok {
    klog.Errorf("Readiness probe already exists! %v - %v",
    format.Pod(pod), c.Name)
    return
    }
    w := newWorker(m, readiness, pod, c)
    m.workers[key] = w
    go w.run()
    }

    if c.LivenessProbe != nil {
    key.probeType = liveness
    if _, ok := m.workers[key]; ok {
    klog.Errorf("Liveness probe already exists! %v - %v",
    format.Pod(pod), c.Name)
    return
    }
    w := newWorker(m, liveness, pod, c)
    m.workers[key] = w
    go w.run()
    }
    }
    }

每个探针的Worker是什么

  1. newWorker操作会创建一个probe worker,worker定义。初始化worker时,会为其赋值pod、container的相关信息。并根据探针类型,初始化其默认状态:readinessProbe的默认值是FailurelivenessProbe的默认值是SuccessstartupProbe的初始值是UNKNOWN
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    // pkg/kubelet/prober/worker.go
    func newWorker(
    m *manager,
    probeType probeType,
    pod *v1.Pod,
    container v1.Container) *worker {

    w := &worker{
    stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
    pod: pod,
    container: container,
    probeType: probeType,
    probeManager: m,
    }

    switch probeType {
    case readiness:
    w.spec = container.ReadinessProbe
    w.resultsManager = m.readinessManager
    w.initialValue = results.Failure
    case liveness:
    w.spec = container.LivenessProbe
    w.resultsManager = m.livenessManager
    w.initialValue = results.Success
    case startup:
    w.spec = container.StartupProbe
    w.resultsManager = m.startupManager
    w.initialValue = results.Unknown
    }

    basicMetricLabels := metrics.Labels{
    "probe_type": w.probeType.String(),
    "container": w.container.Name,
    "pod": w.pod.Name,
    "namespace": w.pod.Namespace,
    "pod_uid": string(w.pod.UID),
    }

    // Prometheus收集时需要的数据格式
    w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
    w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful

    w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
    w.proberResultsFailedMetricLabels["result"] = probeResultFailed

    w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
    w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown

    return w
    }

    type worker struct{
    // 用于停止probe,newWorker()时默认为make(chan struct{},1)
    stopCh chan struct{}
    // 只读
    pod *v1.Pod
    // 只读
    container v1.Container

    spec *v1.Probe
    probeType probeType
    initialValue results.Result

    // 用于存放探测结果
    resultsManager results.Manager
    probeManager *manager

    //当前worker最近一次探测的容器id
    containerID kubecontainer.ContainerID
    // 最近一次探测结果
    lastResult results.Result
    // 探测结果相同的次数
    resultRun int

    // 是否跳过探测
    onHold bool
    // 探测结果标签
    proberResultsSuccessfulMetricLabels metrics.Labels
    proberResultsFailedMetricLabels metrics.Labels
    proberResultsUnknownMetricLabels metrics.Labels
    }

worker的具体工作内容

  1. worker.run()中,根据Container定义的探针,获取周期,然后根据时间间隔,循环执行doProbe()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    func (w *worker) run() {
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

    // If kubelet restarted the probes could be started in rapid succession.
    // Let the worker wait for a random portion of tickerPeriod before probing.
    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

    probeTicker := time.NewTicker(probeTickerPeriod)

    defer func() {
    // Clean up.
    probeTicker.Stop()
    if !w.containerID.IsEmpty() {
    w.resultsManager.Remove(w.containerID)
    }

    w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
    ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
    ProberResults.Delete(w.proberResultsFailedMetricLabels)
    ProberResults.Delete(w.proberResultsUnknownMetricLabels)
    }()

    probeLoop:
    for w.doProbe() {
    // Wait for next probe tick.
    select {
    case <-w.stopCh:
    break probeLoop
    case <-probeTicker.C:
    // continue
    }
    }
    }

探测

  1. doProbe()方法,对container进行一次探测,然后向Manager记录探测结果,并根据执行结果判断是否还需要继续探测。即从kubelet的Manager来获取Pod的状态。
    kubelet的Manager通过ClientSet获取Pod对象,用来保证Pod的最新的status,它会把Pod的status信息同步更新到apiserver。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    func (w *worker) doProbe() (keepGoing bool) {
    defer func() { recover() }() // 发生panic,捕获后不处理,直接吞掉

    defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

    // 从kubelet的Manager获取Pod的status
    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)

    if !ok {
    // Pod还没有被创建,或者已经被删除
    klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
    return true
    }

    // Pod如果已经终止了,就不再需要Worker了
    if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
    klog.V(3).Infof("Pod %v %v, exiting probe worker",
    format.Pod(w.pod), status.Phase)
    return false
    }

    // 从hashmap里获取Pod对象中的Container状态,
    c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) == 0 {
    // Either the container has not been created yet, or it was deleted.
    klog.V(3).Infof("Probe target container not found: %v - %v",
    format.Pod(w.pod), w.container.Name)
    return true // Wait for more information.
    }

    // container发生变化时,将新的container存放到resultsManager里
    if w.containerID.String() != c.ContainerID {
    if !w.containerID.IsEmpty() {
    w.resultsManager.Remove(w.containerID)
    }
    w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
    w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
    // We've got a new container; resume probing.
    w.onHold = false
    }

    if w.onHold {
    // Worker is on hold until there is a new container.
    return true
    }

    // 获取不到container状态是,将结果设置为Failure
    if c.State.Running == nil {
    klog.V(3).Infof("Non-running container probed: %v - %v",
    format.Pod(w.pod), w.container.Name)
    if !w.containerID.IsEmpty() {
    w.resultsManager.Set(w.containerID, results.Failure, w.pod)
    }
    // Abort if the container will not be restarted.
    return c.State.Terminated == nil ||
    w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
    }

    // 启动时间小于设置探针的延迟探测时间,则Worker继续下一次探测
    if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
    return true
    }

    // 如果容器的没有启动起来,那么只有startupProbe才会进行探测,其他的probe worker,就一直在循环等待。
    // 如果容器已经started,那么startupProbe,就直接返回true,不用再执行probe()探测了
    if c.Started != nil && *c.Started {
    if w.probeType == startup {
    return true
    }
    } else {
    if w.probeType != startup {
    return true
    }
    }

    // 探测容器,获取容器状态
    result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)

    if err != nil {
    // Prober error, throw away the result.
    return true
    }

    // Metric状态收集
    switch result {
    case results.Success:
    ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
    case results.Failure:
    ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
    default:
    ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
    }

    if w.lastResult == result {
    w.resultRun++
    } else {
    w.lastResult = result
    w.resultRun = 1
    }

    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
    (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
    // Success or failure is below threshold - leave the probe state unchanged.
    return true
    }

    // 记录探测结果
    w.resultsManager.Set(w.containerID, result, w.pod)

    if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
    // 容器存活检查失败或者启动检查失败时,需要重新启动容器。停止探测,直到获取了新的containerId。
    w.onHold = true
    w.resultRun = 0
    }

    return true
    }

根据探针类型进行相应操作处理

  1. w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID),会调用到runProbe(),根据probeType,进行具体处理。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
    timeout := time.Duration(p.TimeoutSeconds) * time.Second
    if p.Exec != nil {
    klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod.Name, container.Name, p.Exec.Command)
    command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
    return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
    }
    if p.HTTPGet != nil {
    scheme := strings.ToLower(string(p.HTTPGet.Scheme))
    host := p.HTTPGet.Host
    if host == "" {
    host = status.PodIP
    }
    port, err := extractPort(p.HTTPGet.Port, container)
    if err != nil {
    return probe.Unknown, "", err
    }
    path := p.HTTPGet.Path
    klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
    url := formatURL(scheme, host, port, path)
    headers := buildHeader(p.HTTPGet.HTTPHeaders)
    klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
    switch probeType {
    case liveness:
    return pb.livenessHTTP.Probe(url, headers, timeout)
    case startup:
    return pb.startupHTTP.Probe(url, headers, timeout)
    default:
    return pb.readinessHTTP.Probe(url, headers, timeout)
    }
    }
    if p.TCPSocket != nil {
    port, err := extractPort(p.TCPSocket.Port, container)
    if err != nil {
    return probe.Unknown, "", err
    }
    host := p.TCPSocket.Host
    if host == "" {
    host = status.PodIP
    }
    klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
    return pb.tcp.Probe(host, port, timeout)
    }
    klog.Warningf("Failed to find probe builder for container: %v", container)
    return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
    }

具体操作执行方法

  1. 根据probeType,会执行对应的execProber.Probe()、httpProber.Probe()、tcpProber.Probe()。相应方法定义在pkg/probe
  • 如果是exec,那么就会解析exec.command,然后k8s.io/utils/exec的ioutil执行命令,判断exited的值是否为0,如果为0则返回probe.Success,否则返回probe.Failure,如果执行命令发生超时,则返回probe.Unknown
    1
    2
    3
    4
    5
    func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
    ......
    writer := ioutils.LimitWriter(&dataBuffer, maxReadLength)

    }
  • 如果是HTTPGet,那么会发送一个Http的get请求,如果发生error,就返回probe.Failure。通过http状态码进行健康检测,如果StatusCode为2xx,就返回probe.Success,如果为3xx就返回probe.Warningprobe.Warning的意思是,逻辑上是成功了,但是会附带一些额外的调试信息。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) {
    req, err := http.NewRequest("GET", url.String(), nil)
    if err != nil {
    return probe.Failure, err.Error(), nil
    }
    // 对Header的一系列处理
    .....


    // 通过GetHTTPInterface发送Request
    res, err := client.Do(req)
    .....


    b, err := utilio.ReadAtMost(res.Body, maxRespBodyLength)
    // 获取http状态码,20
    body := string(b)
    if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
    if res.StatusCode >= http.StatusMultipleChoices { // Redirect
    klog.V(4).Infof("Probe terminated redirects for %s, Response: %v", url.String(), *res)
    return probe.Warning, body, nil
    }
    klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
    return probe.Success, body, nil
    }

    return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
    }
  • 如果是TCPSocket,则通过主机+端口号,检测是否可以建立TCP Socket,如果可以就返回Success,如果不可以,就返回Failure。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func (pr tcpProber) Probe(host string, port int, timeout time.Duration) (probe.Result, string, error) {
    // 拼接主机+端口号
    return DoTCPProbe(net.JoinHostPort(host, strconv.Itoa(port)), timeout)
    }

    // DoTCPProbe检查到该地址的TCP Socket是否可以打开
    // TCP Socket可以打开返回Success,反之返回Failure
    func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
    conn, err := net.DialTimeout("tcp", addr, timeout)
    if err != nil {
    return probe.Failure, err.Error(), nil
    }
    err = conn.Close()
    if err != nil {
    klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
    }
    return probe.Success, "", nil
    }

停止探针

  1. 删除Pod。kubelet启动时,在syncLoop()时,监听着channel中发来的事件,如果是删除事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //pkg/kubelet/kubelet.go 
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select{
    ......
    case kubetypes.REMOVE:
    klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
    handler.HandlePodRemoves(u.Pods)
    }
    ......
    }
  2. 调用manager.RemovePod(),该方法中会遍历pod中的container,然后通过worker.stop()停止doProbe()的循环调用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func (m *manager) RemovePod(pod *v1.Pod) {
    m.workerLock.RLock()
    defer m.workerLock.RUnlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
    key.containerName = c.Name
    for _, probeType := range [...]probeType{readiness, liveness, startup} {
    key.probeType = probeType
    if worker, ok := m.workers[key]; ok {
    worker.stop()
    }
    }
    }
    }
    worker.stop()通过向worker的channel发送数据,通知其停止探针:
    1
    2
    3
    4
    5
    6
    func (w *worker) stop() {
    select {
    case w.stopCh <- struct{}{}:
    default: // Non-blocking.
    }
    }