diff --git a/Makefile b/Makefile index d84f2b21c..9517a3725 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ generate-code: rel_bins: go get github.com/mitchellh/gox - gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \ + CGO_ENABLED=0 gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \ -output=${BIN_DIR}/{{.OS}}/{{.Arch}}/kube-batch ./cmd/kube-batch images: rel_bins diff --git a/cmd/kube-batch/app/options/options.go b/cmd/kube-batch/app/options/options.go index 6fc4d636c..dd7c24fff 100644 --- a/cmd/kube-batch/app/options/options.go +++ b/cmd/kube-batch/app/options/options.go @@ -23,13 +23,20 @@ import ( "github.com/spf13/pflag" ) +const ( + defaultSchedulerName = "kube-batch" + defaultSchedulerPeriod = time.Second + defaultQueue = "default" + defaultListenAddress = ":8080" +) + // ServerOption is the main context object for the controller manager. type ServerOption struct { Master string Kubeconfig string SchedulerName string SchedulerConf string - SchedulePeriod string + SchedulePeriod time.Duration EnableLeaderElection bool LockObjectNamespace string DefaultQueue string @@ -48,25 +55,22 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information") // kube-batch will ignore pods with scheduler names other than specified with the option - fs.StringVar(&s.SchedulerName, "scheduler-name", "kube-batch", "kube-batch will handle pods with the scheduler-name") + fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods with the scheduler-name") fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file") - fs.StringVar(&s.SchedulePeriod, "schedule-period", "1s", "The period between each scheduling cycle") - fs.StringVar(&s.DefaultQueue, "default-queue", "default", "The default queue name of the job") + fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle") + fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job") fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+ "executing the main loop. Enable this when running replicated kube-batch for high availability") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object") - fs.StringVar(&s.ListenAddress, "listen-address", ":8080", "The address to listen on for HTTP requests.") + fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") } func (s *ServerOption) CheckOptionOrDie() error { if s.EnableLeaderElection && s.LockObjectNamespace == "" { return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled") } - if _, err := time.ParseDuration(s.SchedulePeriod); err != nil { - return fmt.Errorf("failed to parse --schedule-period: %v", err) - } return nil } diff --git a/cmd/kube-batch/app/options/options_test.go b/cmd/kube-batch/app/options/options_test.go new file mode 100644 index 000000000..d634e6288 --- /dev/null +++ b/cmd/kube-batch/app/options/options_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "reflect" + "testing" + "time" + + "github.com/spf13/pflag" +) + +func TestAddFlags(t *testing.T) { + fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError) + s := NewServerOption() + s.AddFlags(fs) + + args := []string{ + "--schedule-period=5m", + } + fs.Parse(args) + + // This is a snapshot of expected options parsed by args. + expected := &ServerOption{ + SchedulerName: defaultSchedulerName, + SchedulePeriod: 5 * time.Minute, + DefaultQueue: defaultQueue, + ListenAddress: defaultListenAddress, + } + + if !reflect.DeepEqual(expected, s) { + t.Errorf("Got different run options than expected.\nGot: %+v\nExpected: %+v\n", s, expected) + } +} diff --git a/deployment/images/Dockerfile b/deployment/images/Dockerfile index cbdd9165c..d804c4f3e 100644 --- a/deployment/images/Dockerfile +++ b/deployment/images/Dockerfile @@ -1,4 +1,4 @@ -From ubuntu:18.04 +From alpine:3.9 ADD kube-batch /usr/local/bin diff --git a/doc/usage/tutorial.md b/doc/usage/tutorial.md index 84314dc20..9d6e30421 100644 --- a/doc/usage/tutorial.md +++ b/doc/usage/tutorial.md @@ -35,6 +35,8 @@ Run the `kube-batch` as kubernetes scheduler # helm install $GOPATH/src/github.com/kubernetes-sigs/kube-batch/deployment/kube-batch --namespace kube-system ``` +Note: If there is an error `Error: apiVersion "scheduling.incubator.k8s.io/v1alpha1" in kube-batch/templates/default.yaml is not available`, please update your helm to latest version and try it again. + Verify the release ```bash diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 753ad8bbb..912d20bb7 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -46,12 +46,16 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { - if _, found := jobsMap[job.Queue]; !found { - jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) - } - if queue, found := ssn.Queues[job.Queue]; found { queues.Push(queue) + } else { + glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found", + job.Namespace, job.Name, job.Queue) + continue + } + + if _, found := jobsMap[job.Queue]; !found { + jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) } glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) @@ -143,12 +147,12 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { selectedNodes := util.SelectBestNode(nodeScores) for _, node := range selectedNodes { // Allocate idle resource to the task. - if task.Resreq.LessEqual(node.Idle) { + if task.InitResreq.LessEqual(node.Idle) { glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node.Name); err != nil { - glog.Errorf("Failed to bind Task %v on %v in Session %v", - task.UID, node.Name, ssn.UID) + glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", + task.UID, node.Name, ssn.UID, err) continue } assigned = true @@ -162,9 +166,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { } // Allocate releasing resource to the task if any. - if task.Resreq.LessEqual(node.Releasing) { + if task.InitResreq.LessEqual(node.Releasing) { glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", - task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing) + task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) if err := ssn.Pipeline(task, node.Name); err != nil { glog.Errorf("Failed to pipeline Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 85aa7ce31..920270351 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -44,7 +44,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { // TODO (k82cn): When backfill, it's also need to balance between Queues. for _, job := range ssn.Jobs { for _, task := range job.TaskStatusIndex[api.Pending] { - if task.Resreq.IsEmpty() { + if task.InitResreq.IsEmpty() { // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 9b2132549..57e0dd417 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -203,7 +203,7 @@ func preempt( var preemptees []*api.TaskInfo preempted := api.EmptyResource() - resreq := preemptor.Resreq.Clone() + resreq := preemptor.InitResreq.Clone() for _, task := range node.Tasks { if filter == nil { @@ -220,8 +220,15 @@ func preempt( continue } - // Preempt victims for tasks. - for _, preemptee := range victims { + victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool { + return !ssn.TaskOrderFn(l, r) + }) + for _, victim := range victims { + victimsQueue.Push(victim) + } + // Preempt victims for tasks, pick lowest priority task first. + for !victimsQueue.Empty() { + preemptee := victimsQueue.Pop().(*api.TaskInfo) glog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>", preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name) if err := stmt.Evict(preemptee, "preempt"); err != nil { @@ -231,17 +238,16 @@ func preempt( } preempted.Add(preemptee.Resreq) // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(preemptee.Resreq) { + if resreq.LessEqual(preempted) { break } - resreq.Sub(preemptee.Resreq) } metrics.RegisterPreemptionAttempts() glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.", - preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq) + preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq) - if preemptor.Resreq.LessEqual(preempted) { + if preemptor.InitResreq.LessEqual(preempted) { if err := stmt.Pipeline(preemptor, node.Name); err != nil { glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>", preemptor.Namespace, preemptor.Name, node.Name) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 0bc2412fc..83ae3a1af 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -110,14 +110,13 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { } assigned := false - for _, n := range ssn.Nodes { // If predicates failed, next node. if err := ssn.PredicateFn(task, n); err != nil { continue } - resreq := task.Resreq.Clone() + resreq := task.InitResreq.Clone() reclaimed := api.EmptyResource() glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", @@ -165,16 +164,15 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { } reclaimed.Add(reclaimee.Resreq) // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(reclaimee.Resreq) { + if resreq.LessEqual(reclaimed) { break } - resreq.Sub(reclaimee.Resreq) } glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.", - reclaimed, task.Namespace, task.Name, task.Resreq) + reclaimed, task.Namespace, task.Name, task.InitResreq) - if task.Resreq.LessEqual(reclaimed) { + if task.InitResreq.LessEqual(reclaimed) { if err := ssn.Pipeline(task, n.Name); err != nil { glog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 68799bcb4..4ae0134e9 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -38,7 +38,10 @@ type TaskInfo struct { Name string Namespace string + // Resreq is the resource that used when task running. Resreq *Resource + // InitResreq is the resource that used to launch a task. + InitResreq *Resource NodeName string Status TaskStatus @@ -61,25 +64,22 @@ func getJobID(pod *v1.Pod) JobID { } func NewTaskInfo(pod *v1.Pod) *TaskInfo { - req := EmptyResource() - - // TODO(k82cn): also includes initContainers' resource. - for _, c := range pod.Spec.Containers { - req.Add(NewResource(c.Resources.Requests)) - } + req := GetPodResourceWithoutInitContainers(pod) + initResreq := GetPodResourceRequest(pod) jobID := getJobID(pod) ti := &TaskInfo{ - UID: TaskID(pod.UID), - Job: jobID, - Name: pod.Name, - Namespace: pod.Namespace, - NodeName: pod.Spec.NodeName, - Status: getTaskStatus(pod), - Priority: 1, - Pod: pod, - Resreq: req, + UID: TaskID(pod.UID), + Job: jobID, + Name: pod.Name, + Namespace: pod.Namespace, + NodeName: pod.Spec.NodeName, + Status: getTaskStatus(pod), + Priority: 1, + Pod: pod, + Resreq: req, + InitResreq: initResreq, } if pod.Spec.Priority != nil { @@ -100,6 +100,7 @@ func (ti *TaskInfo) Clone() *TaskInfo { Priority: ti.Priority, Pod: ti.Pod, Resreq: ti.Resreq.Clone(), + InitResreq: ti.InitResreq.Clone(), VolumeReady: ti.VolumeReady, } } diff --git a/pkg/scheduler/api/pod_info.go b/pkg/scheduler/api/pod_info.go new file mode 100644 index 000000000..bae26a751 --- /dev/null +++ b/pkg/scheduler/api/pod_info.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "k8s.io/api/core/v1" +) + +// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest. +// +// GetResourceRequest returns a *Resource that covers the largest width in each resource dimension. +// Because init-containers run sequentially, we collect the max in each dimension iteratively. +// In contrast, we sum the resource vectors for regular containers since they run simultaneously. +// +// To be consistent with kubernetes default scheduler, it is only used for predicates of actions(e.g. +// allocate, backfill, preempt, reclaim), please use GetPodResourceWithoutInitContainers for other cases. +// +// Example: +// +// Pod: +// InitContainers +// IC1: +// CPU: 2 +// Memory: 1G +// IC2: +// CPU: 2 +// Memory: 3G +// Containers +// C1: +// CPU: 2 +// Memory: 1G +// C2: +// CPU: 1 +// Memory: 1G +// +// Result: CPU: 3, Memory: 3G +func GetPodResourceRequest(pod *v1.Pod) *Resource { + result := GetPodResourceWithoutInitContainers(pod) + + // take max_resource(sum_pod, any_init_container) + for _, container := range pod.Spec.InitContainers { + result.SetMaxResource(NewResource(container.Resources.Requests)) + } + + return result +} + +// GetPodResourceWithoutInitContainers returns Pod's resource request, it does not contain +// init containers' resource request. +func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource { + result := EmptyResource() + for _, container := range pod.Spec.Containers { + result.Add(NewResource(container.Resources.Requests)) + } + + return result +} diff --git a/pkg/scheduler/api/pod_info_test.go b/pkg/scheduler/api/pod_info_test.go new file mode 100644 index 000000000..873b92f2c --- /dev/null +++ b/pkg/scheduler/api/pod_info_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" +) + +func TestGetPodResourceRequest(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + expectedResource *Resource + }{ + { + name: "get resource for pod without init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "2G")), + }, + { + name: "get resource for pod with init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "5G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "5G")), + }, + } + + for i, test := range tests { + req := GetPodResourceRequest(test.pod) + if !reflect.DeepEqual(req, test.expectedResource) { + t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n", + i, test.name, test.expectedResource, req) + } + } +} + +func TestGetPodResourceWithoutInitContainers(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + expectedResource *Resource + }{ + { + name: "get resource for pod without init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "2G")), + }, + { + name: "get resource for pod with init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "5G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "2G")), + }, + } + + for i, test := range tests { + req := GetPodResourceWithoutInitContainers(test.pod) + if !reflect.DeepEqual(req, test.expectedResource) { + t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n", + i, test.name, test.expectedResource, req) + } + } +} diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 3640b9092..294184838 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -109,6 +109,23 @@ func (r *Resource) Sub(rr *Resource) *Resource { r, rr)) } +// SetMaxResource compares with ResourceList and takes max value for each Resource. +func (r *Resource) SetMaxResource(rr *Resource) { + if r == nil || rr == nil { + return + } + + if rr.MilliCPU > r.MilliCPU { + r.MilliCPU = rr.MilliCPU + } + if rr.Memory > r.Memory { + r.Memory = rr.Memory + } + if rr.MilliGPU > r.MilliGPU { + r.MilliGPU = rr.MilliGPU + } +} + //Computes the delta between a resource oject representing available //resources an operand representing resources being requested. Any //field that is less than 0 after the operation represents an diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index df0b533bf..f4941cb34 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -74,6 +74,8 @@ type SchedulerCache struct { kbclient *kbver.Clientset defaultQueue string + // schedulerName is the name for kube-batch scheduler + schedulerName string podInformer infov1.PodInformer nodeInformer infov1.NodeInformer @@ -127,14 +129,9 @@ type defaultEvictor struct { } func (de *defaultEvictor) Evict(p *v1.Pod) error { - // TODO (k82cn): makes grace period configurable. - threeSecs := int64(3) - glog.V(3).Infof("Evicting pod %v/%v", p.Namespace, p.Name) - if err := de.kubeclient.CoreV1().Pods(p.Namespace).Delete(p.Name, &metav1.DeleteOptions{ - GracePeriodSeconds: &threeSecs, - }); err != nil { + if err := de.kubeclient.CoreV1().Pods(p.Namespace).Delete(p.Name, nil); err != nil { glog.Errorf("Failed to evict pod <%v/%v>: %#v", p.Namespace, p.Name, err) return err } @@ -194,6 +191,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s kubeclient: kubernetes.NewForConfigOrDie(config), kbclient: kbver.NewForConfigOrDie(config), defaultQueue: defaultQueue, + schedulerName: schedulerName, } // Prepare event clients. @@ -423,6 +421,8 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error go func() { if err := sc.Binder.Bind(p, hostname); err != nil { sc.resyncTask(task) + } else { + sc.Recorder.Eventf(p, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", p.Namespace, p.Name, hostname) } }() diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 10f53aa1f..06486fd03 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -257,3 +257,53 @@ func TestAddNode(t *testing.T) { } } } + +func TestGetOrCreateJob(t *testing.T) { + owner1 := buildOwnerReference("j1") + owner2 := buildOwnerReference("j2") + + pod1 := buildPod("c1", "p1", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner1}, make(map[string]string)) + pi1 := api.NewTaskInfo(pod1) + pi1.Job = "j1" // The job name is set by cache. + + pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner2}, make(map[string]string)) + pod2.Spec.SchedulerName = "kube-batch" + pi2 := api.NewTaskInfo(pod2) + + pod3 := buildPod("c3", "p3", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner2}, make(map[string]string)) + pi3 := api.NewTaskInfo(pod3) + + cache := &SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + schedulerName: "kube-batch", + } + + tests := []struct { + task *api.TaskInfo + gotJob bool // whether getOrCreateJob will return job for corresponding task + }{ + { + task: pi1, + gotJob: true, + }, + { + task: pi2, + gotJob: true, + }, + { + task: pi3, + gotJob: false, + }, + } + for i, test := range tests { + result := cache.getOrCreateJob(test.task) != nil + if result != test.gotJob { + t.Errorf("case %d: \n expected %t, \n got %t \n", + i, test.gotJob, result) + } + } +} diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 6834fd58f..7522be9b3 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -38,8 +38,15 @@ func isTerminated(status kbapi.TaskStatus) bool { return status == kbapi.Succeeded || status == kbapi.Failed } +// getOrCreateJob will return corresponding Job for pi if it exists, or it will create a Job and return it if +// pi.Pod.Spec.SchedulerName is same as kube-batch scheduler's name, otherwise it will return nil. func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { if len(pi.Job) == 0 { + if pi.Pod.Spec.SchedulerName != sc.schedulerName { + glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it", + pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName) + return nil + } pb := createShadowPodGroup(pi.Pod) pi.Job = kbapi.JobID(pb.Name) @@ -62,7 +69,9 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error { job := sc.getOrCreateJob(pi) - job.AddTaskInfo(pi) + if job != nil { + job.AddTaskInfo(pi) + } if len(pi.NodeName) != 0 { if _, found := sc.Nodes[pi.NodeName]; !found { diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index c8dbd6e49..399f7c417 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -193,10 +193,12 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Pipelined, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", task.Job, ssn.UID) + return fmt.Errorf("failed to find job %s when binding", task.Job) } task.NodeName = hostname @@ -205,12 +207,14 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, ssn.UID, err) + return err } glog.V(3).Infof("After added Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) } else { glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.", hostname, ssn.UID) + return fmt.Errorf("failed to find node %s", hostname) } for _, eh := range ssn.eventHandlers { @@ -235,10 +239,12 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := job.UpdateTaskStatus(task, api.Allocated); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Allocated, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", task.Job, ssn.UID) + return fmt.Errorf("failed to find job %s", task.Job) } task.NodeName = hostname @@ -247,12 +253,14 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, ssn.UID, err) + return err } glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) } else { glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.", hostname, ssn.UID) + return fmt.Errorf("failed to find node %s", hostname) } // Callbacks @@ -269,6 +277,7 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := ssn.dispatch(task); err != nil { glog.Errorf("Failed to dispatch task <%v/%v>: %v", task.Namespace, task.Name, err) + return err } } } @@ -290,10 +299,12 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { if err := job.UpdateTaskStatus(task, api.Binding); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Binding, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", task.Job, ssn.UID) + return fmt.Errorf("failed to find job %s", task.Job) } metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time)) @@ -311,10 +322,12 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", reclaimee.Namespace, reclaimee.Name, api.Releasing, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", reclaimee.Job, ssn.UID) + return fmt.Errorf("failed to find job %s", reclaimee.Job) } // Update task in node. @@ -322,6 +335,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { if err := node.UpdateTask(reclaimee); err != nil { glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v", reclaimee.Namespace, reclaimee.Name, ssn.UID, err) + return err } } diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index dfbe655c6..3520a2c2f 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -213,15 +213,14 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool { } } - // If no job order funcs, order job by UID. + // If no job order funcs, order job by CreationTimestamp first, then by UID. lv := l.(*api.JobInfo) rv := r.(*api.JobInfo) - if lv.CreationTimestamp.Equal(&rv.CreationTimestamp) { return lv.UID < rv.UID + } else { + return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } - - return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } func (ssn *Session) QueueOrderFn(l, r interface{}) bool { @@ -241,11 +240,14 @@ func (ssn *Session) QueueOrderFn(l, r interface{}) bool { } } - // If no queue order funcs, order queue by UID. + // If no queue order funcs, order queue by CreationTimestamp first, then by UID. lv := l.(*api.QueueInfo) rv := r.(*api.QueueInfo) - - return lv.UID < rv.UID + if lv.Queue.CreationTimestamp.Equal(&rv.Queue.CreationTimestamp) { + return lv.UID < rv.UID + } else { + return lv.Queue.CreationTimestamp.Before(&rv.Queue.CreationTimestamp) + } } func (ssn *Session) TaskCompareFns(l, r interface{}) int { @@ -272,11 +274,14 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { return res < 0 } - // If no task order funcs, order task by UID. + // If no task order funcs, order task by CreationTimestamp first, then by UID. lv := l.(*api.TaskInfo) rv := r.(*api.TaskInfo) - - return lv.UID < rv.UID + if lv.Pod.CreationTimestamp.Equal(&rv.Pod.CreationTimestamp) { + return lv.UID < rv.UID + } else { + return lv.Pod.CreationTimestamp.Before(&rv.Pod.CreationTimestamp) + } } func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index ea69338f9..692808575 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -122,6 +122,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { continue } + oldDeserved := attr.deserved.Clone() attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight))) if !attr.deserved.LessEqual(attr.request) { attr.deserved = helpers.Min(attr.deserved, attr.request) @@ -132,7 +133,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { glog.V(4).Infof("The attributes of queue <%s> in proportion: deserved <%v>, allocate <%v>, request <%v>, share <%0.2f>", attr.name, attr.deserved, attr.allocated, attr.request, attr.share) - deserved.Add(attr.deserved) + deserved.Add(attr.deserved.Clone().Sub(oldDeserved)) } remaining.Sub(deserved) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d85c5c7f1..52bbc4f15 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -43,15 +43,14 @@ func NewScheduler( config *rest.Config, schedulerName string, conf string, - period string, + period time.Duration, defaultQueue string, ) (*Scheduler, error) { - sp, _ := time.ParseDuration(period) scheduler := &Scheduler{ config: config, schedulerConf: conf, cache: schedcache.New(config, schedulerName, defaultQueue), - schedulePeriod: sp, + schedulePeriod: period, } return scheduler, nil