Kubelet 服务引导流程
admin
- 11 minutes read - 2322 words版本:v1.17.3
入口文件: /cmd/kubelet/kubelet.go
本文主要是为了通过阅读kubelet启动流程源码,实现对整个kubelet 组件及其服务有所了解,因此许多相关组件服务的运行机制并没有详细介绍,如果有时间的话,可以针对每个组件服务进行详细介绍。
在k8s中 kubelet 是一个极其重要的组件之一,也是 Kubernetes 里面第二个不可被替代的组件(第一个不可被替代的组件当然是 kube-apiserver)。也就是说,无论如何,都不太建议你对 kubelet 的代码进行大量的改动。保持 kubelet 跟上游基本一致的重要性,就跟保持 kube-apiserver 跟上游一致是一个道理。
kubelet 本身,也是按照“控制器”模式来工作的。它实际的工作原理,可以用如下所示的一幅示意图来表示清楚。
可以看到,kubelet 的工作核心,就是一个控制循环,即:SyncLoop(图中的大圆圈)。而驱动这个控制循环运行的事件,包括四种:
- Pod 更新事件;
- Pod 生命周期变化;
- kubelet 本身设置的执行周期;
- 定时的清理事件。
所以,跟其他控制器类似,kubelet 启动的时候,要做的第一件事情,就是设置 Listers,也就是注册它所关心的各种事件的 Informer。这些 Informer 就是 SyncLoop 需要处理的数据的来源。此外,kubelet 还负责维护着很多很多其他的子控制循环(也就是图中的小圆圈)。这些控制循环的名字,一般被称作某某 Manager,比如 Volume Manager、Image Manager、Node Status Manager 等等。
简单理解就是先通过建立各类 Informer 来建立与 APIServer 的通讯,这样当资源发生变化时,立刻就能感知到,做出相应的处理。
下面我们就从源码来看一下它的实现过程。在 kubelet 中有两个重要的数据结构,一个是 kubeletServer ,另一个是 kubeletDeps 。
配置项 kubeletServer
kubeltServer 封装了启动 kubelet 所需的所有参数,这些可以通过命令行设置,也可以直接设置。
type KubeletServer struct {
    KubeletFlags
    kubeletconfig.KubeletConfiguration
}
它内嵌了两个数据结构,分别为 KubeletFlags 和 KubeletConfiguration 。其中 kubeletFlags 主要是用来接收执行命令时手动指定的参数,而 KubeletConfiguration 则是从配置文件里读取配置信息,其 API 介绍参考 。
这两个内嵌的数据结构均表示输入,但它们使用时还是有一些区别的,我们看一下 kubeletFlags 的注释信息
// KubeletFlags contains configuration flags for the Kubelet.
// A configuration field should go in KubeletFlags instead of KubeletConfiguration if any of these are true:
//   - its value will never, or cannot safely be changed during the lifetime of a node, or
//   - its value cannot be safely shared between nodes at the same time (e.g. a hostname);
//     KubeletConfiguration is intended to be shared between nodes.
翻译过来:
KubeltFlags包含 Kubelet 的配置标志。如果以下任何一项为真,则配置字段应位于KubeltFlags中,而不是KubeltConfiguration中:
- 在节点的生存期内,其值永远不会或无法安全更改
- 其值不能在节点之间同时安全共享(例如主机名)
KubeltConfiguration旨在在节点之间共享。
可以看到对于一些全局不变的配置一般是通过 kubeletFlags实现,而对于一些其它应用配置,一般是通过 kubeletConfiguration 实现。
依赖注入器
另一个重要的数据结构就是kubeletDeps ,它是一个依赖注入器,在运行时构建的对象是运行Kubelet所必需的,其包含一些内置插件,如 VolumePlugins 、 clientSet、认证过滤器(auth filters) 、可观测性(testability)、云适配器(cloud provider)、追踪适配器(TraceProvider) 等这些都极其重要。
type Dependencies struct {
    Options []Option
    // Injected Dependencies
    // 认证
    Auth                     server.AuthInterface
    // 可观测性
    CAdvisorInterface        cadvisor.Interface
    // cloudProvider
    Cloud                    cloudprovider.Interface
    //容器管理器
    ContainerManager         cm.ContainerManager
    EventClient              v1core.EventsGetter
    // 心跳
    HeartbeatClient          clientset.Interface
    OnHeartbeatFailure       func()
    // clientSet
    KubeClient               clientset.Interface
    Mounter                  mount.Interface
    HostUtil                 hostutil.HostUtils
    // OOM 的监听器
    OOMAdjuster              *oom.OOMAdjuster
    // 收集系统级别的操作,可以在 test 使用
    OSInterface              kubecontainer.OSInterface
    // 重要,与pod的创建,修改,删除,调整有关,后面所有对Pod操作,将从这里观察到
    PodConfig                *config.PodConfig
    // 重要,pod 探针,检查容器健康情况
    ProbeManager             prober.Manager
    // 事件记录器
    Recorder                 record.EventRecorder
    Subpather                subpath.Interface
    TracerProvider           trace.TracerProvider
    // Volume
    VolumePlugins            []volume.VolumePlugin
    DynamicPluginProber      volume.DynamicPluginProber
    TLSOptions               *server.TLSOptions
    // container runtime
    RemoteRuntimeService     internalapi.RuntimeService
    // 容器镜像管理,如从远程仓库拦取镜像
    RemoteImageService       internalapi.ImageManagerService
    PodStartupLatencyTracker util.PodStartupLatencyTracker
    // remove it after cadvisor.UsingLegacyCadvisorStats dropped.
    useLegacyCadvisorStats bool
}
这个数据结构里包含有大量的接口,也就是说以后的扩展工作基本都可以从这里开始。
Dependencies.PodConfig 是后续 Pod 被处理的入口,任何Pod的创建或变更将从这里看到。
如 RemoteImageService 可以实现对镜像的一系列操作。
type ImageManagerService interface {
    // ListImages lists the existing images.
    ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
    // ImageStatus returns the status of the image.
    ImageStatus(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error)
    // PullImage pulls an image with the authentication config.
    PullImage(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
    // RemoveImage removes the image.
    RemoveImage(ctx context.Context, image *runtimeapi.ImageSpec) error
    // ImageFsInfo returns information of the filesystem that is used to store images.
    ImageFsInfo(ctx context.Context) ([]*runtimeapi.FilesystemUsage, error)
}
服务启动
首先初始化上面的两个数据结构.
初始化 kubeletFlags 和 kubeletConfiguration
// 第一个函数
func NewKubeletCommand() *cobra.Command {
    kubeletFlags := options.NewKubeletFlags()
    kubeletConfig, err := options.NewKubeletConfiguration()
    ...
}
// NewKubeletFlags will create a new KubeletFlags with default values
func NewKubeletFlags() *KubeletFlags {
    return &KubeletFlags{
        ContainerRuntimeOptions: *NewContainerRuntimeOptions(),
        CertDirectory:           "/var/lib/kubelet/pki",
        RootDirectory:           filepath.Clean(defaultRootDir),
        MaxContainerCount:       -1,
        MaxPerPodContainerCount: 1,
        MinimumGCAge:            metav1.Duration{Duration: 0},
        RegisterSchedulable:     true,
        NodeLabels:              make(map[string]string),
    }
}
kubeletConfig, err := options.NewKubeletConfiguration()
// NewKubeletConfiguration will create a new KubeletConfiguration with default values
func NewKubeletConfiguration() (*kubeletconfig.KubeletConfiguration, error) {
    scheme, _, err := kubeletscheme.NewSchemeAndCodecs()
    if err != nil {
        return nil, err
    }
    versioned := &v1beta1.KubeletConfiguration{}
    scheme.Default(versioned)
    config := &kubeletconfig.KubeletConfiguration{}
    if err := scheme.Convert(versioned, config, nil); err != nil {
        return nil, err
    }
    applyLegacyDefaults(config)
    return config, nil
}
上面两个重要的数据结构初始化后, 还要创建一个重要的依赖数据结构,它是用来存储与外部通讯的,如 PodConfig、RemoteRuntimeService 、RemoteImageService、Auth 认证器,contrainerManager、kubeClient、Mounter、Cloud、CAdvisorInterface 等等。
func NewKubeletCommand() *cobra.Command {
			...
	cmd := &cobra.Command{
		Use: componentKubelet,
		DisableFlagParsing: true,
		SilenceUsage:       true,
		RunE: func(cmd *cobra.Command, args []string) error {
			// 1. construct a KubeletServer from kubeletFlags and kubeletConfig
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}
			// 2. 使用 kubeletServer 构建 KubeletDeps 依赖
			kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
			// 3. run the kubelet
			return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
		},
	}
}
现在开始启动服务,入口函数为 Run() ,其内部调用 run() 启动服务。
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
    if err := run(ctx, s, kubeDeps, featureGate); err != nil {
        return fmt.Errorf("failed to run Kubelet: %w", err)
    }
    return nil
}
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
    ...
}
这里的 featureGate 是一些特性开关。
下面看一下 run() 的实现,代码比较多。
cloudProvider 初始化
	if kubeDeps.Cloud == nil {
        if !cloudprovider.IsExternal(s.CloudProvider) {
            cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
            cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
            if err != nil {
                return err
            }
            if cloud != nil {
                klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
            }
            kubeDeps.Cloud = cloud
        }
    }
