kube-scheduler是集群中Master节点的重要组件,其功能是根据调度算法计算,将Pod合理bind到Kubernetes集群中的各个node节点上,scheduler是怎么调度工作的?没什么文档比看源码逻辑更直接了,由于能力有限且源码庞大复杂,如有错误之处还望指正。

1cad0751-f657-4766-986a-20032bd6f083.png

k8s版本:1.13
代码下载: go get k8s.io/kubernetes

scheduler代码入口位置:
kubernetes/cmd/kube-scheduler/scheduler.go

func main() {
    rand.Seed(time.Now().UnixNano())

    command := app.NewSchedulerCommand()

    pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)

    logs.InitLogs()
    defer logs.FlushLogs()

    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

Scheduler命令启动执行NewSchedulerCommand创建一个新的Scheduler调度,NewSchedulerCommand是干嘛的,接着看看,NewSchedulerCommand部分代码:
kubernetes/cmd/kube-scheduler/app/server.go

func NewSchedulerCommand() *cobra.Command {
    opts, err := options.NewOptions()//读取初始化参数
    if err != nil {
        klog.Fatalf("unable to initialize command options: %v", err)
    }

    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%v\n", err)
                os.Exit(1)
            }
        },
    }
    fs := cmd.Flags()
    namedFlagSets := opts.Flags()
    verflag.AddFlags(namedFlagSets.FlagSet("global"))
    globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
    for _, f := range namedFlagSets.FlagSets {
        fs.AddFlagSet(f)
    }

    usageFmt := "Usage:\n  %s\n"
    cols, _, _ := apiserverflag.TerminalSize(cmd.OutOrStdout())
    cmd.SetUsageFunc(func(cmd *cobra.Command) error {
        fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
        apiserverflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
        return nil
    })
    cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
        fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
        apiserverflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
    })
    cmd.MarkFlagFilename("config", "yaml", "yml", "json")

    return cmd
}

NewSchedulerCommand中
1、NewOptions进行读取参数实例化,其中参数缺省值在kubernetes/vendor/k8s.io/kube-scheduler/config/v1alpha1/types.go定义,

更多的默认参数可以通过命令kube-scheduler --help来查看。

2、runCommand来启动一个调度器,接着看runCommand的函数段:

func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    verflag.PrintAndExitIfRequested()
    utilflag.PrintFlags(cmd.Flags())

    if len(args) != 0 {
        fmt.Fprint(os.Stderr, "arguments are not supported\n")
    }

    if errs := opts.Validate(); len(errs) > 0 {
        fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
        os.Exit(1)
    }

    if len(opts.WriteConfigTo) > 0 {
        if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            os.Exit(1)
        }
        klog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
    }

    c, err := opts.Config()
    if err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

    stopCh := make(chan struct{})

    // Get the completed config
    cc := c.Complete()

    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())

    // Apply algorithms based on feature gates.
    // TODO: make configurable?
    algorithmprovider.ApplyFeatureGates()

    // Configz registration.
    if cz, err := configz.New("componentconfig"); err == nil {
        cz.Set(cc.ComponentConfig)
    } else {
        return fmt.Errorf("unable to register configz: %s", err)
    }

    return Run(cc, stopCh)
}

runCommand主要是读取schedule的启动配置参数,最后根据配置参数运行Run,并且根据Run处理返回相应的错误err。再继续往下看看Run干了什么:

