Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

kbebuilder CronJob

简介

CronJob controller 会控制 kubernetes 集群上的 job 每隔一段时间运行一次,它是基于 Job controller 实现的,Job controller 的 job 只会执行任务一次。

创建项目

1
2
3
mkdir project
cd project
kubebuilder init --domain tutorial.kubebuilder.io --repo tutorial.kubebuilder.io/project

--domain tutorial.kubebuilder.io:指定 创建的 API 的组名后缀

--repo tutorial.kubebuilder.io/project:指定 Go module 的路径,用于设置 go.mod 中的模块名。

apiVersion 决定了该资源属于哪个 API 组、使用哪个版本的 schema 来解析。完整的apiVersion:

1
apiVersion: <group>.<domain>/<version>

项目结构

go.mod: 包含最基本依赖关系的 go module 文件。

Makefile: 用于构建和部署 controller。

PROJECT: 用于创建新组件的 Kubebuilder 元数据。

config/目录:运行 operator 所需的所有配置文件。现在它只包含运行 controller 所需要的 Kustomize YAML 配置文件,后续编写 operator 时,这个目录还会包含 CustomResourceDefinitions(CRD)、RBAC 和 Webhook 等相关的配置文件。

  • config/default包含Kustomize base文件,用于以标准配置启动 controller。
  • config/manager: 包含在 k8s 集群中以 pod 形式运行 controller 的 YAML 配置文件
  • config/rbac: 包含运行 controller 所需最小权限的配置文件

main.go 解析

每组 controller 都需要一个 Scheme, Scheme 会提供 Kinds 与 Go types 之间的映射关系。

main.go 的功能相对来说比较简单:

  1. 为 metrics 绑定一些基本的 flags。
  2. 实例化一个 manager,用于跟踪运行的所有 controllers, 并设置 shared caches 和可以连接到 API server 的 k8s clients 实例,并将 Scheme 配置传入 manager。
  3. 运行manager, 而 manager 又运行所有的 controllers 和 webhook。 manager 会一直处于运行状态,直到收到正常关闭信号为止。

// +kubebuilder:scaffold:builder:告诉 Kubebuilder 工具,在执行代码生成命令时,把新代码插入到这个位置。

当运行这些命令时:

1
2
3
kubebuilder create api --group xxx --version v1 --kind MyKind
make generate
make manifests

Kubebuilder 会自动把新生成的代码 插入到有 +kubebuilder:scaffold:\* 的标记位置,比如:

  • 注册 SchemeBuilder
  • 添加 controller
  • 添加 webhook

创建API

1
kubebuilder create api --group batch --version v1 --kind CronJob

创建api/v1/个目录, 对应的 group-versionbatch.tutorial.kubebuilder.io/v1

创建 api/v1/cronjob_types.go 文件并添加 CronJob Kind

cronjob_types.go 解析

CronJobSpecCronJobStatus

Kubernetes 的功能是将期望状态(Spec)与集群实际状态(其他对象的Status)和外部状态进行协调。 然后记录下观察到的状态(Status)。

因此,每个具有功能的对象都包含 SpecStatus 。 但是 ConfigMap 之类的一些类型不遵循此模式,因为它们不会维护一种状态,但是大多数类型都需要。

1
2
3
4
5
6
7
8
9
10
type CronJobSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
}

// CronJobStatus defines the observed state of CronJob
type CronJobStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
CronJobCronJobList

定义了 Kinds 对应的结构体类型, CronJobroot type,用来描述 CronJob Kind。和所有 Kubernetes 对象一样, 它包含 TypeMeta (用来定义 API version 和 Kind) 和 ObjectMeta (用来定义 name、namespace 和 labels等一些字段)。

CronJobList 包含了一个 CronJob 的切片,它是用来批量操作 Kind 的,比如 LIST 操作。通常不会修改它们所有的修改都是在 Spec 和 Status 上进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// +kubebuilder:object:root=true
// CronJob is the Schema for the cronjobs API
type CronJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec CronJobSpec `json:"spec,omitempty"`
Status CronJobStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true
// CronJobList contains a list of CronJob
type CronJobList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []CronJob `json:"items"`
}