kubelet 认证
if kubeDeps.Auth == nil {
        auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
        if err != nil {
            return err
        }
        kubeDeps.Auth = auth
        runAuthenticatorCAReload(ctx.Done())
    }
可观测性
if kubeDeps.CAdvisorInterface == nil {
        imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntimeEndpoint)
        kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntimeEndpoint), s.LocalStorageCapacityIsolation)
        if err != nil {
            return err
        }
    }
if kubeDeps.ContainerManager == nil {
        if s.CgroupsPerQOS && s.CgroupRoot == "" {
            s.CgroupRoot = "/"
        }
        machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
}
运行时服务 runtime service
err = kubelet.PreInitRuntimeService()
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) error {
    remoteImageEndpoint := kubeCfg.ImageServiceEndpoint
    if remoteImageEndpoint == "" && kubeCfg.ContainerRuntimeEndpoint != "" {
        remoteImageEndpoint = kubeCfg.ContainerRuntimeEndpoint
    }
    var err error
    // 1. 初始化 kubeDeps.RemoteRuntimeService
    if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
        return err
    }
    // 2. 初始化 kubeDeps.RemoteImageService
    if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
        return err
    }
    // 3. 初始化 kubeDeps.RemoteImageService
    kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(kubeCfg.ContainerRuntimeEndpoint)
    return nil
}
最后一步,启动 kubelet
	if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}