func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        cc.PodInformer,
        cc.InformerFactory.Core().V1().PersistentVolumes(),
        cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
        cc.InformerFactory.Core().V1().ReplicationControllers(),
        cc.InformerFactory.Apps().V1().ReplicaSets(),
        cc.InformerFactory.Apps().V1().StatefulSets(),
        cc.InformerFactory.Core().V1().Services(),
        cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
        cc.InformerFactory.Storage().V1().StorageClasses(),
        cc.Recorder,
        cc.ComponentConfig.AlgorithmSource,
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName),
        scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
        scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
        scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
        scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
    if err != nil {
        return err
    }

    // Prepare the event broadcaster.
    if cc.Broadcaster != nil && cc.EventClient != nil {
        cc.Broadcaster.StartLogging(klog.V(6).Infof)
        cc.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.EventClient.Events("")})
    }

    // Setup healthz checks.
    var checks []healthz.HealthzChecker
    if cc.ComponentConfig.LeaderElection.LeaderElect {
        checks = append(checks, cc.LeaderElection.WatchDog)
    }

    // Start up the healthz server.
    if cc.InsecureServing != nil {
        separateMetrics := cc.InsecureMetricsServing != nil
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
        if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }
    if cc.InsecureMetricsServing != nil {
        handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
        if err := cc.InsecureMetricsServing.Serve(handler, 0, stopCh); err != nil {
            return fmt.Errorf("failed to start metrics server: %v", err)
        }
    }
    if cc.SecureServing != nil {
        handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
        if err := cc.SecureServing.Serve(handler, 0, stopCh); err != nil {
            // fail early for secure handlers, removing the old error loop from above
            return fmt.Errorf("failed to start healthz server: %v", err)
        }
    }

    // Start all informers.
    go cc.PodInformer.Informer().Run(stopCh)
    cc.InformerFactory.Start(stopCh)

    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(stopCh)
    controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // If leader election is enabled, runCommand via LeaderElector until done and exit.
    if cc.LeaderElection != nil {
        cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
            OnStartedLeading: run,
            OnStoppedLeading: func() {
                utilruntime.HandleError(fmt.Errorf("lost master"))
            },
        }
        leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
        if err != nil {
            return fmt.Errorf("couldn't create leader elector: %v", err)
        }

        leaderElector.Run(ctx)

        return fmt.Errorf("lost lease")
    }

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}

Run中根据config配置:
1、scheduler.New实例化一个新的调度器实例并返回sched
2、配置广播和健康检查以及metric信息(普罗米修斯监控使用)的端口:cc.InsecureServing.Serve(默认InsecureServing启动的监听端口为10251)
3、启动所有informer,其中单独执行PodInformer:cc.PodInformer.Informer().Run(stopCh),PodInformer提供pods信息的lister的访问入口。
PS:Informer 是 Client-go 中的一个核心工具包,Informer 最基本 的功能就是 List/Get Kubernetes 中的 Object,Informer的详细信息可以参考这篇文章。
4、调度前先执行kube-controller的controller.WaitForCacheSync,等待 pod的 informer 同步。
5、scheduler的高可用leader选举(--leader-elect),集群高可用部署时scheduler必须选举leader,默认即true。
6、运行真正的调度 run 函数,sched.Run()执行调度pod。

接着来看,第一步的scheduler.New是怎么来工作的,然后再看下最后一步的Run是怎么调度的,先跳转到scheduler.New代码部分:
kubernetes/pkg/scheduler/scheduler.go

func New(client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer appsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    pdbInformer policyinformers.PodDisruptionBudgetInformer,
    storageClassInformer storageinformers.StorageClassInformer,
    recorder record.EventRecorder,
    schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
    stopCh <-chan struct{},
    opts ...func(o *schedulerOptions)) (*Scheduler, error) {

    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

    // Set up the configurator which can create schedulers from configs.
    configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
        SchedulerName:                  options.schedulerName,
        Client:                         client,
        NodeInformer:                   nodeInformer,
        PodInformer:                    podInformer,
        PvInformer:                     pvInformer,
        PvcInformer:                    pvcInformer,
        ReplicationControllerInformer:  replicationControllerInformer,
        ReplicaSetInformer:             replicaSetInformer,
        StatefulSetInformer:            statefulSetInformer,
        ServiceInformer:                serviceInformer,
        PdbInformer:                    pdbInformer,
        StorageClassInformer:           storageClassInformer,
        HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
        DisablePreemption:              options.disablePreemption,
        PercentageOfNodesToScore:       options.percentageOfNodesToScore,
        BindTimeoutSeconds:             options.bindTimeoutSeconds,
    })
    var config *factory.Config
    source := schedulerAlgorithmSource
    switch {
    case source.Provider != nil:
        // Create the config from a named algorithm provider.
        sc, err := configurator.CreateFromProvider(*source.Provider)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
        }
        config = sc
    case source.Policy != nil:
        // Create the config from a user specified policy source.
        policy := &schedulerapi.Policy{}
        switch {
        case source.Policy.File != nil:
            if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
                return nil, err
            }
        case source.Policy.ConfigMap != nil:
            if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
                return nil, err
            }
        }
        sc, err := configurator.CreateFromConfig(*policy)
        if err != nil {
            return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
        }
        config = sc
    default:
        return nil, fmt.Errorf("unsupported algorithm source: %v", source)
    }
    // Additional tweaks to the config produced by the configurator.
    config.Recorder = recorder
    config.DisablePreemption = options.disablePreemption
    config.StopEverything = stopCh
    // Create the scheduler.
    sched := NewFromConfig(config)
    return sched, nil
}

