Skip to content

Commit

Permalink
fix: flaky TestPrepareCandidate
Browse files Browse the repository at this point in the history
  • Loading branch information
sanposhiho committed Nov 9, 2024
1 parent 210f129 commit 00f7b95
Showing 1 changed file with 81 additions and 20 deletions.
101 changes: 81 additions & 20 deletions pkg/scheduler/framework/preemption/preemption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -441,12 +442,16 @@ func TestPrepareCandidate(t *testing.T) {
)

tests := []struct {
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
expectedDeletedPods []string
name string
nodeNames []string
candidate *fakeCandidate
preemptor *v1.Pod
testPods []*v1.Pod
// expectedDeletedPod is the pod name that is expected to be deleted.
//
// You can set multiple pod name if there're multiple possibilities.
// Both empty and "" means no pod is expected to be deleted.
expectedDeletedPod []string
expectedDeletionError bool
expectedPatchError bool
// Only compared when async preemption is disabled.
Expand All @@ -457,7 +462,6 @@ func TestPrepareCandidate(t *testing.T) {
}{
{
name: "no victims",

candidate: &fakeCandidate{
victims: &extenderv1.Victims{},
},
Expand Down Expand Up @@ -485,7 +489,7 @@ func TestPrepareCandidate(t *testing.T) {
victim1,
},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedDeletedPod: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
Expand All @@ -505,7 +509,7 @@ func TestPrepareCandidate(t *testing.T) {
victim1WithMatchingCondition,
},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedDeletedPod: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
Expand All @@ -523,7 +527,7 @@ func TestPrepareCandidate(t *testing.T) {
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedDeletedPod: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
Expand Down Expand Up @@ -560,7 +564,7 @@ func TestPrepareCandidate(t *testing.T) {
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedDeletedPods: []string{"victim1"},
expectedDeletedPod: []string{"victim1"},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
Expand Down Expand Up @@ -599,9 +603,14 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedPatchError: true,
expectedDeletedPods: []string{"victim2"},
nodeNames: []string{node1Name},
expectedPatchError: true,
expectedDeletedPod: []string{
"victim2",
// The first victim could fail before the deletion of the second victim happens,
// which results in the second victim not being deleted.
"",
},
expectedStatus: framework.AsStatus(errors.New("patch pod status failed")),
expectedPreemptingMap: sets.New(types.UID("preemptor")),
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
Expand Down Expand Up @@ -629,15 +638,13 @@ func TestPrepareCandidate(t *testing.T) {
objs = append(objs, pod)
}

requestStopper := make(chan struct{})
mu := &sync.RWMutex{}
deletedPods := sets.New[string]()
deletionFailure := false // whether any request to delete pod failed
patchFailure := false // whether any request to patch pod status failed

cs := clientsetfake.NewClientset(objs...)
cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
<-requestStopper
mu.Lock()
defer mu.Unlock()
name := action.(clienttesting.DeleteAction).GetName()
Expand All @@ -651,7 +658,6 @@ func TestPrepareCandidate(t *testing.T) {
})

cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
<-requestStopper
mu.Lock()
defer mu.Unlock()
if action.(clienttesting.PatchAction).GetName() == "fail-victim" {
Expand All @@ -664,6 +670,15 @@ func TestPrepareCandidate(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(cs, 0)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()})
fakeActivator := &fakePodActivator{activatedPods: make(map[string]*v1.Pod), mu: mu}

// Note: NominatedPodsForNode is called at the beginning of the goroutine in any case.
// fakePodNominator can delay the response of NominatedPodsForNode until the channel is closed,
// which allows us to test the preempting map before the goroutine does nothing yet.
requestStopper := make(chan struct{})
nominator := &fakePodNominator{
SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory),
requestStopper: requestStopper,
}
fwk, err := tf.NewFramework(
ctx,
registeredPlugins, "",
Expand All @@ -672,7 +687,7 @@ func TestPrepareCandidate(t *testing.T) {
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()),
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)),
frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)),
frameworkruntime.WithPodNominator(nominator),
frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")),
frameworkruntime.WithPodActivator(fakeActivator),
)
Expand Down Expand Up @@ -720,10 +735,15 @@ func TestPrepareCandidate(t *testing.T) {
if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
mu.RLock()
defer mu.RUnlock()
if !deletedPods.Equal(sets.New(tt.expectedDeletedPods...)) {
lastErrMsg = fmt.Sprintf("expected deleted pods %v, got %v", tt.expectedDeletedPods, deletedPods.UnsortedList())

pe.mu.Lock()
defer pe.mu.Unlock()
if len(pe.preempting) != 0 {
// The preempting map should be empty after the goroutine in all test cases.
lastErrMsg = fmt.Sprintf("expected no preempting pods, got %v", pe.preempting)
return false, nil
}

if tt.expectedDeletionError != deletionFailure {
lastErrMsg = fmt.Sprintf("expected deletion error %v, got %v", tt.expectedDeletionError, deletionFailure)
return false, nil
Expand All @@ -744,6 +764,34 @@ func TestPrepareCandidate(t *testing.T) {
}
}

if deletedPods.Len() > 1 {
// For now, we only expect at most one pod to be deleted in all test cases.
// If we need to test multiple pods deletion, we need to update the test table definition.
return false, fmt.Errorf("expected at most one pod to be deleted, got %v", deletedPods.UnsortedList())
}

if len(tt.expectedDeletedPod) == 0 {
if deletedPods.Len() != 0 {
// When tt.expectedDeletedPod is empty, we expect no pod to be deleted.
return false, fmt.Errorf("expected no pod to be deleted, got %v", deletedPods.UnsortedList())
}
// nothing further to check.
return true, nil
}

found := false
for _, podName := range tt.expectedDeletedPod {
if deletedPods.Has(podName) ||
// If podName is empty, we expect no pod to be deleted.
(deletedPods.Len() == 0 && podName == "") {
found = true
}
}
if !found {
lastErrMsg = fmt.Sprintf("expected pod %v to be deleted, but %v is deleted", strings.Join(tt.expectedDeletedPod, " or "), deletedPods.UnsortedList())
return false, nil
}

return true, nil
}); err != nil {
t.Fatal(lastErrMsg)
Expand All @@ -753,6 +801,19 @@ func TestPrepareCandidate(t *testing.T) {
}
}

type fakePodNominator struct {
// embed it so that we can only override NominatedPodsForNode
internalqueue.SchedulingQueue

// fakePodNominator doesn't respond to NominatedPodsForNode() until the channel is closed.
requestStopper chan struct{}
}

func (f *fakePodNominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {
<-f.requestStopper
return nil
}

type fakeExtender struct {
ignorable bool
errProcessPreemption bool
Expand Down

0 comments on commit 00f7b95

Please sign in to comment.