Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: use contextal logging #813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions pkg/capacityscheduling/capacity_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
podLister: handle.SharedInformerFactory().Core().V1().Pods().Lister(),
pdbLister: getPDBLister(handle.SharedInformerFactory()),
}
logger := klog.FromContext(ctx)

client, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme})
if err != nil {
Expand Down Expand Up @@ -187,7 +188,7 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
},
},
)
klog.InfoS("CapacityScheduling start")
logger.Info("CapacityScheduling start")
return c, nil
}

Expand Down Expand Up @@ -288,17 +289,19 @@ func (c *CapacityScheduling) PreFilterExtensions() framework.PreFilterExtensions

// AddPod from pre-computed data in cycleState.
func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(cycleState)
if err != nil {
klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return framework.NewStatus(framework.Error, err.Error())
}

elasticQuotaInfo := elasticQuotaSnapshotState.elasticQuotaInfos[podToAdd.Pod.Namespace]
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.addPodIfNotPresent(podToAdd.Pod)
if err != nil {
klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(podToAdd.Pod))
logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(podToAdd.Pod))
}
}

Expand All @@ -307,17 +310,19 @@ func (c *CapacityScheduling) AddPod(ctx context.Context, cycleState *framework.C

// RemovePod from pre-computed data in cycleState.
func (c *CapacityScheduling) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
logger := klog.FromContext(ctx)

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(cycleState)
if err != nil {
klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return framework.NewStatus(framework.Error, err.Error())
}

elasticQuotaInfo := elasticQuotaSnapshotState.elasticQuotaInfos[podToRemove.Pod.Namespace]
if elasticQuotaInfo != nil {
err = elasticQuotaInfo.deletePodIfPresent(podToRemove.Pod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(podToRemove.Pod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(podToRemove.Pod))
}
}

Expand Down Expand Up @@ -348,11 +353,13 @@ func (c *CapacityScheduling) Reserve(ctx context.Context, state *framework.Cycle
c.Lock()
defer c.Unlock()

logger := klog.FromContext(ctx)

elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace]
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.addPodIfNotPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Error, err.Error())
}
}
Expand All @@ -363,11 +370,13 @@ func (c *CapacityScheduling) Unreserve(ctx context.Context, state *framework.Cyc
c.Lock()
defer c.Unlock()

logger := klog.FromContext(ctx)

elasticQuotaInfo := c.elasticQuotaInfos[pod.Namespace]
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.deletePodIfPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
}
}
}
Expand Down Expand Up @@ -400,14 +409,16 @@ func (p *preemptor) CandidatesToVictimsMap(candidates []preemption.Candidate) ma
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func (p *preemptor) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
logger := klog.FromContext(context.TODO())

if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
klog.V(5).InfoS("Pod is not eligible for preemption because of its preemptionPolicy", "pod", klog.KObj(pod), "preemptionPolicy", v1.PreemptNever)
logger.V(5).Info("Pod is not eligible for preemption because of its preemptionPolicy", "pod", klog.KObj(pod), "preemptionPolicy", v1.PreemptNever)
return false, "not eligible due to preemptionPolicy=Never."
}

preFilterState, err := getPreFilterState(p.state)
if err != nil {
klog.V(5).InfoS("Failed to read preFilterState from cycleState, err: %s", err, "preFilterStateKey", preFilterStateKey)
logger.V(5).Info("Failed to read preFilterState from cycleState, err: %s", err, "preFilterStateKey", preFilterStateKey)
return false, "not eligible due to failed to read from cycleState"
}