1、根据New的传参构建factory.NewConfigFactory配置configurator,factory.NewConfigFactory为一系列Informer初始化了回调函数,其中最重要的是PodInformer的两个回调函数,将已调度和未调度的Pod分别存入缓存和队列中。

// NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
    stopEverything := args.StopCh
    if stopEverything == nil {
        stopEverything = wait.NeverStop
    }
    schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)

    // storageClassInformer is only enabled through VolumeScheduling feature gate
    var storageClassLister storagelisters.StorageClassLister
    if args.StorageClassInformer != nil {
        storageClassLister = args.StorageClassInformer.Lister()
    }
    c := &configFactory{
        client:                         args.Client,
        podLister:                      schedulerCache,
        podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
        nodeLister:                     args.NodeInformer.Lister(),
        pVLister:                       args.PvInformer.Lister(),
        pVCLister:                      args.PvcInformer.Lister(),
        serviceLister:                  args.ServiceInformer.Lister(),
        controllerLister:               args.ReplicationControllerInformer.Lister(),
        replicaSetLister:               args.ReplicaSetInformer.Lister(),
        statefulSetLister:              args.StatefulSetInformer.Lister(),
        pdbLister:                      args.PdbInformer.Lister(),
        storageClassLister:             storageClassLister,
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  args.SchedulerName,
        hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
        disablePreemption:              args.DisablePreemption,
        percentageOfNodesToScore:       args.PercentageOfNodesToScore,
    }

    c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
    // scheduled pod cache
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return assignedPod(pod)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering scheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering scheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    args.PodInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                case cache.DeletedFinalStateUnknown:
                    if pod, ok := t.Obj.(*v1.Pod); ok {
                        return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                    }
                    runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod for filtering unscheduledPod in %T", obj, c))
                    return false
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object for filtering unscheduledPod in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToSchedulingQueue,
                UpdateFunc: c.updatePodInSchedulingQueue,
                DeleteFunc: c.deletePodFromSchedulingQueue,
            },
        },
    )
    // ScheduledPodLister is something we provide to plug-in functions that
    // they may need to call.
    c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}

    args.NodeInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.addNodeToCache,
            UpdateFunc: c.updateNodeInCache,
            DeleteFunc: c.deleteNodeFromCache,
        },
    )

    args.PvInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
            AddFunc:    c.onPvAdd,
            UpdateFunc: c.onPvUpdate,
        },
    )

    // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
    args.PvcInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.onPvcAdd,
            UpdateFunc: c.onPvcUpdate,
        },
    )

    // This is for ServiceAffinity: affected by the selector of the service is updated.
    args.ServiceInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    c.onServiceAdd,
            UpdateFunc: c.onServiceUpdate,
            DeleteFunc: c.onServiceDelete,
        },
    )

    // Setup volume binder
    c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)

    args.StorageClassInformer.Informer().AddEventHandler(
        cache.ResourceEventHandlerFuncs{
            AddFunc: c.onStorageClassAdd,
        },
    )

    // Setup cache debugger
    debugger := cachedebugger.New(
        args.NodeInformer.Lister(),
        args.PodInformer.Lister(),
        c.schedulerCache,
        c.podQueue,
    )
    debugger.ListenForSignal(c.StopEverything)

    go func() {
        <-c.StopEverything
        c.podQueue.Close()
    }()

    return c
}