实现源码
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	// Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	hostnameOverridden := len(kubeServer.HostnameOverride) > 0
	// Setup event recorder if required.
	makeEventRecorder(kubeDeps, nodeName)
	nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs))
	capabilities.Initialize(capabilities.Capabilities{
		AllowPrivileged: true,
	})
	credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
	if kubeDeps.OSInterface == nil {
		kubeDeps.OSInterface = kubecontainer.RealOS{}
	}
	// 1. 初始化 kubelet  实例, 此时会创建一个 PodConfig(pod source)
	k, err := createAndInitKubelet(kubeServer,
		kubeDeps,
		hostname,
		hostnameOverridden,
		nodeName,
		nodeIPs)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %w", err)
	}
	// NewMainKubelet should have set up a pod source config if one didn't exist
	// when the builder was run. This is just a precaution.
	if kubeDeps.PodConfig == nil {
		return fmt.Errorf("failed to create kubelet, pod source config was nil")
	}
        // 所有对Pod的创建或变更,将通过 podCfg 读取到,可视为接口Pod的入口channel
	podCfg := kubeDeps.PodConfig
	if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
		klog.ErrorS(err, "Failed to set rlimit on max file handles")
	}
	// 2. 启动 kubelet
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %w", err)
		}
		klog.InfoS("Started kubelet as runonce")
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.InfoS("Started kubelet")
	}
	return nil
}
整个过程
- 调用 createAndInitKubelet()``函数获取kubelet实例,其实现了[kubelet.Bootstrap](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L232-L242)接口
- 调用 [startKubelet()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/cmd/kubelet/app/server.go#L1173-L1187)函数启动kubelet服务
再看下 startKubelet() 的实现
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// 1. start the kubelet
	go k.Run(podCfg.Updates())
	// 2. start the kubelet server
	if enableServer {
		go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
	}
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}
这里的 PodConfig 表示将许多pod配置源合并到一个一致的结构中,然后按顺序向侦听器发送增量更改通知。而 podCfg.Updates() 则表示接收Pod更新通知类型的 channel。也就是说这个channel只能读取,而对于 往这个channel 的写入参考函数 [makePodSourceConfig()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L296-L312)
不能写入(那么在哪里往这个 channel 写入的呢?)。
这里这里只重点看一下 [k.Run()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L1509-L1605) 都干了什么
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	ctx := context.Background()
	// 1. 日志服务接口注册
	if kl.logServer == nil {
		...
	}
	// 2. Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}
	// 3. Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
	if kl.kubeClient != nil {
		// Start two go-routines to update the status.
		//
		// The first will report to the apiserver every nodeStatusUpdateFrequency and is aimed to provide regular status intervals,
		// while the second is used to provide a more timely status update during initialization and runs an one-shot update to the apiserver
		// once the node becomes ready, then exits afterwards.
		//
		// Introduce some small jittering to ensure that over time the requests won't start
		// accumulating at approximately the same time from the set of nodes due to priority and
		// fairness effect.
		go wait.JitterUntil(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, 0.04, true, wait.NeverStop)
		go kl.fastStatusUpdateOnce()
		// start syncing lease
		go kl.nodeLeaseController.Run(context.Background())
	}
	// 5. 调用容器运行时状态回调,每5s执行一次,状态字段 kl.runtimeState
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
	// 6. iptables 规则初始化
	// Set up iptables util rules
	if kl.makeIPTablesUtilChains {
		kl.initNetworkUtil()
	}
	// 7. Start component sync loops.
	kl.statusManager.Start()
	// Start syncing RuntimeClasses if enabled.
	if kl.runtimeClassManager != nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}
	// 8. Start the pod lifecycle event generator.
	kl.pleg.Start()
	// Start eventedPLEG only if EventedPLEG feature gate is enabled.
	if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
		kl.eventedPleg.Start()
	}
	// 9. 轮询pod更新,即上个函数中的 podCfg.Updates() 事件
	kl.syncLoop(ctx, updates, kl)
}
最后的最后就是调用 [kubelet.syncLoop()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L2251-L2296) 实现对 Pod 的变更监听,其中 updates 参数是一个 [types.PodUpdate](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/types/pod_update.go#L70-L83) 类型的 channel, 第二个参数是处理 pod 变更的 handler, 其实现了 [SyncHandler](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L219-L227) 接口, 这里的 handler 指的是 kubelet 这个结构体实例。
type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups(ctx context.Context) error
}
从接口里可以看到对Pod 的每一种操作类型都有相应的处理函数 handler。
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
	// 1. syncTicker唤醒kubelet,检查是否有pod workers需要同步。检查周期为一秒一次,默认情况下是10秒一次
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	// 2. pleg专用channel
	plegCh := kl.pleg.Watch()
	const (
		base   = 100 * time.Millisecond
		max    = 5 * time.Second
		factor = 2
	)
	duration := base
	// 负责检查resolv.conf中的限制。限制与单个pod无关。
	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
		kl.dnsConfigurer.CheckLimitsForResolvConf()
	}
	// 3. 轮询事件
	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.ErrorS(err, "Skipping pod synchronization")
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base
		kl.syncLoopMonitor.Store(kl.clock.Now())
		// 处理各种pod事件,重点
		if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}