Expand All @@ -422,7 +433,7 @@ func (p *preemptor) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(p.state)
if err != nil {
klog.ErrorS(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, "Failed to read elasticQuotaSnapshot from cycleState", "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return true, ""
}

Expand Down Expand Up @@ -480,25 +491,27 @@ func (p *preemptor) SelectVictimsOnNode(
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {

logger := klog.FromContext(ctx)

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(state)
if err != nil {
msg := "Failed to read elasticQuotaSnapshot from cycleState"
klog.ErrorS(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
logger.Error(err, msg, "elasticQuotaSnapshotKey", ElasticQuotaSnapshotKey)
return nil, 0, framework.NewStatus(framework.Unschedulable, msg)
}

preFilterState, err := getPreFilterState(state)
if err != nil {
msg := "Failed to read preFilterState from cycleState"
klog.ErrorS(err, msg, "preFilterStateKey", preFilterStateKey)
logger.Error(err, msg, "preFilterStateKey", preFilterStateKey)
return nil, 0, framework.NewStatus(framework.Unschedulable, msg)
}

var nominatedPodsReqInEQWithPodReq framework.Resource
var nominatedPodsReqWithPodReq framework.Resource
podReq := preFilterState.podReq

logger := klog.FromContext(ctx)
removePod := func(rpi *framework.PodInfo) error {
if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
return err
Expand Down Expand Up @@ -625,22 +638,22 @@ func (p *preemptor) SelectVictimsOnNode(
return false, err
}
victims = append(victims, pi.Pod)
klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node()))
logger.V(5).Info("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), "node", klog.KObj(nodeInfo.Node()))
}

if preemptorWithElasticQuota && (preemptorElasticQuotaInfo.usedOverMaxWith(&nominatedPodsReqInEQWithPodReq) || elasticQuotaInfos.aggregatedUsedOverMinWith(nominatedPodsReqWithPodReq)) {
if err := removePod(pi); err != nil {
return false, err
}
victims = append(victims, pi.Pod)
klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node()))
logger.V(5).Info("Found a potential preemption victim on node", "pod", klog.KObj(pi.Pod), " node", klog.KObj(nodeInfo.Node()))
}

return fits, nil
}
for _, pi := range violatingVictims {
if fits, err := reprievePod(pi); err != nil {
klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
logger.Error(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
return nil, 0, framework.AsStatus(err)
} else if !fits {
numViolatingVictim++
Expand All @@ -649,7 +662,7 @@ func (p *preemptor) SelectVictimsOnNode(
// Now we try to reprieve non-violating victims.
for _, pi := range nonViolatingVictims {
if _, err := reprievePod(pi); err != nil {
klog.ErrorS(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
logger.Error(err, "Failed to reprieve pod", "pod", klog.KObj(pi.Pod))
return nil, 0, framework.AsStatus(err)
}
}
Expand Down Expand Up @@ -694,6 +707,9 @@ func (c *CapacityScheduling) deleteElasticQuota(obj interface{}) {
}

func (c *CapacityScheduling) addPod(obj interface{}) {
ctx := context.TODO()
logger := klog.FromContext(ctx)

pod := obj.(*v1.Pod)

c.Lock()
Expand All @@ -703,8 +719,8 @@ func (c *CapacityScheduling) addPod(obj interface{}) {
// If elasticQuotaInfo is nil, try to list ElasticQuotas through elasticQuotaLister
if elasticQuotaInfo == nil {
var eqList v1alpha1.ElasticQuotaList
if err := c.client.List(context.Background(), &eqList, client.InNamespace(pod.Namespace)); err != nil {
klog.ErrorS(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace)
if err := c.client.List(ctx, &eqList, client.InNamespace(pod.Namespace)); err != nil {
logger.Error(err, "Failed to get elasticQuota", "elasticQuota", pod.Namespace)
return
}

Expand All @@ -724,11 +740,13 @@ func (c *CapacityScheduling) addPod(obj interface{}) {

err := elasticQuotaInfo.addPodIfNotPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to add Pod to its associated elasticQuota", "pod", klog.KObj(pod))
}
}

func (c *CapacityScheduling) updatePod(oldObj, newObj interface{}) {
logger := klog.FromContext(context.TODO())

oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)

Expand All @@ -744,13 +762,15 @@ func (c *CapacityScheduling) updatePod(oldObj, newObj interface{}) {
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.deletePodIfPresent(newPod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(newPod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(newPod))
}
}
}
}

func (c *CapacityScheduling) deletePod(obj interface{}) {
logger := klog.FromContext(context.TODO())

pod := obj.(*v1.Pod)
c.Lock()
defer c.Unlock()
Expand All @@ -759,7 +779,7 @@ func (c *CapacityScheduling) deletePod(obj interface{}) {
if elasticQuotaInfo != nil {
err := elasticQuotaInfo.deletePodIfPresent(pod)
if err != nil {
klog.ErrorS(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
logger.Error(err, "Failed to delete Pod from its associated elasticQuota", "pod", klog.KObj(pod))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/capacityscheduling/capacity_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func TestReserve(t *testing.T) {

state := framework.NewCycleState()
for i, pod := range tt.pods {
got := cs.Reserve(nil, state, pod, "node-a")
got := cs.Reserve(context.TODO(), state, pod, "node-a")
if got.Code() != tt.expectedCodes[i] {
t.Errorf("expected %v, got %v : %v", tt.expected[i], got.Code(), got.Message())
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestUnreserve(t *testing.T) {

state := framework.NewCycleState()
for i, pod := range tt.pods {
cs.Unreserve(nil, state, pod, "node-a")
cs.Unreserve(context.TODO(), state, pod, "node-a")
if !reflect.DeepEqual(cs.elasticQuotaInfos["ns1"], tt.expected[i]["ns1"]) {
t.Errorf("expected %#v, got %#v", tt.expected[i]["ns1"].Used, cs.elasticQuotaInfos["ns1"].Used)
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ type Manager interface {
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *framework.CycleState, *corev1.Pod) Status
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
CalculateAssignedPods(string, string) int
ActivateSiblings(pod *corev1.Pod, state *framework.CycleState)
GetCreationTimestamp(context.Context, *corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(context.Context, string)
CalculateAssignedPods(context.Context, string, string) int
ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}

Expand Down Expand Up @@ -112,7 +112,8 @@ func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Durati

// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) {
func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) {
lh := klog.FromContext(ctx)
pgName := util.GetPodGroupLabel(pod)
if pgName == "" {
return
Expand All @@ -129,7 +130,7 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}),
)
if err != nil {
klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
return
}

Expand Down Expand Up @@ -159,7 +160,8 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
// 2. the total number of pods in the podgroup is less than the minimum number of pods
// that is required to be scheduled.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
klog.V(5).InfoS("Pre-filter", "pod", klog.KObj(pod))
lh := klog.FromContext(ctx)
lh.V(5).Info("Pre-filter", "pod", klog.KObj(pod))
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pg == nil {
return nil
Expand Down Expand Up @@ -202,7 +204,7 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
minResources[corev1.ResourcePods] = *podQuantity
err = CheckClusterResource(ctx, nodes, minResources, pgFullName)
if err != nil {
klog.ErrorS(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
lh.Error(err, "Failed to PreFilter", "podGroup", klog.KObj(pg))
return err
}
pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
Expand All @@ -220,7 +222,7 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
assigned := pgMgr.CalculateAssignedPods(ctx, pg.Name, pg.Namespace)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
Expand All @@ -243,20 +245,20 @@ func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.Cycle
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod.
func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time {
func (pgMgr *PodGroupManager) GetCreationTimestamp(ctx context.Context, pod *corev1.Pod, ts time.Time) time.Time {
pgName := util.GetPodGroupLabel(pod)
if len(pgName) == 0 {
return ts
}
var pg v1alpha1.PodGroup
if err := pgMgr.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
if err := pgMgr.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pgName}, &pg); err != nil {
return ts
}
return pg.CreationTimestamp.Time
}

// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter.
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(_ context.Context, pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
}

Expand All @@ -274,10 +276,11 @@ func (pgMgr *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod)
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
func (pgMgr *PodGroupManager) CalculateAssignedPods(ctx context.Context, podGroupName, namespace string) int {
lh := klog.FromContext(ctx)
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
if err != nil {
klog.ErrorS(err, "Cannot get nodeInfos from frameworkHandle")
lh.Error(err, "Cannot get nodeInfos from frameworkHandle")
return 0
}
var count int
Expand Down Expand Up @@ -354,6 +357,6 @@ func getNodeResource(ctx context.Context, info *framework.NodeInfo, desiredPodGr
leftResource.ScalarResources[k] = allocatableEx - requestEx
}
}
klog.V(4).InfoS("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource)
logger.V(4).Info("Node left resource", "node", klog.KObj(info.Node()), "resource", leftResource)
return &leftResource
}
Loading