2、根据Provider 和 configMap 来创建对应的调度器以及初始化环境变量、配置文件策略。
3、然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回sched

run的代码段

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

Scheduler 通过 wait.Until() 工具,不停得调用 Scheduler.scheduleOne() 方法直到收到停止信号。

看看scheduleOne代码段,这是最终调度Pod的最终函数:

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    plugins := sched.config.PluginSet
    // Remove all plugin context data at the beginning of a scheduling cycle.
    if plugins.Data().Ctx != nil {
        plugins.Data().Ctx.Reset()
    }

    pod := sched.config.NextPod()
    // pod could be nil when schedulerQueue is closed
    if pod == nil {
        return
    }
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    scheduleResult, err := sched.schedule(pod)
    if err != nil {
        // schedule() may have failed because the pod would not fit on any host, so we try to
        // preempt, with the expectation that the next time the pod is tried for scheduling it
        // will fit due to the preemption. It is also possible that a different pod will schedule
        // into the resources that were preempted, but this is harmless.
        if fitError, ok := err.(*core.FitError); ok {
            if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                    " No preemption is performed.")
            } else {
                preemptionStartTime := time.Now()
                sched.preempt(pod, fitError)
                metrics.PreemptionAttempts.Inc()
                metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
                metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
            }
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
            // succeeds, the pod should get counted as a success the next time we try to
            // schedule it. (hopefully)
            metrics.PodScheduleFailures.Inc()
        } else {
            klog.Errorf("error selecting node for pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        }
        return
    }
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    assumedPod := pod.DeepCopy()

    // Assume volumes first before assuming the pod.
    //
    // If all volumes are completely bound, then allBound is true and binding will be skipped.
    //
    // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
    //
    // This function modifies 'assumedPod' if volume binding is required.
    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming volumes: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }

    // Run "reserve" plugins.
    for _, pl := range plugins.ReservePlugins() {
        if err := pl.Reserve(plugins, assumedPod, scheduleResult.SuggestedHost); err != nil {
            klog.Errorf("error while running %v reserve plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
            sched.recordSchedulingFailure(assumedPod, err, SchedulerError,
                fmt.Sprintf("reserve plugin %v failed", pl.Name()))
            metrics.PodScheduleErrors.Inc()
            return
        }
    }
    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    if err != nil {
        klog.Errorf("error assuming pod: %v", err)
        metrics.PodScheduleErrors.Inc()
        return
    }
    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        // Bind volumes first before Pod
        if !allBound {
            err := sched.bindVolumes(assumedPod)
            if err != nil {
                klog.Errorf("error binding volumes: %v", err)
                metrics.PodScheduleErrors.Inc()
                return
            }
        }

        // Run "prebind" plugins.
        for _, pl := range plugins.PrebindPlugins() {
            approved, err := pl.Prebind(plugins, assumedPod, scheduleResult.SuggestedHost)
            if err != nil {
                approved = false
                klog.Errorf("error while running %v prebind plugin for pod %v: %v", pl.Name(), assumedPod.Name, err)
                metrics.PodScheduleErrors.Inc()
            }
            if !approved {
                sched.Cache().ForgetPod(assumedPod)
                var reason string
                if err == nil {
                    msg := fmt.Sprintf("prebind plugin %v rejected pod %v.", pl.Name(), assumedPod.Name)
                    klog.V(4).Infof(msg)
                    err = errors.New(msg)
                    reason = v1.PodReasonUnschedulable
                } else {
                    reason = SchedulerError
                }
                sched.recordSchedulingFailure(assumedPod, err, reason, err.Error())
                return
            }
        }

        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
        if err != nil {
            klog.Errorf("error binding pod: %v", err)
            metrics.PodScheduleErrors.Inc()
        } else {
            klog.V(2).Infof("pod %v/%v is bound successfully on node %v, %d nodes evaluated, %d nodes were found feasible", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
            metrics.PodScheduleSuccesses.Inc()
        }
    }()
}

