kubernetes job controller 底层实现原理剖析

Job Controller

关注job controller需要关心哪些方面的事情

  • controller内部的数据结构(what whyhow detail)
  • controller内部的业务逻辑实现
  • 几种不同job模式的内部实现
  • controller使用的算法

Controller简介

Controller是k8s的控制器,每类资源都有一个controller, Controller只做两件事

  • 监听Apiserver产生的事件
  • 根据事件和资源参数, 构建要进行的操作修改资源的目标状态

Controller 组成

kubernetes job controller 底层实现原理剖析

Controller从功能上可以拆成下面几个部分

  • 事件源: informer 负责同步apiserver的数据,放入queue
  • 控制器队列queue 延迟队列,相当于本地的 事件数据库
  • worker 从queue中获取事件, 实现业务逻辑的处理

Controller内部逻辑

概览

kubernetes job controller 底层实现原理剖析

Job Controller负责k8s中job资源的处理, job资源通过pod模版启动对应的任务, 然后根据设定的参数, 直到达到满足当前job定义的状态

从上面可以看到内部处理逻辑如下

  • Controller注册自身ResourceEventHandler到Informer
  • informer接收到apiserver的事件, 分发事件到resourceEventHandler
  • resourceEventHandler将事件放入到queue中
  • worker 从queue中获取数据, 进行业务逻辑的处理

queue

Queue用于存储接controller收到的事件,主要作用如下

  • 提供多个worker并发获取事件处理
  • worker处理失败后, 提供延迟重试的功能

worker

worker其实就干一件事, 从queue队列中获取需要job, 然后检查当前job的状态是否达到预期, 如果没有, 则计算添加或者删除pod, 然后更新job状态

syncHandler与updateHandler

syncupdate两个handler是controller一种通用编程模式的实现, 每个controller都有这两个handler, 具体功能如下

  • syncHandler负责从informer获取数据, 同时判断当前事件是否需要处理(每个controller都不相同), 所以syncHandler我们可以理解为是一个*过滤层*, 从所有事件中过滤出需要被worker处理的事件(还有可能会从队列中删除事件), 同时根据事件计算出要进行的操作, 调用update
  • updateHandler其实就比较简单, 通过 syncHandler的一系列计算, 我们最终会有一个确定的操作, updateHandler, 就是负责将我们接下来的状态, 更新到apiserver, 形成一个闭环

Controller内部实现

kubernetes job controller 底层实现原理剖析

Job Controller核心业务逻辑如上图, 除了基本的Controller逻辑, 加入了一个Expectations, 用于记录对指定key预期要进行的Create/Delete操作数量(原子的)

Job Controller首先Expectations里面的次数,进行决策,一旦决策成功, 目标的次数就不会变了,如果这时informer获取到变化, 对应期望的次数, 可能会递减小于0

数据结构

kubernetes job controller 底层实现原理剖析
![](./_image/2018-10-11-17-10-13.png)
type JobController struct {
    // 操作APISERVER的接口
    kubeClient clientset.Interface
    // podControl调用APISERVER创建需要的pod
    podControl controller.PodControlInterface

    // 调用APISERVER进行更新的handler
    updateHandler func(job *batch.Job) error
    // 从queue和apiserver获取当前job的状态,并决定如何进行updateHandler操作
    syncHandler   func(jobKey string) (bool, error)

    // 等待Pod和Job Informer同步APISERVER完成
    podStoreSynced cache.InformerSynced
    jobStoreSynced cache.InformerSynced

    // 存放对job预期Create/Delete的次数和过期处理(5分钟)
    expectations controller.ControllerExpectationsInterface

    // 用于获取Job和pod关联信息
    jobLister batchv1listers.JobLister
    podStore corelisters.PodLister

    // 延迟队列, 存放待处理的事件
    queue workqueue.RateLimitingInterface
    // 记录日志
    recorder record.EventRecorder
}

创建Job Controller

