Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use gRPC Status codes in the Arrow exporter #211

Merged
merged 3 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"runtime"
"sort"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write.
Expand Down Expand Up @@ -114,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro
case <-lp.done:
return ErrStreamRestarting
case <-ctx.Done():
return context.Canceled
return status.Errorf(codes.Canceled, "stream wait: %v", ctx.Err())
case lp.input <- wri:
return waitForWrite(ctx, errCh, lp.done)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

// Exporter is 1:1 with exporter, isolates arrow-specific
Expand Down Expand Up @@ -255,6 +257,12 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str
//
// consumer should fall back to standard OTLP, (true, nil)
func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
select {
case <-ctx.Done():
return false, status.Errorf(codes.Canceled, "incoming context: %v", ctx.Err())
default:
}

errCh := make(chan error, 1)

// Note that if the OTLP exporter's gRPC Headers field was
Expand Down Expand Up @@ -340,7 +348,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{})
select {
case <-ctx.Done():
// This caller's context timed out.
return ctx.Err()
return status.Errorf(codes.Canceled, "send wait: %v", ctx.Err())
case <-down:
return ErrStreamRestarting
case err := <-errCh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package arrow
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -31,7 +30,9 @@ import (
"go.uber.org/zap/zaptest"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer}
Expand Down Expand Up @@ -278,7 +279,10 @@ func TestArrowExporterTimeout(t *testing.T) {
sent, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.True(t, sent)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status")
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(ctx))
})
Expand Down Expand Up @@ -406,7 +410,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) {
}()
_, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(bg))
})
Expand Down Expand Up @@ -489,7 +496,10 @@ func TestArrowExporterStreamRace(t *testing.T) {
// This blocks until the cancelation.
_, err := tc.exporter.SendAndWait(callctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())
}()
}

Expand Down
36 changes: 17 additions & 19 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand Down Expand Up @@ -135,9 +133,9 @@ func (s *Stream) setBatchChannel(batchID int64, errCh chan<- error) {
s.workState.waiters[batchID] = errCh
}

// logStreamError decides how to log an error. `which` indicates the
// stream direction, will be "reader" or "writer".
func (s *Stream) logStreamError(which string, err error) {
// logStreamError decides how to log an error. `where` indicates the
// error location, will be "reader" or "writer".
func (s *Stream) logStreamError(where string, err error) {
var code codes.Code
var msg string
// gRPC tends to supply status-wrapped errors, so we always
Expand All @@ -156,9 +154,9 @@ func (s *Stream) logStreamError(which string, err error) {
msg = err.Error()
}
if code == codes.Canceled {
s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("which", which), zap.String("message", msg))
s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where))
} else {
s.telemetry.Logger.Error("arrow stream error", zap.String("which", which), zap.String("message", msg), zap.Int("code", int(code)))
s.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where))
}
}

Expand Down Expand Up @@ -274,7 +272,7 @@ func (s *Stream) write(ctx context.Context) (retErr error) {
return nil
case wri = <-s.workState.toWrite:
case <-ctx.Done():
return ctx.Err()
return status.Errorf(codes.Canceled, "stream input: %v", ctx.Err())
}

err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc)
Expand Down Expand Up @@ -319,8 +317,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
err = status.Errorf(codes.Internal, "encode: %v", err)
wri.errCh <- err
return err
}

Expand All @@ -336,8 +334,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
err = status.Errorf(codes.Internal, "hpack: %v", err)
wri.errCh <- err
return err
}
}
Expand Down Expand Up @@ -382,24 +380,24 @@ func (s *Stream) read(_ context.Context) error {
}

if err = s.processBatchStatus(resp); err != nil {
return fmt.Errorf("process: %w", err)
return err
}
}
}

// getSenderChannel takes the stream lock and removes the corresonding
// sender channel.
func (sws *streamWorkState) getSenderChannel(status *arrowpb.BatchStatus) (chan<- error, error) {
func (sws *streamWorkState) getSenderChannel(bstat *arrowpb.BatchStatus) (chan<- error, error) {
sws.lock.Lock()
defer sws.lock.Unlock()

ch, ok := sws.waiters[status.BatchId]
ch, ok := sws.waiters[bstat.BatchId]
if !ok {
// Will break the stream.
return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId)
return nil, status.Errorf(codes.Internal, "unrecognized batch ID: %d", bstat.BatchId)
}

delete(sws.waiters, status.BatchId)
delete(sws.waiters, bstat.BatchId)
return ch, nil
}

Expand Down Expand Up @@ -460,7 +458,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error
zap.Reflect("recovered", err),
zap.Stack("stacktrace"),
)
retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err)
retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err)
}
}()
var batch *arrowpb.BatchArrowRecords
Expand All @@ -473,7 +471,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error
case pmetric.Metrics:
batch, err = s.producer.BatchArrowRecordsFromMetrics(data)
default:
return nil, fmt.Errorf("unsupported OTLP type: %T", records)
return nil, status.Errorf(codes.Unimplemented, "unsupported OTel-Arrow signal type: %T", records)
}
return batch, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var oneBatch = &arrowpb.BatchArrowRecords{
Expand Down Expand Up @@ -182,8 +183,12 @@ func TestStreamEncodeError(t *testing.T) {
// sender should get a permanent testErr
err := tc.mustSendAndWait()
require.Error(t, err)
require.True(t, errors.Is(err, testErr))
require.True(t, consumererror.IsPermanent(err))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Internal, stat.Code())

require.Contains(t, stat.Message(), testErr.Error())
})
}
}
Expand Down
Loading