From 0d119201cd3efe0f5e58793f58853981064c6b70 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Thu, 14 Mar 2019 10:34:25 +0800 Subject: [PATCH 01/16] Return err in Allocate if any error occurs --- pkg/scheduler/actions/allocate/allocate.go | 4 ++-- pkg/scheduler/framework/session.go | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 753ad8bbb..d63eeeeb3 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -147,8 +147,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node.Name); err != nil { - glog.Errorf("Failed to bind Task %v on %v in Session %v", - task.UID, node.Name, ssn.UID) + glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", + task.UID, node.Name, ssn.UID, err) continue } assigned = true diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index c8dbd6e49..5ee293d9f 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -235,10 +235,12 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := job.UpdateTaskStatus(task, api.Allocated); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Allocated, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", task.Job, ssn.UID) + return fmt.Errorf("failed to find job %s", task.Job) } task.NodeName = hostname @@ -247,12 +249,14 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, ssn.UID, err) + return err } glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) } else { glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.", hostname, ssn.UID) + return fmt.Errorf("failed to find node %s", hostname) } // Callbacks @@ -269,6 +273,7 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { if err := ssn.dispatch(task); err != nil { glog.Errorf("Failed to dispatch task <%v/%v>: %v", task.Namespace, task.Name, err) + return err } } } From 8f3bedd42b7aed40efd2bac6a3a5518e0d6ecfe1 Mon Sep 17 00:00:00 2001 From: Rajadeepan Date: Wed, 13 Mar 2019 12:55:54 +0000 Subject: [PATCH 02/16] Add event when task is scheduled --- pkg/scheduler/cache/cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index df0b533bf..9758caf3b 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -423,6 +423,8 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error go func() { if err := sc.Binder.Bind(p, hostname); err != nil { sc.resyncTask(task) + } else { + sc.Recorder.Eventf(p, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", p.Namespace, p.Name, hostname) } }() From fa6da50935924ed1889696839be4970c8355614c Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Wed, 13 Mar 2019 11:21:55 +0800 Subject: [PATCH 03/16] Take init containers into account when getting pod resource request --- pkg/scheduler/actions/allocate/allocate.go | 6 +- pkg/scheduler/actions/backfill/backfill.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 6 +- pkg/scheduler/actions/reclaim/reclaim.go | 7 +- pkg/scheduler/api/job_info.go | 31 ++-- pkg/scheduler/api/pod_info.go | 71 +++++++++ pkg/scheduler/api/pod_info_test.go | 162 +++++++++++++++++++++ pkg/scheduler/api/resource_info.go | 17 +++ 8 files changed, 276 insertions(+), 26 deletions(-) create mode 100644 pkg/scheduler/api/pod_info.go create mode 100644 pkg/scheduler/api/pod_info_test.go diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index d63eeeeb3..30e3d858f 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -143,7 +143,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { selectedNodes := util.SelectBestNode(nodeScores) for _, node := range selectedNodes { // Allocate idle resource to the task. - if task.Resreq.LessEqual(node.Idle) { + if task.InitResreq.LessEqual(node.Idle) { glog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name) if err := ssn.Allocate(task, node.Name); err != nil { @@ -162,9 +162,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { } // Allocate releasing resource to the task if any. - if task.Resreq.LessEqual(node.Releasing) { + if task.InitResreq.LessEqual(node.Releasing) { glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", - task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing) + task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) if err := ssn.Pipeline(task, node.Name); err != nil { glog.Errorf("Failed to pipeline Task %v on %v in Session %v", task.UID, node.Name, ssn.UID) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 85aa7ce31..920270351 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -44,7 +44,7 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) { // TODO (k82cn): When backfill, it's also need to balance between Queues. for _, job := range ssn.Jobs { for _, task := range job.TaskStatusIndex[api.Pending] { - if task.Resreq.IsEmpty() { + if task.InitResreq.IsEmpty() { // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 9b2132549..c6bb3f29a 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -203,7 +203,7 @@ func preempt( var preemptees []*api.TaskInfo preempted := api.EmptyResource() - resreq := preemptor.Resreq.Clone() + resreq := preemptor.InitResreq.Clone() for _, task := range node.Tasks { if filter == nil { @@ -239,9 +239,9 @@ func preempt( metrics.RegisterPreemptionAttempts() glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.", - preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq) + preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq) - if preemptor.Resreq.LessEqual(preempted) { + if preemptor.InitResreq.LessEqual(preempted) { if err := stmt.Pipeline(preemptor, node.Name); err != nil { glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>", preemptor.Namespace, preemptor.Name, node.Name) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index e113a65a9..8a317c599 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -110,14 +110,13 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { } assigned := false - for _, n := range ssn.Nodes { // If predicates failed, next node. if err := ssn.PredicateFn(task, n); err != nil { continue } - resreq := task.Resreq.Clone() + resreq := task.InitResreq.Clone() reclaimed := api.EmptyResource() glog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", @@ -172,9 +171,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { } glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.", - reclaimed, task.Namespace, task.Name, task.Resreq) + reclaimed, task.Namespace, task.Name, task.InitResreq) - if task.Resreq.LessEqual(reclaimed) { + if task.InitResreq.LessEqual(reclaimed) { if err := ssn.Pipeline(task, n.Name); err != nil { glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 297bf9df4..a2bffd11e 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -38,7 +38,10 @@ type TaskInfo struct { Name string Namespace string + // Resreq is the resource that used when task running. Resreq *Resource + // InitResreq is the resource that used to launch a task. + InitResreq *Resource NodeName string Status TaskStatus @@ -61,25 +64,22 @@ func getJobID(pod *v1.Pod) JobID { } func NewTaskInfo(pod *v1.Pod) *TaskInfo { - req := EmptyResource() - - // TODO(k82cn): also includes initContainers' resource. - for _, c := range pod.Spec.Containers { - req.Add(NewResource(c.Resources.Requests)) - } + req := GetPodResourceWithoutInitContainers(pod) + initResreq := GetPodResourceRequest(pod) jobID := getJobID(pod) ti := &TaskInfo{ - UID: TaskID(pod.UID), - Job: jobID, - Name: pod.Name, - Namespace: pod.Namespace, - NodeName: pod.Spec.NodeName, - Status: getTaskStatus(pod), - Priority: 1, - Pod: pod, - Resreq: req, + UID: TaskID(pod.UID), + Job: jobID, + Name: pod.Name, + Namespace: pod.Namespace, + NodeName: pod.Spec.NodeName, + Status: getTaskStatus(pod), + Priority: 1, + Pod: pod, + Resreq: req, + InitResreq: initResreq, } if pod.Spec.Priority != nil { @@ -100,6 +100,7 @@ func (ti *TaskInfo) Clone() *TaskInfo { Priority: ti.Priority, Pod: ti.Pod, Resreq: ti.Resreq.Clone(), + InitResreq: ti.InitResreq.Clone(), VolumeReady: ti.VolumeReady, } } diff --git a/pkg/scheduler/api/pod_info.go b/pkg/scheduler/api/pod_info.go new file mode 100644 index 000000000..bae26a751 --- /dev/null +++ b/pkg/scheduler/api/pod_info.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "k8s.io/api/core/v1" +) + +// Refer k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go#GetResourceRequest. +// +// GetResourceRequest returns a *Resource that covers the largest width in each resource dimension. +// Because init-containers run sequentially, we collect the max in each dimension iteratively. +// In contrast, we sum the resource vectors for regular containers since they run simultaneously. +// +// To be consistent with kubernetes default scheduler, it is only used for predicates of actions(e.g. +// allocate, backfill, preempt, reclaim), please use GetPodResourceWithoutInitContainers for other cases. +// +// Example: +// +// Pod: +// InitContainers +// IC1: +// CPU: 2 +// Memory: 1G +// IC2: +// CPU: 2 +// Memory: 3G +// Containers +// C1: +// CPU: 2 +// Memory: 1G +// C2: +// CPU: 1 +// Memory: 1G +// +// Result: CPU: 3, Memory: 3G +func GetPodResourceRequest(pod *v1.Pod) *Resource { + result := GetPodResourceWithoutInitContainers(pod) + + // take max_resource(sum_pod, any_init_container) + for _, container := range pod.Spec.InitContainers { + result.SetMaxResource(NewResource(container.Resources.Requests)) + } + + return result +} + +// GetPodResourceWithoutInitContainers returns Pod's resource request, it does not contain +// init containers' resource request. +func GetPodResourceWithoutInitContainers(pod *v1.Pod) *Resource { + result := EmptyResource() + for _, container := range pod.Spec.Containers { + result.Add(NewResource(container.Resources.Requests)) + } + + return result +} diff --git a/pkg/scheduler/api/pod_info_test.go b/pkg/scheduler/api/pod_info_test.go new file mode 100644 index 000000000..873b92f2c --- /dev/null +++ b/pkg/scheduler/api/pod_info_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" +) + +func TestGetPodResourceRequest(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + expectedResource *Resource + }{ + { + name: "get resource for pod without init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "2G")), + }, + { + name: "get resource for pod with init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "5G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "5G")), + }, + } + + for i, test := range tests { + req := GetPodResourceRequest(test.pod) + if !reflect.DeepEqual(req, test.expectedResource) { + t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n", + i, test.name, test.expectedResource, req) + } + } +} + +func TestGetPodResourceWithoutInitContainers(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + expectedResource *Resource + }{ + { + name: "get resource for pod without init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "2G")), + }, + { + name: "get resource for pod with init containers", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "5G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("1000m", "1G"), + }, + }, + { + Resources: v1.ResourceRequirements{ + Requests: buildResourceList("2000m", "1G"), + }, + }, + }, + }, + }, + expectedResource: NewResource(buildResourceList("3000m", "2G")), + }, + } + + for i, test := range tests { + req := GetPodResourceWithoutInitContainers(test.pod) + if !reflect.DeepEqual(req, test.expectedResource) { + t.Errorf("case %d(%s) failed: \n expected %v, \n got: %v \n", + i, test.name, test.expectedResource, req) + } + } +} diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 3640b9092..294184838 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -109,6 +109,23 @@ func (r *Resource) Sub(rr *Resource) *Resource { r, rr)) } +// SetMaxResource compares with ResourceList and takes max value for each Resource. +func (r *Resource) SetMaxResource(rr *Resource) { + if r == nil || rr == nil { + return + } + + if rr.MilliCPU > r.MilliCPU { + r.MilliCPU = rr.MilliCPU + } + if rr.Memory > r.Memory { + r.Memory = rr.Memory + } + if rr.MilliGPU > r.MilliGPU { + r.MilliGPU = rr.MilliGPU + } +} + //Computes the delta between a resource oject representing available //resources an operand representing resources being requested. Any //field that is less than 0 after the operation represents an From 0b1c05e5d4d6dbbf84808525edca3ee87786cb09 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Mon, 18 Mar 2019 09:17:30 +0800 Subject: [PATCH 04/16] Order task by CreationTimestamp first, then by UID --- pkg/scheduler/framework/session_plugins.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index dfbe655c6..3f4d68bd5 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -272,11 +272,14 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { return res < 0 } - // If no task order funcs, order task by UID. + // If no task order funcs, order task by CreationTimestamp first, then by UID. lv := l.(*api.TaskInfo) rv := r.(*api.TaskInfo) - - return lv.UID < rv.UID + if lv.Pod.CreationTimestamp.Equal(&rv.Pod.CreationTimestamp) { + return lv.UID < rv.UID + } else { + return lv.Pod.CreationTimestamp.Before(&rv.Pod.CreationTimestamp) + } } func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { From f2acdf276e1f21efdf030c737e14d7ae53e8b609 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Mon, 18 Mar 2019 09:19:46 +0800 Subject: [PATCH 05/16] Order queue by CreationTimestamp first, then by UID --- pkg/scheduler/framework/session_plugins.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 3f4d68bd5..edcedadf6 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -241,11 +241,14 @@ func (ssn *Session) QueueOrderFn(l, r interface{}) bool { } } - // If no queue order funcs, order queue by UID. + // If no queue order funcs, order queue by CreationTimestamp first, then by UID. lv := l.(*api.QueueInfo) rv := r.(*api.QueueInfo) - - return lv.UID < rv.UID + if lv.Queue.CreationTimestamp.Equal(&rv.Queue.CreationTimestamp) { + return lv.UID < rv.UID + } else { + return lv.Queue.CreationTimestamp.Before(&rv.Queue.CreationTimestamp) + } } func (ssn *Session) TaskCompareFns(l, r interface{}) int { From c129b54679125f26222c7333d2a354fcb0e3750a Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Mon, 18 Mar 2019 09:20:50 +0800 Subject: [PATCH 06/16] Order job by CreationTimestamp first, then by UID --- pkg/scheduler/framework/session_plugins.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index edcedadf6..3520a2c2f 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -213,15 +213,14 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool { } } - // If no job order funcs, order job by UID. + // If no job order funcs, order job by CreationTimestamp first, then by UID. lv := l.(*api.JobInfo) rv := r.(*api.JobInfo) - if lv.CreationTimestamp.Equal(&rv.CreationTimestamp) { return lv.UID < rv.UID + } else { + return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } - - return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } func (ssn *Session) QueueOrderFn(l, r interface{}) bool { From 393a8bd6950dc3416214e114c7643dcda9e77d3d Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Mon, 18 Mar 2019 11:29:36 +0800 Subject: [PATCH 07/16] In allocate, skip adding Job if its queue is not found --- pkg/scheduler/actions/allocate/allocate.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 30e3d858f..912d20bb7 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -46,12 +46,16 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { - if _, found := jobsMap[job.Queue]; !found { - jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) - } - if queue, found := ssn.Queues[job.Queue]; found { queues.Push(queue) + } else { + glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found", + job.Namespace, job.Name, job.Queue) + continue + } + + if _, found := jobsMap[job.Queue]; !found { + jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) } glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) From d6700b0efaa45fcd256af70ecaf6e93f1a76ca8e Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Tue, 19 Mar 2019 11:48:05 +0800 Subject: [PATCH 08/16] Return err in functions of session.go if any error occurs --- pkg/scheduler/framework/session.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 5ee293d9f..399f7c417 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -193,10 +193,12 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Pipelined, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", task.Job, ssn.UID) + return fmt.Errorf("failed to find job %s when binding", task.Job) } task.NodeName = hostname @@ -205,12 +207,14 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, ssn.UID, err) + return err } glog.V(3).Infof("After added Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>", task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing) } else { glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.", hostname, ssn.UID) + return fmt.Errorf("failed to find node %s", hostname) } for _, eh := range ssn.eventHandlers { @@ -295,10 +299,12 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { if err := job.UpdateTaskStatus(task, api.Binding); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Binding, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", task.Job, ssn.UID) + return fmt.Errorf("failed to find job %s", task.Job) } metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time)) @@ -316,10 +322,12 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", reclaimee.Namespace, reclaimee.Name, api.Releasing, ssn.UID, err) + return err } } else { glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.", reclaimee.Job, ssn.UID) + return fmt.Errorf("failed to find job %s", reclaimee.Job) } // Update task in node. @@ -327,6 +335,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { if err := node.UpdateTask(reclaimee); err != nil { glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v", reclaimee.Namespace, reclaimee.Name, ssn.UID, err) + return err } } From 89f7351c6fd15964162e873ae2eb7db1fbf9645f Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Wed, 20 Mar 2019 09:17:44 +0800 Subject: [PATCH 09/16] Change run option SchedulePeriod's type to make it clear --- cmd/kube-batch/app/options/options.go | 20 +++++---- cmd/kube-batch/app/options/options_test.go | 48 ++++++++++++++++++++++ pkg/scheduler/scheduler.go | 5 +-- 3 files changed, 62 insertions(+), 11 deletions(-) create mode 100644 cmd/kube-batch/app/options/options_test.go diff --git a/cmd/kube-batch/app/options/options.go b/cmd/kube-batch/app/options/options.go index 6fc4d636c..dd7c24fff 100644 --- a/cmd/kube-batch/app/options/options.go +++ b/cmd/kube-batch/app/options/options.go @@ -23,13 +23,20 @@ import ( "github.com/spf13/pflag" ) +const ( + defaultSchedulerName = "kube-batch" + defaultSchedulerPeriod = time.Second + defaultQueue = "default" + defaultListenAddress = ":8080" +) + // ServerOption is the main context object for the controller manager. type ServerOption struct { Master string Kubeconfig string SchedulerName string SchedulerConf string - SchedulePeriod string + SchedulePeriod time.Duration EnableLeaderElection bool LockObjectNamespace string DefaultQueue string @@ -48,25 +55,22 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information") // kube-batch will ignore pods with scheduler names other than specified with the option - fs.StringVar(&s.SchedulerName, "scheduler-name", "kube-batch", "kube-batch will handle pods with the scheduler-name") + fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "kube-batch will handle pods with the scheduler-name") fs.StringVar(&s.SchedulerConf, "scheduler-conf", "", "The absolute path of scheduler configuration file") - fs.StringVar(&s.SchedulePeriod, "schedule-period", "1s", "The period between each scheduling cycle") - fs.StringVar(&s.DefaultQueue, "default-queue", "default", "The default queue name of the job") + fs.DurationVar(&s.SchedulePeriod, "schedule-period", defaultSchedulerPeriod, "The period between each scheduling cycle") + fs.StringVar(&s.DefaultQueue, "default-queue", defaultQueue, "The default queue name of the job") fs.BoolVar(&s.EnableLeaderElection, "leader-elect", s.EnableLeaderElection, "Start a leader election client and gain leadership before "+ "executing the main loop. Enable this when running replicated kube-batch for high availability") fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") fs.StringVar(&s.LockObjectNamespace, "lock-object-namespace", s.LockObjectNamespace, "Define the namespace of the lock object") - fs.StringVar(&s.ListenAddress, "listen-address", ":8080", "The address to listen on for HTTP requests.") + fs.StringVar(&s.ListenAddress, "listen-address", defaultListenAddress, "The address to listen on for HTTP requests.") } func (s *ServerOption) CheckOptionOrDie() error { if s.EnableLeaderElection && s.LockObjectNamespace == "" { return fmt.Errorf("lock-object-namespace must not be nil when LeaderElection is enabled") } - if _, err := time.ParseDuration(s.SchedulePeriod); err != nil { - return fmt.Errorf("failed to parse --schedule-period: %v", err) - } return nil } diff --git a/cmd/kube-batch/app/options/options_test.go b/cmd/kube-batch/app/options/options_test.go new file mode 100644 index 000000000..d634e6288 --- /dev/null +++ b/cmd/kube-batch/app/options/options_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "reflect" + "testing" + "time" + + "github.com/spf13/pflag" +) + +func TestAddFlags(t *testing.T) { + fs := pflag.NewFlagSet("addflagstest", pflag.ContinueOnError) + s := NewServerOption() + s.AddFlags(fs) + + args := []string{ + "--schedule-period=5m", + } + fs.Parse(args) + + // This is a snapshot of expected options parsed by args. + expected := &ServerOption{ + SchedulerName: defaultSchedulerName, + SchedulePeriod: 5 * time.Minute, + DefaultQueue: defaultQueue, + ListenAddress: defaultListenAddress, + } + + if !reflect.DeepEqual(expected, s) { + t.Errorf("Got different run options than expected.\nGot: %+v\nExpected: %+v\n", s, expected) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d85c5c7f1..52bbc4f15 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -43,15 +43,14 @@ func NewScheduler( config *rest.Config, schedulerName string, conf string, - period string, + period time.Duration, defaultQueue string, ) (*Scheduler, error) { - sp, _ := time.ParseDuration(period) scheduler := &Scheduler{ config: config, schedulerConf: conf, cache: schedcache.New(config, schedulerName, defaultQueue), - schedulePeriod: sp, + schedulePeriod: period, } return scheduler, nil From 2135a4d27aeaca2704f91094c879b2c64988f591 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Thu, 21 Mar 2019 09:47:58 +0800 Subject: [PATCH 10/16] Do graceful eviction using default policy --- pkg/scheduler/cache/cache.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 9758caf3b..2e6f22f3a 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -127,14 +127,9 @@ type defaultEvictor struct { } func (de *defaultEvictor) Evict(p *v1.Pod) error { - // TODO (k82cn): makes grace period configurable. - threeSecs := int64(3) - glog.V(3).Infof("Evicting pod %v/%v", p.Namespace, p.Name) - if err := de.kubeclient.CoreV1().Pods(p.Namespace).Delete(p.Name, &metav1.DeleteOptions{ - GracePeriodSeconds: &threeSecs, - }); err != nil { + if err := de.kubeclient.CoreV1().Pods(p.Namespace).Delete(p.Name, nil); err != nil { glog.Errorf("Failed to evict pod <%v/%v>: %#v", p.Namespace, p.Name, err) return err } From eb0245588f705fd4ed0a98fa5003988afc894ad3 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Sun, 24 Mar 2019 12:34:35 +0800 Subject: [PATCH 11/16] Address helm install error in tutorial.md --- doc/usage/tutorial.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/doc/usage/tutorial.md b/doc/usage/tutorial.md index 84314dc20..9d6e30421 100644 --- a/doc/usage/tutorial.md +++ b/doc/usage/tutorial.md @@ -35,6 +35,8 @@ Run the `kube-batch` as kubernetes scheduler # helm install $GOPATH/src/github.com/kubernetes-sigs/kube-batch/deployment/kube-batch --namespace kube-system ``` +Note: If there is an error `Error: apiVersion "scheduling.incubator.k8s.io/v1alpha1" in kube-batch/templates/default.yaml is not available`, please update your helm to latest version and try it again. + Verify the release ```bash From f49c65742cacee08df0e1aaf9982e3bc0a5efd73 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Tue, 19 Mar 2019 09:25:51 +0800 Subject: [PATCH 12/16] Preempt lowest priority task first --- pkg/scheduler/actions/preempt/preempt.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index c6bb3f29a..78ddb5eda 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -220,8 +220,15 @@ func preempt( continue } - // Preempt victims for tasks. - for _, preemptee := range victims { + victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool { + return !ssn.TaskOrderFn(l, r) + }) + for _, victim := range victims { + victimsQueue.Push(victim) + } + // Preempt victims for tasks, pick lowest priority task first. + for !victimsQueue.Empty() { + preemptee := victimsQueue.Pop().(*api.TaskInfo) glog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>", preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name) if err := stmt.Evict(preemptee, "preempt"); err != nil { From e1cc9689723f2248c63706dcf54fc280caab8ec1 Mon Sep 17 00:00:00 2001 From: TommyLike Date: Mon, 25 Mar 2019 18:53:49 +0800 Subject: [PATCH 13/16] Fix sub exception in reclaim and preempt --- pkg/scheduler/actions/preempt/preempt.go | 3 +-- pkg/scheduler/actions/reclaim/reclaim.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 78ddb5eda..57e0dd417 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -238,10 +238,9 @@ func preempt( } preempted.Add(preemptee.Resreq) // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(preemptee.Resreq) { + if resreq.LessEqual(preempted) { break } - resreq.Sub(preemptee.Resreq) } metrics.RegisterPreemptionAttempts() diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 8a317c599..2ea2290ba 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -164,10 +164,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { } reclaimed.Add(reclaimee.Resreq) // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(reclaimee.Resreq) { + if resreq.LessEqual(reclaimed) { break } - resreq.Sub(reclaimee.Resreq) } glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.", From 55384b9ad4ecd159b85abe97a77096ebeb9e649d Mon Sep 17 00:00:00 2001 From: Ziyang Wu Date: Wed, 27 Mar 2019 16:49:26 +0800 Subject: [PATCH 14/16] Fix wrong caculation for deserved in proportion plugin --- pkg/scheduler/plugins/proportion/proportion.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 582adf3f4..b162fea71 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -119,6 +119,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { continue } + oldDeserved := attr.deserved.Clone() attr.deserved.Add(remaining.Clone().Multi(float64(attr.weight) / float64(totalWeight))) if !attr.deserved.LessEqual(attr.request) { attr.deserved = helpers.Min(attr.deserved, attr.request) @@ -129,7 +130,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { glog.V(4).Infof("The attributes of queue <%s> in proportion: deserved <%v>, allocate <%v>, request <%v>, share <%0.2f>", attr.name, attr.deserved, attr.allocated, attr.request, attr.share) - deserved.Add(attr.deserved) + deserved.Add(attr.deserved.Clone().Sub(oldDeserved)) } remaining.Sub(deserved) From 747c021aae02bd6e7c278ae9e33847d4f977db22 Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Thu, 28 Mar 2019 11:21:05 +0800 Subject: [PATCH 15/16] Change base image to alphine to reduce image size --- Makefile | 2 +- deployment/images/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index d84f2b21c..9517a3725 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ generate-code: rel_bins: go get github.com/mitchellh/gox - gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \ + CGO_ENABLED=0 gox -osarch=${REL_OSARCH} -ldflags ${LD_FLAGS} \ -output=${BIN_DIR}/{{.OS}}/{{.Arch}}/kube-batch ./cmd/kube-batch images: rel_bins diff --git a/deployment/images/Dockerfile b/deployment/images/Dockerfile index cbdd9165c..d804c4f3e 100644 --- a/deployment/images/Dockerfile +++ b/deployment/images/Dockerfile @@ -1,4 +1,4 @@ -From ubuntu:18.04 +From alpine:3.9 ADD kube-batch /usr/local/bin From 3af910028365f22168ff959c9680ff797046871a Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Thu, 28 Mar 2019 14:20:06 +0800 Subject: [PATCH 16/16] Do not create PodGroup and Job for task whose scheduler is not kube-batch --- pkg/scheduler/cache/cache.go | 3 ++ pkg/scheduler/cache/cache_test.go | 50 +++++++++++++++++++++++++++ pkg/scheduler/cache/event_handlers.go | 11 +++++- 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 2e6f22f3a..f4941cb34 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -74,6 +74,8 @@ type SchedulerCache struct { kbclient *kbver.Clientset defaultQueue string + // schedulerName is the name for kube-batch scheduler + schedulerName string podInformer infov1.PodInformer nodeInformer infov1.NodeInformer @@ -189,6 +191,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s kubeclient: kubernetes.NewForConfigOrDie(config), kbclient: kbver.NewForConfigOrDie(config), defaultQueue: defaultQueue, + schedulerName: schedulerName, } // Prepare event clients. diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 10f53aa1f..06486fd03 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -257,3 +257,53 @@ func TestAddNode(t *testing.T) { } } } + +func TestGetOrCreateJob(t *testing.T) { + owner1 := buildOwnerReference("j1") + owner2 := buildOwnerReference("j2") + + pod1 := buildPod("c1", "p1", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner1}, make(map[string]string)) + pi1 := api.NewTaskInfo(pod1) + pi1.Job = "j1" // The job name is set by cache. + + pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner2}, make(map[string]string)) + pod2.Spec.SchedulerName = "kube-batch" + pi2 := api.NewTaskInfo(pod2) + + pod3 := buildPod("c3", "p3", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), + []metav1.OwnerReference{owner2}, make(map[string]string)) + pi3 := api.NewTaskInfo(pod3) + + cache := &SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + schedulerName: "kube-batch", + } + + tests := []struct { + task *api.TaskInfo + gotJob bool // whether getOrCreateJob will return job for corresponding task + }{ + { + task: pi1, + gotJob: true, + }, + { + task: pi2, + gotJob: true, + }, + { + task: pi3, + gotJob: false, + }, + } + for i, test := range tests { + result := cache.getOrCreateJob(test.task) != nil + if result != test.gotJob { + t.Errorf("case %d: \n expected %t, \n got %t \n", + i, test.gotJob, result) + } + } +} diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 72055f3b2..d88de2f8d 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -38,8 +38,15 @@ func isTerminated(status kbapi.TaskStatus) bool { return status == kbapi.Succeeded || status == kbapi.Failed } +// getOrCreateJob will return corresponding Job for pi if it exists, or it will create a Job and return it if +// pi.Pod.Spec.SchedulerName is same as kube-batch scheduler's name, otherwise it will return nil. func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { if len(pi.Job) == 0 { + if pi.Pod.Spec.SchedulerName != sc.schedulerName { + glog.V(4).Infof("Pod %s/%s will not not scheduled by %s, skip creating PodGroup and Job for it", + pi.Pod.Namespace, pi.Pod.Name, sc.schedulerName) + return nil + } pb := createShadowPodGroup(pi.Pod) pi.Job = kbapi.JobID(pb.Name) @@ -62,7 +69,9 @@ func (sc *SchedulerCache) getOrCreateJob(pi *kbapi.TaskInfo) *kbapi.JobInfo { func (sc *SchedulerCache) addTask(pi *kbapi.TaskInfo) error { job := sc.getOrCreateJob(pi) - job.AddTaskInfo(pi) + if job != nil { + job.AddTaskInfo(pi) + } if len(pi.NodeName) != 0 { if _, found := sc.Nodes[pi.NodeName]; !found {