kubernetes job controller 底层实现原理剖析
// NewJobController
    jm := &JobController{
        kubeClient: kubeClient,
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),   // Recorder记录当前事件来自那个组件
        },
        expectations: controller.NewControllerExpectations(),
        queue:        workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
        recorder:     eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
    }

    // 调用shared informer监听事件, 监测到job对象变化, add/Delete直接将object丢到队列中,交给worker处理
    jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
        UpdateFunc: jm.updateJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
    })
    // Lister从本地的cache中获取job对象
    jm.jobLister = jobInformer.Lister()
    jm.jobStoreSynced = jobInformer.Informer().HasSynced

    // 设置监听pod事件过滤出job controller的job对象, 加入队列等待worker处理
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    jm.addPod,
        UpdateFunc: jm.updatePod,
        DeleteFunc: jm.deletePod,
    })
    jm.podStore = podInformer.Lister()
    jm.podStoreSynced = podInformer.Informer().HasSynced

    // 设置对应的处理同步和更新处理函数
    jm.updateHandler = jm.updateJobStatus
    jm.syncHandler = jm.syncJob

启动job Controller

// Run the main goroutine responsible for watching and syncing jobs.
func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
    // 等待pod和job同步完成
    if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {
        return
    }

    // 启动worker数量, ConcurrentJobSyncs, 默认是5
    for i := 0; i < workers; i++ {
        go wait.Until(jm.worker, time.Second, stopCh)
    }

    <-stopCh
}

Job Informer的ResourceEventHandler

kubernetes job controller 底层实现原理剖析
  • ResourceEventHandler
    // 调用shared informer监听事件, 监测到job对象变化, add/Delete直接将object丢到队列中,交给worker处理
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
        UpdateFunc: jm.updateJob,
        DeleteFunc: func(obj interface{}) {
            jm.enqueueController(obj, true)
        },
    })
  • updateJob
    if curJob.Status.StartTime != nil {
        // 如果job已经启动, 获取job的最后期限
        curADS := curJob.Spec.ActiveDeadlineSeconds
        if curADS == nil {
            return
        }
        oldADS := oldJob.Spec.ActiveDeadlineSeconds
        if oldADS == nil || *oldADS != *curADS {
            // 获取当前时间和任务的启动时间,减去消耗的时间, 然后计算延迟时间, 最后加入到队列中
            now := metav1.Now()
            start := curJob.Status.StartTime.Time
            passed := now.Time.Sub(start)
            // 获取延迟操作的时间
            total := time.Duration(*curADS) * time.Second
            // AddAfter will handle total < passed
            // 添加事件到队列中,
            jm.queue.AddAfter(key, total-passed)
            glog.V(4).Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed)
        }
    }
}

我省略了updateJob中解析job添加到队列的业务逻辑, 其实核心是对ActiveDeadlineSeconds这个参数的控制, 如果有job设置这个参数并且已经启动, controoler会主动添加一个事件到延迟队列中, 这样及时后面事件没有发生, 但是达到指定时间后, worker就可以接收到事件进行job的取消操作了

Pod Informer的 ResourceEventHandler

kubernetes job controller 底层实现原理剖析
  • addPod
func (jm *JobController) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)
    // pod已经被删除了,
    if pod.DeletionTimestamp != nil {
        // 在重启controller, 在等待被删除, 就从本地队列里面删除对应的pod
        jm.deletePod(pod)
        return
    }

    // pod Informer会接收到所有controller创建的pod, 通过ControllerRef确认是否是当前Controller处理的
    if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
        // 获取job对象
        job := jm.resolveControllerRef(pod.Namespace, controllerRef)
        if job == nil {
            return
        }
        jobKey, err := controller.KeyFunc(job)
        if err != nil {
            return
        }
        // watch到一个事件, 将对应期望的Create操作期望值-1
        jm.expectations.CreationObserved(jobKey)
        jm.enqueueController(job, true)
        return
    }

    // 如果pod没有对应到controllerRef, 就尝试获取当前pod,是否跟job关联, 如果有跟job关联就加入到当前的队列中
    for _, job := range jm.getPodJobs(pod) {
        // 获取当前
        jm.enqueueController(job, true)
    }
}

updatePod和deletePod处理逻辑同addPod类似, 都是监测当前job是不是自身类型创建的, 如果是就获取对应的job加入到队列处理, 不同的是在deletePod和addPod里面会修改对应job的expectations的Delete/add属性

expectations

kubernetes job controller 底层实现原理剖析

