Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007
kubernetes cronjob controller设计与源码分析 | 专注go语言|微服务|分布式|kubernetes系统研发与源码分析
Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

Warning: mysqli_query(): (HY000/1030): Got error 28 from storage engine in /www/wwwroot/www.sreguide.com/wp-includes/wp-db.php on line 2007

kubernetes cronjob controller设计与源码分析

CronJob Controller

kubernetes cronjob controller设计与源码分析

CronJobController主要负责周期性任务的调度, 就跟linux中的crontab程序类似, 当达到时间就根据策略调用apiserver进行任务的创建或者状态的修改

当前CronJobController的问题

  • CronJobController并没有使用informer来进行时间监听apiserver, 而是直接进行周期循环
  • 单groutine进行调度

informer与CronJob两种模型

  • informer
kubernetes cronjob controller设计与源码分析
  • 定时任务逻辑
kubernetes cronjob controller设计与源码分析

定时任务为什么没有一开始就使用informer模型, 而要实现成现在这个样子(官方从2015年底就打算修复, 但是一直到现在, 都没有release), 问题是什么呢, 如果换成队列, 会有哪些问题?

  • 为什么syncAll中要获取所有的任务和定时任务
  • 定时任务为什么没使用informer、queue、worker这种结构
  • 当前controller有哪些问题
  • 如果要修复问题是什么
  • 有那些取舍

简单粗暴的实现

数据结构

type CronJobController struct {
    kubeClient clientset.Interface
    jobControl jobControlInterface
    // sjController 原来是scheduler job,后面才改名未cron job
    sjControl  sjControlInterface
    podControl podControlInterface
    recorder   record.EventRecorder
}

全部都是要操作apiserver的Control, 并没有像Job那样的expectations、队列等(就是直接操作apiserver)

启动

func (jm *CronJobController) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    glog.Infof("Starting CronJob Manager")
    // Check things every 10 second.
    go wait.Until(jm.syncAll, 10*time.Second, stopCh)
    <-stopCh
    glog.Infof("Shutting down CronJob Manager")
}

启动也没有等待, 也没有启动多个groutine,就是单纯的10秒一次的循环

同步逻辑

func (jm *CronJobController) syncAll() {
    // List children (Jobs) before parents (CronJob).
    // This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
    // we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
    // Note that this only works because we are NOT using any caches here.
    // 获取所有namespace的所有job
    jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("can't list Jobs: %v", err))
        return
    }
    js := jl.Items
    glog.V(4).Infof("Found %d jobs", len(js))

    // 获取所有的cron job
    sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("can't list CronJobs: %v", err))
        return
    }
    sjs := sjl.Items
    glog.V(4).Infof("Found %d cronjobs", len(sjs))

    // 根据cron job对job进行分组
    jobsBySj := groupJobsByParent(js)
    glog.V(4).Infof("Found %d groups", len(jobsBySj))

    for _, sj := range sjs {
        syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
        cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
    }
}

同步也没有多余的操作, 就是同步所有的Job和CronJob, 本地也不做缓存, 最终通过syncOne和cleanupFinishedJobs来进行CronJob的调度和清理工作

调度逻辑

kubernetes cronjob controller设计与源码分析

在调度逻辑中, CronJobController会获取所有的CronJob里面的job信息, 进行对比, 用于进行接下来业务逻辑的决策, 这个地方可能就是CronJobController为什么要获取所有任务的理由了

