Skip to content

Commit

Permalink
fix(blooms): Do not restart builders when planner disconnects (backpo…
Browse files Browse the repository at this point in the history
…rt k227) (#14922)

Co-authored-by: Salva Corts <[email protected]>
  • Loading branch information
loki-gh-app[bot] and salvacorts authored Nov 14, 2024
1 parent 31b2a63 commit 213e8ee
Showing 1 changed file with 62 additions and 17 deletions.
79 changes: 62 additions & 17 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package builder
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -136,16 +138,39 @@ func (b *Builder) running(ctx context.Context) error {
retries := backoff.New(ctx, b.cfg.BackoffConfig)
for retries.Ongoing() {
err := b.connectAndBuild(ctx)
if err == nil || errors.Is(err, context.Canceled) {
break
if err != nil {
err := standardizeRPCError(err)

// When the builder is shutting down, we will get a context canceled error
if errors.Is(err, context.Canceled) && b.State() != services.Running {
level.Debug(b.logger).Log("msg", "builder is shutting down")
break
}

// If the planner disconnects while we are sending/receive a message we get an EOF error.
// In this case we should reset the backoff and retry
if errors.Is(err, io.EOF) {
level.Error(b.logger).Log("msg", "planner disconnected. Resetting backoff and retrying", "err", err)
retries.Reset()
continue
}

// Otherwise (e.g. failed to connect to the builder), we should retry
code := status.Code(err)
level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "retry", retries.NumRetries(), "maxRetries", b.cfg.BackoffConfig.MaxRetries, "code", code.String(), "err", err)
retries.Wait()
continue
}

level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err)
retries.Wait()
// We shouldn't get here. If we do, we better restart the builder.
// Adding a log line for visibility
level.Error(b.logger).Log("msg", "unexpected end of connectAndBuild. Restarting builder")
break
}

if err := retries.Err(); err != nil {
if errors.Is(err, context.Canceled) {
// Edge case when the builder is shutting down while we check for retries
return nil
}
return fmt.Errorf("failed to connect and build: %w", err)
Expand All @@ -154,6 +179,31 @@ func (b *Builder) running(ctx context.Context) error {
return nil
}

// standardizeRPCError converts some gRPC errors we want to handle differently to standard errors.
// 1. codes.Canceled -> context.Canceled
// 2. codes.Unavailable with EOF -> io.EOF
// 3. All other errors are returned as is.
func standardizeRPCError(err error) error {
if err == nil {
return nil
}

if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Canceled:
// Happens when the builder is shutting down, and we are sending/receiving a message
return context.Canceled
case codes.Unavailable:
// We want to handle this case as a retryable error that resets the backoff
if i := strings.LastIndex(st.Message(), "EOF"); i != -1 {
return io.EOF
}
}
}

return err
}

func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
Expand All @@ -173,8 +223,7 @@ func (b *Builder) connectAndBuild(ctx context.Context) error {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
conn, err := grpc.NewClient(b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}
Expand Down Expand Up @@ -202,21 +251,17 @@ func (b *Builder) connectAndBuild(ctx context.Context) error {
}

func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
ctx := c.Context()

// Send ready message to planner
if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil {
return fmt.Errorf("failed to send ready message to planner: %w", err)
}

for b.State() == services.Running {
// When the planner connection closes, an EOF or "planner shutting down" error is returned.
// When the builder is shutting down, a gRPC context canceled error is returned.
// Will break when planner<->builder connection is closed or when the builder is shutting down.
for ctx.Err() == nil {
protoTask, err := c.Recv()
if err != nil {
if status.Code(err) == codes.Canceled {
level.Debug(b.logger).Log("msg", "builder loop context canceled")
return nil
}

return fmt.Errorf("failed to receive task from planner: %w", err)
}

Expand All @@ -235,7 +280,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
continue
}

newMetas, err := b.processTask(c.Context(), task)
newMetas, err := b.processTask(ctx, task)
if err != nil {
err = fmt.Errorf("failed to process task: %w", err)
}
Expand All @@ -249,8 +294,8 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
b.metrics.processingTask.Set(0)
}

level.Debug(b.logger).Log("msg", "builder loop stopped")
return nil
level.Debug(b.logger).Log("msg", "builder loop stopped", "ctx_err", ctx.Err())
return ctx.Err()
}

func (b *Builder) logTaskCompleted(
Expand Down

0 comments on commit 213e8ee

Please sign in to comment.