// +kubebuilder:object:root=true:这个注释告诉 object 这是一种 root type Kind。 然后,object 生成器会为生成 runtime.Object 接口的实现, 这是所有 Kinds 必须实现的接口。

Register
1
2
3
func init() {
SchemeBuilder.Register(&CronJob{}, &CronJobList{})
}

Kinds 注册到 API group 中。这将此 API group 中的 Kind 添加到任何 Scheme 中。

其他文件解析

groupversion_info.go

包含和 group-version 有关的元数据:

定义一些全局变量帮助建立 Scheme。 由于需要在 controller 中使用此程序包中的所有 types, 因此需要一种方便的方法(或约定)可以将所有 types 添加到其他 Scheme 中。

1
2
3
4
5
6
7
8
9
var (
// GroupVersion is group version used to register these objects
GroupVersion = schema.GroupVersion{Group: "batch.tutorial.kubebuilder.io", Version: "v1"}

// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
// AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme
)
zz_generated.deepcopy.go

包含之前所说的由 +kubebuilder:object:root自动生成的 runtime.Object 接口的实现。

runtime.Object interface 的核心是一个 deep-copy 方法:DeepCopyObject

controller-tools 中的 object 生成器也为每个 root type(CronJob) 和他的 sub-types(CronJobList,CronJobSpec,CronJob1Status) 都生成了两个方法:DeepCopyDeepCopyInto

编写API