func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
    nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)

    childrenJobs := make(map[types.UID]bool)
    // 遍历UID等于当前cron job的job列表
    for _, j := range js {
        childrenJobs[j.ObjectMeta.UID] = true
        // 检查当前job是否在cron job的活动列表里面
        found := inActiveList(*sj, j.ObjectMeta.UID)
        // 如果没有找到, 并且当前job也未结束
        if !found && !IsJobFinished(&j) {
            recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
        } else if found && IsJobFinished(&j) {
            // 如果job已经结束, 就从当前cron job的活动列表中删除, 并且修改contab的活动列表
            deleteFromActiveList(sj, j.ObjectMeta.UID)
            // TODO: event to call out failure vs success.
            recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
        }
    }

    // 遍历cron job的活动列表,如果未找到任务就删除
    for _, j := range sj.Status.Active {
        if found := childrenJobs[j.UID]; !found {
            recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
            deleteFromActiveList(sj, j.UID)
        }
    }

    // 调用apiserver修改当前任务的状态
    updatedSJ, err := sjc.UpdateStatus(sj)
    if err != nil {
        glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
        return
    }
    *sj = *updatedSJ

    // 如果cron job删除直接返回
    if sj.DeletionTimestamp != nil {
        return
    }

    // 如果job已经挂起,
    if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
        glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
        return
    }

    // 获取接下来的调度时间
    times, err := getRecentUnmetScheduleTimes(*sj, now)
    if err != nil {
        recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
        glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
        return
    }
    // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
    // 启动时间为0, 表示当前job不需要被调度
    if len(times) == 0 {
        glog.V(4).Infof("No unmet start times for %s", nameForLog)
        return
    }
    if len(times) > 1 {
        glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
    }

    // 通过计算获取了到现在为止所有调度时间的一个切片, 取到当前时间为止
    scheduledTime := times[len(times)-1]
    tooLate := false
    // 如果最后一次需要被调度的时间, 加上最后调度期限, 在now之前, 就直接退出
    if sj.Spec.StartingDeadlineSeconds != nil {
        tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
    }
    if tooLate {
        glog.V(4).Infof("Missed starting window for %s", nameForLog)
        recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
        return
    }
    // 如果当前不允许并发, 并且当前已经有job启动, 退出
    if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
        glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
        return
    }
    // 如果当前并发策略是替换, 就获取到当前的所有启动的任务, 并删除
    if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
        for _, j := range sj.Status.Active {
            glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)

            // 获取job删除
            job, err := jc.GetJob(j.Namespace, j.Name)
            if err != nil {
                recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
                return
            }
            if !deleteJob(sj, job, jc, pc, recorder, "") {
                return
            }
        }
    }

    // 获取job信息
    jobReq, err := getJobFromTemplate(sj, scheduledTime)
    if err != nil {
        glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
        return
    }
    // 创建Job
    jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
    if err != nil {
        recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
        return
    }
    glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
    recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

    // 添加创建的job到CronJob的Active中
    ref, err := getRef(jobResp)
    if err != nil {
        glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
    } else {
        sj.Status.Active = append(sj.Status.Active, *ref)
    }
    // 更新最后调度时间, 更新cron job状态
    sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
    if _, err := sjc.UpdateStatus(sj); err != nil {
        glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
    }

    return
}

总结

最开始的问题

  • 为什么syncAll中要获取所有的任务和定时任务
    • CronJob中要对比所有CronJob相关Job的信息, 主要原因可能是避免多次同步(调度策略里面,需要进行CronJob的并发控制)
    • controller故障后的恢复(对关联的job和pod进行回收逻辑处理)
  • 定时任务为什么没使用informer、queue、worker这种结构
    • 官方是有计划进行这个操作的, 但是并没有release, 主要是想解决同步job和 Cronjob的同步问题
    • 多个worker之间的竞态条件(多个job和CronJob重复操作)
  • 当前controller有哪些问题
    • 同步问题, 在k8s中,informer就相当于我们的实时同步的数据
    • 并发问题, 将CronJob和Job进行组合然后分配到对应的worker, 增加类似亲和性这种

我的设计

kubernetes cronjob controller设计与源码分析

Dispatcher其实需要负责两个事情

  • 定时轮训informer实现当前的定时调度策略
  • 接收事件, 同时按照亲和性(CronJob和Job两者的亲和性)分发到对应的worker

自己的思考

理想和现实总会有取舍, 没有说通用的设计解决模式,合适的场景,合适的取舍, 需要学习的嗨很多

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/uncategorized/kubernetes-cronjob-controller%e8%ae%be%e8%ae%a1%e4%b8%8e%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169