Skip to content

Commit

Permalink
pickfirst: Stop test servers without closing listeners (#7872)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Dec 3, 2024
1 parent 00272e8 commit 78aa51b
Showing 1 changed file with 56 additions and 43 deletions.
99 changes: 56 additions & 43 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,54 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// testServer is a server than can be stopped and resumed without closing
// the listener. This guarantees the same port number (and address) is used
// after restart. When a server is stopped, it accepts and closes all tcp
// connections from clients.
type testServer struct {
stubserver.StubServer
lis *testutils.RestartableListener
}

func (s *testServer) stop() {
s.lis.Stop()
}

func (s *testServer) resume() {
s.lis.Restart()
}

func newTestServer(t *testing.T) *testServer {
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
rl := testutils.NewRestartableListener(l)
ss := stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
Listener: rl,
}
return &testServer{
StubServer: ss,
lis: rl,
}
}

// setupPickFirstLeaf performs steps required for pick_first tests. It starts a
// bunch of backends exporting the TestService, and creates a ClientConn to them.
func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, *backendManager) {
t.Helper()
r := manual.NewBuilderWithScheme("whatever")
backends := make([]*stubserver.StubServer, backendCount)
backends := make([]*testServer, backendCount)
addrs := make([]resolver.Address, backendCount)

for i := 0; i < backendCount; i++ {
backend := stubserver.StartTestService(t, nil)
server := newTestServer(t)
backend := stubserver.StartTestService(t, &server.StubServer)
t.Cleanup(func() {
backend.Stop()
})
backends[i] = backend
backends[i] = server
addrs[i] = resolver.Address{Addr: backend.Address}
}

Expand Down Expand Up @@ -264,8 +298,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) {
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand All @@ -287,8 +320,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) {
t.Errorf("SubConn states mismatch (-want +got):\n%s", diff)
}

bm.backends[2].S.Stop()
bm.backends[2].S = nil
bm.backends[2].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[3]}})

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[3]); err != nil {
Expand Down Expand Up @@ -327,8 +359,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand All @@ -350,8 +381,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing
t.Errorf("SubConn states mismatch (-want +got):\n%s", diff)
}

bm.backends[2].S.Stop()
bm.backends[2].S = nil
bm.backends[2].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[1]}})

// Verify that the ClientConn stays in READY.
Expand Down Expand Up @@ -391,8 +421,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand All @@ -414,11 +443,9 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi
t.Errorf("SubConn states mismatch (-want +got):\n%s", diff)
}

bm.backends[2].S.Stop()
bm.backends[2].S = nil
if err := bm.backends[0].StartServer(); err != nil {
t.Fatalf("Failed to re-start test backend: %v", err)
}
bm.backends[2].stop()
bm.backends[0].resume()

r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[2]}})

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
Expand Down Expand Up @@ -456,8 +483,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) {
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand Down Expand Up @@ -554,14 +580,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T)
}

// Shut down the connected server.
bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[0].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[0].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -620,14 +643,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T)
}

// Shut down the connected server.
bm.backends[1].S.Stop()
bm.backends[1].S = nil
bm.backends[1].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[1].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[1].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -692,14 +712,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T)
}

// Shut down the connected server.
bm.backends[1].S.Stop()
bm.backends[1].S = nil
bm.backends[1].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[0].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[0].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -763,14 +780,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T)
}

// Shut down the connected server.
bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[1].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[1].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1308,14 +1322,13 @@ type scState struct {
}

type backendManager struct {
backends []*stubserver.StubServer
backends []*testServer
}

func (b *backendManager) stopAllExcept(index int) {
for idx, b := range b.backends {
if idx != index {
b.S.Stop()
b.S = nil
b.stop()
}
}
}
Expand Down

0 comments on commit 78aa51b

Please sign in to comment.