scheduleOne是串行调度的,每次调度一个pod到合适的node节点上,
1、sched.config.NextPod从podQueue队列中取出下一个Pod。如果这个Pod正在删除,则跳过。
2.1、sched.schedule进行算法调度这个Pod,如果调度分配失败(比如没有node资源可调度)则进行记录,并且如果开启抢占配置(preemption),失败调度的pod会尝试抢占记录,下次调度优先调度,然后记录调度成功和调度失败的次数,以及调度分配耗时。

// schedule implements the scheduling algorithm and returns the suggested result(host,
// evaluated nodes number,feasible nodes number).
func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) {
    result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    if err != nil {
        pod = pod.DeepCopy()
        sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
        return core.ScheduleResult{}, err
    }
    return result, err
}

2.2 sched.config.Algorithm.Schedule对node列表进行调度,如果调度成功则返回一个调度节点的name,否则返回相应的错误信息。

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
    trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
    defer trace.LogIfLong(100 * time.Millisecond)

    if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
        return result, err
    }

    nodes, err := nodeLister.List()
    if err != nil {
        return result, err
    }
    if len(nodes) == 0 {
        return result, ErrNoNodesAvailable
    }

    if err := g.snapshot(); err != nil {
        return result, err
    }

    trace.Step("Computing predicates")
    startPredicateEvalTime := time.Now()
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return result, err
    }

    if len(filteredNodes) == 0 {
        return result, &FitError{
            Pod:              pod,
            NumAllNodes:      len(nodes),
            FailedPredicates: failedPredicateMap,
        }
    }
    metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
    metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

    trace.Step("Prioritizing")
    startPriorityEvalTime := time.Now()
    // When only one node after predicate, just use it.
    if len(filteredNodes) == 1 {
        metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
        return ScheduleResult{
            SuggestedHost:  filteredNodes[0].Name,
            EvaluatedNodes: 1 + len(failedPredicateMap),
            FeasibleNodes:  1,
        }, nil
    }

    metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return result, err
    }
    metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))

    trace.Step("Selecting host")

    host, err := g.selectHost(priorityList)
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}

上面Schedule调度中:
2.2.1.获取可用的node列表
2.2.2.先进行Predicates预选(findNodesThatFit)调度算法计算,刷选出合适的filteredNodes,看看预选的代码段

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}

    if len(g.predicates) == 0 {
        filtered = nodes
    } else {
        allNodes := int32(g.cache.NodeTree().NumNodes())
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)

        // Create filtered list with enough space to avoid growing it
        // and allow assigning.
        filtered = make([]*v1.Node, numNodesToFind)
        errs := errors.MessageCountMap{}
        var (
            predicateResultLock sync.Mutex
            filteredLen         int32
        )

        ctx, cancel := context.WithCancel(context.Background())

        // We can use the same metadata producer for all nodes.
        meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                g.cachedNodeInfoMap[nodeName],
                g.predicates,
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
            )
            if err != nil {
                predicateResultLock.Lock()
                errs[err.Error()]++
                predicateResultLock.Unlock()
                return
            }
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                }
            } else {
                predicateResultLock.Lock()
                failedPredicateMap[nodeName] = failedPredicates
                predicateResultLock.Unlock()
            }
        }

        // Stops searching for more nodes once the configured number of feasible nodes
        // are found.
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

        filtered = filtered[:filteredLen]
        if len(errs) > 0 {
            return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
        }
    }

    if len(filtered) > 0 && len(g.extenders) != 0 {
        for _, extender := range g.extenders {
            if !extender.IsInterested(pod) {
                continue
            }
            filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
            if err != nil {
                if extender.IsIgnorable() {
                    klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
                        extender, err)
                    continue
                } else {
                    return []*v1.Node{}, FailedPredicateMap{}, err
                }
            }

            for failedNodeName, failedMsg := range failedMap {
                if _, found := failedPredicateMap[failedNodeName]; !found {
                    failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
                }
                failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
            }
            filtered = filteredList
            if len(filtered) == 0 {
                break
            }
        }
    }
    return filtered, failedPredicateMap, nil
}

