Skip to content

Commit

Permalink
pickfirst: New pick first policy for dualstack (#7498)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal authored Oct 10, 2024
1 parent 18a4eac commit 00b9e14
Show file tree
Hide file tree
Showing 17 changed files with 2,048 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ jobs:
- name: Run coverage
run: go test -coverprofile=coverage.out -coverpkg=./... ./...

- name: Run coverage with new pickfirst
run: GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true go test -coverprofile=coverage_new_pickfirst.out -coverpkg=./... ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ jobs:
- type: tests
goversion: '1.21'

- type: tests
goversion: '1.22'
testflags: -race
grpcenv: 'GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST=true'

steps:
# Setup the environment.
- name: Setup GOARCH
Expand Down
6 changes: 6 additions & 0 deletions balancer/pickfirst/pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@ import (
"google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"

_ "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" // For automatically registering the new pickfirst if required.
)

func init() {
if envconfig.NewPickFirstEnabled {
return
}
balancer.Register(pickfirstBuilder{})
}

Expand Down
132 changes: 132 additions & 0 deletions balancer/pickfirst/pickfirst_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package pickfirst

import (
"context"
"errors"
"fmt"
"testing"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
)

const (
// Default timeout for tests in this package.
defaultTestTimeout = 10 * time.Second
// Default short timeout, to be used when waiting for events which are not
// expected to happen.
defaultTestShortTimeout = 100 * time.Millisecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestPickFirstLeaf_InitialResolverError sends a resolver error to the balancer
// before a valid resolver update. It verifies that the clientconn state is
// updated to TRANSIENT_FAILURE.
func (s) TestPickFirstLeaf_InitialResolverError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
bal.ResolverError(errors.New("resolution failed: test error"))

if err := cc.WaitForConnectivityState(ctx, connectivity.TransientFailure); err != nil {
t.Fatalf("cc.WaitForConnectivityState(%v) returned error: %v", connectivity.TransientFailure, err)
}

// After sending a valid update, the LB policy should report CONNECTING.
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
{Addresses: []resolver.Address{{Addr: "2.2.2.2:2"}}},
},
},
}
if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

if err := cc.WaitForConnectivityState(ctx, connectivity.Connecting); err != nil {
t.Fatalf("cc.WaitForConnectivityState(%v) returned error: %v", connectivity.Connecting, err)
}
}

// TestPickFirstLeaf_ResolverErrorinTF sends a resolver error to the balancer
// before when it's attempting to connect to a SubConn TRANSIENT_FAILURE. It
// verifies that the picker is updated and the SubConn is not closed.
func (s) TestPickFirstLeaf_ResolverErrorinTF(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()

// After sending a valid update, the LB policy should report CONNECTING.
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}},
},
},
}

if err := bal.UpdateClientConnState(ccState); err != nil {
t.Fatalf("UpdateClientConnState(%v) returned error: %v", ccState, err)
}

sc1 := <-cc.NewSubConnCh
if err := cc.WaitForConnectivityState(ctx, connectivity.Connecting); err != nil {
t.Fatalf("cc.WaitForConnectivityState(%v) returned error: %v", connectivity.Connecting, err)
}

scErr := fmt.Errorf("test error: connection refused")
sc1.UpdateState(balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scErr,
})

if err := cc.WaitForPickerWithErr(ctx, scErr); err != nil {
t.Fatalf("cc.WaitForPickerWithErr(%v) returned error: %v", scErr, err)
}

bal.ResolverError(errors.New("resolution failed: test error"))
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatalf("cc.WaitForPickerWithErr() returned error: %v", err)
}

select {
case <-time.After(defaultTestShortTimeout):
case sc := <-cc.ShutdownSubConnCh:
t.Fatalf("Unexpected SubConn shutdown: %v", sc)
}
}
Loading

0 comments on commit 00b9e14

Please sign in to comment.