Skip to content

Commit

Permalink
Merge #33676 #33687
Browse files Browse the repository at this point in the history
33676: rpc: adopt logging in circuitbreaker r=ajwerner a=ajwerner

Adopts changes to the circuitbreaker package to enable logging. 
Fixes #33657

Release note: None

33687: opt: workaround for flaky lookup_join test r=RaduBerinde a=RaduBerinde

As of #33571 we've started including information about the spans in
distsql plans.

This testcase has an unexpected set of spans, with one TableReader
having 3 spans and another having 1 (they should all have 2). It sometimes
fails with the correct plan.

I noticed that by adding a `SHOW EXPERIMENTAL_RANGES` check, we now
reliably get the correct plan. I am making this change to the test to
prevent others from hitting the test failure.

This does not fix the underlying issue which needs to be investigated.

Informs #33686.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
3 people committed Jan 14, 2019
3 parents a9cbc4c + 467f89c + db87db2 commit 196fc47
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func gossipSucceedsSoon(
// If the client wasn't able to connect, restart it.
g := gossip[client]
g.mu.Lock()
client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker())
client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
g.mu.Unlock()
default:
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestClientNodeID(t *testing.T) {
case <-disconnected:
// The client hasn't been started or failed to start, loop and try again.
local.mu.Lock()
c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker())
c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
local.mu.Unlock()
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,8 @@ func (g *Gossip) startClientLocked(addr net.Addr) {
defer g.clientsMu.Unlock()
breaker, ok := g.clientsMu.breakers[addr.String()]
if !ok {
breaker = g.rpcContext.NewBreaker()
name := fmt.Sprintf("gossip %v->%v", g.rpcContext.Addr, addr)
breaker = g.rpcContext.NewBreaker(name)
g.clientsMu.breakers[addr.String()] = breaker
}
ctx := g.AnnotateCtx(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
localAddr := local.GetNodeAddr()
c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, localAddr, makeMetrics())
peer.mu.Lock()
c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker())
c.startLocked(peer, disconnectedCh, peer.rpcContext, stopper, peer.rpcContext.NewBreaker(""))
peer.mu.Unlock()

disconnectedClient := <-disconnectedCh
Expand Down
23 changes: 22 additions & 1 deletion pkg/rpc/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package rpc

import (
"context"
"time"

"github.com/cenkalti/backoff"
circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/facebookgo/clock"
)

Expand Down Expand Up @@ -89,10 +91,29 @@ func newBackOff(clock backoff.Clock) backoff.BackOff {
return b
}