CronJobSpec

  • schedule :Cron 表达式,用来定义任务执行的时间,例如 */5 * * * * 表示每 5 分钟执行一次。
  • JobTemplate:每次执行会基于这个模板生成一个 Job。
  • StartingDeadlineSeconds:这个截止时间,将会等到下一个调度时间点调度。
  • ConcurrencyPolicy:并发策略
  • Suspend:暂停
  • SuccessfulJobsHistoryLimit:限制历史 Job 的数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type CronJobSpec struct {
Foo string `json:"foo,omitempty"`
// +kubebuilder:validation:MinLength=0
// Cron 表达式,用来定义任务执行的时间,例如 */5 * * * * 表示每 5 分钟执行一次。
Schedule string `json:"schedule"`
// +optional
// 调度开始之后在这个时间内,就可以执行job
// 如果任务错过了调度时间(比如因为 controller 崩溃、宕机),这个字段定义了一个“容忍窗口(秒)”,
// 允许它在这个时间内仍然启动,否则就认为“错过”,并标记为失败。
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`
// +optional
// 并行策略
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
// +optional
// 是否暂停该 CronJob 的调度。
Suspend *bool `json:"suspend,omitempty"`
// CronJob 每次执行会基于这个模板生成一个 Job
JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"`
// +optional
// 保留多少个成功完成的 Job。
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`
// +kubebuilder:validation:Minimum=0
// +optional
// 失败的job历史记录限制条数
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
}

自定义了一个类型(ConcurrencyPolicy)来保存并发策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// +kubebuilder:validation:Enum=Allow;Forbid;Replace
type ConcurrencyPolicy string

const (
// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrent ConcurrencyPolicy = "Allow"

// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrent ConcurrencyPolicy = "Forbid"

// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

CronJobstatus

它用来存储观察到的状态。它包含 controllers 能够获取的所有信息。

1
2
3
4
5
6
7
8
type CronJobStatus struct {
// +optional
Active []corev1.ObjectReference `json:"active,omitempty"`

// Information when was the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
}

保留一份正在运行的 Job 列表,以及最后一次成功运行 Job 的时间。

CronJob

1
2
3
4
5
6
7
8
9
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
type CronJob struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec CronJobSpec `json:"spec,omitempty"`
Status CronJobStatus `json:"status,omitempty"`
}

+kubebuilder:subresource:status 它的作用是: 告诉 Kubebuilder 为 CRD 启用 status 子资源支持。

在 Kubernetes 中,每个资源(包括 CRD)都可以有一个 status 字段,但:

  • 默认情况下,status 只是资源的普通字段,用户可以随意修改。
  • 如果希望 只有 Kubernetes 控制器能修改 status,用户不能手动改,需要开启它的 subresource 支持

Controllers简介

Controllers 是 operator 和 Kubernetes 的核心组件。

controller 的职责是确保实际的状态(包括群集状态,以及潜在的外部状态,例如正在运行的 Kubelet 容器和云提供商的 loadbalancers)与给定 object 期望的状态相匹配。 每个 controller 专注于一个 root Kind,但也可以与其他 Kinds 进行交互。

这种努力达到期望状态的过程,称之为 reconciling(调协)。

在 controller-runtime 库中,实现 Kind reconciling 的逻辑称为 Reconcilerreconciler 获取对象的名称并返回是否需要重试。

reconciler 结构体

每个 reconciler 都需要记录日志,并且需要能够获取对象,因此这个结构体是开箱即用的。

1
2
3
4
5
// CronJobReconciler reconciles a CronJob object
type CronJobReconciler struct {
client.Client
Scheme *runtime.Scheme
}

Reconcile函数

大多数 controllers 最终都会运行在 k8s 集群上,因此它们需要 RBAC 权限, 我们使用 controller-tools RBAC markers 指定了这些权限。 这是运行所需的最小权限。

1
2
// +kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch.tutorial.kubebuilder.io,resources=cronjobs/status,verbs=get;update;patch
1
func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)

Reconcile 方法对个某个单一的 object 执行 reconciling 动作,req内容是一个 NamespacedName,client 可以通过 NamespacedName 信息从 cache 中获取到对应的 object。

返回的 result 为空,且 error 为 nil, 这表明 controller-runtime 已经成功 reconciled 了这个 object,无需进行任何重试,直到这个 object 被更改。

SetupWithManager函数

1
2
3
4
5
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.CronJob{}).
Complete(r)
}

将此 reconciler 加到 manager,以便在启动 manager 时启动 reconciler。

编写Controllers

CronJob controller 的基本逻辑是:

  1. 加载 CronJob
  2. 列出所有 active jobs,并更新状态
  3. 根据历史记录清理 old jobs
  4. 检查 Job 是否已被 suspended(如果被 suspended,请不要执行任何操作)
  5. 获取到下一次要 schedule 的 Job
  6. 运行新的 Job, 确定新 Job 没有超过 deadline 时间,且不会被我们 concurrency 规则 block
  7. 如果 Job 正在运行或者它应该下次运行,请重新排队

CronJobReconciler添加计时器

模拟时钟:需要一个 Clock 字段,它在测试中伪装计时。

1
2
3
4
5
6
7
8
9
10
11
12
13
type CronJobReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Clock
}
type realClock struct{}

func (_ realClock) Now() time.Time { return time.Now() }

type Clock interface {
Now() time.Time
}

reconciler 的逻辑

按 namespace 加载 CronJob

使用 client 获取 CronJob。使用 NamespacedName作为中间参数获取CronJob对象。

1
2
3
4
5
6
7
8
var cronJob batch.CronJob
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
log.Error(err, "unable to fetch CronJob")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

列出所有 active jobs,并更新状态

需要列出此 namespace 中属于此 CronJob 的所有 Job。 与 Get 方法类似,可以使用 List 方法列出 Job。 注意,使用可变参数选项来设置 client.InNamespaceclient.MatchingFields

client.InNamespace(req.Namespace)这个表示:只查找指定命名空间下的资源

client.MatchingFields{jobOwnerKey: req.Name}它的意思是:只匹配 .metadata.ownerReferences.name == req.Name 的 Job,也就是属于当前 CronJob 的 Job

jobOwnerKey 是自定义的 index key,后面介绍。

主要的步骤如下:

  1. 列出所有关联的 Job
  2. 遍历当前 CronJob 的所有子 Job,对它们进行分类
  3. 遍历的同时,从注解提取调度时间,并记录最后一次执行的调度时间
  4. 更新 CronJob 资源中的 Status的LastScheduleTime,就是记录的最后一次执行的调度时间
  5. 更新 CronJob 资源中的 Status的Active,就是分类出的activeJobs
  6. 更新 CronJob 的状态信息(Status)到 Kubernetes 集群中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// 列出所有关联的 Job
var childJobs kbatch.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
log.Error(err, "unable to list child Jobs")
return ctrl.Result{}, err
}
// 分类 Job:成功、失败、运行中
var activeJobs []*kbatch.Job
var successfulJobs []*kbatch.Job
var failedJobs []*kbatch.Job
var mostRecentTime *time.Time // find the last run so we can update the status
// 判断一个 Job 是否已经“完成”,(成功或失败)
isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
for _, c := range job.Status.Conditions {
if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
// 找到符合条件的 Condition,就立即返回 (true, c.Type)
return true, c.Type
}
}
// 如果遍历完都没有找到符合条件的项
// 表示这个 Job 还没有完成,也就是仍在运行中或等待中。
return false, ""
}
// 从 Job 的注解中提取调度时间(ScheduledTime)并返回一个 time.Time 类型的时间戳。
getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
timeRaw := job.Annotations[scheduledTimeAnnotation]
if len(timeRaw) == 0 {
return nil, nil
}

timeParsed, err := time.Parse(time.RFC3339, timeRaw)
if err != nil {
return nil, err
}
return &timeParsed, nil
}
// 遍历当前 CronJob 的所有子 Job,对它们进行分类,
// 并提取每个 Job 的调度时间(从 annotation 中拿),以便控制器后续判断和处理。
for i, job := range childJobs.Items {
// 判断这个 Job 是否完成,并获取它的完成类型
_, finishedType := isJobFinished(&job)
switch finishedType {
// 还在运行的放入 activeJobs
case "": // ongoing
activeJobs = append(activeJobs, &childJobs.Items[i])
case kbatch.JobFailed:
failedJobs = append(failedJobs, &childJobs.Items[i])
case kbatch.JobComplete:
successfulJobs = append(successfulJobs, &childJobs.Items[i])
}

// We'll store the launch time in an annotation, so we'll reconstitute that from
// the active jobs themselves.
// 从 Job 的 annotation 中获取调度时间
scheduledTimeForJob, err := getScheduledTimeForJob(&job)
// 解析失败,打印日志并跳过这个 Job
if err != nil {
log.Error(err, "unable to parse schedule time for child job", "job", &job)
continue
}
// mostRecentTime 会记录该 CronJob 最近一次实际执行 Job 的时间
if scheduledTimeForJob != nil {
if mostRecentTime == nil || mostRecentTime.Before(*scheduledTimeForJob) {
mostRecentTime = scheduledTimeForJob
}
}
}
// 更新 CronJob 资源中的 Status.LastScheduleTime 字段,用来记录 CronJob 最后一次成功调度时间。
if mostRecentTime != nil {
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
} else {
cronJob.Status.LastScheduleTime = nil
}
cronJob.Status.Active = nil
// 将 当前活跃的 Job(即正在执行中的 Job)更新到 CronJob 的 Status.Active 字段。
for _, activeJob := range activeJobs {
// 将 activeJob 转换为 Kubernetes 对象引用(ObjectReference)。
jobRef, err := ref.GetReference(r.Scheme, activeJob)
// 生成 ObjectReference 时发生错误,则记录错误并跳过当前的 activeJob
if err != nil {
log.Error(err, "unable to make reference to active job", "job", activeJob)
continue
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
}
// 帮助开发人员或者运维人员监控 CronJob 的执行状态,查看当前系统中各个 Job 的情况。
log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
// 更新 CronJob 的状态信息到 Kubernetes 集群中
if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "unable to update CronJob status")
return ctrl.Result{}, err
}

根据历史记录清理过期 jobs

清理过期的失败作业(failed jobs),并根据 CronJob 中的 FailedJobsHistoryLimit 配置删除历史失败的 Job

成功的作业同理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 清理过期的失败作业(failed jobs),并根据 CronJob 中的 FailedJobsHistoryLimit 配置删除历史失败的 Job
if cronJob.Spec.FailedJobsHistoryLimit != nil {
// 按失败时间对失败作业排序, 最旧的失败作业排在前面。
sort.Slice(failedJobs, func(i, j int) bool {
if failedJobs[i].Status.StartTime == nil {
return failedJobs[j].Status.StartTime != nil
}
return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
})
// 删除超出历史保留限制的失败作业
for i, job := range failedJobs {
// 检查当前遍历的失败作业是否超过了保留的最大历史失败作业数。
if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
break
}
// 删除作业
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete old failed job", "job", job)
} else {
log.V(0).Info("deleted old failed job", "job", job)
}
}
}

if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
sort.Slice(successfulJobs, func(i, j int) bool {
if successfulJobs[i].Status.StartTime == nil {
return successfulJobs[j].Status.StartTime != nil
}
return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
})
for i, job := range successfulJobs {
if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil {
log.Error(err, "unable to delete old successful job", "job", job)
} else {
log.V(0).Info("deleted old successful job", "job", job)
}
}
}

检查 Job 是否已被 suspended

如果这个 object 已被 suspended,会立即 return。

1
2
3
4
if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
log.V(1).Info("cronjob suspended, skipping")
return ctrl.Result{}, nil
}

获取到下一次要 schedule 的 Job的时间

如果 Job 没有被暂停,则需要计算错过但仍然在允许范围内的执行时间(missedRun)和下一次调度的时间(nextRun)。

在这里需要StartingDeadlineSeconds来实现missedRun的计算。

[!NOTE]

假设上一次调度时间是12:00,调度规则是每小时调度 ,StartingDeadlineSeconds是30分钟。

当前时间是13:00,设置earliestTime为上一次调度时间12:00。当前时间13:00-30分钟(StartingDeadlineSeconds)=12:30,12:30在earliestTime之后,设置earliestTime为12:30。

earliestTime(12:30)在当前时间(13:00)之前,此时说明存在可以调度的时间,根据earliestTime和调度规则计算出调度时间为13:00。后续就会在13:00时调度。missedRun为13:00。

假设上一次调度时间是12:00,调度规则是每小时调度 ,StartingDeadlineSeconds是30分钟。

当前时间是13:28,假设调度出现问题,在13:00时没有调度,因此上一次调度时间是12:00。

设置earliestTime为上一次调度时间12:00。当前时间13:28-30分钟(StartingDeadlineSeconds)=12:58,12:58在earliestTime(12:00)之后,设置earliestTime为12:58。

earliestTime(12:58)在在当前时间(13:28)之前,此时说明存在可以调度的时间,根据earliestTime和调度规则计算出调度时间为13:00。后续就会在13:00时调度。missedRun为13:00。

这两项说明:当调度出现问题,在13:00时没有调度,一直到13:30这个范围内,都是可以调度的,这就是StartingDeadlineSeconds的作用。

假设上一次调度时间是12:00,调度规则是每小时调度 ,StartingDeadlineSeconds是3小时。

当前时间是18:11,假设调度出现问题,在到当前时间的范围内没有调度,因此上一次调度时间是12:00。

设置earliestTime为上一次调度时间12:00。当前时间18:11-3小时(StartingDeadlineSeconds)=15:11,

15:11在earliestTime(12:00)之后,设置earliestTime为15:11。

earliestTime(15:11)在在当前时间(18:11)之前,此时说明存在可以调度的时间,根据earliestTime和调度规则计算出调度时间为16:00、17:00和18:00,由于StartingDeadlineSeconds的过大,存在多个未调度的时间点的时候,遍历这些事件点找到离当前时间最近的时间点,该时间点为18:00。missedRun为18:00。

根据earliestTime和调度规则计算出调度时间

1
2
t := sched.Next(earliestTime)
// 假设earliestTime为11:10,规则为每小时调度,第一次调用t为12:00,依次分别为13:00、14:00.....

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
// 将 Cron 表达式字符串 (cronJob.Spec.Schedule) 解析成一个 Cron 表达式调度器 sched。
sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
}

// for optimization purposes, cheat a bit and start from our last observed run time
// we could reconstitute this here, but there's not much point, since we've
// just updated it.
// earliestTime 是开始计算下一个调度时间的基础时间
var earliestTime time.Time
// 如果 CronJob 已经有 LastScheduleTime(上次成功调度的时间),则使用该时间。
if cronJob.Status.LastScheduleTime != nil {
earliestTime = cronJob.Status.LastScheduleTime.Time
} else {
// 如果没有 LastScheduleTime,则使用 CronJob 的创建时间作为开始时间
earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
}
// 限制我们调度“过去的漏掉的任务”的范围,避免太老的调度点被浪费资源去执行。
// 设置了 StartingDeadlineSeconds,表示在某个时间点之后的调度将被忽略
if cronJob.Spec.StartingDeadlineSeconds != nil {
// controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))
// 如果 earliestTime 晚于 schedulingDeadline,则将 earliestTime 更新为 schedulingDeadline。
if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return time.Time{}, sched.Next(now), nil
}

starts := 0

for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
// 最终记录的是最近一次错过的时间
lastMissed = t
// 一共发现了多少个错过的时间点
starts++
if starts > 100 {
// We can't get the most recent times so just return an empty slice
return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
}
}
return lastMissed, sched.Next(now), nil
}
// 错过的 Cron 执行时间, nextRun 下一次执行的时间
missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
if err != nil {
log.Error(err, "unable to figure out CronJob schedule")
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
}

如果missedRun为0,说明没有需要调度的时间点,直接返回下一次调度的时间。

1
2
3
4
5
6
7
8
scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
// 日志添加当前时间(now)和下一次运行时间(next run)两个字段。
log = log.WithValues("now", r.Now(), "next run", nextRun)
// 没有错过任何调度。
if missedRun.IsZero() {
log.V(1).Info("no upcoming scheduled times, sleeping until next")
return scheduledResult, nil
}

运行新的 Job, 确定新 Job 没有超过 deadline 时间,且不会被 concurrency 规则 block

检查 Job运行的时间点是否还在 deadline 时间内,没有直接返回scheduledResult
1
2
3
4
5
6
7
8
9
10
11
12
13
log = log.WithValues("current run", missedRun)
// 定义一个标志变量,表示“是否已经太迟不能补救”
tooLate := false
if cronJob.Spec.StartingDeadlineSeconds != nil {
// 当前时间是否已经超过了 missedRun + startingDeadlineSeconds
tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
}
if tooLate {
// “太迟”,就打印日志,什么都不做,等待下次调度
log.V(1).Info("missed starting deadline for last run, sleeping till next")
// TODO(directxman12): events
return scheduledResult, nil
}

[!NOTE]

个人理解:检查是否还在 deadline 时间内

missedRun在上一步骤是通过sched.Next(当前时间-StartingDeadlineSeconds)计算得出,若missedRun存在,那么条件missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now()),一定是可以满足的,所以这一步骤做了冗余检查。

controller 有延迟(reconcile 不是实时触发),从上次计算r.Now()和这次调用的r.Now()计算的时间不同,所以不能 100% 保证 missedRun 一定还在“合法的执行时间窗口”内。因此要做一次检查。

执行并发规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 如果当前 CronJob 的策略是禁止并发(ForbidConcurrent),并且还有未完成的 Job, 直接跳过本次调度
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
return scheduledResult, nil
}

// ...or instruct us to replace existing ones...
// 替换运行中的 Job
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
// 当前所有还在运行的 Job 全部删除
for _, activeJob := range activeJobs {
// we don't care if the job was already deleted
if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete active job", "job", activeJob)
return ctrl.Result{}, err
}
}
}

创建job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 创建实际 Job 的核心逻辑
constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())

job := &kbatch.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: make(map[string]string),
Annotations: make(map[string]string),
Name: name,
Namespace: cronJob.Namespace,
},
Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
}
for k, v := range cronJob.Spec.JobTemplate.Annotations {
job.Annotations[k] = v
}
// 注解添加执行的时间
job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
for k, v := range cronJob.Spec.JobTemplate.Labels {
job.Labels[k] = v
}
// 表示这个 Job 是由哪个 CronJob 控制的
if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
return nil, err
}

return job, nil
}
// actually make the job...
// 构造 Job
job, err := constructJobForCronJob(&cronJob, missedRun)
if err != nil {
log.Error(err, "unable to construct job from template")
// don't bother requeuing until we get a change to the spec
return scheduledResult, nil
}

// ...and create it on the cluster
// 集群创建job
if err := r.Create(ctx, job); err != nil {
log.Error(err, "unable to create Job for CronJob", "job", job)
return ctrl.Result{}, err
}

log.V(1).Info("created Job for CronJob run", "job", job)

return scheduledResult, nil

SetupWithManager 逻辑

初始化r.Clock

Reconciler 设置一个默认的“时钟实现”。

CronJob Controller 里,需要获取“当前时间”来决定下一次调度时间是否到了,比如在这个调用中:

1
missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())

看到的 r.Now(),其实就是:

1
2
3
4
5
6
7
8
9
type Clock interface {
Now() time.Time
}

type realClock struct{}

func (realClock) Now() time.Time {
return time.Now()
}

添加索引

还记得查找job时的函数吗?

1
r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name})

r.List:用来列出某种类型的 Kubernetes 对象。在这里是列出所有 Job 类型的对象,并将它们填充到 childJobs

client.MatchingFields{jobOwnerKey: req.Name}字段选择器(field selector),表示只匹配jobOwnerKey 字段等于 req.Name 的 Job。

jobOwnerKey 是哪里来的?

在 controller 初始化时,手动为 Job 设置的一个索引字段 key

1
2
3
4
var (
jobOwnerKey = ".metadata.controller"
apiGVStr = batchv1.GroupVersion.String()
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey,
func(rawObj client.Object) []string {
// grab the job object, extract the owner...
// 首先将原始对象转换为 kbatch.Job 类型
job := rawObj.(*kbatch.Job)
owner := metav1.GetControllerOf(job)
if owner == nil {
return nil
}
// ...make sure it's a CronJob...
if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
return nil
}

// ...and if so, return it
return []string{owner.Name}
}); err != nil {
return err
}

mgr

mgr.GetFieldIndexer().IndexField(...)

  • 这是 controller-runtime 提供的方法,用来给某种资源(这里是 Job)建立字段索引。
  • jobOwnerKey 是索引字段的名字,通常是个字符串(比如 "metadata.owner" 或者自定义的名字)。
  • 后面那个函数是如何从 Job 中提取这个字段值的逻辑。
1
2
3
4
5
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.CronJob{}).
Owns(&kbatch.Job{}).
Named("cronjob").
Complete(r)

“告诉 Controller Manager:要管理 CronJob 对象(主资源),并且也需要关注由它控制的 Job 对象(子资源)。”

main.go

使用任何其他 CRD,则必须以相同的方式添加其 scheme。

1
2
3
4
5
6
7
8
9
10
11
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

func init() {
_ = clientgoscheme.AddToScheme(scheme)

_ = batchv1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme
}

更改是 kubebuilder 添加了一个块代码, 该代码调用了我们的 CronJob controller 的 SetupWithManager 方法。

1
2
3
4
5
6
7
if err = (&controller.CronJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CronJob")
os.Exit(1)
}

设置 webhook,只需要将它们添加到管理器中即可。可能希望单独运行 webhook,或者在本地测试控制器时不运行它们,因此将它们放在环境变量后面。

1
2
3
4
5
6
if os.Getenv("ENABLE_WEBHOOKS") != "false" {
if err = webhookbatchv1.SetupCronJobWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "CronJob")
os.Exit(1)
}
}

其他详解

Reconcile返回值

Reconcile() 的签名是这样的:

1
Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
返回值 含义
ctrl.Result{} 不需要立即重新调度
ctrl.Result{Requeue: true} 立刻重新调度
ctrl.Result{RequeueAfter: time.Duration} 过一段时间后自动调度
error != nil 表示出错,控制器会立即重试

controller-runtime 框架内部根据返回的 ctrl.Result 自动决定是否/何时重新执行 Reconcile() 的。不需要自己管理这些调度行为。

框架源码示意(仅供理解)

在 controller-runtime 源码中(比如在 pkg/internal/controller/controller.go 里)有类似的逻辑:

1
2
3
4
5
if result.Requeue {
queue.AddRateLimited(req)
} else if result.RequeueAfter > 0 {
queue.AddAfter(req, result.RequeueAfter)
}

controller 会把 req(也就是那个 CronJob 的名字)重新塞到队列里,等时间到了就再次调用 Reconcile

执行流程

SetupWithManager 先执行,Reconcile 只有在资源变更或手动触发后才执行。

启动流程详细解释

main.go 中写了这样的代码:

1
2
3
4
5
6
if err := (&controllers.CronJobReconciler{
Client: mgr.GetClient(),
// ...
}).SetupWithManager(mgr); err != nil {
// handle error
}

这段代码注册了自定义的 Reconcile 逻辑到控制器运行框架中(controller-runtime 的 manager),这个 SetupWithManager() 的作用是:告诉 controller-runtime:这个控制器要 watch 哪些资源、怎么处理它们。

1
2
3
4
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}

这是一个阻塞操作,一旦调用:

  • 所有注册的 controller 都会启动控制循环。
  • 各 controller 会开始监听它们所关心的 Kubernetes 资源变更(通过 informer)。

SetupWithManager 什么时候执行?

SetupWithManager它在 mgr.Start() 之前执行,仅执行 一次。做的事情包括:

  • 通过 .For().Owns() 设置要 watch 的 GVK(资源类型)
  • 构造 controller 对象,并注入 Reconcile 逻辑
  • 注册到 manager 中的控制器队列里

它是注册阶段的配置函数,而不是控制循环的一部分。

Reconcile 什么时候执行?

  • manager 启动后
  • 当 Kubernetes 中的资源(例如 CronJob、Job)发生变化时(增删改)
  • 或你手动设置了 .Result{Requeue: true}.RequeueAfter
  • 或用 event recorder 或 channel 明确触发它
1
2
3
4
5
6
7
8
9
main() 开始

SetupWithManager() 执行(注册控制器)

mgr.Start() 启动 manager,控制循环开始

监听资源变更(informer)

当有资源变化 -> 调用 Reconcile()

webhook

只需要实现 Defaulter 和 (或) Validator 接口

其余的东西 Kubebuilder 会自动实现,比如:

  1. 创建一个 webhook server
  2. 确保这个 server 被添加到 manager 中
  3. 为你的 webhooks 创建一个 handlers
  4. 将每个 handler 以 path 形式注册到你的 server 中

首先,让我们为 CRD(CronJob)创建 webhooks

1
2
kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation

这将创建 Webhook 功能相关的方法,并在 main.go 中注册 Webhook 到你的 manager 中。

评论