Skip to content

Commit

Permalink
Remove mixed_signals support, minor cleanups (#135)
Browse files Browse the repository at this point in the history
With minor cleanups:
- remove cfg-schema.yaml, it will be auto-generated in the contrib repo
(and was stale)
- rename otlp* to otelarrow*
- replace OTLP-Arrow with OTel-Arrow

Part of #43 
Fixes #33
  • Loading branch information
jmacd authored Jan 16, 2024
1 parent e88d3f9 commit 9ee8aaa
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 266 deletions.
15 changes: 0 additions & 15 deletions collector/exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,3 @@ exporters:
zstd:
level: 1 # 1 is the "fastest" compression level
```

### Experimental Configuration

The exporter uses the signal-specific Arrow stream methods (i.e.,
`ArrowTraces`, `ArrowLogs`, and `ArrowMetrics`) by default. There is
an option to use the generic `ArrowStream` method instead.

- `enable_mixed_signals` (default: false): Use `ArrowStream` instead of per-signal stream methods.

This option has the potential to enable the future exporter to cross
signals, meaning to allow traces, metrics and logs to refer to the
same shared-data items across a single stream. Presently, there is no
cross-signal compression benefit, this option simply causes one method
name instead of three method names to be used by the exporter
instances of different signal types.
162 changes: 0 additions & 162 deletions collector/exporter/otelarrowexporter/cfg-schema.yaml

This file was deleted.

17 changes: 7 additions & 10 deletions collector/exporter/otelarrowexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,9 @@ type Config struct {
// ArrowSettings includes whether Arrow is enabled and the number of
// concurrent Arrow streams.
type ArrowSettings struct {
// Disabled prevents registering the OTel Arrow service.
Disabled bool `mapstructure:"disabled"`

// NumStreams determines the number of OTel Arrow streams.
NumStreams int `mapstructure:"num_streams"`

// DisableDowngrade prevents this exporter from fallback back to
// standard OTLP.
DisableDowngrade bool `mapstructure:"disable_downgrade"`

// EnableMixedSignals allows the use of multi-signal streams.
EnableMixedSignals bool `mapstructure:"enable_mixed_signals"`

// MaxStreamLifetime should be set to less than the value of
// grpc: keepalive: max_connection_age_grace plus the timeout.
MaxStreamLifetime time.Duration `mapstructure:"max_stream_lifetime"`
Expand All @@ -69,6 +59,13 @@ type ArrowSettings struct {
// set to "zstd" to turn on Arrow-Zstd compression.
// Note that `Zstd` applies to gRPC, not Arrow compression.
PayloadCompression configcompression.CompressionType `mapstructure:"payload_compression"`

// Disabled prevents registering the OTel Arrow service.
Disabled bool `mapstructure:"disabled"`

// DisableDowngrade prevents this exporter from fallback back to
// standard OTLP.
DisableDowngrade bool `mapstructure:"disable_downgrade"`
}

var _ component.Config = (*Config)(nil)
Expand Down
1 change: 0 additions & 1 deletion collector/exporter/otelarrowexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestUnmarshalConfig(t *testing.T) {
},
Arrow: ArrowSettings{
NumStreams: 2,
EnableMixedSignals: true,
MaxStreamLifetime: 2 * time.Hour,
PayloadCompression: configcompression.Zstd,
Zstd: zstd.DefaultEncoderConfig(),
Expand Down
9 changes: 0 additions & 9 deletions collector/exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ var (
)

func createArrowTracesStream(cfg *Config, conn *grpc.ClientConn) arrow.StreamClientFunc {
if cfg.Arrow.EnableMixedSignals {
return arrow.MakeAnyStreamClient(arrowStreamMethod, arrowpb.NewArrowStreamServiceClient(conn).ArrowStream)
}
return arrow.MakeAnyStreamClient(arrowTracesMethod, arrowpb.NewArrowTracesServiceClient(conn).ArrowTraces)
}

Expand All @@ -114,9 +111,6 @@ func createTracesExporter(
}

func createArrowMetricsStream(cfg *Config, conn *grpc.ClientConn) arrow.StreamClientFunc {
if cfg.Arrow.EnableMixedSignals {
return arrow.MakeAnyStreamClient(arrowStreamMethod, arrowpb.NewArrowStreamServiceClient(conn).ArrowStream)
}
return arrow.MakeAnyStreamClient(arrowMetricsMethod, arrowpb.NewArrowMetricsServiceClient(conn).ArrowMetrics)
}

Expand All @@ -136,9 +130,6 @@ func createMetricsExporter(
}

func createArrowLogsStream(cfg *Config, conn *grpc.ClientConn) arrow.StreamClientFunc {
if cfg.Arrow.EnableMixedSignals {
return arrow.MakeAnyStreamClient(arrowStreamMethod, arrowpb.NewArrowStreamServiceClient(conn).ArrowStream)
}
return arrow.MakeAnyStreamClient(arrowLogsMethod, arrowpb.NewArrowLogsServiceClient(conn).ArrowLogs)
}

Expand Down
13 changes: 6 additions & 7 deletions collector/exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ type Exporter struct {
netReporter netstats.Interface
}

// AnyStreamClient is the interface supported by all Arrow streams,
// mixed signals or not.
// AnyStreamClient is the interface supported by all Arrow streams.
type AnyStreamClient interface {
Send(*arrowpb.BatchArrowRecords) error
Recv() (*arrowpb.BatchStatus, error)
Expand All @@ -89,10 +88,10 @@ type AnyStreamClient interface {
// handler isn't able to see the correct uncompressed size.
type StreamClientFunc func(context.Context, ...grpc.CallOption) (AnyStreamClient, string, error)

// MakeAnyStreamClient accepts any Arrow-like stream (mixed signal or
// not) and turns it into an AnyStreamClient. The method name is
// carried through because once constructed, gRPC clients will not
// reveal their service and method names.
// MakeAnyStreamClient accepts any Arrow-like stream and turns it into
// an AnyStreamClient. The method name is carried through because
// once constructed, gRPC clients will not reveal their service and
// method names.
func MakeAnyStreamClient[T AnyStreamClient](method string, clientFunc func(ctx context.Context, opts ...grpc.CallOption) (T, error)) StreamClientFunc {
return func(ctx context.Context, opts ...grpc.CallOption) (AnyStreamClient, string, error) {
client, err := clientFunc(ctx, opts...)
Expand Down Expand Up @@ -142,7 +141,7 @@ func (e *Exporter) Start(ctx context.Context) error {

// runStreamController starts the initial set of streams, then waits for streams to
// terminate one at a time and restarts them. If streams come back with a nil
// client (meaning that OTLP+Arrow was not supported by the endpoint), it will
// client (meaning that OTel-Arrow was not supported by the endpoint), it will
// not be restarted.
func (e *Exporter) runStreamController(bgctx context.Context) {
defer e.cancel()
Expand Down
4 changes: 2 additions & 2 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Stream struct {

// client uses the exporter's grpc.ClientConn. this is
// initially nil only set when ArrowStream() calls meaning the
// endpoint recognizes OTLP+Arrow.
// endpoint recognizes OTel-Arrow.
client arrowpb.ArrowStreamService_ArrowStreamClient

// method the gRPC method name, used for additional instrumentation.
Expand Down Expand Up @@ -432,7 +432,7 @@ func (s *Stream) processBatchStatus(ss *arrowpb.BatchStatus) error {
ch <- nil
return nil
}
// See ../../otlp.go's `shouldRetry()` method, the retry
// See ../../otelarrow.go's `shouldRetry()` method, the retry
// behavior described here is achieved there by setting these
// recognized codes.
var err error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ type baseExporter struct {
// Default user-agent header.
userAgent string

// OTLP+Arrow optional state
// OTel-Arrow optional state
arrow *arrow.Exporter
// streamClientFunc is the stream constructor, depends on EnableMixedTelemetry.
// streamClientFunc is the stream constructor
streamClientFactory streamClientFactory
}

Expand Down
Loading

0 comments on commit 9ee8aaa

Please sign in to comment.