expectations是job controller的一个辅助属性,表示当前controller对ControllerKey期望进行Create/Delete的次数, 主要功能如下

  • 用于存储对给定资源的预期操作(Create/Delete)两种类型
  • 通过判断预期条件是否满足和操作的时间,来确定当前job是否要从apiserver进行数据同步

worker

kubernetes job controller 底层实现原理剖析
func (jm *JobController) processNextWorkItem() bool {
    // 从队列中获取任务一个事件, 处理完成后删除
    key, quit := jm.queue.Get()
    if quit {
        return false
    }
    defer jm.queue.Done(key)

    // syncHandler里负责同步当前job的状态, 并通过计算预期状态触发updateHandler, 如果处理成功,就可以删除对应的key
    // 否则处理失败, 就将当前事件丢会到队列中, 等待重试
    forget, err := jm.syncHandler(key.(string))
    if err == nil {
        if forget {
            //  从当前队列中删除key
            jm.queue.Forget(key)
        }
        return true
    }

    utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))
    jm.queue.AddRateLimited(key)

    return true
}

syncHandler

kubernetes job controller 底层实现原理剖析

SyncHandler业务逻辑我从上下分了两个大的部分

  • 获取当前状态, 判断是否失败(如果需要同步, 则会直接同步)
  • 业务逻辑处理部分(根据上面获取的状态和预期状态, 调整当前状态)

最后调用updateHandler将修改的后的job提交大Apiserver完成逻辑处理

  • 代码
func (jm *JobController) syncJob(key string) (bool, error) {
    startTime := time.Now()
    defer func() {
        glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
    }()

    // 获取Job信息
    ns, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return false, err
    }
    if len(ns) == 0 || len(name) == 0 {
        return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
    }
    // 从informer中获取job的信息
    sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            glog.V(4).Infof("Job has been deleted: %v", key)
            jm.expectations.DeleteExpectations(key)
            return true, nil
        }
        return false, err
    }
    job := *sharedJob

    // 判断JOb是否结束
    if IsJobFinished(&job) {
        return true, nil
    }

    // 获取当前job重试的次数
    previousRetry := jm.queue.NumRequeues(key)

    // 从 store中获取key判断是否需要进行同步, 需要同步的逻辑是
    // expectations, 会存储当前key的add/del数次数,还有时间, 如果超过对应的策略, 则需要跟apiserver进行同步
    jobNeedsSync := jm.expectations.SatisfiedExpectations(key)

    // 获取job的所有pods
    pods, err := jm.getPodsForJob(&job)
    if err != nil {
        return false, err
    }

    // 获取当前激活状态的pods
    activePods := controller.FilterActivePods(pods)
    // 活动的pod数量
    active := int32(len(activePods))
    // 成功和失败的pod
    succeeded, failed := getStatus(pods)
    conditions := len(job.Status.Conditions)
    // job first start
    if job.Status.StartTime == nil {
        now := metav1.Now()
        job.Status.StartTime = &now
        // 如果job有设置最后期限,添加一个事件到延迟队列
        if job.Spec.ActiveDeadlineSeconds != nil {
            glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds",
                key, *job.Spec.ActiveDeadlineSeconds)
            jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
        }
    }

    var manageJobErr error
    jobFailed := false
    var failureReason string
    var failureMessage string

    // 是否有新的pod失败
    jobHaveNewFailure := failed > job.Status.Failed
    // 判断是否超过重试次数
    exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
        (int32(previousRetry)+1 > *job.Spec.BackoffLimit)

    // 如果超过重试次数, 同时已经超过了重试次数或者超过了最后期限则job失败
    // 注意,如果重试策略是RestartPolicyOnFailure则会一直重试
    if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
        jobFailed = true
        failureReason = "BackoffLimitExceeded"
        failureMessage = "Job has reached the specified backoff limit"
    } else if pastActiveDeadline(&job) {
        jobFailed = true
        failureReason = "DeadlineExceeded"
        failureMessage = "Job was active longer than specified deadline"
    }

    if jobFailed {
        // 如果job已经失败,  就删除对应的pod
        errCh := make(chan error, active)
        jm.deleteJobPods(&job, activePods, errCh)
        select {
        case manageJobErr = <-errCh:
            if manageJobErr != nil {
                break
            }
        default:
        }
        failed += active
        active = 0
        // 为job添加新的状态同时记录事件
        job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
        jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
    } else {
        // 如果job需要同步, 并且Job并未被删除, 则调用manageJob
        if jobNeedsSync && job.DeletionTimestamp == nil {
            active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
        }
        // 已经完成的任务
        completions := succeeded
        complete := false
        // 下面就是job工作模式中后两种模式的结束判断逻辑
        if job.Spec.Completions == nil {
            // 如果Completions为空, 则有一个成功并且没有活动的pod就成功, 这里会涉及到两种模式, 第一种和第三种, 只有有一个成功, 当前任务就成功
            // Job Template Expansion 模式1: 一个work item 一个pod 有一个成功, 当前job就成功
            // Queue with Variable Pod 模式2: 如果有一个任务成功, 并且active为0, 则当前任务成功, 否则需要等待所有的active都成功
            if succeeded > 0 && active == 0 {
                complete = true
            }
        } else {
            // 如果成功的任务数量大于Completions当前任务也成功
            if completions >= *job.Spec.Completions {
                complete = true
                if active > 0 {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
                }
                if completions > *job.Spec.Completions {
                    jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
                }
            }
        }
        if complete {
            // 如果成功, 就修改当前job的状态
            job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
            now := metav1.Now()
            job.Status.CompletionTime = &now
        }
    }

    forget := false
    // 补偿策略, 如果最后成功的数量> 当前成功的数量
    // 当并行数量> 1, 有一些job失败, 但是其他的都成功, 应该从延迟队列清除当前key, 目标是快速成功
    // 主要用于操作controller失败, 后面仍然希望能够快速成功
    if job.Status.Succeeded < succeeded {
        forget = true
    }

    // no need to update the job if the status hasn't changed since last time
    if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
        // 设置任务状态, 调用 APIserver修改任务
        job.Status.Active = active
        job.Status.Succeeded = succeeded
        job.Status.Failed = failed

        if err := jm.updateHandler(&job); err != nil {
            return forget, err
        }

        if jobHaveNewFailure && !IsJobFinished(&job) {
            // returning an error will re-enqueue Job after the backoff period
            return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
        }

        forget = true
    }

    return forget, manageJobErr
}

