diff --git a/Gopkg.lock b/Gopkg.lock index e75541422aa5..d1cc061670b9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -302,11 +302,11 @@ [[projects]] branch = "master" - digest = "1:220d60c10e7c67f52168c496bf778fb00ec88e12956a933b7ce53d286fb438fe" + digest = "1:5502cef94063585c627f3bc3ffc80ad7f0f1e0f1b5dccf8aeff9a822526806da" name = "github.com/cockroachdb/circuitbreaker" packages = ["."] pruneopts = "UT" - revision = "3e861b2e1d31642be9e447dc6f8aa599813e98b4" + revision = "a614b14ccf63dd2311d4ff646c30c61b8ed34aa8" [[projects]] branch = "master" diff --git a/pkg/gossip/client_test.go b/pkg/gossip/client_test.go index 4ca0056f2f0c..a5dd1e612a17 100644 --- a/pkg/gossip/client_test.go +++ b/pkg/gossip/client_test.go @@ -168,7 +168,7 @@ func gossipSucceedsSoon( // If the client wasn't able to connect, restart it. g := gossip[client] g.mu.Lock() - client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker()) + client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker("")) g.mu.Unlock() default: } @@ -306,7 +306,7 @@ func TestClientNodeID(t *testing.T) { case <-disconnected: // The client hasn't been started or failed to start, loop and try again. local.mu.Lock() - c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker()) + c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker("")) local.mu.Unlock() } } diff --git a/pkg/gossip/gossip.go b/pkg/gossip/gossip.go index cb9c27ffe332..a9a95865d524 100644 --- a/pkg/gossip/gossip.go +++ b/pkg/gossip/gossip.go @@ -1572,7 +1572,8 @@ func (g *Gossip) startClientLocked(addr net.Addr) { defer g.clientsMu.Unlock() breaker, ok := g.clientsMu.breakers[addr.String()] if !ok { - breaker = g.rpcContext.NewBreaker() + name := fmt.Sprintf("gossip %v->%v", g.rpcContext.Addr, addr) + breaker = g.rpcContext.NewBreaker(name) g.clientsMu.breakers[addr.String()] = breaker } ctx := g.AnnotateCtx(context.TODO()) diff --git a/pkg/gossip/gossip_test.go b/pkg/gossip/gossip_test.go index 21af2321b466..e2bf87e5b575 100644 --- a/pkg/gossip/gossip_test.go +++ b/pkg/gossip/gossip_test.go @@ -552,7 +552,7 @@ func TestGossipNoForwardSelf(t *testing.T) { localAddr := local.GetNodeAddr() c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, localAddr, makeMetrics()) peer.mu.Lock() - c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker()) + c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker("")) peer.mu.Unlock() disconnectedClient := <-disconnectedCh diff --git a/pkg/rpc/breaker.go b/pkg/rpc/breaker.go index bee6be32d64a..9391f3a48135 100644 --- a/pkg/rpc/breaker.go +++ b/pkg/rpc/breaker.go @@ -15,11 +15,13 @@ package rpc import ( + "context" "time" "github.com/cenkalti/backoff" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/facebookgo/clock" ) @@ -89,10 +91,29 @@ func newBackOff(clock backoff.Clock) backoff.BackOff { return b } -func newBreaker(clock clock.Clock) *circuit.Breaker { +func newBreaker(ctx context.Context, name string, clock clock.Clock) *circuit.Breaker { return circuit.NewBreakerWithOptions(&circuit.Options{ + Name: name, BackOff: newBackOff(clock), Clock: clock, ShouldTrip: circuit.ThresholdTripFunc(1), + Logger: breakerLogger{ctx}, }) } + +// breakerLogger implements circuit.Logger to expose logging from the +// circuitbreaker package. Debugf is logged with a vmodule level of 2 so to see +// the circuitbreaker debug messages set --vmodule=breaker=2 +type breakerLogger struct { + ctx context.Context +} + +func (r breakerLogger) Debugf(format string, v ...interface{}) { + if log.V(2) { + log.InfofDepth(r.ctx, 1, format, v...) + } +} + +func (r breakerLogger) Infof(format string, v ...interface{}) { + log.InfofDepth(r.ctx, 1, format, v...) +} diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index a88584278b09..650b1bb2dca6 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -681,12 +681,13 @@ func (ctx *Context) GRPCDial(target string) *Connection { } // NewBreaker creates a new circuit breaker properly configured for RPC -// connections. -func (ctx *Context) NewBreaker() *circuit.Breaker { +// connections. name is used internally for logging state changes of the +// returned breaker. +func (ctx *Context) NewBreaker(name string) *circuit.Breaker { if ctx.BreakerFactory != nil { return ctx.BreakerFactory() } - return newBreaker(&ctx.breakerClock) + return newBreaker(ctx.masterCtx, name, &ctx.breakerClock) } // ErrNotHeartbeated is returned by ConnHealth when we have not yet performed diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 02dbca5c1bb3..862add03d6c0 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -16,6 +16,7 @@ package nodedialer import ( "context" + "fmt" "net" "time" "unsafe" @@ -93,12 +94,14 @@ func (n *Dialer) Dial(ctx context.Context, nodeID roachpb.NodeID) (_ *grpc.Clien addr, err := n.resolver(nodeID) if err != nil { - breaker.Fail() + err = errors.Wrapf(err, "failed to resolve n%d", nodeID) + breaker.Fail(err) return nil, err } conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx) if err != nil { - breaker.Fail() + err = errors.Wrapf(err, "failed to grpc dial n%d at %v", nodeID, addr) + breaker.Fail(err) return nil, err } breaker.Success() @@ -136,14 +139,16 @@ func (n *Dialer) DialInternalClient( log.VEventf(ctx, 2, "sending request to %s", addr) conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx) if err != nil { - breaker.Fail() + err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr) + breaker.Fail(err) return nil, nil, err } // Check to see if the connection is in the transient failure state. This can // happen if the connection already existed, but a recent heartbeat has // failed and we haven't yet torn down the connection. if err := grpcutil.ConnectionReady(conn); err != nil { - breaker.Fail() + err = errors.Wrapf(err, "failed to check for connection ready to n%d at %v", nodeID, addr) + breaker.Fail(err) return nil, nil, err } // TODO(bdarnell): Reconcile the different health checks and circuit breaker @@ -190,7 +195,8 @@ func (n *Dialer) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker { func (n *Dialer) getBreaker(nodeID roachpb.NodeID) *wrappedBreaker { value, ok := n.breakers.Load(int64(nodeID)) if !ok { - breaker := &wrappedBreaker{Breaker: n.rpcContext.NewBreaker(), EveryN: log.Every(logPerNodeFailInterval)} + name := fmt.Sprintf("rpc %v->%v", n.rpcContext.Config.Addr, nodeID) + breaker := &wrappedBreaker{Breaker: n.rpcContext.NewBreaker(name), EveryN: log.Every(logPerNodeFailInterval)} value, _ = n.breakers.LoadOrStore(int64(nodeID), unsafe.Pointer(breaker)) } return (*wrappedBreaker)(value) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join index 565367766e95..58a98575b8b8 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join @@ -109,6 +109,22 @@ statement ok ALTER TABLE data EXPERIMENTAL_RELOCATE SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i) +# Verify data placement. +query TTTI colnames +SELECT start_key, end_key, replicas, lease_holder from [SHOW EXPERIMENTAL_RANGES FROM TABLE data] +---- +start_key end_key replicas lease_holder +NULL /1 {1} 1 +/1 /2 {2} 2 +/2 /3 {3} 3 +/3 /4 {4} 4 +/4 /5 {5} 5 +/5 /6 {1} 1 +/6 /7 {2} 2 +/7 /8 {3} 3 +/8 /9 {4} 4 +/9 NULL {5} 5 + query TTTTT EXPLAIN (VERBOSE) SELECT * FROM (SELECT * FROM data WHERE c = 1) AS l NATURAL JOIN data AS r ---- @@ -129,7 +145,7 @@ render · · (a, b, c, d) · query T SELECT url FROM [EXPLAIN (DISTSQL) SELECT * FROM (SELECT * FROM data WHERE c = 1) AS l NATURAL JOIN data AS r] ---- -https://cockroachdb.github.io/distsqlplan/decode.html#eJzElEFr2zAUx-_7FOKdZWxJTpoKBj4NMkY6ut6GD1r06LylkpFk2Cj57sP2oE5IZYFLerSsP_8fvyfeMxircaee0IP8DgwocKAggEIJFFZQU2id3aP31vVXxsBW_wFZUGhM24X-uKawtw5BPkNowgFBwoP6ccB7VBpdXgAFjUE1h6Gmdc2Tcn8rrYICCt9aZbwkWc6IMppwYsNPdB4ofGoOAZ0klSAfCZNSbncPG6iPFGwXXqp9UI8Ikh1pOt5n25j_dKso3Rdrf3ct-WUbQ6yRpGK04rQStOoN3XXh9Og1OP4q3AtTZ6zT6FCfANXHC_g7m9k2Z8XZzcvd4qSbpc-NpcwtZ1nOh8mxcXILBzfDNxnc-vqD4-nyeJI8nuXiDeXN8E3k3VxfnkiXJ5LkiSwv31DeDN9E3ub68sp0eWWSvDIbNt8SYTNME2G377tjL8Ddo2-t8Zi0QYt-B6N-xHFhe9u5PX51dj_UjJ93Q2440OjD-JeNH1sz_uoBp2EWDfOTMDsP83jzTLWIpst4uFzCvYqG1_Hm9ZLmm2h4E2_eLGm-jc-qmHkm8Ud23l0fP_wLAAD__w5KJqY= +https://cockroachdb.github.io/distsqlplan/decode.html#eJzElEFr2zAUx-_7FOKdZWxJTpoKBj4NMkY6ut6GD1r06LylkpFk2Cj57sPxoE5IZEFMerSsP_8fvyfeKxircaNe0IP8DgwocKAggEIJFBZQU2id3aL31vVXhsBa_wFZUGhM24X-uKawtQ5BvkJowg5BwpP6scNHVBpdXgAFjUE1u0NN65oX5f5WWgUFFL61ynhJspwRZTRhxIaf6IDCp2YX0ElSCfKRMCnlevO0gnpPwXbhrdkH9Ywg2Z6m0322jfkPt4jCfbH2d9eSX7YxxBpJKkYrTitBq17QQxeOjy7B8Ytwb0ydsU6jQ30EVO_P4G9sZtucFSc3z3eLo26WPjaWMracZTmfcXATfKPBLW8_OJ4ujyfJ41kuZpQ3wTeSd3d7eSJdnkiSJ7K8nFHeBN9I3ur28sp0eWWSvDLLFzPKm-Abybt_3317Bu4RfWuNx6RtWvT7GPUzDsvb285t8auz20PN8PlwyB0ONPow_GXDx9oMv3rAcZhFw_wozE7DPN48US2i6TIeLq_hXkTDy3jz8prmu2h4FW9eXdN8H59VMfFM4o_stLvef_gXAAD__0eIKhg= statement ok CREATE TABLE books (title STRING, edition INT, shelf INT, PRIMARY KEY (title, edition)) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index d3ff034f48bf..e24439400f2e 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -3894,7 +3894,7 @@ func TestFailedPreemptiveSnapshot(t *testing.T) { if err != nil { t.Fatal(err) } - const expErr = "snapshot failed: unknown peer 3" + const expErr = "snapshot failed: failed to resolve n3: unknown peer 3" if err := rep.ChangeReplicas( context.Background(), roachpb.ADD_REPLICA, diff --git a/vendor b/vendor index a7e87efade33..cf3b625c2ecc 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit a7e87efade33ddd79f2d32d77b8ecc78a3fae167 +Subproject commit cf3b625c2ecce426a1b1066e40bf538b058e92b5