2.2.3 numFeasibleNodesToFind刷选出可用的node数量
2.2.4 podFitsOnNode判断node是否合适在node节点上。TODO:kubernetes的extender的使用了解。
2.2.5.如果filteredNodes数量大于2,进行权重优选(PrioritizeNodes)调度算法计算刷选

func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
    // If no priority configs are provided, then the EqualPriority function is applied
    // This is required to generate the priority list in the required format
    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for i := range nodes {
            hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
            if err != nil {
                return nil, err
            }
            result = append(result, hostPriority)
        }
        return result, nil
    }

    var (
        mu   = sync.Mutex{}
        wg   = sync.WaitGroup{}
        errs []error
    )
    appendError := func(err error) {
        mu.Lock()
        defer mu.Unlock()
        errs = append(errs, err)
    }

    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    // DEPRECATED: we can remove this when all priorityConfigs implement the
    // Map-Reduce pattern.
    for i := range priorityConfigs {
        if priorityConfigs[i].Function != nil {
            wg.Add(1)
            go func(index int) {
                defer wg.Done()
                var err error
                results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
                if err != nil {
                    appendError(err)
                }
            }(i)
        } else {
            results[i] = make(schedulerapi.HostPriorityList, len(nodes))
        }
    }

    workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }

            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                results[i][index].Host = nodes[index].Name
            }
        }
    })

    for i := range priorityConfigs {
        if priorityConfigs[i].Reduce == nil {
            continue
        }
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                appendError(err)
            }
            if klog.V(10) {
                for _, hostPriority := range results[index] {
                    klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
                }
            }
        }(i)
    }
    // Wait for all computations to be finished.
    wg.Wait()
    if len(errs) != 0 {
        return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }

    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }

    if len(extenders) != 0 && nodes != nil {
        combinedScores := make(map[string]int, len(nodeNameToInfo))
        for i := range extenders {
            if !extenders[i].IsInterested(pod) {
                continue
            }
            wg.Add(1)
            go func(extIndex int) {
                defer wg.Done()
                prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
                if err != nil {
                    // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
                    return
                }
                mu.Lock()
                for i := range *prioritizedList {
                    host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
                    if klog.V(10) {
                        klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
                    }
                    combinedScores[host] += score * weight
                }
                mu.Unlock()
            }(i)
        }
        // wait for all go routines to finish
        wg.Wait()
        for i := range result {
            result[i].Score += combinedScores[result[i].Host]
        }
    }

    if klog.V(10) {
        for i := range result {
            klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
        }
    }
    return result, nil
}

2.2.6 selectHost() 如果优选出的多个得分相同的 Node,则随机选取一个 Node。
3、调度分配好后pod.DeepCopy()更新到缓存已分配记录(assumedPod)。
4、如果有volume的设置,在bind pod之前先进行volume的绑定。
5、异步bind pod到分配好的node节点上,调用 kube-apiserver API,将 Pod 绑定到选出的 Node,之后 kube-apiserver 会将元数据写入 etcd 中。
大致刷选流程图:
绘图1.png

总结:
kube-scheduler 作为 Kubernetes master上一个单独的进程提供调度服务,通过informer的list-watch机制,从apiserver端获取数据并缓存。

获取到待调度的 Pod 后,执行Schedule 方法进行调度,整个调度过程分两个关键步骤:Predicates 和 Priorities (其中刷选的调度策略policy可以通过启动参数--policy-config-file进行json格式自定义调度策略,如上流程图的policy摘至官方代码给的示例参数,默认调度策略是defaultProvider, defaultPredicates, defaultPriorities ),最终选出一个最适合该 Pod 的 Node,更新 SchedulerCache 中 Pod 的状态 (AssumePod),标志该 Pod 为 scheduled,并更新到 NodeInfo 中。