From 59a8b1ca68d0256d10239a588d69ab0ba28d4076 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Tue, 29 Oct 2024 13:28:33 +0800 Subject: [PATCH] feature: use contextal logging --- pkg/capacityscheduling/capacity_scheduling.go | 64 +++++++++------ .../capacity_scheduling_test.go | 4 +- pkg/coscheduling/core/core.go | 33 ++++---- pkg/coscheduling/coscheduling.go | 42 ++++++---- .../networkoverhead/networkoverhead.go | 67 +++++++++------- .../topologicalsort/topologicalsort.go | 24 +++--- pkg/noderesources/allocatable.go | 3 +- pkg/noderesources/resource_allocation.go | 13 +-- .../preemption_toleration.go | 11 ++- pkg/sysched/sysched.go | 79 ++++++++++--------- pkg/sysched/sysched_test.go | 40 ++++++---- pkg/trimaran/collector.go | 22 +++--- pkg/trimaran/collector_test.go | 31 +++++--- pkg/trimaran/handler.go | 6 +- .../loadvariationriskbalancing/analysis.go | 6 +- .../analysis_test.go | 6 +- .../loadvariationriskbalancing.go | 30 +++---- .../lowriskovercommitment.go | 51 ++++++------ .../lowriskovercommitment_test.go | 4 +- pkg/trimaran/resourcestats.go | 10 +-- pkg/trimaran/resourcestats_test.go | 8 +- .../targetloadpacking/targetloadpacking.go | 26 +++--- 22 files changed, 334 insertions(+), 246 deletions(-) diff --git a/pkg/capacityscheduling/capacity_scheduling.go b/pkg/capacityscheduling/capacity_scheduling.go index b9caa8509..f9b8fb347 100644 --- a/pkg/capacityscheduling/capacity_scheduling.go +++ b/pkg/capacityscheduling/capacity_scheduling.go @@ -125,6 +125,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(), pdbLister: getPDBLister(handle.SharedInformerFactory()), } + logger := klog.FromContext(ctx) client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) if err != nil { @@ -187,7 +188,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram }, }, ) - klog.InfoS("CapacityScheduling start") + logger.Info("CapacityScheduling start") return c, nil } @@ -288,9 +289,11 @@ func (c *CapacityScheduling) PreFilterExtensions() framework.PreFilterExtensions // AddPod from pre-computed data in cycleState. func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + logger := klog.FromContext(ctx) + elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(cycleState) if err != nil { - klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) + logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) return framework.NewStatus(framework.Error, err.Error()) } @@ -298,7 +301,7 @@ func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.C if elasticQuotaInfo != nil { err := elasticQuotaInfo.addPodIfNotPresent(podToAdd.Pod) if err != nil { - klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(podToAdd.Pod)) + logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(podToAdd.Pod)) } } @@ -307,9 +310,11 @@ func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.C // RemovePod from pre-computed data in cycleState. func (c *CapacityScheduling) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { + logger := klog.FromContext(ctx) + elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(cycleState) if err != nil { - klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) + logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) return framework.NewStatus(framework.Error, err.Error()) } @@ -317,7 +322,7 @@ func (c *CapacityScheduling) RemovePod(ctx context.Context, cycleState *framewor if elasticQuotaInfo != nil { err = elasticQuotaInfo.deletePodIfPresent(podToRemove.Pod) if err != nil { - klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(podToRemove.Pod)) + logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(podToRemove.Pod)) } } @@ -348,11 +353,13 @@ func (c *CapacityScheduling) Reserve(ctx context.Context, state *framework.Cycle c.Lock() defer c.Unlock() + logger := klog.FromContext(ctx) + elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace] if elasticQuotaInfo != nil { err := elasticQuotaInfo.addPodIfNotPresent(pod) if err != nil { - klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod)) + logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod)) return framework.NewStatus(framework.Error, err.Error()) } } @@ -363,11 +370,13 @@ func (c *CapacityScheduling) Unreserve(ctx context.Context, state *framework.Cyc c.Lock() defer c.Unlock() + logger := klog.FromContext(ctx) + elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace] if elasticQuotaInfo != nil { err := elasticQuotaInfo.deletePodIfPresent(pod) if err != nil { - klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod)) + logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod)) } } } @@ -400,14 +409,16 @@ func (p *preemptor) CandidatesToVictimsMap(candidates []preemption.Candidate) ma // We look at the node that is nominated for this pod and as long as there are // terminating pods on the node, we don't consider this for preempting more pods. func (p *preemptor) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) { + logger := klog.FromContext(context.TODO()) + if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { - klog.V(5).InfoS("Pod is not eligible for preemption because of its preemptionPolicy", "pod", klog.KObj(pod), "preemptionPolicy", v1.PreemptNever) + logger.V(5).Info("Pod is not eligible for preemption because of its preemptionPolicy", "pod", klog.KObj(pod), "preemptionPolicy", v1.PreemptNever) return false, "not eligible due to preemptionPolicy=Never." } preFilterState, err := getPreFilterState(p.state) if err != nil { - klog.V(5).InfoS("Failed to read preFilterState from cycleState, err: %s", err, "preFilterStateKey", preFilterStateKey) + logger.V(5).Info("Failed to read preFilterState from cycleState, err: %s", err, "preFilterStateKey", preFilterStateKey) return false, "not eligible due to failed to read from cycleState" } @@ -422,7 +433,7 @@ func (p *preemptor) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(p.state) if err != nil { - klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) + logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) return true, "" } @@ -480,17 +491,20 @@ func (p *preemptor) SelectVictimsOnNode( pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) { + + logger := klog.FromContext(ctx) + elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(state) if err != nil { msg := "Failed to read elasticQuotaSnapshot from cycleState" - klog.ErrorS(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) + logger.Error(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey) return nil, 0, framework.NewStatus(framework.Unschedulable, msg) } preFilterState, err := getPreFilterState(state) if err != nil { msg := "Failed to read preFilterState from cycleState" - klog.ErrorS(err, msg, "preFilterStateKey", preFilterStateKey) + logger.Error(err, msg, "preFilterStateKey", preFilterStateKey) return nil, 0, framework.NewStatus(framework.Unschedulable, msg) } @@ -498,7 +512,6 @@ func (p *preemptor) SelectVictimsOnNode( var nominatedPodsReqWithPodReq framework.Resource podReq := preFilterState.podReq - logger := klog.FromContext(ctx) removePod := func(rpi *framework.PodInfo) error { if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil { return err @@ -625,7 +638,7 @@ func (p *preemptor) SelectVictimsOnNode( return false, err } victims = append(victims, pi.Pod) - klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node())) + logger.V(5).Info("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node())) } if preemptorWithElasticQuota && (preemptorElasticQuotaInfo.usedOverMaxWith(&nominatedPodsReqInEQWithPodReq) || elasticQuotaInfos.aggregatedUsedOverMinWith(nominatedPodsReqWithPodReq)) { @@ -633,14 +646,14 @@ func (p *preemptor) SelectVictimsOnNode( return false, err } victims = append(victims, pi.Pod) - klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node())) + logger.V(5).Info("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node())) } return fits, nil } for _, pi := range violatingVictims { if fits, err := reprievePod(pi); err != nil { - klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod)) + logger.Error(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod)) return nil, 0, framework.AsStatus(err) } else if !fits { numViolatingVictim++ @@ -649,7 +662,7 @@ func (p *preemptor) SelectVictimsOnNode( // Now we try to reprieve non-violating victims. for _, pi := range nonViolatingVictims { if _, err := reprievePod(pi); err != nil { - klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod)) + logger.Error(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod)) return nil, 0, framework.AsStatus(err) } } @@ -694,6 +707,9 @@ func (c *CapacityScheduling) deleteElasticQuota(obj interface{}) { } func (c *CapacityScheduling) addPod(obj interface{}) { + ctx := context.TODO() + logger := klog.FromContext(ctx) + pod := obj.(*v1.Pod) c.Lock() @@ -703,8 +719,8 @@ func (c *CapacityScheduling) addPod(obj interface{}) { // If elasticQuotaInfo is nil, try to list ElasticQuotas through elasticQuotaLister if elasticQuotaInfo == nil { var eqList v1alpha1.ElasticQuotaList - if err := c.client.List(context.Background(), &eqList, client.InNamespace(pod.Namespace)); err != nil { - klog.ErrorS(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace) + if err := c.client.List(ctx, &eqList, client.InNamespace(pod.Namespace)); err != nil { + logger.Error(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace) return } @@ -724,11 +740,13 @@ func (c *CapacityScheduling) addPod(obj interface{}) { err := elasticQuotaInfo.addPodIfNotPresent(pod) if err != nil { - klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod)) + logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod)) } } func (c *CapacityScheduling) updatePod(oldObj, newObj interface{}) { + logger := klog.FromContext(context.TODO()) + oldPod := oldObj.(*v1.Pod) newPod := newObj.(*v1.Pod) @@ -744,13 +762,15 @@ func (c *CapacityScheduling) updatePod(oldObj, newObj interface{}) { if elasticQuotaInfo != nil { err := elasticQuotaInfo.deletePodIfPresent(newPod) if err != nil { - klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(newPod)) + logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(newPod)) } } } } func (c *CapacityScheduling) deletePod(obj interface{}) { + logger := klog.FromContext(context.TODO()) + pod := obj.(*v1.Pod) c.Lock() defer c.Unlock() @@ -759,7 +779,7 @@ func (c *CapacityScheduling) deletePod(obj interface{}) { if elasticQuotaInfo != nil { err := elasticQuotaInfo.deletePodIfPresent(pod) if err != nil { - klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod)) + logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod)) } } } diff --git a/pkg/capacityscheduling/capacity_scheduling_test.go b/pkg/capacityscheduling/capacity_scheduling_test.go index 43f0e0000..bfd1d87ea 100644 --- a/pkg/capacityscheduling/capacity_scheduling_test.go +++ b/pkg/capacityscheduling/capacity_scheduling_test.go @@ -469,7 +469,7 @@ func TestReserve(t *testing.T) { state := framework.NewCycleState() for i, pod := range tt.pods { - got := cs.Reserve(nil, state, pod, "node-a") + got := cs.Reserve(context.TODO(), state, pod, "node-a") if got.Code() != tt.expectedCodes[i] { t.Errorf("expected %v, got %v : %v", tt.expected[i], got.Code(), got.Message()) } @@ -591,7 +591,7 @@ func TestUnreserve(t *testing.T) { state := framework.NewCycleState() for i, pod := range tt.pods { - cs.Unreserve(nil, state, pod, "node-a") + cs.Unreserve(context.TODO(), state, pod, "node-a") if !reflect.DeepEqual(cs.elasticQuotaInfos["ns1"], tt.expected[i]["ns1"]) { t.Errorf("expected %#v, got %#v", tt.expected[i]["ns1"].Used, cs.elasticQuotaInfos["ns1"].Used) } diff --git a/pkg/coscheduling/core/core.go b/pkg/coscheduling/core/core.go index daa139c7e..177467614 100644 --- a/pkg/coscheduling/core/core.go +++ b/pkg/coscheduling/core/core.go @@ -65,10 +65,10 @@ type Manager interface { PreFilter(context.Context, *corev1.Pod) error Permit(context.Context, *framework.CycleState, *corev1.Pod) Status GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) - GetCreationTimestamp(*corev1.Pod, time.Time) time.Time - DeletePermittedPodGroup(string) - CalculateAssignedPods(string, string) int - ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) + GetCreationTimestamp(context.Context, *corev1.Pod, time.Time) time.Time + DeletePermittedPodGroup(context.Context, string) + CalculateAssignedPods(context.Context, string, string) int + ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) BackoffPodGroup(string, time.Duration) } @@ -112,7 +112,8 @@ func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Durati // ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod // in the given state, with a reserved key "kubernetes.io/pods-to-activate". -func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { +func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) { + lh := klog.FromContext(ctx) pgName := util.GetPodGroupLabel(pod) if pgName == "" { return @@ -129,7 +130,7 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}), ) if err != nil { - klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName) + lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName) return } @@ -159,7 +160,8 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework // 2. the total number of pods in the podgroup is less than the minimum number of pods // that is required to be scheduled. func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { - klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod)) + lh := klog.FromContext(ctx) + lh.V(5).Info("Pre-filter", "pod", klog.KObj(pod)) pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) if pg == nil { return nil @@ -202,7 +204,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er minResources[corev1.ResourcePods] = *podQuantity err = CheckClusterResource(ctx, nodes, minResources, pgFullName) if err != nil { - klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg)) + lh.Error(err, "Failed to PreFilter", "podGroup", klog.KObj(pg)) return err } pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout) @@ -220,7 +222,7 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle return PodGroupNotFound } - assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace) + assigned := pgMgr.CalculateAssignedPods(ctx, pg.Name, pg.Namespace) // The number of pods that have been assigned nodes is calculated from the snapshot. // The current pod in not included in the snapshot during the current scheduling cycle. if int32(assigned)+1 >= pg.Spec.MinMember { @@ -243,20 +245,20 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle } // GetCreationTimestamp returns the creation time of a podGroup or a pod. -func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time { +func (pgMgr *PodGroupManager) GetCreationTimestamp(ctx context.Context, pod *corev1.Pod, ts time.Time) time.Time { pgName := util.GetPodGroupLabel(pod) if len(pgName) == 0 { return ts } var pg v1alpha1.PodGroup - if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil { + if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil { return ts } return pg.CreationTimestamp.Time } // DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter. -func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) { +func (pgMgr *PodGroupManager) DeletePermittedPodGroup(_ context.Context, pgFullName string) { pgMgr.permittedPG.Delete(pgFullName) } @@ -274,10 +276,11 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) } // CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. -func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { +func (pgMgr *PodGroupManager) CalculateAssignedPods(ctx context.Context, podGroupName, namespace string) int { + lh := klog.FromContext(ctx) nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List() if err != nil { - klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle") + lh.Error(err, "Cannot get nodeInfos from frameworkHandle") return 0 } var count int @@ -354,6 +357,6 @@ func getNodeResource(ctx context.Context, info *framework.NodeInfo, desiredPodGr leftResource.ScalarResources[k] = allocatableEx - requestEx } } - klog.V(4).InfoS("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource) + logger.V(4).Info("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource) return &leftResource } diff --git a/pkg/coscheduling/coscheduling.go b/pkg/coscheduling/coscheduling.go index 55f5f368c..69b36175c 100644 --- a/pkg/coscheduling/coscheduling.go +++ b/pkg/coscheduling/coscheduling.go @@ -60,7 +60,11 @@ const ( ) // New initializes and returns a new Coscheduling plugin. -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + lh := klog.FromContext(ctx) + lh.V(5).Info("creating new coscheduling plugin") + args, ok := obj.(*config.CoschedulingArgs) if !ok { return nil, fmt.Errorf("want args to be of type CoschedulingArgs, got %T", obj) @@ -93,7 +97,7 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew } if args.PodGroupBackoffSeconds < 0 { err := fmt.Errorf("parse arguments failed") - klog.ErrorS(err, "PodGroupBackoffSeconds cannot be negative") + lh.Error(err, "PodGroupBackoffSeconds cannot be negative") return nil, err } else if args.PodGroupBackoffSeconds > 0 { pgBackoff := time.Duration(args.PodGroupBackoffSeconds) * time.Second @@ -128,8 +132,8 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { if prio1 != prio2 { return prio1 > prio2 } - creationTime1 := cs.pgMgr.GetCreationTimestamp(podInfo1.Pod, *podInfo1.InitialAttemptTimestamp) - creationTime2 := cs.pgMgr.GetCreationTimestamp(podInfo2.Pod, *podInfo2.InitialAttemptTimestamp) + creationTime1 := cs.pgMgr.GetCreationTimestamp(context.TODO(), podInfo1.Pod, *podInfo1.InitialAttemptTimestamp) + creationTime2 := cs.pgMgr.GetCreationTimestamp(context.TODO(), podInfo2.Pod, *podInfo2.InitialAttemptTimestamp) if creationTime1.Equal(creationTime2) { return core.GetNamespacedName(podInfo1.Pod) < core.GetNamespacedName(podInfo2.Pod) } @@ -140,10 +144,11 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // 1. Whether the PodGroup that the Pod belongs to is on the deny list. // 2. Whether the total number of pods in a PodGroup is less than its `minMember`. func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { + lh := klog.FromContext(ctx) // If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid // any preemption attempts. if err := cs.pgMgr.PreFilter(ctx, pod); err != nil { - klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod)) + lh.Error(err, "PreFilter failed", "pod", klog.KObj(pod)) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } return nil, framework.NewStatus(framework.Success, "") @@ -152,17 +157,18 @@ func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleSta // PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { + lh := klog.FromContext(ctx) pgName, pg := cs.pgMgr.GetPodGroup(ctx, pod) if pg == nil { - klog.V(4).InfoS("Pod does not belong to any group", "pod", klog.KObj(pod)) + lh.V(4).Info("Pod does not belong to any group", "pod", klog.KObj(pod)) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "can not find pod group") } // This indicates there are already enough Pods satisfying the PodGroup, // so don't bother to reject the whole PodGroup. - assigned := cs.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace) + assigned := cs.pgMgr.CalculateAssignedPods(ctx, pg.Name, pod.Namespace) if assigned >= int(pg.Spec.MinMember) { - klog.V(4).InfoS("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned) + lh.V(4).Info("Assigned pods", "podGroup", klog.KObj(pg), "assigned", assigned) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) } @@ -170,7 +176,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt // to see they can satisfy the PodGroup notAssignedPercentage := float32(int(pg.Spec.MinMember)-assigned) / float32(pg.Spec.MinMember) if notAssignedPercentage <= 0.1 { - klog.V(4).InfoS("A small gap of pods to reach the quorum", "podGroup", klog.KObj(pg), "percentage", notAssignedPercentage) + lh.V(4).Info("A small gap of pods to reach the quorum", "podGroup", klog.KObj(pg), "percentage", notAssignedPercentage) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) } @@ -178,7 +184,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt // it's inferrable other Pods belonging to the same PodGroup would be very likely to fail. cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { - klog.V(3).InfoS("PostFilter rejects the pod", "podGroup", klog.KObj(pg), "pod", klog.KObj(waitingPod.GetPod())) + lh.V(3).Info("PostFilter rejects the pod", "podGroup", klog.KObj(pg), "pod", klog.KObj(waitingPod.GetPod())) waitingPod.Reject(cs.Name(), "optimistic rejection in PostFilter") } }) @@ -192,7 +198,7 @@ func (cs *Coscheduling) PostFilter(ctx context.Context, state *framework.CycleSt } } - cs.pgMgr.DeletePermittedPodGroup(pgName) + cs.pgMgr.DeletePermittedPodGroup(ctx, pgName) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name)) } @@ -204,6 +210,7 @@ func (cs *Coscheduling) PreFilterExtensions() framework.PreFilterExtensions { // Permit is the functions invoked by the framework at "Permit" extension point. func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { + lh := klog.FromContext(ctx) waitTime := *cs.scheduleTimeout s := cs.pgMgr.Permit(ctx, state, pod) var retStatus *framework.Status @@ -213,23 +220,23 @@ func (cs *Coscheduling) Permit(ctx context.Context, state *framework.CycleState, case core.PodGroupNotFound: return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 case core.Wait: - klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName) + lh.Info("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName) _, pg := cs.pgMgr.GetPodGroup(ctx, pod) if wait := util.GetWaitTimeDuration(pg, cs.scheduleTimeout); wait != 0 { waitTime = wait } retStatus = framework.NewStatus(framework.Wait) // We will also request to move the sibling pods back to activeQ. - cs.pgMgr.ActivateSiblings(pod, state) + cs.pgMgr.ActivateSiblings(ctx, pod, state) case core.Success: pgFullName := util.GetPodGroupFullName(pod) cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if util.GetPodGroupFullName(waitingPod.GetPod()) == pgFullName { - klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod())) + lh.V(3).Info("Permit allows", "pod", klog.KObj(waitingPod.GetPod())) waitingPod.Allow(cs.Name()) } }) - klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod)) + lh.V(3).Info("Permit allows", "pod", klog.KObj(pod)) retStatus = framework.NewStatus(framework.Success) waitTime = 0 } @@ -244,15 +251,16 @@ func (cs *Coscheduling) Reserve(ctx context.Context, state *framework.CycleState // Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. func (cs *Coscheduling) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { + lh := klog.FromContext(ctx) pgName, pg := cs.pgMgr.GetPodGroup(ctx, pod) if pg == nil { return } cs.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if waitingPod.GetPod().Namespace == pod.Namespace && util.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { - klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg)) + lh.V(3).Info("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()), "podGroup", klog.KObj(pg)) waitingPod.Reject(cs.Name(), "rejection in Unreserve") } }) - cs.pgMgr.DeletePermittedPodGroup(pgName) + cs.pgMgr.DeletePermittedPodGroup(ctx, pgName) } diff --git a/pkg/networkaware/networkoverhead/networkoverhead.go b/pkg/networkaware/networkoverhead/networkoverhead.go index bc59602f6..242e0a62c 100644 --- a/pkg/networkaware/networkoverhead/networkoverhead.go +++ b/pkg/networkaware/networkoverhead/networkoverhead.go @@ -139,8 +139,9 @@ func (no *NetworkOverhead) ScoreExtensions() framework.ScoreExtensions { } // New : create an instance of a NetworkOverhead plugin -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - klog.V(4).InfoS("Creating new instance of the NetworkOverhead plugin") +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Creating new instance of the NetworkOverhead plugin") args, err := getArgs(obj) if err != nil { @@ -177,6 +178,7 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle preFilterState := &PreFilterState{ scoreEqually: true, } + logger := klog.FromContext(ctx) // Write initial status state.Write(preFilterStateKey, preFilterState) @@ -188,10 +190,10 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle } // Get AppGroup CR - appGroup := no.findAppGroupNetworkOverhead(agName) + appGroup := no.findAppGroupNetworkOverhead(ctx, logger, agName) // Get NetworkTopology CR - networkTopology := no.findNetworkTopologyNetworkOverhead() + networkTopology := no.findNetworkTopologyNetworkOverhead(ctx, logger) // Sort Costs if manual weights were selected no.sortNetworkTopologyCosts(networkTopology) @@ -220,7 +222,7 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle scheduledList := networkawareutil.GetScheduledList(pods) // Check if scheduledList is empty... if len(scheduledList) == 0 { - klog.ErrorS(nil, "Scheduled list is empty, return") + logger.Error(nil, "Scheduled list is empty, return") return nil, framework.NewStatus(framework.Success, "Scheduled list is empty, return") } @@ -244,7 +246,7 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle // retrieve region and zone labels region := networkawareutil.GetNodeRegion(nodeInfo.Node()) zone := networkawareutil.GetNodeZone(nodeInfo.Node()) - klog.V(6).InfoS("Node info", + logger.V(6).Info("Node info", "name", nodeInfo.Node().Name, "region", region, "zone", zone) @@ -254,13 +256,13 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle // Populate cost map for the given node no.populateCostMap(costMap, networkTopology, region, zone) - klog.V(6).InfoS("Map", "costMap", costMap) + logger.V(6).Info("Map", "costMap", costMap) // Update nodeCostMap nodeCostMap[nodeInfo.Node().Name] = costMap // Get Satisfied and Violated number of dependencies - satisfied, violated, ok := checkMaxNetworkCostRequirements(scheduledList, dependencyList, nodeInfo, region, zone, costMap, no) + satisfied, violated, ok := checkMaxNetworkCostRequirements(logger, scheduledList, dependencyList, nodeInfo, region, zone, costMap, no) if ok != nil { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("pod hostname not found: %v", ok)) } @@ -268,14 +270,14 @@ func (no *NetworkOverhead) PreFilter(ctx context.Context, state *framework.Cycle // Update Satisfied and Violated maps satisfiedMap[nodeInfo.Node().Name] = satisfied violatedMap[nodeInfo.Node().Name] = violated - klog.V(6).InfoS("Number of dependencies", "satisfied", satisfied, "violated", violated) + logger.V(6).Info("Number of dependencies", "satisfied", satisfied, "violated", violated) // Get accumulated cost based on pod dependencies - cost, ok := no.getAccumulatedCost(scheduledList, dependencyList, nodeInfo.Node().Name, region, zone, costMap) + cost, ok := no.getAccumulatedCost(logger, scheduledList, dependencyList, nodeInfo.Node().Name, region, zone, costMap) if ok != nil { return nil, framework.NewStatus(framework.Error, fmt.Sprintf("getting pod hostname from Snapshot: %v", ok)) } - klog.V(6).InfoS("Node final cost", "cost", cost) + logger.V(6).Info("Node final cost", "cost", cost) finalCostMap[nodeInfo.Node().Name] = cost } @@ -330,24 +332,25 @@ func (no *NetworkOverhead) Filter(ctx context.Context, if nodeInfo.Node() == nil { return framework.NewStatus(framework.Error, "node not found") } + logger := klog.FromContext(ctx) // Get PreFilterState preFilterState, err := getPreFilterState(cycleState) if err != nil { - klog.ErrorS(err, "Failed to read preFilterState from cycleState", "preFilterStateKey", preFilterStateKey) + logger.Error(err, "Failed to read preFilterState from cycleState", "preFilterStateKey", preFilterStateKey) return framework.NewStatus(framework.Error, "not eligible due to failed to read from cycleState") } // If scoreEqually, return nil if preFilterState.scoreEqually { - klog.V(6).InfoS("Score all nodes equally, return") + logger.V(6).Info("Score all nodes equally, return") return nil } // Get satisfied and violated number of dependencies satisfied := preFilterState.satisfiedMap[nodeInfo.Node().Name] violated := preFilterState.violatedMap[nodeInfo.Node().Name] - klog.V(6).InfoS("Number of dependencies:", "satisfied", satisfied, "violated", violated) + logger.V(6).Info("Number of dependencies:", "satisfied", satisfied, "violated", violated) // The pod is filtered out if the number of violated dependencies is higher than the satisfied ones if violated > satisfied { @@ -364,10 +367,11 @@ func (no *NetworkOverhead) Score(ctx context.Context, nodeName string) (int64, *framework.Status) { score := framework.MinNodeScore + logger := klog.FromContext(ctx) // Get PreFilterState preFilterState, err := getPreFilterState(cycleState) if err != nil { - klog.ErrorS(err, "Failed to read preFilterState from cycleState", "preFilterStateKey", preFilterStateKey) + logger.Error(err, "Failed to read preFilterState from cycleState", "preFilterStateKey", preFilterStateKey) return score, framework.NewStatus(framework.Error, "not eligible due to failed to read from cycleState, return min score") } @@ -378,7 +382,7 @@ func (no *NetworkOverhead) Score(ctx context.Context, // Return Accumulated Cost as score score = preFilterState.finalCostMap[nodeName] - klog.V(4).InfoS("Score:", "pod", pod.GetName(), "node", nodeName, "finalScore", score) + logger.V(4).Info("Score:", "pod", pod.GetName(), "node", nodeName, "finalScore", score) return score, framework.NewStatus(framework.Success, "Accumulated cost added as score, normalization ensures lower costs are favored") } @@ -387,7 +391,8 @@ func (no *NetworkOverhead) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status { - klog.V(4).InfoS("before normalization: ", "scores", scores) + logger := klog.FromContext(ctx) + logger.V(4).Info("before normalization: ", "scores", scores) // Get Min and Max Scores to normalize between framework.MaxNodeScore and framework.MinNodeScore minCost, maxCost := getMinMaxScores(scores) @@ -409,7 +414,7 @@ func (no *NetworkOverhead) NormalizeScore(ctx context.Context, scores[i].Score = framework.MaxNodeScore - int64(normCost) } } - klog.V(4).InfoS("after normalization: ", "scores", scores) + logger.V(4).Info("after normalization: ", "scores", scores) return nil } @@ -494,6 +499,7 @@ func (no *NetworkOverhead) populateCostMap( // checkMaxNetworkCostRequirements : verifies the number of met and unmet dependencies based on the pod being filtered func checkMaxNetworkCostRequirements( + logger klog.Logger, scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeInfo *framework.NodeInfo, @@ -522,7 +528,7 @@ func checkMaxNetworkCostRequirements( // If Nodes are not the same, get NodeInfo from pod Hostname podNodeInfo, err := no.handle.SnapshotSharedLister().NodeInfos().Get(podAllocated.Hostname) if err != nil { - klog.ErrorS(nil, "getting pod nodeInfo %q from Snapshot: %v", podNodeInfo, err) + logger.Error(nil, "getting pod nodeInfo %q from Snapshot: %v", podNodeInfo, err) return satisfied, violated, err } @@ -569,6 +575,7 @@ func checkMaxNetworkCostRequirements( // getAccumulatedCost : calculate the accumulated cost based on the Pod's dependencies func (no *NetworkOverhead) getAccumulatedCost( + logger klog.Logger, scheduledList networkawareutil.ScheduledList, dependencyList []agv1alpha1.DependenciesInfo, nodeName string, @@ -592,7 +599,7 @@ func (no *NetworkOverhead) getAccumulatedCost( // Get NodeInfo from pod Hostname podNodeInfo, err := no.handle.SnapshotSharedLister().NodeInfos().Get(podAllocated.Hostname) if err != nil { - klog.ErrorS(nil, "getting pod hostname %q from Snapshot: %v", podNodeInfo, err) + logger.Error(nil, "getting pod hostname %q from Snapshot: %v", podNodeInfo, err) return cost, err } // Get zone and region from Pod Hostname @@ -646,18 +653,18 @@ func getPreFilterState(cycleState *framework.CycleState) (*PreFilterState, error return state, nil } -func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha1.AppGroup { - klog.V(6).InfoS("namespaces: %s", no.namespaces) +func (no *NetworkOverhead) findAppGroupNetworkOverhead(ctx context.Context, logger klog.Logger, agName string) *agv1alpha1.AppGroup { + logger.V(6).Info("namespaces: %s", no.namespaces) for _, namespace := range no.namespaces { - klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName) + logger.V(6).Info("appGroup CR", "namespace", namespace, "name", agName) // AppGroup could not be placed in several namespaces simultaneously appGroup := &agv1alpha1.AppGroup{} - err := no.Get(context.TODO(), client.ObjectKey{ + err := no.Get(ctx, client.ObjectKey{ Namespace: namespace, Name: agName, }, appGroup) if err != nil { - klog.V(4).ErrorS(err, "Cannot get AppGroup from AppGroupNamespaceLister:") + logger.V(4).Error(err, "Cannot get AppGroup from AppGroupNamespaceLister:") continue } if appGroup != nil && appGroup.GetUID() != "" { @@ -667,18 +674,18 @@ func (no *NetworkOverhead) findAppGroupNetworkOverhead(agName string) *agv1alpha return nil } -func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead() *ntv1alpha1.NetworkTopology { - klog.V(6).InfoS("namespaces: %s", no.namespaces) +func (no *NetworkOverhead) findNetworkTopologyNetworkOverhead(ctx context.Context, logger klog.Logger) *ntv1alpha1.NetworkTopology { + logger.V(6).Info("namespaces: %s", no.namespaces) for _, namespace := range no.namespaces { - klog.V(6).InfoS("networkTopology CR:", "namespace", namespace, "name", no.ntName) + logger.V(6).Info("networkTopology CR:", "namespace", namespace, "name", no.ntName) // NetworkTopology could not be placed in several namespaces simultaneously networkTopology := &ntv1alpha1.NetworkTopology{} - err := no.Get(context.TODO(), client.ObjectKey{ + err := no.Get(ctx, client.ObjectKey{ Namespace: namespace, Name: no.ntName, }, networkTopology) if err != nil { - klog.V(4).ErrorS(err, "Cannot get networkTopology from networkTopologyNamespaceLister:") + logger.V(4).Error(err, "Cannot get networkTopology from networkTopologyNamespaceLister:") continue } if networkTopology != nil && networkTopology.GetUID() != "" { diff --git a/pkg/networkaware/topologicalsort/topologicalsort.go b/pkg/networkaware/topologicalsort/topologicalsort.go index 326a2db95..a360347e8 100644 --- a/pkg/networkaware/topologicalsort/topologicalsort.go +++ b/pkg/networkaware/topologicalsort/topologicalsort.go @@ -72,8 +72,9 @@ func getArgs(obj runtime.Object) (*pluginconfig.TopologicalSortArgs, error) { } // New : create an instance of a TopologicalSort plugin -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - klog.V(4).InfoS("Creating new instance of the TopologicalSort plugin") +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Creating new instance of the TopologicalSort plugin") args, err := getArgs(obj) if err != nil { @@ -101,18 +102,20 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew func (ts *TopologicalSort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { p1AppGroup := networkawareutil.GetPodAppGroupLabel(pInfo1.Pod) p2AppGroup := networkawareutil.GetPodAppGroupLabel(pInfo2.Pod) + ctx := context.TODO() + logger := klog.FromContext(ctx) // If pods do not belong to an AppGroup, or being to different AppGroups, follow vanilla QoS Sort if p1AppGroup != p2AppGroup || len(p1AppGroup) == 0 { - klog.V(4).InfoS("Pods do not belong to the same AppGroup CR", "p1AppGroup", p1AppGroup, "p2AppGroup", p2AppGroup) + logger.V(4).Info("Pods do not belong to the same AppGroup CR", "p1AppGroup", p1AppGroup, "p2AppGroup", p2AppGroup) s := &queuesort.PrioritySort{} return s.Less(pInfo1, pInfo2) } // Pods belong to the same appGroup, get the CR - klog.V(6).InfoS("Pods belong to the same AppGroup CR", "p1 name", pInfo1.Pod.Name, "p2 name", pInfo2.Pod.Name, "appGroup", p1AppGroup) + logger.V(6).Info("Pods belong to the same AppGroup CR", "p1 name", pInfo1.Pod.Name, "p2 name", pInfo2.Pod.Name, "appGroup", p1AppGroup) agName := p1AppGroup - appGroup := ts.findAppGroupTopologicalSort(agName) + appGroup := ts.findAppGroupTopologicalSort(ctx, logger, agName) // Get labels from both pods labelsP1 := pInfo1.Pod.GetLabels() @@ -122,24 +125,23 @@ func (ts *TopologicalSort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool { orderP1 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP1[agv1alpha.AppGroupSelectorLabel]) orderP2 := networkawareutil.FindPodOrder(appGroup.Status.TopologyOrder, labelsP2[agv1alpha.AppGroupSelectorLabel]) - klog.V(6).InfoS("Pod order values", "p1 order", orderP1, "p2 order", orderP2) + logger.V(6).Info("Pod order values", "p1 order", orderP1, "p2 order", orderP2) // Lower is better return orderP1 <= orderP2 } -func (ts *TopologicalSort) findAppGroupTopologicalSort(agName string) *agv1alpha.AppGroup { - klog.V(6).InfoS("namespaces: %s", ts.namespaces) +func (ts *TopologicalSort) findAppGroupTopologicalSort(ctx context.Context, logger klog.Logger, agName string) *agv1alpha.AppGroup { for _, namespace := range ts.namespaces { - klog.V(6).InfoS("appGroup CR", "namespace", namespace, "name", agName) + logger.V(6).Info("appGroup CR", "namespace", namespace, "name", agName) // AppGroup couldn't be placed in several namespaces simultaneously appGroup := &agv1alpha.AppGroup{} - err := ts.Get(context.TODO(), client.ObjectKey{ + err := ts.Get(ctx, client.ObjectKey{ Namespace: namespace, Name: agName, }, appGroup) if err != nil { - klog.V(4).InfoS("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err) + logger.V(4).Info("Cannot get AppGroup from AppGroupNamespaceLister:", "error", err) continue } if appGroup != nil { diff --git a/pkg/noderesources/allocatable.go b/pkg/noderesources/allocatable.go index f394035a8..4bd71cc13 100644 --- a/pkg/noderesources/allocatable.go +++ b/pkg/noderesources/allocatable.go @@ -59,6 +59,7 @@ func validateResources(resources []schedulerconfig.ResourceSpec) error { // Score invoked at the score extension point. func (alloc *Allocatable) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + logger := klog.FromContext(ctx) nodeInfo, err := alloc.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) @@ -68,7 +69,7 @@ func (alloc *Allocatable) Score(ctx context.Context, state *framework.CycleState // It calculates the sum of the node's weighted allocatable resources. // // Note: the returned "score" is negative for least allocatable, and positive for most allocatable. - return alloc.score(pod, nodeInfo) + return alloc.score(logger, pod, nodeInfo) } // ScoreExtensions of the Score plugin. diff --git a/pkg/noderesources/resource_allocation.go b/pkg/noderesources/resource_allocation.go index 1e29873df..b4b26e5b7 100644 --- a/pkg/noderesources/resource_allocation.go +++ b/pkg/noderesources/resource_allocation.go @@ -47,6 +47,7 @@ type resourceToValueMap map[v1.ResourceName]int64 // score will use `scorer` function to calculate the score. func (r *resourceAllocationScorer) score( + logger klog.Logger, pod *v1.Pod, nodeInfo *framework.NodeInfo) (int64, *framework.Status) { node := nodeInfo.Node() @@ -59,13 +60,13 @@ func (r *resourceAllocationScorer) score( requested := make(resourceToValueMap, len(r.resourceToWeightMap)) allocatable := make(resourceToValueMap, len(r.resourceToWeightMap)) for resource := range r.resourceToWeightMap { - allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource) + allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(logger, nodeInfo, pod, resource) } score := r.scorer(requested, allocatable) - if klog.V(10).Enabled() { - klog.InfoS("Resources and score", + if logger.V(10).Enabled() { + logger.Info("Resources and score", "podName", pod.Name, "nodeName", node.Name, "scorer", r.Name, "allocatableResources", allocatable, "requestedResources", requested, "score", score) @@ -75,7 +76,7 @@ func (r *resourceAllocationScorer) score( } // calculateResourceAllocatableRequest returns resources Allocatable and Requested values -func calculateResourceAllocatableRequest(nodeInfo *framework.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) { +func calculateResourceAllocatableRequest(logger klog.Logger, nodeInfo *framework.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) { podRequest := calculatePodResourceRequest(pod, resource) switch resource { case v1.ResourceCPU: @@ -90,8 +91,8 @@ func calculateResourceAllocatableRequest(nodeInfo *framework.NodeInfo, pod *v1.P return nodeInfo.Allocatable.ScalarResources[resource], (nodeInfo.Requested.ScalarResources[resource] + podRequest) } } - if klog.V(10).Enabled() { - klog.InfoS("Requested resource not considered for node score calculation", + if logger.V(10).Enabled() { + logger.Info("Requested resource not considered for node score calculation", "resource", resource, ) } diff --git a/pkg/preemptiontoleration/preemption_toleration.go b/pkg/preemptiontoleration/preemption_toleration.go index c71f17e82..7ab7767d1 100644 --- a/pkg/preemptiontoleration/preemption_toleration.go +++ b/pkg/preemptiontoleration/preemption_toleration.go @@ -129,6 +129,8 @@ func ExemptedFromPreemption( pcLister schedulinglisters.PriorityClassLister, now time.Time, ) (bool, error) { + logger := klog.FromContext(context.TODO()) + if victimCandidate.Spec.PriorityClassName == "" { return false, nil } @@ -149,7 +151,7 @@ func ExemptedFromPreemption( policy, err := parsePreemptionTolerationPolicy(*victimPriorityClass) if err != nil { // if any error raised, no toleration at all - klog.ErrorS(err, "Failed to parse preemption toleration policy of victim candidate's priorityclass. This victim candidate can't tolerate the preemption", + logger.Error(err, "Failed to parse preemption toleration policy of victim candidate's priorityclass. This victim candidate can't tolerate the preemption", "PreemptorPod", klog.KObj(preemptor), "VictimCandidatePod", klog.KObj(victimCandidate), "VictimCandidatePriorityClass", klog.KRef("", victimPriorityClass.Name), @@ -219,7 +221,7 @@ func (pl *PreemptionToleration) SelectVictimsOnNode( // For a pod with lower priority, check if it can be exempted from the preemption. exempted, err := ExemptedFromPreemption(pi.Pod, preemptor, pl.priorityClassLister, pl.curTime) if err != nil { - klog.ErrorS(err, "Encountered error while selecting victims on node", "Node", nodeInfo.Node().Name) + logger.Error(err, "Encountered error while selecting victims on node", "Node", nodeInfo.Node().Name) return nil, 0, framework.AsStatus(err) } @@ -265,7 +267,7 @@ func (pl *PreemptionToleration) SelectVictimsOnNode( } rpi := pi.Pod victims = append(victims, rpi) - klog.V(5).InfoS("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node())) + logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node())) } return fits, nil } @@ -324,8 +326,9 @@ func (pl *PreemptionToleration) calculateNumCandidates(numNodes int32) int32 { // We look at the node that is nominated for this pod and as long as there are // terminating pods on the node, we don't consider this for preempting more pods. func (pl *PreemptionToleration) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) { + logger := klog.FromContext(context.TODO()) if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { - klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod)) + logger.V(5).Info("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod)) return false, "not eligible due to preemptionPolicy=Never." } nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos() diff --git a/pkg/sysched/sysched.go b/pkg/sysched/sysched.go index 3c8289cc1..4f8359f10 100644 --- a/pkg/sysched/sysched.go +++ b/pkg/sysched/sysched.go @@ -17,9 +17,9 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/security-profiles-operator/api/seccompprofile/v1beta1" - "sigs.k8s.io/controller-runtime/pkg/client" pluginconfig "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) @@ -120,7 +120,7 @@ func (sc *SySched) readSPOProfileCR(name string, namespace string) (sets.Set[str // SPO is used to generate and input the seccomp profile to a pod // If a pod does not have a SPO seccomp profile, then an unconfined // system call set is return for the pod -func (sc *SySched) getSyscalls(pod *v1.Pod) sets.Set[string] { +func (sc *SySched) getSyscalls(logger klog.Logger, pod *v1.Pod) sets.Set[string] { r := sets.New[string]() // read the seccomp profile from the security context of a pod @@ -134,7 +134,7 @@ func (sc *SySched) getSyscalls(pod *v1.Pod) sets.Set[string] { if len(ns) > 0 && len(name) > 0 { syscalls, err := sc.readSPOProfileCR(name, ns) if err != nil { - klog.ErrorS(err, "Failed to read syscall CR by parsing pod security context") + logger.Error(err, "Failed to read syscall CR by parsing pod security context") } if len(syscalls) > 0 { @@ -155,7 +155,7 @@ func (sc *SySched) getSyscalls(pod *v1.Pod) sets.Set[string] { if len(ns) > 0 && len(name) > 0 { syscalls, err := sc.readSPOProfileCR(name, ns) if err != nil { - klog.ErrorS(err, "Failed to read syscall CR by parsing container security context") + logger.Error(err, "Failed to read syscall CR by parsing container security context") } if len(syscalls) > 0 { @@ -179,7 +179,7 @@ func (sc *SySched) getSyscalls(pod *v1.Pod) sets.Set[string] { syscalls, err := sc.readSPOProfileCR(name, ns) if err != nil { - klog.ErrorS(err, "Failed to read syscall CR by parsing pod annotation") + logger.Error(err, "Failed to read syscall CR by parsing pod annotation") continue } @@ -196,7 +196,7 @@ func (sc *SySched) getSyscalls(pod *v1.Pod) sets.Set[string] { if len(r) == 0 { syscalls, err := sc.readSPOProfileCR(sc.DefaultProfileName, sc.DefaultProfileNamespace) if err != nil { - klog.ErrorS(err, "Failed to read the CR of all syscalls") + logger.Error(err, "Failed to read the CR of all syscalls") } if syscalls.Len() > 0 { @@ -212,7 +212,7 @@ func (sc *SySched) Name() string { return Name } -func (sc *SySched) calcScore(syscalls sets.Set[string]) int { +func (sc *SySched) calcScore(logger klog.Logger, syscalls sets.Set[string]) int { // Currently, score is not adjusted based on critical/cve syscalls. // NOTE: weight W is hardcoded for now // TODO: add critical/cve syscalls @@ -222,13 +222,14 @@ func (sc *SySched) calcScore(syscalls sets.Set[string]) int { score := syscalls.Len() - totCrit score = score + W*totCrit - klog.V(10).InfoS("Score: ", "score", score, "tot_crit", totCrit) + logger.V(10).Info("Score: ", "score", score, "tot_crit", totCrit) return score } // Score invoked at the score extension point. func (sc *SySched) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + logger := klog.FromContext(ctx) // Read directly from API server because cached state in SnapSharedLister not always up-to-date // especially during initial scheduler start. node, err := sc.handle.ClientSet().CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) @@ -236,7 +237,7 @@ func (sc *SySched) Score(ctx context.Context, cs *framework.CycleState, pod *v1. return 0, nil } - podSyscalls := sc.getSyscalls(pod) + podSyscalls := sc.getSyscalls(logger, pod) // NOTE: this condition is true only when a pod does not // have a syscall profile, or the unconfined syscall is @@ -245,7 +246,7 @@ func (sc *SySched) Score(ctx context.Context, cs *framework.CycleState, pod *v1. return math.MaxInt64, nil } - _, hostSyscalls := sc.getHostSyscalls(node.Name) + _, hostSyscalls := sc.getHostSyscalls(logger, node.Name) // when a host or node does not have any pods // running, the extraneous syscall score is zero @@ -254,30 +255,31 @@ func (sc *SySched) Score(ctx context.Context, cs *framework.CycleState, pod *v1. } diffSyscalls := hostSyscalls.Difference(podSyscalls) - totalDiffs := sc.calcScore(diffSyscalls) + totalDiffs := sc.calcScore(logger, diffSyscalls) // add the difference existing pods will see if new Pod is added into this host newHostSyscalls := hostSyscalls.Clone() newHostSyscalls = newHostSyscalls.Union(podSyscalls) for _, p := range sc.HostToPods[node.Name] { - podSyscalls = sc.getSyscalls(p) + podSyscalls = sc.getSyscalls(logger, p) diffSyscalls = newHostSyscalls.Difference(podSyscalls) - totalDiffs += sc.calcScore(diffSyscalls) + totalDiffs += sc.calcScore(logger, diffSyscalls) } sc.ExSAvg = sc.ExSAvg + (float64(totalDiffs)-sc.ExSAvg)/float64(sc.ExSAvgCount) sc.ExSAvgCount += 1 - klog.V(10).Info("ExSAvg: ", sc.ExSAvg) - klog.V(10).InfoS("Score: ", "totalDiffs", totalDiffs, "pod", pod.Name, "node", nodeName) + logger.V(10).Info("ExSAvg: ", sc.ExSAvg) + logger.V(10).Info("Score: ", "totalDiffs", totalDiffs, "pod", pod.Name, "node", nodeName) return int64(totalDiffs), nil } func (sc *SySched) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { - klog.V(10).InfoS("Original: ", "scores", scores, "pod", pod.Name) + logger := klog.FromContext(ctx) + logger.V(10).Info("Original: ", "scores", scores, "pod", pod.Name) ret := helper.DefaultNormalizeScore(framework.MaxNodeScore, true, scores) - klog.V(10).InfoS("Normalized: ", "scores", scores, "pod", pod.Name) + logger.V(10).Info("Normalized: ", "scores", scores, "pod", pod.Name) return ret } @@ -287,22 +289,22 @@ func (sc *SySched) ScoreExtensions() framework.ScoreExtensions { return sc } -func (sc *SySched) getHostSyscalls(nodeName string) (int, sets.Set[string]) { +func (sc *SySched) getHostSyscalls(logger klog.Logger, nodeName string) (int, sets.Set[string]) { count := 0 h, ok := sc.HostSyscalls[nodeName] if !ok { - klog.V(5).Infof("getHostSyscalls: no nodeName %s", nodeName) + logger.V(5).Info(fmt.Sprintf("getHostSyscalls: no nodeName %s", nodeName)) return count, nil } return h.Len(), h } -func (sc *SySched) updateHostSyscalls(pod *v1.Pod) { - syscall := sc.getSyscalls(pod) +func (sc *SySched) updateHostSyscalls(logger klog.Logger, pod *v1.Pod) { + syscall := sc.getSyscalls(logger, pod) sc.HostSyscalls[pod.Spec.NodeName] = sc.HostSyscalls[pod.Spec.NodeName].Union(syscall) } -func (sc *SySched) addPod(pod *v1.Pod) { +func (sc *SySched) addPod(logger klog.Logger, pod *v1.Pod) { nodeName := pod.Spec.NodeName name := pod.Name @@ -311,7 +313,7 @@ func (sc *SySched) addPod(pod *v1.Pod) { sc.HostToPods[nodeName] = make([]*v1.Pod, 0) sc.HostToPods[nodeName] = append(sc.HostToPods[nodeName], pod) sc.HostSyscalls[nodeName] = sets.New[string]() - sc.updateHostSyscalls(pod) + sc.updateHostSyscalls(logger, pod) return } @@ -322,36 +324,36 @@ func (sc *SySched) addPod(pod *v1.Pod) { } sc.HostToPods[nodeName] = append(sc.HostToPods[nodeName], pod) - sc.updateHostSyscalls(pod) + sc.updateHostSyscalls(logger, pod) return } -func (sc *SySched) recomputeHostSyscalls(pods []*v1.Pod) sets.Set[string] { +func (sc *SySched) recomputeHostSyscalls(logger klog.Logger, pods []*v1.Pod) sets.Set[string] { syscalls := sets.New[string]() for _, p := range pods { - syscall := sc.getSyscalls(p) + syscall := sc.getSyscalls(logger, p) syscalls = syscalls.Union(syscall) } return syscalls } -func (sc *SySched) removePod(pod *v1.Pod) { +func (sc *SySched) removePod(logger klog.Logger, pod *v1.Pod) { nodeName := pod.Spec.NodeName _, ok := sc.HostToPods[nodeName] if !ok { - klog.V(5).Infof("removePod: Host %s not yet cached", nodeName) + logger.V(5).Info(fmt.Sprintf("removePod: Host %s not yet cached", nodeName)) return } for i, p := range sc.HostToPods[nodeName] { if p.Name == pod.Name { sc.HostToPods[nodeName] = remove(sc.HostToPods[nodeName], i) - sc.HostSyscalls[nodeName] = sc.recomputeHostSyscalls(sc.HostToPods[nodeName]) - c, _ := sc.getHostSyscalls(nodeName) - klog.V(5).InfoS("remaining ", "syscalls", c, "node", nodeName) + sc.HostSyscalls[nodeName] = sc.recomputeHostSyscalls(logger, sc.HostToPods[nodeName]) + c, _ := sc.getHostSyscalls(logger, nodeName) + logger.V(5).Info("remaining ", "syscalls", c, "node", nodeName) return } } @@ -360,30 +362,33 @@ func (sc *SySched) removePod(pod *v1.Pod) { } func (sc *SySched) podAdded(obj interface{}) { + logger := klog.FromContext(context.TODO()) pod := obj.(*v1.Pod) // Add already running pod to map // This is for when our scheduler comes up after other pods if pod.Status.Phase == v1.PodRunning { - klog.V(10).Infof("POD ADDED: %s/%s phase: %s", pod.Namespace, pod.Name, pod.Status.Phase) - sc.addPod(pod) + logger.V(10).Info(fmt.Sprintf("POD ADDED: %s/%s phase: %s", pod.Namespace, pod.Name, pod.Status.Phase)) + sc.addPod(logger, pod) } } func (sc *SySched) podUpdated(old, new interface{}) { + logger := klog.FromContext(context.TODO()) pod := old.(*v1.Pod) // Pod has been assigned to node, now can add to our map if pod.Status.Phase == v1.PodPending && pod.Status.HostIP != "" { - klog.V(10).Infof("POD UPDATED. %s/%s", pod.Namespace, pod.Name) - sc.addPod(pod) + logger.V(10).Info(fmt.Sprintf("POD UPDATED. %s/%s", pod.Namespace, pod.Name)) + sc.addPod(logger, pod) } } func (sc *SySched) podDeleted(obj interface{}) { + logger := klog.FromContext(context.TODO()) pod := obj.(*v1.Pod) - klog.V(10).Infof("POD DELETED: %s/%s", pod.Namespace, pod.Name) - sc.removePod(pod) + logger.V(10).Info(fmt.Sprintf("POD DELETED: %s/%s", pod.Namespace, pod.Name)) + sc.removePod(logger, pod) } // getArgs : returns the arguments for the SySchedArg plugin. diff --git a/pkg/sysched/sysched_test.go b/pkg/sysched/sysched_test.go index 012c61990..1eea8c39b 100644 --- a/pkg/sysched/sysched_test.go +++ b/pkg/sysched/sysched_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -14,6 +15,7 @@ import ( clientsetfake "k8s.io/client-go/kubernetes/fake" clientscheme "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" @@ -311,7 +313,8 @@ func TestGetSyscalls(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - syscalls := sys.getSyscalls(tt.pod) + logger := klog.FromContext(context.TODO()) + syscalls := sys.getSyscalls(logger, tt.pod) assert.NotNil(t, syscalls) assert.EqualValues(t, tt.expected, syscalls.Len()) }) @@ -333,7 +336,8 @@ func TestCalcScore(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - score := sys.calcScore(tt.syscalls) + logger := klog.FromContext(context.TODO()) + score := sys.calcScore(logger, tt.syscalls) assert.EqualValues(t, tt.expected, score) }) } @@ -351,6 +355,7 @@ func TestScore(t *testing.T) { // fake out the framework handle ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := klog.FromContext(ctx) fr, err := tf.NewFramework(ctx, registeredPlugins, Name, frameworkruntime.WithClientSet(clientsetfake.NewSimpleClientset(node.Obj()))) if err != nil { @@ -381,7 +386,7 @@ func TestScore(t *testing.T) { sys.ExSAvg = 0 sys.ExSAvgCount = 0 - sys.addPod(pod) + sys.addPod(logger, pod) tests := []struct { name string @@ -479,14 +484,15 @@ func TestGetHostSyscalls(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger := klog.FromContext(context.TODO()) sys, _ := mockSysched() sys.ExSAvgCount = 0 for i := range tt.pods { - sys.addPod(tt.pods[i]) + sys.addPod(logger, tt.pods[i]) } - cnt, _ := sys.getHostSyscalls(tt.nodeName) + cnt, _ := sys.getHostSyscalls(logger, tt.nodeName) assert.EqualValues(t, tt.expected, cnt) }) } @@ -524,6 +530,7 @@ func TestUpdateHostSyscalls(t *testing.T) { // fake out the framework handle ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logger := klog.FromContext(ctx) nodeItems := []v1.Node{} for _, node := range tt.nodes { nodeItems = append(nodeItems, *node) @@ -559,13 +566,13 @@ func TestUpdateHostSyscalls(t *testing.T) { sys.ExSAvgCount = 0 for i := range tt.basePods { - sys.addPod(tt.basePods[i]) + sys.addPod(logger, tt.basePods[i]) } for i := range tt.newPods { - sys.updateHostSyscalls(tt.newPods[i]) + sys.updateHostSyscalls(logger, tt.newPods[i]) } - sc, _ := sys.getHostSyscalls("test") + sc, _ := sys.getHostSyscalls(logger, "test") assert.EqualValues(t, tt.expected, sc) }) } @@ -591,8 +598,9 @@ func TestAddPod(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger := klog.FromContext(context.TODO()) for i := range tt.pods { - sys.addPod(tt.pods[i]) + sys.addPod(logger, tt.pods[i]) } for i := range sys.HostToPods["test"] { assert.EqualValues(t, tt.pods[i], sys.HostToPods["test"][i]) @@ -623,7 +631,8 @@ func TestRecomputeHostSyscalls(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - syscalls := sys.recomputeHostSyscalls(tt.pods) + logger := klog.FromContext(context.TODO()) + syscalls := sys.recomputeHostSyscalls(logger, tt.pods) assert.EqualValues(t, tt.expected, len(syscalls)) }) } @@ -655,12 +664,13 @@ func TestRemovePod(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger := klog.FromContext(context.TODO()) for i := range tt.pods { - sys.addPod(tt.pods[i]) + sys.addPod(logger, tt.pods[i]) } for i := range tt.removePods { - sys.removePod(tt.removePods[i]) + sys.removePod(logger, tt.removePods[i]) } assert.EqualValues(t, tt.expectedPodNum, len(sys.HostToPods["test"])) @@ -729,8 +739,9 @@ func TestPodUpdated(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger := klog.FromContext(context.TODO()) for i := range tt.basePods { - sys.addPod(tt.basePods[i]) + sys.addPod(logger, tt.basePods[i]) } for i := range tt.updatedPods { @@ -769,8 +780,9 @@ func TestPodDeleted(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger := klog.FromContext(context.TODO()) for i := range tt.basePods { - sys.addPod(tt.basePods[i]) + sys.addPod(logger, tt.basePods[i]) } for i := range tt.deletedPods { diff --git a/pkg/trimaran/collector.go b/pkg/trimaran/collector.go index 2208cedec..69bb9b65a 100644 --- a/pkg/trimaran/collector.go +++ b/pkg/trimaran/collector.go @@ -49,11 +49,11 @@ type Collector struct { } // NewCollector : create an instance of a data collector -func NewCollector(trimaranSpec *pluginConfig.TrimaranSpec) (*Collector, error) { +func NewCollector(logger klog.Logger, trimaranSpec *pluginConfig.TrimaranSpec) (*Collector, error) { if err := checkSpecs(trimaranSpec); err != nil { return nil, err } - klog.V(4).InfoS("Using TrimaranSpec", "type", trimaranSpec.MetricProvider.Type, + logger.V(4).Info("Using TrimaranSpec", "type", trimaranSpec.MetricProvider.Type, "address", trimaranSpec.MetricProvider.Address, "watcher", trimaranSpec.WatcherAddress) var client loadwatcherapi.Client @@ -74,17 +74,17 @@ func NewCollector(trimaranSpec *pluginConfig.TrimaranSpec) (*Collector, error) { } // populate metrics before returning - err := collector.updateMetrics() + err := collector.updateMetrics(logger) if err != nil { - klog.ErrorS(err, "Unable to populate metrics initially") + logger.Error(err, "Unable to populate metrics initially") } // start periodic updates go func() { metricsUpdaterTicker := time.NewTicker(time.Second * metricsUpdateIntervalSeconds) for range metricsUpdaterTicker.C { - err = collector.updateMetrics() + err = collector.updateMetrics(logger) if err != nil { - klog.ErrorS(err, "Unable to update metrics") + logger.Error(err, "Unable to update metrics") } } }() @@ -100,16 +100,16 @@ func (collector *Collector) getAllMetrics() *watcher.WatcherMetrics { } // GetNodeMetrics : get metrics for a node from watcher -func (collector *Collector) GetNodeMetrics(nodeName string) ([]watcher.Metric, *watcher.WatcherMetrics) { +func (collector *Collector) GetNodeMetrics(logger klog.Logger, nodeName string) ([]watcher.Metric, *watcher.WatcherMetrics) { allMetrics := collector.getAllMetrics() // This happens if metrics were never populated since scheduler started if allMetrics.Data.NodeMetricsMap == nil { - klog.ErrorS(nil, "Metrics not available from watcher") + logger.Error(nil, "Metrics not available from watcher") return nil, nil } // Check if node is new (no metrics yet) or metrics are unavailable due to 404 or 500 if _, ok := allMetrics.Data.NodeMetricsMap[nodeName]; !ok { - klog.ErrorS(nil, "Unable to find metrics for node", "nodeName", nodeName) + logger.Error(nil, "Unable to find metrics for node", "nodeName", nodeName) return nil, allMetrics } return allMetrics.Data.NodeMetricsMap[nodeName].Metrics, allMetrics @@ -130,10 +130,10 @@ func checkSpecs(trimaranSpec *pluginConfig.TrimaranSpec) error { } // updateMetrics : request to load watcher to update all metrics -func (collector *Collector) updateMetrics() error { +func (collector *Collector) updateMetrics(logger klog.Logger) error { metrics, err := collector.client.GetLatestWatcherMetrics() if err != nil { - klog.ErrorS(err, "Load watcher client failed") + logger.Error(err, "Load watcher client failed") return err } collector.mu.Lock() diff --git a/pkg/trimaran/collector_test.go b/pkg/trimaran/collector_test.go index 96c0feb9c..f3c7209e1 100644 --- a/pkg/trimaran/collector_test.go +++ b/pkg/trimaran/collector_test.go @@ -17,6 +17,7 @@ limitations under the License. package trimaran import ( + "context" "encoding/json" "net/http" "net/http/httptest" @@ -24,6 +25,7 @@ import ( "github.com/paypal/load-watcher/pkg/watcher" "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" pluginConfig "sigs.k8s.io/scheduler-plugins/apis/config" ) @@ -74,7 +76,8 @@ var ( ) func TestNewCollector(t *testing.T) { - col, err := NewCollector(&args) + logger := klog.FromContext(context.TODO()) + col, err := NewCollector(logger, &args) assert.NotNil(t, col) assert.Nil(t, err) } @@ -94,8 +97,8 @@ func TestNewCollectorSpecs(t *testing.T) { WatcherAddress: "", MetricProvider: metricProvider, } - - col, err := NewCollector(&trimaranSpec) + logger := klog.FromContext(context.TODO()) + col, err := NewCollector(logger, &trimaranSpec) assert.Nil(t, col) expectedErr := "invalid MetricProvider.Type, got " + string(metricProvider.Type) assert.EqualError(t, err, expectedErr) @@ -112,7 +115,8 @@ func TestGetAllMetrics(t *testing.T) { trimaranSpec := pluginConfig.TrimaranSpec{ WatcherAddress: server.URL, } - collector, err := NewCollector(&trimaranSpec) + logger := klog.FromContext(context.TODO()) + collector, err := NewCollector(logger, &trimaranSpec) assert.NotNil(t, collector) assert.Nil(t, err) @@ -133,11 +137,12 @@ func TestUpdateMetrics(t *testing.T) { trimaranSpec := pluginConfig.TrimaranSpec{ WatcherAddress: server.URL, } - collector, err := NewCollector(&trimaranSpec) + logger := klog.FromContext(context.TODO()) + collector, err := NewCollector(logger, &trimaranSpec) assert.NotNil(t, collector) assert.Nil(t, err) - err = collector.updateMetrics() + err = collector.updateMetrics(logger) assert.Nil(t, err) } @@ -152,11 +157,12 @@ func TestGetNodeMetrics(t *testing.T) { trimaranSpec := pluginConfig.TrimaranSpec{ WatcherAddress: server.URL, } - collector, err := NewCollector(&trimaranSpec) + logger := klog.FromContext(context.TODO()) + collector, err := NewCollector(logger, &trimaranSpec) assert.NotNil(t, collector) assert.Nil(t, err) nodeName := "node-1" - metrics, allMetrics := collector.GetNodeMetrics(nodeName) + metrics, allMetrics := collector.GetNodeMetrics(logger, nodeName) expectedMetrics := watcherResponse.Data.NodeMetricsMap[nodeName].Metrics assert.EqualValues(t, expectedMetrics, metrics) expectedAllMetrics := &watcherResponse @@ -174,11 +180,12 @@ func TestGetNodeMetricsNilForNode(t *testing.T) { trimaranSpec := pluginConfig.TrimaranSpec{ WatcherAddress: server.URL, } - collector, err := NewCollector(&trimaranSpec) + logger := klog.FromContext(context.TODO()) + collector, err := NewCollector(logger, &trimaranSpec) assert.NotNil(t, collector) assert.Nil(t, err) nodeName := "node-1" - metrics, allMetrics := collector.GetNodeMetrics(nodeName) + metrics, allMetrics := collector.GetNodeMetrics(logger, nodeName) expectedMetrics := noWatcherResponseForNode.Data.NodeMetricsMap[nodeName].Metrics assert.EqualValues(t, expectedMetrics, metrics) assert.NotNil(t, allMetrics) @@ -203,8 +210,8 @@ func TestNewCollectorLoadWatcher(t *testing.T) { WatcherAddress: "", MetricProvider: metricProvider, } - - col, err := NewCollector(&trimaranSpec) + logger := klog.FromContext(context.TODO()) + col, err := NewCollector(logger, &trimaranSpec) assert.NotNil(t, col) assert.Nil(t, err) } diff --git a/pkg/trimaran/handler.go b/pkg/trimaran/handler.go index 9b93cc34b..cd672d41f 100644 --- a/pkg/trimaran/handler.go +++ b/pkg/trimaran/handler.go @@ -21,6 +21,7 @@ Package Trimaran provides common code for plugins developed for real load aware package trimaran import ( + "context" "fmt" "sort" "sync" @@ -56,7 +57,7 @@ type podInfo struct { Pod *v1.Pod } -// Returns a new instance of PodAssignEventHandler, after starting a background go routine for cache cleanup +// New returns a new instance of PodAssignEventHandler, after starting a background go routine for cache cleanup func New() *PodAssignEventHandler { p := PodAssignEventHandler{ScheduledPodsCache: make(map[string][]podInfo)} go func() { @@ -107,6 +108,7 @@ func (p *PodAssignEventHandler) OnUpdate(oldObj, newObj interface{}) { } func (p *PodAssignEventHandler) OnDelete(obj interface{}) { + logger := klog.FromContext(context.TODO()) pod := obj.(*v1.Pod) nodeName := pod.Spec.NodeName p.Lock() @@ -117,7 +119,7 @@ func (p *PodAssignEventHandler) OnDelete(obj interface{}) { for i, v := range p.ScheduledPodsCache[nodeName] { n := len(p.ScheduledPodsCache[nodeName]) if pod.ObjectMeta.UID == v.Pod.ObjectMeta.UID { - klog.V(10).InfoS("Deleting pod", "pod", klog.KObj(v.Pod)) + logger.V(10).Info("Deleting pod", "pod", klog.KObj(v.Pod)) copy(p.ScheduledPodsCache[nodeName][i:], p.ScheduledPodsCache[nodeName][i+1:]) p.ScheduledPodsCache[nodeName][n-1] = podInfo{} p.ScheduledPodsCache[nodeName] = p.ScheduledPodsCache[nodeName][:n-1] diff --git a/pkg/trimaran/loadvariationriskbalancing/analysis.go b/pkg/trimaran/loadvariationriskbalancing/analysis.go index b0db576c0..0b6255427 100644 --- a/pkg/trimaran/loadvariationriskbalancing/analysis.go +++ b/pkg/trimaran/loadvariationriskbalancing/analysis.go @@ -31,9 +31,9 @@ Calculation of risk score for resources given measured data // computeScore : compute score given usage statistics // - risk = [ average + margin * stDev^{1/sensitivity} ] / 2 // - score = ( 1 - risk ) * maxScore -func computeScore(rs *trimaran.ResourceStats, margin float64, sensitivity float64) float64 { +func computeScore(logger klog.Logger, rs *trimaran.ResourceStats, margin float64, sensitivity float64) float64 { if rs.Capacity <= 0 { - klog.ErrorS(nil, "Invalid resource capacity", "capacity", rs.Capacity) + logger.Error(nil, "Invalid resource capacity", "capacity", rs.Capacity) return 0 } @@ -55,6 +55,6 @@ func computeScore(rs *trimaran.ResourceStats, margin float64, sensitivity float6 // evaluate overall risk factor risk := (mu + sigma) / 2 - klog.V(6).InfoS("Evaluating risk factor", "mu", mu, "sigma", sigma, "margin", margin, "sensitivity", sensitivity, "risk", risk) + logger.V(6).Info("Evaluating risk factor", "mu", mu, "sigma", sigma, "margin", margin, "sensitivity", sensitivity, "risk", risk) return (1. - risk) * float64(framework.MaxNodeScore) } diff --git a/pkg/trimaran/loadvariationriskbalancing/analysis_test.go b/pkg/trimaran/loadvariationriskbalancing/analysis_test.go index 72643fb56..2b08631cb 100644 --- a/pkg/trimaran/loadvariationriskbalancing/analysis_test.go +++ b/pkg/trimaran/loadvariationriskbalancing/analysis_test.go @@ -17,10 +17,13 @@ limitations under the License. package loadvariationriskbalancing import ( + "context" "math" "testing" "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "sigs.k8s.io/scheduler-plugins/pkg/trimaran" ) @@ -156,7 +159,8 @@ func TestComputeScore(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - response := int64(math.Round(computeScore(tt.rs, tt.margin, tt.sensitivity))) + logger := klog.FromContext(context.TODO()) + response := int64(math.Round(computeScore(logger, tt.rs, tt.margin, tt.sensitivity))) assert.Equal(t, tt.expected, response) }) } diff --git a/pkg/trimaran/loadvariationriskbalancing/loadvariationriskbalancing.go b/pkg/trimaran/loadvariationriskbalancing/loadvariationriskbalancing.go index 78bcf714b..330810de8 100644 --- a/pkg/trimaran/loadvariationriskbalancing/loadvariationriskbalancing.go +++ b/pkg/trimaran/loadvariationriskbalancing/loadvariationriskbalancing.go @@ -52,18 +52,19 @@ type LoadVariationRiskBalancing struct { var _ framework.ScorePlugin = &LoadVariationRiskBalancing{} // New : create an instance of a LoadVariationRiskBalancing plugin -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - klog.V(4).InfoS("Creating new instance of the LoadVariationRiskBalancing plugin") +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Creating new instance of the LoadVariationRiskBalancing plugin") // cast object into plugin arguments object args, ok := obj.(*pluginConfig.LoadVariationRiskBalancingArgs) if !ok { return nil, fmt.Errorf("want args to be of type LoadVariationRiskBalancingArgs, got %T", obj) } - collector, err := trimaran.NewCollector(&args.TrimaranSpec) + collector, err := trimaran.NewCollector(logger, &args.TrimaranSpec) if err != nil { return nil, err } - klog.V(4).InfoS("Using LoadVariationRiskBalancingArgs", "margin", args.SafeVarianceMargin, "sensitivity", args.SafeVarianceSensitivity) + logger.V(4).Info("Using LoadVariationRiskBalancingArgs", "margin", args.SafeVarianceMargin, "sensitivity", args.SafeVarianceSensitivity) podAssignEventHandler := trimaran.New() podAssignEventHandler.AddToHandle(handle) @@ -79,16 +80,17 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew // Score : evaluate score for a node func (pl *LoadVariationRiskBalancing) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - klog.V(6).InfoS("Calculating score", "pod", klog.KObj(pod), "nodeName", nodeName) + logger := klog.FromContext(ctx) + logger.V(6).Info("Calculating score", "pod", klog.KObj(pod), "nodeName", nodeName) score := framework.MinNodeScore nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { return score, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } // get node metrics - metrics, _ := pl.collector.GetNodeMetrics(nodeName) + metrics, _ := pl.collector.GetNodeMetrics(logger, nodeName) if metrics == nil { - klog.InfoS("Failed to get metrics for node; using minimum score", "nodeName", nodeName) + logger.Info("Failed to get metrics for node; using minimum score", "nodeName", nodeName) return score, nil } podRequest := trimaran.GetResourceRequested(pod) @@ -96,18 +98,18 @@ func (pl *LoadVariationRiskBalancing) Score(ctx context.Context, cycleState *fra // calculate CPU score var cpuScore float64 = 0 - cpuStats, cpuOK := trimaran.CreateResourceStats(metrics, node, podRequest, v1.ResourceCPU, watcher.CPU) + cpuStats, cpuOK := trimaran.CreateResourceStats(logger, metrics, node, podRequest, v1.ResourceCPU, watcher.CPU) if cpuOK { - cpuScore = computeScore(cpuStats, pl.args.SafeVarianceMargin, pl.args.SafeVarianceSensitivity) + cpuScore = computeScore(logger, cpuStats, pl.args.SafeVarianceMargin, pl.args.SafeVarianceSensitivity) } - klog.V(6).InfoS("Calculating CPUScore", "pod", klog.KObj(pod), "nodeName", nodeName, "cpuScore", cpuScore) + logger.V(6).Info("Calculating CPUScore", "pod", klog.KObj(pod), "nodeName", nodeName, "cpuScore", cpuScore) // calculate Memory score var memoryScore float64 = 0 - memoryStats, memoryOK := trimaran.CreateResourceStats(metrics, node, podRequest, v1.ResourceMemory, watcher.Memory) + memoryStats, memoryOK := trimaran.CreateResourceStats(logger, metrics, node, podRequest, v1.ResourceMemory, watcher.Memory) if memoryOK { - memoryScore = computeScore(memoryStats, pl.args.SafeVarianceMargin, pl.args.SafeVarianceSensitivity) + memoryScore = computeScore(logger, memoryStats, pl.args.SafeVarianceMargin, pl.args.SafeVarianceSensitivity) } - klog.V(6).InfoS("Calculating MemoryScore", "pod", klog.KObj(pod), "nodeName", nodeName, "memoryScore", memoryScore) + logger.V(6).Info("Calculating MemoryScore", "pod", klog.KObj(pod), "nodeName", nodeName, "memoryScore", memoryScore) // calculate total score var totalScore float64 = 0 if memoryOK && cpuOK { @@ -116,7 +118,7 @@ func (pl *LoadVariationRiskBalancing) Score(ctx context.Context, cycleState *fra totalScore = math.Max(memoryScore, cpuScore) } score = int64(math.Round(totalScore)) - klog.V(6).InfoS("Calculating totalScore", "pod", klog.KObj(pod), "nodeName", nodeName, "totalScore", score) + logger.V(6).Info("Calculating totalScore", "pod", klog.KObj(pod), "nodeName", nodeName, "totalScore", score) return score, framework.NewStatus(framework.Success, "") } diff --git a/pkg/trimaran/lowriskovercommitment/lowriskovercommitment.go b/pkg/trimaran/lowriskovercommitment/lowriskovercommitment.go index bf8492b00..ba8651543 100644 --- a/pkg/trimaran/lowriskovercommitment/lowriskovercommitment.go +++ b/pkg/trimaran/lowriskovercommitment/lowriskovercommitment.go @@ -45,7 +45,7 @@ const ( // MaxVarianceAllowance : allowed value from the maximum variance (to avoid zero divisions) MaxVarianceAllowance = 0.99 - // State key used in CycleState + // PodResourcesKey State key used in CycleState PodResourcesKey = Name + ".PodResources" ) @@ -58,14 +58,15 @@ type LowRiskOverCommitment struct { } // New : create an instance of a LowRiskOverCommitment plugin -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - klog.V(4).InfoS("Creating new instance of the LowRiskOverCommitment plugin") +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Creating new instance of the LowRiskOverCommitment plugin") // cast object into plugin arguments object args, ok := obj.(*pluginConfig.LowRiskOverCommitmentArgs) if !ok { return nil, fmt.Errorf("want args to be of type LowRiskOverCommitmentArgs, got %T", obj) } - collector, err := trimaran.NewCollector(&args.TrimaranSpec) + collector, err := trimaran.NewCollector(logger, &args.TrimaranSpec) if err != nil { return nil, err } @@ -76,7 +77,7 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew for r, w := range args.RiskLimitWeights { m[r] = w } - klog.V(4).InfoS("Using LowRiskOverCommitmentArgs", "smoothingWindowSize", args.SmoothingWindowSize, + logger.V(4).Info("Using LowRiskOverCommitmentArgs", "smoothingWindowSize", args.SmoothingWindowSize, "riskLimitWeights", m) pl := &LowRiskOverCommitment{ @@ -90,7 +91,8 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew // PreScore : calculate pod requests and limits and store as plugin state data to be used during scoring func (pl *LowRiskOverCommitment) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status { - klog.V(6).InfoS("PreScore: Calculating pod resource requests and limits", "pod", klog.KObj(pod)) + logger := klog.FromContext(ctx) + logger.V(6).Info("PreScore: Calculating pod resource requests and limits", "pod", klog.KObj(pod)) podResourcesStateData := CreatePodResourcesStateData(pod) cycleState.Write(PodResourcesKey, podResourcesStateData) return nil @@ -98,18 +100,19 @@ func (pl *LowRiskOverCommitment) PreScore(ctx context.Context, cycleState *frame // Score : evaluate score for a node func (pl *LowRiskOverCommitment) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { - klog.V(6).InfoS("Score: Calculating score", "pod", klog.KObj(pod), "nodeName", nodeName) + logger := klog.FromContext(ctx) + logger.V(6).Info("Score: Calculating score", "pod", klog.KObj(pod), "nodeName", nodeName) score := framework.MinNodeScore defer func() { - klog.V(6).InfoS("Calculating totalScore", "pod", klog.KObj(pod), "nodeName", nodeName, "totalScore", score) + logger.V(6).Info("Calculating totalScore", "pod", klog.KObj(pod), "nodeName", nodeName, "totalScore", score) }() // get pod requests and limits podResources, err := getPreScoreState(cycleState) if err != nil { // calculate pod requests and limits, if missing - klog.V(6).InfoS(err.Error()+"; recalculating", "pod", klog.KObj(pod)) + logger.V(6).Info(err.Error()+"; recalculating", "pod", klog.KObj(pod)) podResources = CreatePodResourcesStateData(pod) } // exclude scoring for best effort pods; this plugin is not concerned about best effort pods @@ -117,7 +120,7 @@ func (pl *LowRiskOverCommitment) Score(ctx context.Context, cycleState *framewor podLimits := &podResources.podLimits if podRequests.MilliCPU == 0 && podRequests.Memory == 0 && podLimits.MilliCPU == 0 && podLimits.Memory == 0 { - klog.V(6).InfoS("Skipping scoring best effort pod; using minimum score", "nodeName", nodeName, "pod", klog.KObj(pod)) + logger.V(6).Info("Skipping scoring best effort pod; using minimum score", "nodeName", nodeName, "pod", klog.KObj(pod)) return score, nil } // get node info @@ -126,13 +129,13 @@ func (pl *LowRiskOverCommitment) Score(ctx context.Context, cycleState *framewor return score, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } // get node metrics - metrics, _ := pl.collector.GetNodeMetrics(nodeName) + metrics, _ := pl.collector.GetNodeMetrics(logger, nodeName) if metrics == nil { - klog.InfoS("Failed to get metrics for node; using minimum score", "nodeName", nodeName) + logger.Info("Failed to get metrics for node; using minimum score", "nodeName", nodeName) return score, nil } // calculate score - totalScore := pl.computeRank(metrics, nodeInfo, pod, podRequests, podLimits) * float64(framework.MaxNodeScore) + totalScore := pl.computeRank(logger, metrics, nodeInfo, pod, podRequests, podLimits) * float64(framework.MaxNodeScore) score = int64(math.Round(totalScore)) return score, framework.NewStatus(framework.Success, "") } @@ -153,27 +156,27 @@ func (pl *LowRiskOverCommitment) NormalizeScore(context.Context, *framework.Cycl } // computeRank : rank function for the LowRiskOverCommitment -func (pl *LowRiskOverCommitment) computeRank(metrics []watcher.Metric, nodeInfo *framework.NodeInfo, pod *v1.Pod, +func (pl *LowRiskOverCommitment) computeRank(logger klog.Logger, metrics []watcher.Metric, nodeInfo *framework.NodeInfo, pod *v1.Pod, podRequests *framework.Resource, podLimits *framework.Resource) float64 { node := nodeInfo.Node() // calculate risk based on requests and limits - nodeRequestsAndLimits := trimaran.GetNodeRequestsAndLimits(nodeInfo.Pods, node, pod, podRequests, podLimits) - riskCPU := pl.computeRisk(metrics, v1.ResourceCPU, watcher.CPU, node, nodeRequestsAndLimits) - riskMemory := pl.computeRisk(metrics, v1.ResourceMemory, watcher.Memory, node, nodeRequestsAndLimits) + nodeRequestsAndLimits := trimaran.GetNodeRequestsAndLimits(logger, nodeInfo.Pods, node, pod, podRequests, podLimits) + riskCPU := pl.computeRisk(logger, metrics, v1.ResourceCPU, watcher.CPU, node, nodeRequestsAndLimits) + riskMemory := pl.computeRisk(logger, metrics, v1.ResourceMemory, watcher.Memory, node, nodeRequestsAndLimits) rank := 1 - math.Max(riskCPU, riskMemory) - klog.V(6).InfoS("Node rank", "nodeName", node.GetName(), "riskCPU", riskCPU, "riskMemory", riskMemory, "rank", rank) + logger.V(6).Info("Node rank", "nodeName", node.GetName(), "riskCPU", riskCPU, "riskMemory", riskMemory, "rank", rank) return rank } // computeRisk : calculate the risk of scheduling on node for a given resource -func (pl *LowRiskOverCommitment) computeRisk(metrics []watcher.Metric, resourceName v1.ResourceName, +func (pl *LowRiskOverCommitment) computeRisk(logger klog.Logger, metrics []watcher.Metric, resourceName v1.ResourceName, resourceType string, node *v1.Node, nodeRequestsAndLimits *trimaran.NodeRequestsAndLimits) float64 { var riskLimit, riskLoad, totalRisk float64 defer func() { - klog.V(6).InfoS("Calculated risk", "node", klog.KObj(node), "resource", resourceName, + logger.V(6).Info("Calculated risk", "node", klog.KObj(node), "resource", resourceName, "riskLimit", riskLimit, "riskLoad", riskLoad, "totalRisk", totalRisk) }() @@ -198,7 +201,7 @@ func (pl *LowRiskOverCommitment) computeRisk(metrics []watcher.Metric, resourceN capacity = nodeCapacity.Memory } else { // invalid resource - klog.V(6).InfoS("Unexpected resource", "resourceName", resourceName) + logger.V(6).Info("Unexpected resource", "resourceName", resourceName) return 0 } @@ -206,11 +209,11 @@ func (pl *LowRiskOverCommitment) computeRisk(metrics []watcher.Metric, resourceN if limit > capacity { riskLimit = float64(limit-capacity) / float64(limit-request) } - klog.V(6).InfoS("RiskLimit", "node", klog.KObj(node), "resource", resourceName, "riskLimit", riskLimit) + logger.V(6).Info("RiskLimit", "node", klog.KObj(node), "resource", resourceName, "riskLimit", riskLimit) // (2) riskLoad : calculate measured overcommitment zeroRequest := &framework.Resource{} - stats, ok := trimaran.CreateResourceStats(metrics, node, zeroRequest, resourceName, resourceType) + stats, ok := trimaran.CreateResourceStats(logger, metrics, node, zeroRequest, resourceName, resourceType) if ok { // fit a beta distribution to the measured load stats mu, sigma := trimaran.GetMuSigma(stats) @@ -242,7 +245,7 @@ func (pl *LowRiskOverCommitment) computeRisk(metrics []watcher.Metric, resourceN // calculate risk riskLoad = 1 - allocProb - klog.V(6).InfoS("RiskLoad", "node", klog.KObj(node), "resource", resourceName, + logger.V(6).Info("RiskLoad", "node", klog.KObj(node), "resource", resourceName, "allocThreshold", allocThreshold, "allocProb", allocProb, "riskLoad", riskLoad) } diff --git a/pkg/trimaran/lowriskovercommitment/lowriskovercommitment_test.go b/pkg/trimaran/lowriskovercommitment/lowriskovercommitment_test.go index 93c267ad5..63c6c765d 100644 --- a/pkg/trimaran/lowriskovercommitment/lowriskovercommitment_test.go +++ b/pkg/trimaran/lowriskovercommitment/lowriskovercommitment_test.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" testClientSet "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2" schedConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" @@ -376,7 +377,8 @@ func TestLowRiskOverCommitment_computeRisk(t *testing.T) { metrics := watcherData_A.NodeMetricsMap[node_A.Name].Metrics for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := pl.computeRisk(metrics, tt.resourceName, tt.resourceType, node_A, tt.nodeRequestsAndLimits); got != tt.want { + logger := klog.FromContext(context.TODO()) + if got := pl.computeRisk(logger, metrics, tt.resourceName, tt.resourceType, node_A, tt.nodeRequestsAndLimits); got != tt.want { t.Errorf("LowRiskOverCommitment.computeRisk() = %v, want %v", got, tt.want) } }) diff --git a/pkg/trimaran/resourcestats.go b/pkg/trimaran/resourcestats.go index 71c280883..f02a49ebb 100644 --- a/pkg/trimaran/resourcestats.go +++ b/pkg/trimaran/resourcestats.go @@ -43,12 +43,12 @@ type ResourceStats struct { } // CreateResourceStats : get resource statistics data from measurements for a node -func CreateResourceStats(metrics []watcher.Metric, node *v1.Node, podRequest *framework.Resource, +func CreateResourceStats(logger klog.Logger, metrics []watcher.Metric, node *v1.Node, podRequest *framework.Resource, resourceName v1.ResourceName, watcherType string) (rs *ResourceStats, isValid bool) { // get resource usage statistics nodeUtil, nodeStd, metricFound := GetResourceData(metrics, watcherType) if !metricFound { - klog.V(6).InfoS("Resource usage statistics for node : no valid data", "node", klog.KObj(node)) + logger.V(6).Info("Resource usage statistics for node : no valid data", "node", klog.KObj(node)) return nil, false } // get resource capacity @@ -69,7 +69,7 @@ func CreateResourceStats(metrics []watcher.Metric, node *v1.Node, podRequest *fr rs.UsedAvg = nodeUtil * rs.Capacity / 100 rs.UsedStdev = nodeStd * rs.Capacity / 100 - klog.V(6).InfoS("Resource usage statistics for node", "node", klog.KObj(node), "resource", resourceName, + logger.V(6).Info("Resource usage statistics for node", "node", klog.KObj(node), "resource", resourceName, "capacity", rs.Capacity, "required", rs.Req, "usedAvg", rs.UsedAvg, "usedStdev", rs.UsedStdev) return rs, true } @@ -121,7 +121,7 @@ func GetResourceLimits(pod *v1.Pod) *framework.Resource { }) } -// GetEffectiveResource: calculate effective resources of a pod (CPU and Memory) +// GetEffectiveResource : calculate effective resources of a pod (CPU and Memory) func GetEffectiveResource(pod *v1.Pod, fn func(container *v1.Container) v1.ResourceList) *framework.Resource { result := &framework.Resource{} // add up resources of all containers @@ -161,7 +161,7 @@ type NodeRequestsAndLimits struct { } // GetNodeRequestsAndLimits : total requested and limits of resources on a given node plus a pod -func GetNodeRequestsAndLimits(podInfosOnNode []*framework.PodInfo, node *v1.Node, pod *v1.Pod, +func GetNodeRequestsAndLimits(logger klog.Logger, podInfosOnNode []*framework.PodInfo, node *v1.Node, pod *v1.Pod, podRequests *framework.Resource, podLimits *framework.Resource) *NodeRequestsAndLimits { // initialization nodeRequest := &framework.Resource{} diff --git a/pkg/trimaran/resourcestats_test.go b/pkg/trimaran/resourcestats_test.go index a83762062..da50ef3a5 100644 --- a/pkg/trimaran/resourcestats_test.go +++ b/pkg/trimaran/resourcestats_test.go @@ -17,6 +17,7 @@ limitations under the License. package trimaran import ( + "context" "math" "reflect" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -147,7 +149,8 @@ func TestCreateResourceStats(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotRs, gotIsValid := CreateResourceStats(tt.args.metrics, tt.args.node, tt.args.podRequest, tt.args.resourceName, tt.args.watcherType) + logger := klog.FromContext(context.TODO()) + gotRs, gotIsValid := CreateResourceStats(logger, tt.args.metrics, tt.args.node, tt.args.podRequest, tt.args.resourceName, tt.args.watcherType) if !reflect.DeepEqual(gotRs, tt.wantRs) { t.Errorf("createResourceStats() gotRs = %v, want %v", gotRs, tt.wantRs) } @@ -580,9 +583,10 @@ func TestGetNodeRequestsAndLimits(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + logger := klog.FromContext(context.TODO()) var got *NodeRequestsAndLimits SetMaxLimits(tt.args.podRequests, tt.args.podLimits) - if got = GetNodeRequestsAndLimits(tt.args.podsOnNode, tt.args.node, tt.args.pod, + if got = GetNodeRequestsAndLimits(logger, tt.args.podsOnNode, tt.args.node, tt.args.pod, tt.args.podRequests, tt.args.podLimits); !reflect.DeepEqual(got, tt.want) { t.Errorf("GetNodeRequestsAndLimits(): got = {%+v, %+v, %+v, %+v, %+v}, want = {%+v, %+v, %+v, %+v, %+v}", *got.NodeRequest, *got.NodeLimit, *got.NodeRequestMinusPod, *got.NodeLimitMinusPod, *got.Nodecapacity, diff --git a/pkg/trimaran/targetloadpacking/targetloadpacking.go b/pkg/trimaran/targetloadpacking/targetloadpacking.go index 2f7baf576..79e85f4c8 100644 --- a/pkg/trimaran/targetloadpacking/targetloadpacking.go +++ b/pkg/trimaran/targetloadpacking/targetloadpacking.go @@ -61,14 +61,15 @@ type TargetLoadPacking struct { var _ framework.ScorePlugin = &TargetLoadPacking{} -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - klog.V(4).InfoS("Creating new instance of the TargetLoadPacking plugin") +func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { + logger := klog.FromContext(ctx) + logger.V(4).Info("Creating new instance of the TargetLoadPacking plugin") // cast object into plugin arguments object args, ok := obj.(*pluginConfig.TargetLoadPackingArgs) if !ok { return nil, fmt.Errorf("want args to be of type TargetLoadPackingArgs, got %T", obj) } - collector, err := trimaran.NewCollector(&args.TrimaranSpec) + collector, err := trimaran.NewCollector(logger, &args.TrimaranSpec) if err != nil { return nil, err } @@ -80,7 +81,7 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew return nil, errors.New("unable to parse DefaultRequestsMultiplier: " + err.Error()) } - klog.V(4).InfoS("Using TargetLoadPackingArgs", + logger.V(4).Info("Using TargetLoadPackingArgs", "requestsMilliCores", requestsMilliCores, "requestsMultiplier", requestsMultiplier, "targetUtilization", hostTargetUtilizationPercent) @@ -102,6 +103,7 @@ func (pl *TargetLoadPacking) Name() string { } func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { + logger := klog.FromContext(ctx) score := framework.MinNodeScore nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { @@ -109,7 +111,7 @@ func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.Cy } // get node metrics - metrics, allMetrics := pl.collector.GetNodeMetrics(nodeName) + metrics, allMetrics := pl.collector.GetNodeMetrics(logger, nodeName) if metrics == nil { klog.InfoS("Failed to get metrics for node; using minimum score", "nodeName", nodeName) // Avoid the node by scoring minimum @@ -122,7 +124,7 @@ func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.Cy for _, container := range pod.Spec.Containers { curPodCPUUsage += PredictUtilisation(&container) } - klog.V(6).InfoS("Predicted utilization for pod", "podName", pod.Name, "cpuUsage", curPodCPUUsage) + logger.V(6).Info("Predicted utilization for pod", "podName", pod.Name, "cpuUsage", curPodCPUUsage) if pod.Spec.Overhead != nil { curPodCPUUsage += pod.Spec.Overhead.Cpu().MilliValue() } @@ -139,13 +141,13 @@ func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.Cy } if !cpuMetricFound { - klog.ErrorS(nil, "Cpu metric not found in node metrics", "nodeName", nodeName, "nodeMetrics", metrics) + logger.Error(nil, "Cpu metric not found in node metrics", "nodeName", nodeName, "nodeMetrics", metrics) return score, nil } nodeCPUCapMillis := float64(nodeInfo.Node().Status.Capacity.Cpu().MilliValue()) nodeCPUUtilMillis := (nodeCPUUtilPercent / 100) * nodeCPUCapMillis - klog.V(6).InfoS("Calculating CPU utilization and capacity", "nodeName", nodeName, "cpuUtilMillis", nodeCPUUtilMillis, "cpuCapMillis", nodeCPUCapMillis) + logger.V(6).Info("Calculating CPU utilization and capacity", "nodeName", nodeName, "cpuUtilMillis", nodeCPUUtilMillis, "cpuCapMillis", nodeCPUCapMillis) var missingCPUUtilMillis int64 = 0 pl.eventHandler.RLock() @@ -160,11 +162,11 @@ func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.Cy missingCPUUtilMillis += PredictUtilisation(&container) } missingCPUUtilMillis += info.Pod.Spec.Overhead.Cpu().MilliValue() - klog.V(6).InfoS("Missing utilization for pod", "podName", info.Pod.Name, "missingCPUUtilMillis", missingCPUUtilMillis) + logger.V(6).Info("Missing utilization for pod", "podName", info.Pod.Name, "missingCPUUtilMillis", missingCPUUtilMillis) } } pl.eventHandler.RUnlock() - klog.V(6).InfoS("Missing utilization for node", "nodeName", nodeName, "missingCPUUtilMillis", missingCPUUtilMillis) + logger.V(6).Info("Missing utilization for node", "nodeName", nodeName, "missingCPUUtilMillis", missingCPUUtilMillis) var predictedCPUUsage float64 if nodeCPUCapMillis != 0 { @@ -175,13 +177,13 @@ func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.Cy return score, framework.NewStatus(framework.Success, "") } penalisedScore := int64(math.Round(float64(hostTargetUtilizationPercent) * (100 - predictedCPUUsage) / (100 - float64(hostTargetUtilizationPercent)))) - klog.V(6).InfoS("Penalised score for host", "nodeName", nodeName, "penalisedScore", penalisedScore) + logger.V(6).Info("Penalised score for host", "nodeName", nodeName, "penalisedScore", penalisedScore) return penalisedScore, framework.NewStatus(framework.Success, "") } score = int64(math.Round((100-float64(hostTargetUtilizationPercent))* predictedCPUUsage/float64(hostTargetUtilizationPercent) + float64(hostTargetUtilizationPercent))) - klog.V(6).InfoS("Score for host", "nodeName", nodeName, "score", score) + logger.V(6).Info("Score for host", "nodeName", nodeName, "score", score) return score, framework.NewStatus(framework.Success, "") }