updateHandler

updatehandler其实比较简单就调用apiserver更新当前job的状态即可

func (jm *JobController) updateJobStatus(job *batch.Job) error {
    jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
    var err error
    for i := 0; i <= statusUpdateRetries; i = i + 1 {
        var newJob *batch.Job
        newJob, err = jobClient.Get(job.Name, metav1.GetOptions{})
        if err != nil {
            break
        }
        newJob.Status = job.Status
        if _, err = jobClient.UpdateStatus(newJob); err == nil {
            break
        }
    }

    return err
}

manageJob

syncHandler里面最终调用了updateHandler修改当前job的状态, 那具体的创建动作,其实就是在manageJob中进行的

  • syncHandler将活动的pod数量、成功的数量和job传递过来
  • manageJob会根据并行参数、成功、活跃进行计算, 然后直接进行pod添加和删除操作, 具体代码如下
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
    // manageJob 负责job的pod具体的操作, 如果超过job设定的并发数量就删除,  否则就添加
    // 会修改expectations中jobKey对应的具体操作的Expect, 同时进行对应的操作, 当流程走到当前模块, 后续及时还有当前job的操作, 也要继续进行
    var activeLock sync.Mutex
    active := int32(len(activePods))
    parallelism := *job.Spec.Parallelism
    jobKey, err := controller.KeyFunc(job)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
        return 0, nil
    }

    var errCh chan error
    if active > parallelism {
        diff := active - parallelism
        errCh = make(chan error, diff)
        // 计算我们预期要删除diff数量的pod, 设置ControlleeExpectations的del数量为diff
        jm.expectations.ExpectDeletions(jobKey, int(diff))
        glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
        // 根据状态和事件进行比较对应的pod排序, 按照时间等进行排序
        sort.Sort(controller.ActivePods(activePods))

        active -= diff
        wait := sync.WaitGroup{}
        wait.Add(int(diff))
        for i := int32(0); i < diff; i++ {
            go func(ix int32) {
                defer wait.Done()
                // 删除对应的pod
                if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
                    defer utilruntime.HandleError(err)
                    // Decrement the expected number of deletes because the informer won't observe this deletion
                    glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
                    // 每当完成一个操作就设置ControlleeExpectations的del删除减小1
                    jm.expectations.DeletionObserved(jobKey)
                    activeLock.Lock()
                    active++
                    activeLock.Unlock()
                    errCh <- err
                }
            }(i)
        }
        wait.Wait()

    } else if active < parallelism {
        wantActive := int32(0)
        if job.Spec.Completions == nil {
            if succeeded > 0 {
                wantActive = active
            } else {
                wantActive = parallelism
            }
        } else {
            wantActive = *job.Spec.Completions - succeeded
            if wantActive > parallelism {
                wantActive = parallelism
            }
        }
        // 获取要创建的pod的数量
        diff := wantActive - active
        if diff < 0 {
            utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))
            diff = 0
        }
        // 设置根据当前计算所需要添加的数量,设置ControlleeExpectations的add值为diff
        jm.expectations.ExpectCreations(jobKey, int(diff))
        // 创建一个错误的管道
        errCh = make(chan error, diff)
        glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

        // diff是我们要新建的pod, 期待结束后diff个pod都是成功的, 那active 就是active + diff
        active += diff
        wait := sync.WaitGroup{}

        // 创建对应的pod, 这个地方有速率是每次*2
        for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {

            errorCount := len(errCh)
            wait.Add(int(batchSize))
            // 开始创建任务, 如果创建任务失败
            for i := int32(0); i < batchSize; i++ {
                go func() {
                    defer wait.Done()
                    err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
                    if err != nil && errors.IsTimeout(err) {
                        return
                    }
                    if err != nil {
                        defer utilruntime.HandleError(err)
                        // Decrement the expected number of creates because the informer won't observe this pod
                        glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
                        // 每当进行操作的时候, 设置对应ControlleeExpectations的add减小1
                        // pod创建失败了, 也修改了jobKey对应的expectations
                        jm.expectations.CreationObserved(jobKey)
                        activeLock.Lock()
                        // active的数量 -1
                        active--
                        activeLock.Unlock()
                        // 将错误放入的errCh管道中
                        errCh <- err
                    }
                }()
            }
            wait.Wait()
            // any skipped pods that we never attempted to start shouldn't be expected.
            // 本次pod创建操作已经完成, diff - batchSize, 表示剩余要操作的pod数量
            skippedPods := diff - batchSize
            // 如果有错误, 并且未完成的pod数量大于0, 则关于jobKey的skippedPods数量的操作, 并不需要预期这些操作, 因为skippedPods这些pod不会被创建
            // 就不会被informer给watch到,
            if errorCount < len(errCh) && skippedPods > 0 {
                glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name)
                active -= skippedPods
                for i := int32(0); i < skippedPods; i++ {
                    // Decrement the expected number of creates because the informer won't observe this pod
                    // informer不需要关注
                    jm.expectations.CreationObserved(jobKey)
                }
                break
            }
            diff -= batchSize
        }
    }

    select {
    case err := <-errCh:
        // all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
        // 如果有多个错误就返回一个?
        if err != nil {
            return active, err
        }
    default:
    }

    return active, nil
}

思考

Job Controller逻辑本身并不复杂, 但我想理解每一部分看的就很慢, 在延迟队列、expectations、ControllerRef等上花了很长时间, 结合嘴上面的初衷和代码, 从job controller中能读到什么

  • 一个通用的事件处理模型
  • 延迟队列用于失败事件的延迟处理
  • expectations在informer和syncHandler之间的同步取舍
  • 业务逻辑故障后的恢复
  • 失败重试策略的具体实现
  • Job三种模式的实现

Job Controller暂时先看到这里, 看了三个早晨才看完, 先这样,后面再回来温习

原创文章,作者:baxiaoshi,如若转载,请注明出处:http://www.sreguide.com/go%e8%af%ad%e8%a8%80/kubernetes-job-controller-%e5%ba%95%e5%b1%82%e5%ae%9e%e7%8e%b0%e5%8e%9f%e7%90%86%e5%89%96%e6%9e%90.html

发表评论

电子邮件地址不会被公开。 必填项已用*标注

联系我们

QQ: 52866169