这里 syncLoop 是处理变更事件的主要循环函数。它将观察 file、apiserver 和 http 三种类型的 channel, 然后将它们关联(与比上图中 sources 个数不符?)。
对于发现的新变更,将根据所需要的 描述状态 和 实际运行状态 运行同步。如果没有任何变更,则将根据固定的频率同步最后已知的 描述状态。此函数无返回值。
这里重点是 kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) 这个函数, 它的作用就是从多个 channel 中读取相关事件并分发给 handler 。注意每个函数参数的意义。
参数
- configCh: 从中读取配置事件的通道,这里指参数- updates
- handler: 将 pod 分发到的- SyncHandler
- syncCh: 读取周期性同步事件的通道
- housekeepingCh: 读取- housekeeping事件的通道
- plegCh: 读取 PLEG 更新的通道
用一句话概括的话,就是在一个 Loop 中不断的从其中一个channel 中读取事件,然后再视情况处理事件,最后再更新时间戳。
不同的通道处理如下:
configCh:将配置更改的pod调度到事件类型的适当的 handler callback
plegCh:更新 runtime cache;同步Pod
syncCh:同步所有等待同步的Pod
housekeepingCh:触发Pod清理
health manager:同步已失败或其中一个或多个容器未通过运行状况检查的pod
下面我们依次针对每一类channel的处理逻辑
configCh
func (kl *Kubelet) syncLoopIteration(...) {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}
		switch u.Op {
		case kubetypes.ADD:
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			klog.ErrorS(nil, "Kubelet does not support snapshot update")
		default:
			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
		}
		kl.sourcesReady.AddSource(u.Source)
}
这里主要是指针对一个pod 操作类型做相应的处理,如创建新Pod,修改Pod、Remove 、 DELETE 或 RECONCILE操作。操作类型定义
// PodOperation defines what changes will be made on a pod configuration.
type PodOperation int
// These constants identify the PodOperations that can be made on a pod configuration.
const (
	// SET is the current pod configuration.
	SET PodOperation = iota
	// ADD signifies pods that are new to this source.
	ADD
	// DELETE signifies pods that are gracefully deleted from this source.
	DELETE
	// REMOVE signifies pods that have been removed from this source.
	REMOVE
	// UPDATE signifies pods have been updated in this source.
	UPDATE
	// RECONCILE signifies pods that have unexpected status in this source,
	// kubelet should reconcile status with this source.
	RECONCILE
)
针对每种操作类型调用不同的处理函数
kubetypes.ADD: handler.HandlePodAdditions(u.Pods)
kubetypes.UPDATE: handler.HandlePodUpdates(u.Pods)
kubetypes.REMOVE: handler.HandlePodRemoves(u.Pods)
kubetypes.RECONCILE: handler.HandlePodReconcile(u.Pods)
kubetypes.DELETE: handler.HandlePodUpdates(u.Pods)
这里将 kubetypes.DELETE 视为 kubetypes.UPDATE ,因为可以实现优雅的删除。
plegCh
读取 PodLifecycleEvent 事件。
case e := <-plegCh:
		// 过滤掉允许被忽略的事件 pleg.ContainerRemoved
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
			}
		}
		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}
		}