func newBreaker(clock clock.Clock) *circuit.Breaker {
func newBreaker(ctx context.Context, name string, clock clock.Clock) *circuit.Breaker {
return circuit.NewBreakerWithOptions(&circuit.Options{
Name: name,
BackOff: newBackOff(clock),
Clock: clock,
ShouldTrip: circuit.ThresholdTripFunc(1),
Logger: breakerLogger{ctx},
})
}

// breakerLogger implements circuit.Logger to expose logging from the
// circuitbreaker package. Debugf is logged with a vmodule level of 2 so to see
// the circuitbreaker debug messages set --vmodule=breaker=2
type breakerLogger struct {
ctx context.Context
}

func (r breakerLogger) Debugf(format string, v ...interface{}) {
if log.V(2) {
log.InfofDepth(r.ctx, 1, format, v...)
}
}

func (r breakerLogger) Infof(format string, v ...interface{}) {
log.InfofDepth(r.ctx, 1, format, v...)
}
7 changes: 4 additions & 3 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,12 +681,13 @@ func (ctx *Context) GRPCDial(target string) *Connection {
}

// NewBreaker creates a new circuit breaker properly configured for RPC
// connections.
func (ctx *Context) NewBreaker() *circuit.Breaker {
// connections. name is used internally for logging state changes of the
// returned breaker.
func (ctx *Context) NewBreaker(name string) *circuit.Breaker {
if ctx.BreakerFactory != nil {
return ctx.BreakerFactory()
}
return newBreaker(&ctx.breakerClock)
return newBreaker(ctx.masterCtx, name, &ctx.breakerClock)
}

// ErrNotHeartbeated is returned by ConnHealth when we have not yet performed
Expand Down
16 changes: 11 additions & 5 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package nodedialer

import (
"context"
"fmt"
"net"
"time"
"unsafe"
Expand Down Expand Up @@ -93,12 +94,14 @@ func (n *Dialer) Dial(ctx context.Context, nodeID roachpb.NodeID) (_ *grpc.Clien

addr, err := n.resolver(nodeID)
if err != nil {
breaker.Fail()
err = errors.Wrapf(err, "failed to resolve n%d", nodeID)
breaker.Fail(err)
return nil, err
}
conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx)
if err != nil {
breaker.Fail()
err = errors.Wrapf(err, "failed to grpc dial n%d at %v", nodeID, addr)
breaker.Fail(err)
return nil, err
}
breaker.Success()
Expand Down Expand Up @@ -136,14 +139,16 @@ func (n *Dialer) DialInternalClient(
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, err := n.rpcContext.GRPCDial(addr.String()).Connect(ctx)
if err != nil {
breaker.Fail()
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
breaker.Fail(err)
return nil, nil, err
}
// Check to see if the connection is in the transient failure state. This can
// happen if the connection already existed, but a recent heartbeat has
// failed and we haven't yet torn down the connection.
if err := grpcutil.ConnectionReady(conn); err != nil {
breaker.Fail()
err = errors.Wrapf(err, "failed to check for connection ready to n%d at %v", nodeID, addr)
breaker.Fail(err)
return nil, nil, err
}
// TODO(bdarnell): Reconcile the different health checks and circuit breaker
Expand Down Expand Up @@ -190,7 +195,8 @@ func (n *Dialer) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker {
func (n *Dialer) getBreaker(nodeID roachpb.NodeID) *wrappedBreaker {
value, ok := n.breakers.Load(int64(nodeID))
if !ok {
breaker := &wrappedBreaker{Breaker: n.rpcContext.NewBreaker(), EveryN: log.Every(logPerNodeFailInterval)}
name := fmt.Sprintf("rpc %v->%v", n.rpcContext.Config.Addr, nodeID)
breaker := &wrappedBreaker{Breaker: n.rpcContext.NewBreaker(name), EveryN: log.Every(logPerNodeFailInterval)}
value, _ = n.breakers.LoadOrStore(int64(nodeID), unsafe.Pointer(breaker))
}
return (*wrappedBreaker)(value)
Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ statement ok
ALTER TABLE data EXPERIMENTAL_RELOCATE
SELECT ARRAY[i%5+1], i FROM generate_series(0, 9) AS g(i)

# Verify data placement.
query TTTI colnames
SELECT start_key, end_key, replicas, lease_holder from [SHOW EXPERIMENTAL_RANGES FROM TABLE data]
----
start_key end_key replicas lease_holder
NULL /1 {1} 1
/1 /2 {2} 2
/2 /3 {3} 3
/3 /4 {4} 4
/4 /5 {5} 5
/5 /6 {1} 1
/6 /7 {2} 2
/7 /8 {3} 3
/8 /9 {4} 4
/9 NULL {5} 5

query TTTTT
EXPLAIN (VERBOSE) SELECT * FROM (SELECT * FROM data WHERE c = 1) AS l NATURAL JOIN data AS r
----
Expand All @@ -129,7 +145,7 @@ render · · (a, b, c, d) ·
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT * FROM (SELECT * FROM data WHERE c = 1) AS l NATURAL JOIN data AS r]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzElEFr2zAUx-_7FOKdZWxJTpoKBj4NMkY6ut6GD1r06LylkpFk2Cj57sP2oE5IZYFLerSsP_8fvyfeMxircaee0IP8DgwocKAggEIJFFZQU2id3aP31vVXxsBW_wFZUGhM24X-uKawtw5BPkNowgFBwoP6ccB7VBpdXgAFjUE1h6Gmdc2Tcn8rrYICCt9aZbwkWc6IMppwYsNPdB4ofGoOAZ0klSAfCZNSbncPG6iPFGwXXqp9UI8Ikh1pOt5n25j_dKso3Rdrf3ct-WUbQ6yRpGK04rQStOoN3XXh9Og1OP4q3AtTZ6zT6FCfANXHC_g7m9k2Z8XZzcvd4qSbpc-NpcwtZ1nOh8mxcXILBzfDNxnc-vqD4-nyeJI8nuXiDeXN8E3k3VxfnkiXJ5LkiSwv31DeDN9E3ub68sp0eWWSvDIbNt8SYTNME2G377tjL8Ddo2-t8Zi0QYt-B6N-xHFhe9u5PX51dj_UjJ93Q2440OjD-JeNH1sz_uoBp2EWDfOTMDsP83jzTLWIpst4uFzCvYqG1_Hm9ZLmm2h4E2_eLGm-jc-qmHkm8Ud23l0fP_wLAAD__w5KJqY=
https://cockroachdb.github.io/distsqlplan/decode.html#eJzElEFr2zAUx-_7FOKdZWxJTpoKBj4NMkY6ut6GD1r06LylkpFk2Cj57sPxoE5IZEFMerSsP_8fvyfeKxircaNe0IP8DgwocKAggEIJFBZQU2id3aL31vVXhsBa_wFZUGhM24X-uKawtQ5BvkJowg5BwpP6scNHVBpdXgAFjUE1u0NN65oX5f5WWgUFFL61ynhJspwRZTRhxIaf6IDCp2YX0ElSCfKRMCnlevO0gnpPwXbhrdkH9Ywg2Z6m0322jfkPt4jCfbH2d9eSX7YxxBpJKkYrTitBq17QQxeOjy7B8Ytwb0ydsU6jQ30EVO_P4G9sZtucFSc3z3eLo26WPjaWMracZTmfcXATfKPBLW8_OJ4ujyfJ41kuZpQ3wTeSd3d7eSJdnkiSJ7K8nFHeBN9I3ur28sp0eWWSvDLLFzPKm-Abybt_3317Bu4RfWuNx6RtWvT7GPUzDsvb285t8auz20PN8PlwyB0ONPow_GXDx9oMv3rAcZhFw_wozE7DPN48US2i6TIeLq_hXkTDy3jz8prmu2h4FW9eXdN8H59VMfFM4o_stLvef_gXAAD__0eIKhg=

statement ok
CREATE TABLE books (title STRING, edition INT, shelf INT, PRIMARY KEY (title, edition))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3894,7 +3894,7 @@ func TestFailedPreemptiveSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
const expErr = "snapshot failed: unknown peer 3"
const expErr = "snapshot failed: failed to resolve n3: unknown peer 3"
if err := rep.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
Expand Down

0 comments on commit 196fc47

Please sign in to comment.