Skip to content

Commit

Permalink
Created telset for remote-storage component (#5731)
Browse files Browse the repository at this point in the history
**Which problem is this PR solving?**

This PR addresses a part of the issue [#5633
](#5633)

**Description of the changes**
This is a Draft PR to achieve Observability Parity between V1 and V2
components by creating an unified telemetery container to pass
observability clients to V1 components.
**How was this change tested?**

The changes were tested by running the following command:

```bash
make test
```

**Checklist**

- [x] I have read
[CONTRIBUTING_GUIDELINES.md](https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md)
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - `for jaeger: make lint test`
  - `for jaeger-ui: yarn lint` and `yarn test`

---------

Signed-off-by: Wise-Wizard <[email protected]>
Signed-off-by: Saransh Shankar <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
Wise-Wizard and yurishkuro authored Jul 13, 2024
1 parent a54e2a5 commit e3e763c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 26 deletions.
28 changes: 14 additions & 14 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
"net"
"sync"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
Expand All @@ -36,32 +37,30 @@ import (

// Server runs a gRPC server
type Server struct {
logger *zap.Logger
healthcheck *healthcheck.HealthCheck
opts *Options
opts *Options

grpcConn net.Listener
grpcServer *grpc.Server
wg sync.WaitGroup
telemetery.Setting
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, logger *zap.Logger, healthcheck *healthcheck.HealthCheck) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, logger)
func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, telset telemetery.Setting) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, telset.Logger)
if err != nil {
return nil, err
}

grpcServer, err := createGRPCServer(options, tm, handler, logger)
grpcServer, err := createGRPCServer(options, tm, handler, telset.Logger)
if err != nil {
return nil, err
}

return &Server{
logger: logger,
healthcheck: healthcheck,
opts: options,
grpcServer: grpcServer,
opts: options,
grpcServer: grpcServer,
Setting: telset,
}, nil
}

Expand Down Expand Up @@ -129,15 +128,15 @@ func (s *Server) Start() error {
if err != nil {
return err
}
s.logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.grpcConn = listener
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("GRPC server exited", zap.Error(err))
s.Logger.Error("GRPC server exited", zap.Error(err))
s.ReportStatus(component.NewFatalErrorEvent(err))
}
s.healthcheck.Set(healthcheck.Unavailable)
}()

return nil
Expand All @@ -149,5 +148,6 @@ func (s *Server) Close() error {
s.grpcConn.Close()
s.opts.TLSGRPC.Close()
s.wg.Wait()
s.ReportStatus(component.NewStatusEvent(component.StatusStopped))
return nil
}
32 changes: 22 additions & 10 deletions cmd/remote-storage/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
Expand All @@ -51,14 +53,16 @@ func TestNewServer_CreateStorageErrors(t *testing.T) {
factory.On("CreateSpanWriter").Return(nil, nil)
factory.On("CreateDependencyReader").Return(nil, errors.New("no deps")).Once()
factory.On("CreateDependencyReader").Return(nil, nil)

telset := telemetery.Setting{
Logger: zap.NewNop(),
ReportStatus: func(*component.StatusEvent) {},
}
f := func() (*Server, error) {
return NewServer(
&Options{GRPCHostPort: ":0"},
factory,
tenancy.NewManager(&tenancy.Options{}),
zap.NewNop(),
healthcheck.New(),
telset,
)
}
_, err := f()
Expand Down Expand Up @@ -123,13 +127,16 @@ func TestNewServer_TLSConfigError(t *testing.T) {
KeyPath: "invalid/path",
ClientCAPath: "invalid/path",
}
telset := telemetery.Setting{
Logger: zap.NewNop(),
ReportStatus: telemetery.HCAdapter(healthcheck.New()),
}
storageMocks := newStorageMocks()
_, err := NewServer(
&Options{GRPCHostPort: ":8081", TLSGRPC: tlsCfg},
storageMocks.factory,
tenancy.NewManager(&tenancy.Options{}),
zap.NewNop(),
healthcheck.New(),
telset,
)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid TLS config")
Expand Down Expand Up @@ -334,12 +341,15 @@ func TestServerGRPCTLS(t *testing.T) {
storageMocks.reader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

tm := tenancy.NewManager(&tenancy.Options{Enabled: true})
telset := telemetery.Setting{
Logger: flagsSvc.Logger,
ReportStatus: telemetery.HCAdapter(flagsSvc.HC()),
}
server, err := NewServer(
serverOptions,
storageMocks.factory,
tm,
flagsSvc.Logger,
flagsSvc.HC(),
telset,
)
require.NoError(t, err)
require.NoError(t, server.Start())
Expand Down Expand Up @@ -380,13 +390,15 @@ func TestServerHandlesPortZero(t *testing.T) {
zapCore, logs := observer.New(zap.InfoLevel)
flagsSvc.Logger = zap.New(zapCore)
storageMocks := newStorageMocks()

telset := telemetery.Setting{
Logger: flagsSvc.Logger,
ReportStatus: telemetery.HCAdapter(flagsSvc.HC()),
}
server, err := NewServer(
&Options{GRPCHostPort: ":0"},
storageMocks.factory,
tenancy.NewManager(&tenancy.Options{}),
flagsSvc.Logger,
flagsSvc.HC(),
telset,
)
require.NoError(t, err)

Expand Down
7 changes: 6 additions & 1 deletion cmd/remote-storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/remote-storage/app"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
Expand Down Expand Up @@ -77,7 +78,11 @@ func main() {
}

tm := tenancy.NewManager(&opts.Tenancy)
server, err := app.NewServer(opts, storageFactory, tm, svc.Logger, svc.HC())
telset := telemetery.Setting{
Logger: svc.Logger,
ReportStatus: telemetery.HCAdapter(svc.HC()),
}
server, err := app.NewServer(opts, storageFactory, tm, telset)
if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))
}
Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/integration/remote_memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
Expand Down Expand Up @@ -47,7 +48,11 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage {
require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger))

t.Logf("Starting in-process remote storage server on %s", opts.GRPCHostPort)
server, err := app.NewServer(opts, storageFactory, tm, logger, healthcheck.New())
telset := telemetery.Setting{
Logger: logger,
ReportStatus: telemetery.HCAdapter(healthcheck.New()),
}
server, err := app.NewServer(opts, storageFactory, tm, telset)
require.NoError(t, err)
require.NoError(t, server.Start())

Expand Down

0 comments on commit e3e763c

Please sign in to comment.