这里分两种情况:
- 如果事件为 !pleg.ContainerRemoved,则调用[kl.podManager.GetPodByUID(e.ID)](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/pod/pod_manager.go#L225-L230)来进行检查Pod是否存在;如果存在,则接着调用handler.HandlePodSyncs([]*v1.Pod{pod})同步它。
- 如果事件为 pleg.ContainerRemoved,则调用[kl.cleanUpContainersInPod(e.ID, containerID)](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L2825-L2832)将其容器删除。
syncCh
case <-syncCh:
		// Sync pods waiting for sync
		podsToSync := kl.getPodsToSync()
		if len(podsToSync) == 0 {
			break
		}
		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjSlice(podsToSync))
		handler.HandlePodSyncs(podsToSync)
调用 handler.HandlePodSyncs() 同步Pod。
housekeepingCh
case <-housekeepingCh:
		if !kl.sourcesReady.AllReady() {
			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
		} else {
			start := time.Now()
			if err := handler.HandlePodCleanups(ctx); err != nil {
				klog.ErrorS(err, "Failed cleaning pods")
			}
			duration := time.Since(start)
			if duration > housekeepingWarningDuration {
				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than expected", "expected", housekeepingWarningDuration, "actual", duration.Round(time.Millisecond))
			}
		}
	}
