From 57dd7016876a8856423341e600b73f3fa1225280 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 7 Aug 2020 09:55:58 -0700 Subject: [PATCH] Breaking Change: Remove usage of ocagent package for oc exporter The config for OpenCensus exporter removed . Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 4 + exporter/opencensusexporter/README.md | 3 +- exporter/opencensusexporter/config.go | 5 - exporter/opencensusexporter/config_test.go | 3 +- exporter/opencensusexporter/factory.go | 82 +---- exporter/opencensusexporter/factory_test.go | 94 +++--- exporter/opencensusexporter/opencensus.go | 309 +++++++++++------- .../opencensusexporter/opencensus_test.go | 208 ++++++++++++ .../opencensusexporter/testdata/config.yaml | 1 - go.mod | 1 - go.sum | 6 - .../ocmetrics/opencensus_test.go | 46 ++- .../octrace/opencensus_test.go | 155 ++++----- service/builder/exporters_builder_test.go | 1 + 14 files changed, 552 insertions(+), 366 deletions(-) create mode 100644 exporter/opencensusexporter/opencensus_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c95a9bdf2611..f5a10887a944 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## 🛑 Breaking changes 🛑 + +- Remove `reconnection_delay` from OpenCensus exporter #1516. + ## v0.8.0 Beta ## 🚀 New components 🚀 diff --git a/exporter/opencensusexporter/README.md b/exporter/opencensusexporter/README.md index bfff7aa3cfee..fe98b1ca04ce 100644 --- a/exporter/opencensusexporter/README.md +++ b/exporter/opencensusexporter/README.md @@ -27,10 +27,9 @@ The following settings can be optionally configured: [grpc.WithKeepaliveParams()](https://godoc.org/google.golang.org/grpc#WithKeepaliveParams). - `num_workers` (default = 2): number of workers that send the gRPC requests. Optional. -- `reconnection_delay` (default = unset): time period between each reconnection - performed by the exporter. - `balancer_name`(default = pick_first): Sets the balancer in grpclb_policy to discover the servers. See [grpc loadbalancing example](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md). +- `wait_for_ready`: Option not supported. Example: diff --git a/exporter/opencensusexporter/config.go b/exporter/opencensusexporter/config.go index e3e79dbca234..64bcbfce5608 100644 --- a/exporter/opencensusexporter/config.go +++ b/exporter/opencensusexporter/config.go @@ -15,8 +15,6 @@ package opencensusexporter import ( - "time" - "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" ) @@ -29,7 +27,4 @@ type Config struct { // The number of workers that send the gRPC requests. NumWorkers int `mapstructure:"num_workers"` - - // The time period between each reconnection performed by the exporter. - ReconnectionDelay time.Duration `mapstructure:"reconnection_delay,omitempty"` } diff --git a/exporter/opencensusexporter/config_test.go b/exporter/opencensusexporter/config_test.go index 459551f3c809..3dd2ed8269ef 100644 --- a/exporter/opencensusexporter/config_test.go +++ b/exporter/opencensusexporter/config_test.go @@ -71,7 +71,6 @@ func TestLoadConfig(t *testing.T) { WriteBufferSize: 512 * 1024, BalancerName: "round_robin", }, - NumWorkers: 123, - ReconnectionDelay: 15, + NumWorkers: 123, }) } diff --git a/exporter/opencensusexporter/factory.go b/exporter/opencensusexporter/factory.go index 4cb0919cc427..7d7d6ba15443 100644 --- a/exporter/opencensusexporter/factory.go +++ b/exporter/opencensusexporter/factory.go @@ -15,13 +15,9 @@ package opencensusexporter import ( - "fmt" + "context" - "contrib.go.opencensus.io/exporter/ocagent" "go.uber.org/zap" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -54,82 +50,18 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter { // We almost read 0 bytes, so no need to tune ReadBufferSize. WriteBufferSize: 512 * 1024, }, + NumWorkers: 2, } } // CreateTraceExporter creates a trace exporter based on this config. -func (f *Factory) CreateTraceExporter(logger *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) { - ocac := config.(*Config) - opts, err := f.OCAgentOptions(logger, ocac) - if err != nil { - return nil, err - } - return NewTraceExporter(logger, config, opts...) -} - -// OCAgentOptions takes the oc exporter Config and generates ocagent Options -func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.ExporterOption, error) { - if ocac.Endpoint == "" { - return nil, &ocExporterError{ - code: errEndpointRequired, - msg: "OpenCensus exporter config requires an Endpoint", - } - } - // TODO(ccaraman): Clean up this usage of gRPC settings apart of PR to address issue #933. - opts := []ocagent.ExporterOption{ocagent.WithAddress(ocac.Endpoint)} - if ocac.Compression != "" { - if compressionKey := configgrpc.GetGRPCCompressionKey(ocac.Compression); compressionKey != configgrpc.CompressionUnsupported { - opts = append(opts, ocagent.UseCompressor(compressionKey)) - } else { - return nil, &ocExporterError{ - code: errUnsupportedCompressionType, - msg: fmt.Sprintf("OpenCensus exporter unsupported compression type %q", ocac.Compression), - } - } - } - switch { - case ocac.TLSSetting.CAFile != "": - creds, err := credentials.NewClientTLSFromFile(ocac.TLSSetting.CAFile, "") - if err != nil { - return nil, &ocExporterError{ - code: errUnableToGetTLSCreds, - msg: fmt.Sprintf("OpenCensus exporter unable to read TLS credentials from pem file %q: %v", ocac.TLSSetting.CAFile, err), - } - } - opts = append(opts, ocagent.WithTLSCredentials(creds)) - case !ocac.TLSSetting.Insecure: - tlsConf, err := ocac.TLSSetting.LoadTLSConfig() - if err != nil { - return nil, fmt.Errorf("OpenCensus exporter failed to load TLS config: %w", err) - } - creds := credentials.NewTLS(tlsConf) - opts = append(opts, ocagent.WithTLSCredentials(creds)) - default: - opts = append(opts, ocagent.WithInsecure()) - } - - if len(ocac.Headers) > 0 { - opts = append(opts, ocagent.WithHeaders(ocac.Headers)) - } - if ocac.ReconnectionDelay > 0 { - opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay)) - } - if ocac.Keepalive != nil { - opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: ocac.Keepalive.Time, - Timeout: ocac.Keepalive.Timeout, - PermitWithoutStream: ocac.Keepalive.PermitWithoutStream, - }))) - } - return opts, nil +func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) { + oCfg := config.(*Config) + return newTraceExporter(context.Background(), oCfg) } // CreateMetricsExporter creates a metrics exporter based on this config. -func (f *Factory) CreateMetricsExporter(logger *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) { +func (f *Factory) CreateMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) { oCfg := config.(*Config) - opts, err := f.OCAgentOptions(logger, oCfg) - if err != nil { - return nil, err - } - return NewMetricsExporter(logger, config, opts...) + return newMetricsExporter(context.Background(), oCfg) } diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index b7eee6bbb9f3..e534267f057d 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -23,12 +23,10 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/receiver/opencensusreceiver" "go.opentelemetry.io/collector/testutil" ) @@ -50,25 +48,7 @@ func TestCreateMetricsExporter(t *testing.T) { } func TestCreateTraceExporter(t *testing.T) { - // This test is about creating the exporter and stopping it. However, the - // exporter keeps trying to update its connection state in the background - // so unless there is a receiver enabled the stop call can return different - // results. Standing up a receiver to ensure that stop don't report errors. - rcvFactory := &opencensusreceiver.Factory{} - require.NotNil(t, rcvFactory) - rcvCfg := rcvFactory.CreateDefaultConfig().(*opencensusreceiver.Config) - rcvCfg.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t) - - rcv, err := rcvFactory.CreateTraceReceiver( - context.Background(), - zap.NewNop(), - rcvCfg, - new(exportertest.SinkTraceExporterOld)) - require.NotNil(t, rcv) - require.Nil(t, err) - require.Nil(t, rcv.Start(context.Background(), componenttest.NewNopHost())) - defer rcv.Shutdown(context.Background()) - + endpoint := testutil.GetAvailableLocalAddress(t) tests := []struct { name string config Config @@ -80,69 +60,69 @@ func TestCreateTraceExporter(t *testing.T) { GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: "", }, + NumWorkers: 3, }, mustFail: true, }, { - name: "UseSecure", + name: "ZeroNumWorkers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, TLSSetting: configtls.TLSClientSetting{ Insecure: false, }, }, + NumWorkers: 0, }, + mustFail: true, }, { - name: "ReconnectionDelay", + name: "UseSecure", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + Insecure: false, + }, }, - ReconnectionDelay: 5 * time.Second, + NumWorkers: 3, }, }, { name: "Keepalive", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, Keepalive: &configgrpc.KeepaliveClientConfig{ Time: 30 * time.Second, Timeout: 25 * time.Second, PermitWithoutStream: true, }, }, + NumWorkers: 3, }, }, { name: "Compression", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, Compression: configgrpc.CompressionGzip, }, + NumWorkers: 3, }, }, { name: "Headers", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, Headers: map[string]string{ "hdr1": "val1", "hdr2": "val2", }, }, - }, - }, - { - name: "NumConsumers", - config: Config{ - GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, - }, NumWorkers: 3, }, }, @@ -150,9 +130,10 @@ func TestCreateTraceExporter(t *testing.T) { name: "CompressionError", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, Compression: "unknown compression", }, + NumWorkers: 3, }, mustFail: true, }, @@ -160,26 +141,28 @@ func TestCreateTraceExporter(t *testing.T) { name: "CaCert", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, TLSSetting: configtls.TLSClientSetting{ TLSSetting: configtls.TLSSetting{ CAFile: "testdata/test_cert.pem", }, }, }, + NumWorkers: 3, }, }, { name: "CertPemFileError", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Endpoint: rcvCfg.NetAddr.Endpoint, + Endpoint: endpoint, TLSSetting: configtls.TLSClientSetting{ TLSSetting: configtls.TLSSetting{ CAFile: "nosuchfile", }, }, }, + NumWorkers: 3, }, mustFail: true, }, @@ -188,21 +171,22 @@ func TestCreateTraceExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { factory := &Factory{} - consumer, err := factory.CreateTraceExporter(zap.NewNop(), &tt.config) - - if tt.mustFail { - assert.NotNil(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, consumer) - err = consumer.Shutdown(context.Background()) - if err != nil { - // Since the endpoint of opencensus exporter doesn't actually exist, - // exporter may already stop because it cannot connect. - assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") - } - } + tReceiver, tErr := factory.CreateTraceExporter(zap.NewNop(), &tt.config) + checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail) + mReceiver, mErr := factory.CreateMetricsExporter(zap.NewNop(), &tt.config) + checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail) }) } } + +func checkErrorsAndShutdown(t *testing.T, receiver component.Receiver, err error, mustFail bool) { + if mustFail { + assert.NotNil(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, receiver) + + require.NoError(t, receiver.Shutdown(context.Background())) + } +} diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index b1ee31a2a6a4..925498017c4c 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -16,183 +16,252 @@ package opencensusexporter import ( "context" + "errors" "fmt" - "sync" - "contrib.go.opencensus.io/exporter/ocagent" + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" - "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/exporter/exporterhelper" ) -type ocAgentExporter struct { - exporters chan *ocagent.Exporter +// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream +// why we need to keep the cancel func to cancel the stream +type tracesClientWithCancel struct { + cancel context.CancelFunc + tsec agenttracepb.TraceService_ExportClient } -type ocExporterErrorCode int -type ocExporterError struct { - code ocExporterErrorCode - msg string +// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream +// why we need to keep the cancel func to cancel the stream +type metricsClientWithCancel struct { + cancel context.CancelFunc + msec agentmetricspb.MetricsService_ExportClient } -var _ error = (*ocExporterError)(nil) - -func (e *ocExporterError) Error() string { - return e.msg +type ocExporter struct { + cfg *Config + // gRPC clients and connection. + traceSvcClient agenttracepb.TraceServiceClient + metricsSvcClient agentmetricspb.MetricsServiceClient + // In any of the channels we keep always NumWorkers object (sometimes nil), + // to make sure we don't open more than NumWorkers RPCs at any moment. + tracesClients chan *tracesClientWithCancel + metricsClients chan *metricsClientWithCancel + grpcClientConn *grpc.ClientConn + metadata metadata.MD } -const ( - defaultNumWorkers int = 2 - - _ ocExporterErrorCode = iota // skip 0 - // errEndpointRequired indicates that this exporter was not provided with an endpoint in its config. - errEndpointRequired - // errUnsupportedCompressionType indicates that this exporter was provided with a compression protocol it does not support. - errUnsupportedCompressionType - // errUnableToGetTLSCreds indicates that this exporter could not read the provided TLS credentials. - errUnableToGetTLSCreds - // errAlreadyStopped indicates that the exporter was already stopped. - errAlreadyStopped -) +func newOcExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { + if cfg.Endpoint == "" { + return nil, errors.New("OpenCensus exporter cfg requires an Endpoint") + } + + if cfg.NumWorkers <= 0 { + return nil, errors.New("OpenCensus exporter cfg requires at least one worker") + } -// NewTraceExporter creates an Open Census trace exporter. -func NewTraceExporter(logger *zap.Logger, config configmodels.Exporter, opts ...ocagent.ExporterOption) (component.TraceExporterOld, error) { - oce, err := createOCAgentExporter(logger, config, opts...) + dialOpts, err := cfg.GRPCClientSettings.ToDialOptions() if err != nil { return nil, err } - oexp, err := exporterhelper.NewTraceExporterOld( - config, - oce.PushTraceData, - exporterhelper.WithShutdown(oce.Shutdown)) - if err != nil { + + var clientConn *grpc.ClientConn + if clientConn, err = grpc.DialContext(ctx, cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil { return nil, err } - return oexp, nil + oce := &ocExporter{ + cfg: cfg, + grpcClientConn: clientConn, + metadata: metadata.New(cfg.GRPCClientSettings.Headers), + } + return oce, nil } -// createOCAgentExporter takes ocagent exporter options and create an OC exporter -func createOCAgentExporter(logger *zap.Logger, config configmodels.Exporter, opts ...ocagent.ExporterOption) (*ocAgentExporter, error) { - oCfg := config.(*Config) - numWorkers := defaultNumWorkers - if oCfg.NumWorkers > 0 { - numWorkers = oCfg.NumWorkers - } - - exportersChan := make(chan *ocagent.Exporter, numWorkers) - for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ { - // TODO: ocagent.NewExporter blocks for connection. Now that we have ability - // to report errors asynchronously using Host.ReportFatalError we can move this - // code to Start() and do it in background to avoid blocking Collector startup - // as we do now. - exporter, serr := ocagent.NewExporter(opts...) - if serr != nil { - return nil, fmt.Errorf("cannot configure OpenCensus exporter: %v", serr) +func (oce *ocExporter) shutdown(context.Context) error { + if oce.tracesClients != nil { + // First remove all the clients from the channel. + for i := 0; i < oce.cfg.NumWorkers; i++ { + <-oce.tracesClients } - exportersChan <- exporter + // Now close the channel + close(oce.tracesClients) } - oce := &ocAgentExporter{exporters: exportersChan} - return oce, nil + if oce.metricsClients != nil { + // First remove all the clients from the channel. + for i := 0; i < oce.cfg.NumWorkers; i++ { + <-oce.metricsClients + } + // Now close the channel + close(oce.metricsClients) + } + return oce.grpcClientConn.Close() } -// NewMetricsExporter creates an Open Census metrics exporter. -func NewMetricsExporter(logger *zap.Logger, config configmodels.Exporter, opts ...ocagent.ExporterOption) (component.MetricsExporterOld, error) { - oce, err := createOCAgentExporter(logger, config, opts...) +func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporterOld, error) { + oce, err := newOcExporter(ctx, cfg) if err != nil { return nil, err } - oexp, err := exporterhelper.NewMetricsExporterOld( - config, - oce.PushMetricsData, - exporterhelper.WithShutdown(oce.Shutdown)) - if err != nil { - return nil, err + oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn) + oce.tracesClients = make(chan *tracesClientWithCancel, cfg.NumWorkers) + // Try to create rpc clients now. + for i := 0; i < cfg.NumWorkers; i++ { + tc, errTC := oce.createTraceServiceRPC() + if errTC != nil { + oce.tracesClients <- nil + } else { + oce.tracesClients <- tc + } } - return oexp, nil + return exporterhelper.NewTraceExporterOld( + cfg, + oce.pushTraceData, + exporterhelper.WithShutdown(oce.shutdown)) } -func (oce *ocAgentExporter) Shutdown(context.Context) error { - wg := &sync.WaitGroup{} - var errors []error - var errorsMu sync.Mutex - visitedCnt := 0 - for currExporter := range oce.exporters { - wg.Add(1) - go func(exporter *ocagent.Exporter) { - defer wg.Done() - err := exporter.Stop() - if err != nil { - errorsMu.Lock() - errors = append(errors, err) - errorsMu.Unlock() - } - }(currExporter) - visitedCnt++ - if visitedCnt == cap(oce.exporters) { - // Visited and started Stop on all exporters, just wait for the stop to finish. - break +func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExporterOld, error) { + oce, err := newOcExporter(ctx, cfg) + if err != nil { + return nil, err + } + oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn) + oce.metricsClients = make(chan *metricsClientWithCancel, cfg.NumWorkers) + // Try to create rpc clients now. + for i := 0; i < cfg.NumWorkers; i++ { + mc, errMC := oce.createMetricsServiceRPC() + if errMC != nil { + oce.metricsClients <- nil + } else { + oce.metricsClients <- mc } } - wg.Wait() - close(oce.exporters) - - return componenterror.CombineErrors(errors) + return exporterhelper.NewMetricsExporterOld( + cfg, + oce.pushMetricsData, + exporterhelper.WithShutdown(oce.shutdown)) } -func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) { - // Get first available exporter. - exporter, ok := <-oce.exporters +func (oce *ocExporter) pushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) { + // Get first available trace Client. + tClient, ok := <-oce.tracesClients if !ok { - err := &ocExporterError{ - code: errAlreadyStopped, - msg: "OpenCensus exporter was already stopped.", + err := errors.New("failed to push traces, OpenCensus exporter was already stopped") + return len(td.Spans), err + } + + // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), + // to make sure we don't open more than NumWorkers RPCs at any moment. + // Here check if the client is nil and create a new one if that is the case. A nil + // object means that an error happened: could not connect, service went down, etc. + if tClient == nil { + var err error + tClient, err = oce.createTraceServiceRPC() + if err != nil { + // Cannot create an RPC, put back nil to keep the number of workers constant. + oce.tracesClients <- nil + return len(td.Spans), err } - return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err) } - err := exporter.ExportTraceServiceRequest( - &agenttracepb.ExportTraceServiceRequest{ - Spans: td.Spans, - Resource: td.Resource, - Node: td.Node, - }, - ) - oce.exporters <- exporter - if err != nil { - return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %w", err) + // This is a hack because OC protocol expects a Node for the initial message. + node := td.Node + if node == nil { + node = &commonpb.Node{} + } + req := &agenttracepb.ExportTraceServiceRequest{ + Spans: td.Spans, + Resource: td.Resource, + Node: node, + } + if err := tClient.tsec.Send(req); err != nil { + // Error received, cancel the context used to create the RPC to free all resources, + // put back nil to keep the number of workers constant. + tClient.cancel() + oce.tracesClients <- nil + return len(td.Spans), err } + oce.tracesClients <- tClient return 0, nil } -func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata.MetricsData) (int, error) { - // Get first available exporter. - exporter, ok := <-oce.exporters +func (oce *ocExporter) pushMetricsData(_ context.Context, md consumerdata.MetricsData) (int, error) { + // Get first available mClient. + mClient, ok := <-oce.metricsClients if !ok { - err := &ocExporterError{ - code: errAlreadyStopped, - msg: "OpenCensus exporter was already stopped.", + err := errors.New("failed to push metrics, OpenCensus exporter was already stopped") + return exporterhelper.NumTimeSeries(md), err + } + + // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), + // to make sure we don't open more than NumWorkers RPCs at any moment. + // Here check if the client is nil and create a new one if that is the case. A nil + // object means that an error happened: could not connect, service went down, etc. + if mClient == nil { + var err error + mClient, err = oce.createMetricsServiceRPC() + if err != nil { + // Cannot create an RPC, put back nil to keep the number of workers constant. + oce.metricsClients <- nil + return exporterhelper.NumTimeSeries(md), err } - return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err) } + // This is a hack because OC protocol expects a Node for the initial message. + node := md.Node + if node == nil { + node = &commonpb.Node{} + } req := &agentmetricspb.ExportMetricsServiceRequest{ Metrics: md.Metrics, Resource: md.Resource, - Node: md.Node, + Node: node, } - err := exporter.ExportMetricsServiceRequest(req) - oce.exporters <- exporter - if err != nil { - return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %w", err) + if err := mClient.msec.Send(req); err != nil { + // Error received, cancel the context used to create the RPC to free all resources, + // put back nil to keep the number of workers constant. + mClient.cancel() + oce.metricsClients <- nil + return exporterhelper.NumTimeSeries(md), err } + oce.metricsClients <- mClient return 0, nil } + +func (oce *ocExporter) createTraceServiceRPC() (*tracesClientWithCancel, error) { + // Initiate the trace service by sending over node identifier info. + ctx, cancel := context.WithCancel(context.Background()) + if len(oce.cfg.Headers) > 0 { + ctx = metadata.NewOutgoingContext(ctx, metadata.New(oce.cfg.Headers)) + } + // Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever. + traceClient, err := oce.traceSvcClient.Export(ctx) + if err != nil { + cancel() + return nil, fmt.Errorf("TraceServiceClient: %w", err) + } + return &tracesClientWithCancel{cancel: cancel, tsec: traceClient}, nil +} + +func (oce *ocExporter) createMetricsServiceRPC() (*metricsClientWithCancel, error) { + // Initiate the trace service by sending over node identifier info. + ctx, cancel := context.WithCancel(context.Background()) + if len(oce.cfg.Headers) > 0 { + ctx = metadata.NewOutgoingContext(ctx, metadata.New(oce.cfg.Headers)) + } + // Cannot use grpc.WaitForReady(cfg.WaitForReady) because will block forever. + metricsClient, err := oce.metricsSvcClient.Export(ctx) + if err != nil { + cancel() + return nil, fmt.Errorf("MetricsServiceClient: %w", err) + } + return &metricsClientWithCancel{cancel: cancel, msec: metricsClient}, nil +} diff --git a/exporter/opencensusexporter/opencensus_test.go b/exporter/opencensusexporter/opencensus_test.go new file mode 100644 index 000000000000..a67cc5652564 --- /dev/null +++ b/exporter/opencensusexporter/opencensus_test.go @@ -0,0 +1,208 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencensusexporter + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/internal/data/testdata" + "go.opentelemetry.io/collector/receiver/opencensusreceiver" + "go.opentelemetry.io/collector/testutil" + "go.opentelemetry.io/collector/translator/internaldata" +) + +func TestSendTraces(t *testing.T) { + sink := &exportertest.SinkTraceExporterOld{} + rFactory := opencensusreceiver.Factory{} + rCfg := rFactory.CreateDefaultConfig().(*opencensusreceiver.Config) + endpoint := testutil.GetAvailableLocalAddress(t) + rCfg.GRPCServerSettings.NetAddr.Endpoint = endpoint + recv, err := rFactory.CreateTraceReceiver(context.Background(), zap.NewNop(), rCfg, sink) + assert.NoError(t, err) + assert.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, recv.Shutdown(context.Background())) + }) + + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + cfg.NumWorkers = 1 + exp, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, exp) + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + t.Cleanup(func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }) + + td := internaldata.TraceDataToOC(testdata.GenerateTraceDataOneSpan()) + assert.NoError(t, exp.ConsumeTraceData(context.Background(), td[0])) + testutil.WaitFor(t, func() bool { + return len(sink.AllTraces()) == 1 + }) + require.Len(t, sink.AllTraces(), 1) + assert.True(t, proto.Equal(td[0].Node, sink.AllTraces()[0].Node)) + assert.True(t, proto.Equal(td[0].Resource, sink.AllTraces()[0].Resource)) + require.Len(t, sink.AllTraces()[0].Spans, 1) + assert.True(t, proto.Equal(td[0].Spans[0], sink.AllTraces()[0].Spans[0])) +} + +func TestSendTraces_NoBackend(t *testing.T) { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: "localhost:56569", + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + exp, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, exp) + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + t.Cleanup(func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }) + + td := internaldata.TraceDataToOC(testdata.GenerateTraceDataOneSpan()) + for i := 0; i < 10000; i++ { + assert.Error(t, exp.ConsumeTraceData(context.Background(), td[0])) + } +} + +func TestSendTraces_AfterStop(t *testing.T) { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: "localhost:56569", + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + exp, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, exp) + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + assert.NoError(t, exp.Shutdown(context.Background())) + + td := internaldata.TraceDataToOC(testdata.GenerateTraceDataOneSpan()) + assert.Error(t, exp.ConsumeTraceData(context.Background(), td[0])) +} + +func TestSendMetrics(t *testing.T) { + sink := &exportertest.SinkMetricsExporterOld{} + rFactory := opencensusreceiver.Factory{} + rCfg := rFactory.CreateDefaultConfig().(*opencensusreceiver.Config) + endpoint := testutil.GetAvailableLocalAddress(t) + rCfg.GRPCServerSettings.NetAddr.Endpoint = endpoint + recv, err := rFactory.CreateMetricsReceiver(context.Background(), zap.NewNop(), rCfg, sink) + assert.NoError(t, err) + assert.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, recv.Shutdown(context.Background())) + }) + + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: endpoint, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + cfg.NumWorkers = 1 + exp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, exp) + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + t.Cleanup(func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }) + + md := internaldata.MetricDataToOC(testdata.GenerateMetricDataOneMetric()) + assert.NoError(t, exp.ConsumeMetricsData(context.Background(), md[0])) + testutil.WaitFor(t, func() bool { + return len(sink.AllMetrics()) == 1 + }) + require.Len(t, sink.AllMetrics(), 1) + assert.True(t, proto.Equal(md[0].Node, sink.AllMetrics()[0].Node)) + assert.True(t, proto.Equal(md[0].Resource, sink.AllMetrics()[0].Resource)) + require.Len(t, sink.AllMetrics()[0].Metrics, 1) + assert.True(t, proto.Equal(md[0].Metrics[0], sink.AllMetrics()[0].Metrics[0])) +} + +func TestSendMetrics_NoBackend(t *testing.T) { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: "localhost:56569", + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + exp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, exp) + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + t.Cleanup(func() { + assert.NoError(t, exp.Shutdown(context.Background())) + }) + + md := internaldata.MetricDataToOC(testdata.GenerateMetricDataOneMetric()) + for i := 0; i < 10000; i++ { + assert.Error(t, exp.ConsumeMetricsData(context.Background(), md[0])) + } +} + +func TestSendMetrics_AfterStop(t *testing.T) { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ + Endpoint: "localhost:56569", + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, + } + exp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, exp) + host := componenttest.NewNopHost() + require.NoError(t, exp.Start(context.Background(), host)) + assert.NoError(t, exp.Shutdown(context.Background())) + + md := internaldata.MetricDataToOC(testdata.GenerateMetricDataOneMetric()) + assert.Error(t, exp.ConsumeMetricsData(context.Background(), md[0])) +} diff --git a/exporter/opencensusexporter/testdata/config.yaml b/exporter/opencensusexporter/testdata/config.yaml index 470537973d4a..20b1284dd590 100644 --- a/exporter/opencensusexporter/testdata/config.yaml +++ b/exporter/opencensusexporter/testdata/config.yaml @@ -15,7 +15,6 @@ exporters: "can you have a . here?": "F0000000-0000-0000-0000-000000000000" header1: 234 another: "somevalue" - reconnection_delay: 15 balancer_name: "round_robin" keepalive: time: 20 diff --git a/go.mod b/go.mod index d0189d690a21..63964cf4fdfb 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.14 require ( contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948 - contrib.go.opencensus.io/exporter/ocagent v0.7.0 contrib.go.opencensus.io/exporter/prometheus v0.2.0 github.com/OneOfOne/xxhash v1.2.5 // indirect github.com/Shopify/sarama v1.27.0 diff --git a/go.sum b/go.sum index 05648265092e..d897436cabdd 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,6 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= contrib.go.opencensus.io/exporter/jaeger v0.1.1-0.20190430175949-e8b55949d948/go.mod h1:ukdzwIYYHgZ7QYtwVFQUjiT28BJHiMhTERo32s6qVgM= -contrib.go.opencensus.io/exporter/ocagent v0.7.0 h1:BEfdCTXfMV30tLZD8c9n64V/tIZX5+9sXiuFLnrr1k8= -contrib.go.opencensus.io/exporter/ocagent v0.7.0/go.mod h1:IshRmMJBhDfFj5Y67nVhMYTTIze91RUeT73ipWKs/GY= contrib.go.opencensus.io/exporter/prometheus v0.2.0/go.mod h1:TYmVAyE8Tn1lyPcltF5IYYfWp2KHu7lQGIZnj8iZMys= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= @@ -1014,7 +1012,6 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200528225125-3c3fba18258b/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -1089,7 +1086,6 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1203,7 +1199,6 @@ google.golang.org/api v0.15.0/go.mod h1:iLdEw5Ide6rF15KTC1Kkl0iskquN2gFfn9o9XIsb google.golang.org/api v0.17.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= google.golang.org/api v0.18.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= google.golang.org/api v0.20.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= -google.golang.org/api v0.25.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.26.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -1240,7 +1235,6 @@ google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20200527145253-8367513e4ece/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= google.golang.org/genproto v0.0.0-20200603110839-e855014d5736/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= google.golang.org/genproto v0.0.0-20200624020401-64a14ca9d1ad h1:uAwc13+y0Y8QZLTYhLCu6lHhnG99ecQU5FYTj8zxAng= google.golang.org/genproto v0.0.0-20200624020401-64a14ca9d1ad/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= diff --git a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go index 65b214432685..5aad4b30bdaf 100644 --- a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go +++ b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go @@ -29,18 +29,62 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/opencensusexporter" "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/testutil" ) -// TODO: add E2E tests once ocagent implements metric service client. +func TestReceiver_endToEnd(t *testing.T) { + metricSink := new(exportertest.SinkMetricsExporterOld) + + port, doneFn := ocReceiverOnGRPCServer(t, metricSink) + defer doneFn() + + address := fmt.Sprintf("localhost:%d", port) + expFactory := &opencensusexporter.Factory{} + expCfg := expFactory.CreateDefaultConfig().(*opencensusexporter.Config) + expCfg.GRPCClientSettings.TLSSetting.Insecure = true + expCfg.Endpoint = address + expCfg.WaitForReady = true + oce, err := expFactory.CreateMetricsExporter(zap.NewNop(), expCfg) + require.NoError(t, err) + err = oce.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + defer func() { + require.NoError(t, oce.Shutdown(context.Background())) + }() + + wantMetrics := []*metricspb.Metric{ + makeMetric(1), + makeMetric(2), + } + wantNode := &commonpb.Node{ServiceInfo: &commonpb.ServiceInfo{Name: "test"}} + td := consumerdata.MetricsData{Node: wantNode, Metrics: wantMetrics} + assert.NoError(t, oce.ConsumeMetricsData(context.Background(), td)) + + testutil.WaitFor(t, func() bool { + return len(metricSink.AllMetrics()) != 0 + }) + gotMetrics := metricSink.AllMetrics() + assert.Len(t, gotMetrics, 1) + assert.True(t, proto.Equal(wantNode, gotMetrics[0].Node)) + require.Equal(t, len(wantMetrics), len(gotMetrics[0].Metrics)) + for i := range wantMetrics { + assert.True(t, proto.Equal(wantMetrics[i], gotMetrics[0].Metrics[i])) + } +} // Issue #43. Export should support node multiplexing. // The goal is to ensure that Receiver can always support diff --git a/receiver/opencensusreceiver/octrace/opencensus_test.go b/receiver/opencensusreceiver/octrace/opencensus_test.go index e5fdb7fc91f0..332c30846263 100644 --- a/receiver/opencensusreceiver/octrace/opencensus_test.go +++ b/receiver/opencensusreceiver/octrace/opencensus_test.go @@ -21,142 +21,101 @@ import ( "fmt" "io" "net" - "reflect" "strings" "testing" "time" - "contrib.go.opencensus.io/exporter/ocagent" commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/trace" - "go.opencensus.io/trace/tracestate" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/opencensusexporter" "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/testutil" ) func TestReceiver_endToEnd(t *testing.T) { - t.Skip("This test is flaky due to timing slowdown due to -race. Will reenable in the future") - spanSink := new(exportertest.SinkTraceExporterOld) port, doneFn := ocReceiverOnGRPCServer(t, spanSink) defer doneFn() - // Now the opencensus-agent exporter. address := fmt.Sprintf("localhost:%d", port) - oce, err := ocagent.NewExporter(ocagent.WithAddress(address), ocagent.WithInsecure()) - require.NoError(t, err, "Failed to create the ocagent-exporter: %v", err) - - trace.RegisterExporter(oce) + expFactory := &opencensusexporter.Factory{} + expCfg := expFactory.CreateDefaultConfig().(*opencensusexporter.Config) + expCfg.GRPCClientSettings.TLSSetting.Insecure = true + expCfg.Endpoint = address + expCfg.WaitForReady = true + oce, err := expFactory.CreateTraceExporter(zap.NewNop(), expCfg) + require.NoError(t, err) + err = oce.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) defer func() { - oce.Stop() - trace.UnregisterExporter(oce) + require.NoError(t, oce.Shutdown(context.Background())) }() now := time.Now().UTC() - clientSpanData := &trace.SpanData{ - StartTime: now.Add(-10 * time.Second), - EndTime: now.Add(20 * time.Second), - SpanContext: trace.SpanContext{ - TraceID: trace.TraceID{0x4F, 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41}, - SpanID: trace.SpanID{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, - TraceOptions: trace.TraceOptions(0x01), - }, - ParentSpanID: trace.SpanID{0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37}, - Name: "ClientSpan", - Status: trace.Status{Code: trace.StatusCodeInternal, Message: "Blocked by firewall"}, - SpanKind: trace.SpanKindClient, - } - - serverSpanData := &trace.SpanData{ - StartTime: now.Add(-5 * time.Second), - EndTime: now.Add(10 * time.Second), - SpanContext: trace.SpanContext{ - TraceID: trace.TraceID{0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E}, - SpanID: trace.SpanID{0xF0, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7}, - TraceOptions: trace.TraceOptions(0x01), - Tracestate: &tracestate.Tracestate{}, - }, - ParentSpanID: trace.SpanID{0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F}, - Name: "ServerSpan", - Status: trace.Status{Code: trace.StatusCodeOK, Message: "OK"}, - SpanKind: trace.SpanKindServer, - Links: []trace.Link{ - { - TraceID: trace.TraceID{0x4F, 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41, 0x40}, - SpanID: trace.SpanID{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, - Type: trace.LinkTypeParent, - }, - }, - } - - oce.ExportSpan(serverSpanData) - oce.ExportSpan(clientSpanData) - // Give them some time to be exported. - <-time.After(100 * time.Millisecond) - - oce.Flush() - - // Give them some time to be exported. - <-time.After(150 * time.Millisecond) - - // Now span inspection and verification time! - var gotSpans []*tracepb.Span - for _, td := range spanSink.AllTraces() { - gotSpans = append(gotSpans, td.Spans...) + clientSpan := &tracepb.Span{ + TraceId: []byte{0x4F, 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41}, + SpanId: []byte{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, + ParentSpanId: []byte{0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37}, + Name: &tracepb.TruncatableString{Value: "ClientSpan"}, + Kind: tracepb.Span_CLIENT, + StartTime: internal.TimeToTimestamp(now.Add(-10 * time.Second)), + EndTime: internal.TimeToTimestamp(now.Add(20 * time.Second)), + Status: &tracepb.Status{Code: trace.StatusCodeInternal, Message: "Blocked by firewall"}, } - wantSpans := []*tracepb.Span{ - { - TraceId: serverSpanData.TraceID[:], - SpanId: serverSpanData.SpanID[:], - ParentSpanId: serverSpanData.ParentSpanID[:], - Name: &tracepb.TruncatableString{Value: "ServerSpan"}, - Kind: tracepb.Span_SERVER, - StartTime: internal.TimeToTimestamp(serverSpanData.StartTime), - EndTime: internal.TimeToTimestamp(serverSpanData.EndTime), - Status: &tracepb.Status{Code: serverSpanData.Status.Code, Message: serverSpanData.Status.Message}, - Tracestate: &tracepb.Span_Tracestate{}, - Links: &tracepb.Span_Links{ - Link: []*tracepb.Span_Link{ - { - TraceId: []byte{0x4F, 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41, 0x40}, - SpanId: []byte{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, - Type: tracepb.Span_Link_PARENT_LINKED_SPAN, - }, + serverSpan := &tracepb.Span{ + TraceId: []byte{0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2A, 0x2B, 0x2C, 0x2D, 0x2E}, + SpanId: []byte{0xF0, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7}, + ParentSpanId: []byte{0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F}, + Name: &tracepb.TruncatableString{Value: "ServerSpan"}, + Kind: tracepb.Span_SERVER, + StartTime: internal.TimeToTimestamp(now.Add(-5 * time.Second)), + EndTime: internal.TimeToTimestamp(now.Add(10 * time.Second)), + Status: &tracepb.Status{Code: trace.StatusCodeOK, Message: ""}, + Tracestate: &tracepb.Span_Tracestate{}, + Links: &tracepb.Span_Links{ + Link: []*tracepb.Span_Link{ + { + TraceId: []byte{0x4F, 0x4E, 0x4D, 0x4C, 0x4B, 0x4A, 0x49, 0x48, 0x47, 0x46, 0x45, 0x44, 0x43, 0x42, 0x41, 0x40}, + SpanId: []byte{0x7F, 0x7E, 0x7D, 0x7C, 0x7B, 0x7A, 0x79, 0x78}, + Type: tracepb.Span_Link_PARENT_LINKED_SPAN, }, }, }, - { - TraceId: clientSpanData.TraceID[:], - SpanId: clientSpanData.SpanID[:], - ParentSpanId: clientSpanData.ParentSpanID[:], - Name: &tracepb.TruncatableString{Value: "ClientSpan"}, - Kind: tracepb.Span_CLIENT, - StartTime: internal.TimeToTimestamp(clientSpanData.StartTime), - EndTime: internal.TimeToTimestamp(clientSpanData.EndTime), - Status: &tracepb.Status{Code: clientSpanData.Status.Code, Message: clientSpanData.Status.Message}, - }, } - if g, w := len(gotSpans), len(wantSpans); g != w { - t.Errorf("SpanCount: got %d want %d", g, w) + wantSpans := []*tracepb.Span{ + serverSpan, + clientSpan, } - - if !reflect.DeepEqual(gotSpans, wantSpans) { - gotBlob, _ := json.MarshalIndent(gotSpans, "", " ") - wantBlob, _ := json.MarshalIndent(wantSpans, "", " ") - t.Errorf("GotSpans:\n%s\nWantSpans:\n%s", gotBlob, wantBlob) + wantNode := &commonpb.Node{ServiceInfo: &commonpb.ServiceInfo{Name: "test"}} + td := consumerdata.TraceData{Node: wantNode, Spans: wantSpans} + assert.NoError(t, oce.ConsumeTraceData(context.Background(), td)) + + testutil.WaitFor(t, func() bool { + return len(spanSink.AllTraces()) != 0 + }) + gotTraces := spanSink.AllTraces() + assert.Len(t, gotTraces, 1) + assert.True(t, proto.Equal(wantNode, gotTraces[0].Node)) + require.Equal(t, len(wantSpans), len(gotTraces[0].Spans)) + for i := range gotTraces[0].Spans { + assert.True(t, proto.Equal(wantSpans[i], gotTraces[0].Spans[i])) } } @@ -442,7 +401,7 @@ func ocReceiverOnGRPCServer(t *testing.T, sr consumer.TraceConsumerOld) (int, fu ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) - doneFnList := []func(){func() { ln.Close() }} + doneFnList := []func(){func() { require.NoError(t, ln.Close()) }} done := func() { for _, doneFn := range doneFnList { doneFn() diff --git a/service/builder/exporters_builder_test.go b/service/builder/exporters_builder_test.go index bc3554c272c4..cc166df037e2 100644 --- a/service/builder/exporters_builder_test.go +++ b/service/builder/exporters_builder_test.go @@ -45,6 +45,7 @@ func TestExportersBuilder_Build(t *testing.T) { GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: "0.0.0.0:12345", }, + NumWorkers: 2, }, },