因此,每个具有功能的对象都包含 Spec 和 Status 。 但是 ConfigMap 之类的一些类型不遵循此模式,因为它们不会维护一种状态,但是大多数类型都需要。
1 2 3 4 5 6 7 8 9 10
type CronJobSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file }
// CronJobStatus defines the observed state of CronJob type CronJobStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file }
CronJobList 包含了一个 CronJob 的切片,它是用来批量操作 Kind 的,比如 LIST 操作。通常不会修改它们所有的修改都是在 Spec 和 Status 上进行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// +kubebuilder:object:root=true // CronJob is the Schema for the cronjobs API type CronJob struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"`
Spec CronJobSpec `json:"spec,omitempty"` Status CronJobStatus `json:"status,omitempty"` }
// +kubebuilder:object:root=true // CronJobList contains a list of CronJob type CronJobList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` Items []CronJob `json:"items"` }
var ( // GroupVersion is group version used to register these objects GroupVersion = schema.GroupVersion{Group: "batch.tutorial.kubebuilder.io", Version: "v1"}
// SchemeBuilder is used to add go types to the GroupVersionKind scheme SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} // AddToScheme adds the types in this group-version to the given scheme. AddToScheme = SchemeBuilder.AddToScheme )
// +kubebuilder:validation:Enum=Allow;Forbid;Replace type ConcurrencyPolicy string
const ( // AllowConcurrent allows CronJobs to run concurrently. AllowConcurrent ConcurrencyPolicy = "Allow"
// ForbidConcurrent forbids concurrent runs, skipping next run if previous // hasn't finished yet. ForbidConcurrent ConcurrencyPolicy = "Forbid"
// ReplaceConcurrent cancels currently running job and replaces it with a new one. ReplaceConcurrent ConcurrencyPolicy = "Replace" )
CronJobstatus
它用来存储观察到的状态。它包含 controllers 能够获取的所有信息。
1 2 3 4 5 6 7 8
type CronJobStatus struct { // +optional Active []corev1.ObjectReference `json:"active,omitempty"`
// Information when was the last time the job was successfully scheduled. // +optional LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"` }
var cronJob batch.CronJob if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil { log.Error(err, "unable to fetch CronJob") // we'll ignore not-found errors, since they can't be fixed by an immediate // requeue (we'll need to wait for a new notification), and we can get them // on deleted requests. return ctrl.Result{}, client.IgnoreNotFound(err) }
列出所有 active jobs,并更新状态
需要列出此 namespace 中属于此 CronJob 的所有 Job。 与 Get 方法类似,可以使用 List 方法列出 Job。 注意,使用可变参数选项来设置 client.InNamespace 和 client.MatchingFields。
// for optimization purposes, cheat a bit and start from our last observed run time // we could reconstitute this here, but there's not much point, since we've // just updated it. // earliestTime 是开始计算下一个调度时间的基础时间 var earliestTime time.Time // 如果 CronJob 已经有 LastScheduleTime(上次成功调度的时间),则使用该时间。 if cronJob.Status.LastScheduleTime != nil { earliestTime = cronJob.Status.LastScheduleTime.Time } else { // 如果没有 LastScheduleTime,则使用 CronJob 的创建时间作为开始时间 earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time } // 限制我们调度“过去的漏掉的任务”的范围,避免太老的调度点被浪费资源去执行。 // 设置了 StartingDeadlineSeconds,表示在某个时间点之后的调度将被忽略 if cronJob.Spec.StartingDeadlineSeconds != nil { // controller is not going to schedule anything below this point schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)) // 如果 earliestTime 晚于 schedulingDeadline,则将 earliestTime 更新为 schedulingDeadline。 if schedulingDeadline.After(earliestTime) { earliestTime = schedulingDeadline } } if earliestTime.After(now) { return time.Time{}, sched.Next(now), nil }
starts := 0
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) { // 最终记录的是最近一次错过的时间 lastMissed = t // 一共发现了多少个错过的时间点 starts++ if starts > 100 { // We can't get the most recent times so just return an empty slice return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.") } } return lastMissed, sched.Next(now), nil } // 错过的 Cron 执行时间, nextRun 下一次执行的时间 missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now()) if err != nil { log.Error(err, "unable to figure out CronJob schedule") // we don't really care about requeuing until we get an update that // fixes the schedule, so don't return an error return ctrl.Result{}, nil }
如果missedRun为0,说明没有需要调度的时间点,直接返回下一次调度的时间。
1 2 3 4 5 6 7 8
scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere // 日志添加当前时间(now)和下一次运行时间(next run)两个字段。 log = log.WithValues("now", r.Now(), "next run", nextRun) // 没有错过任何调度。 if missedRun.IsZero() { log.V(1).Info("no upcoming scheduled times, sleeping until next") return scheduledResult, nil }
// ...or instruct us to replace existing ones... // 替换运行中的 Job if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent { // 当前所有还在运行的 Job 全部删除 for _, activeJob := range activeJobs { // we don't care if the job was already deleted if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil { log.Error(err, "unable to delete active job", "job", activeJob) return ctrl.Result{}, err } } }
// 创建实际 Job 的核心逻辑 constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) { // We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())
job := &kbatch.Job{ ObjectMeta: metav1.ObjectMeta{ Labels: make(map[string]string), Annotations: make(map[string]string), Name: name, Namespace: cronJob.Namespace, }, Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(), } for k, v := range cronJob.Spec.JobTemplate.Annotations { job.Annotations[k] = v } // 注解添加执行的时间 job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339) for k, v := range cronJob.Spec.JobTemplate.Labels { job.Labels[k] = v } // 表示这个 Job 是由哪个 CronJob 控制的 if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil { return nil, err }
return job, nil } // actually make the job... // 构造 Job job, err := constructJobForCronJob(&cronJob, missedRun) if err != nil { log.Error(err, "unable to construct job from template") // don't bother requeuing until we get a change to the spec return scheduledResult, nil }
// ...and create it on the cluster // 集群创建job if err := r.Create(ctx, job); err != nil { log.Error(err, "unable to create Job for CronJob", "job", job) return ctrl.Result{}, err }
log.V(1).Info("created Job for CronJob run", "job", job)