调用 [handler.HandlePodCleanups(ctx)](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet_pods.go#L1006-L1250) 清理 Pod。这里 HandlePodCleanups 将执行一系列清理工作,包括终止pod工作程序、杀死不需要的pod以及删除孤立卷pod目录。在执行此方法时,不会向pod工作者发送任何配置更改,这意味着不会出现新的pod。在这个方法完成后,kubelet的期望状态应该与pod worker和其他pod相关组件中的实际状态相协调。
healthy manager
健康检查事件
case update := <-kl.livenessManager.Updates():
		// 存活状态失败
		if update.Result == proberesults.Failure {
			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
		}
	case update := <-kl.readinessManager.Updates():
		// 准备就绪
		ready := update.Result == proberesults.Success
		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
		status := ""
		if ready {
			status = "ready"
		}
		handleProbeSync(kl, update, handler, "readiness", status)
	case update := <-kl.startupManager.Updates():
		started := update.Result == proberesults.Success
		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
		status := "unhealthy"
		if started {
			status = "started"
		}
		handleProbeSync(kl, update, handler, "startup", status)
对于服务健康检查的处理结果大致可以分为两步:
- 如有必要,调用 [kl.statusManager.SetContainerReadiness()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/status/status_manager.go#L286-L345)或[kl.statusManager.SetContainerStartup()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/status/status_manager.go#L347-L387)来设置容器状态,
- 然后调用 handleProbeSync()函数同步probe状态,。通过源码发现,其实最终调用的还是上面的[handler.HandlePodSyncs()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/kubelet.go#L2601-L2609)函数。
我们看下Pod同步实现
// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		// 根据 RegularPod 获取对应的 mirrorPod
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
	}
}
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// 关注重点: Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}
最后调用 [kl.podWorkers.UpdatePod()](https://github.com/kubernetes/kubernetes/blob/v1.27.3/pkg/kubelet/pod_workers.go#L734-L976) 在 asyner worker 进行同步操作,这一步实现逻辑比较多,也是我们关注的重点。由于本篇主要是为了熟悉 kubelet 的整个启动流程以及相关组件,所以不再详细介绍, 可参考 《创建Pod源码解析》 。
注意 RegularPod 与 mirrorPod 的区别
在Kubernetes中,
mirrorPod和RegularPod是两种不同类型的Pod。以下是它们之间的主要区别:
用途:
RegularPod是正常的工作Pod,用于运行应用程序或服务。而mirrorPod是RegularPod的一种特殊类型,用于支持调试和监视目的。
生命周期:
RegularPod是由用户定义和创建的,可以具有任意的生命周期,包括创建、修改、删除等。而mirrorPod是由Kubernetes控制平面自动创建和管理的,它是根据关联的RegularPod创建的,会在RegularPod删除后自动删除。
资源配置:
MirrorPod是一个只读镜像,与关联的RegularPod共享相同的容器镜像和配置。它不会被调度到任何节点上执行,而是在控制平面上以被动模式运行。这意味着MirrorPod不占用实际的计算资源,并不处理任何网络请求。
监控调试:
MirrorPod被用于支持对关联的RegularPod进行监控和调试。通过kubectl attach命令,可以将kubectl attach到关联的RegularPod上,并在MirrorPod中查看它的日志、执行命令等。这为开发人员和运维人员提供了一种方便的方式来检查和诊断正在运行的应用程序。总而言之,
mirrorPod是一种特殊类型的Pod,主要用于支持调试和监控RegularPod的目的。它是由Kubernetes自动创建和删除的,共享相同的容器配置,但不执行任何实际的计算工作。
参考资料
- https://www.cnblogs.com/HopeGi/p/15351158.html
- PLEG( pod lifecycle event generator )
- https://time.geekbang.org/column/article/71056