Skip to content

Commit

Permalink
feature(backend): adding event logs for otlp endpoints (#2427)
Browse files Browse the repository at this point in the history
* feature(backend): adding event logs for otlp endpoints

* adding server name to event
  • Loading branch information
xoscar authored Apr 24, 2023
1 parent d1f0b31 commit 12464b7
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 11 deletions.
9 changes: 6 additions & 3 deletions server/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kubeshop/tracetest/server/config"
"github.com/kubeshop/tracetest/server/config/configresource"
"github.com/kubeshop/tracetest/server/config/demoresource"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/executor/pollingprofile"
"github.com/kubeshop/tracetest/server/executor/trigger"
httpServer "github.com/kubeshop/tracetest/server/http"
Expand Down Expand Up @@ -187,6 +188,9 @@ func (app *App) Start(opts ...appOption) error {

pollingProfileRepo := pollingprofile.NewRepository(db)

eventEmitter := executor.NewEventEmitter(testDB, subscriptionManager)
registerOtlpServer(app, testDB, eventEmitter)

rf := newRunnerFacades(
pollingProfileRepo,
testDB,
Expand Down Expand Up @@ -225,7 +229,6 @@ func (app *App) Start(opts ...appOption) error {
return err
}

registerOtlpServer(app, testDB)
provisioner := provisioning.New()

router, mappers := controller(app.cfg, testDB, tracer, rf, triggerRegistry)
Expand Down Expand Up @@ -283,8 +286,8 @@ func registerSPAHandler(router *mux.Router, cfg httpServerConfig, analyticsEnabl
)
}

func registerOtlpServer(app *App, testDB model.Repository) {
ingester := otlp.NewIngester(testDB)
func registerOtlpServer(app *App, testDB model.Repository, eventEmitter executor.EventEmitter) {
ingester := otlp.NewIngester(testDB, eventEmitter)
grpcOtlpServer := otlp.NewGrpcServer(":4317", ingester)
httpOtlpServer := otlp.NewHttpServer(":4318", ingester)
go grpcOtlpServer.Start()
Expand Down
15 changes: 15 additions & 0 deletions server/model/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,18 @@ func TestSpecsAssertionWarning(testID id.ID, runID int, err error, spanID string
Outputs: []model.OutputInfo{},
}
}

func TraceOtlpServerReceivedSpans(testID id.ID, runID, spanCount int, requestType string) model.TestRunEvent {
return model.TestRunEvent{
TestID: testID,
RunID: runID,
Stage: model.StageTrace,
Type: "OTLP_SERVER_RECEIVED_SPANS",
Title: fmt.Sprintf("%s OTLP server endpoint received spans", requestType),
Description: fmt.Sprintf("The Tracetest %s OTLP endpoint server received %d spans", requestType, spanCount),
CreatedAt: time.Now(),
DataStoreConnection: model.ConnectionResult{},
Polling: model.PollingInfo{},
Outputs: []model.OutputInfo{},
}
}
2 changes: 1 addition & 1 deletion server/otlp/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ func (s *grpcServer) Stop() {
}

func (s grpcServer) Export(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
return s.ingester.Ingest(ctx, request)
return s.ingester.Ingest(ctx, request, "gRPC")
}
2 changes: 1 addition & 1 deletion server/otlp/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s httpServer) Export(w http.ResponseWriter, r *http.Request) {
return
}

result, err := s.ingester.Ingest(r.Context(), request)
result, err := s.ingester.Ingest(r.Context(), request, "HTTP")
if err != nil {
response.sendError(http.StatusInternalServerError, status.Errorf(codes.InvalidArgument, "Error when ingesting spans %s", err.Error()))
return
Expand Down
18 changes: 12 additions & 6 deletions server/otlp/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,29 @@ import (
"fmt"
"strings"

"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/model"
"github.com/kubeshop/tracetest/server/model/events"
"github.com/kubeshop/tracetest/server/traces"
"go.opentelemetry.io/otel/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)

type ingester struct {
db model.Repository
db model.Repository
eventEmitter executor.EventEmitter
}

func NewIngester(db model.Repository) ingester {
func NewIngester(db model.Repository, eventEmitter executor.EventEmitter) ingester {
return ingester{
db: db,
db: db,
eventEmitter: eventEmitter,
}
}

func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest) (*pb.ExportTraceServiceResponse, error) {
func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType string) (*pb.ExportTraceServiceResponse, error) {
fmt.Println(">>>> INGESTING REQUEST")
ds, err := i.db.DefaultDataStore(ctx)

if err != nil || !ds.IsOTLPBasedProvider() {
Expand All @@ -37,7 +42,7 @@ func (i ingester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequ
spansByTrace := i.getSpansByTrace(request)

for traceID, spans := range spansByTrace {
i.saveSpansIntoTest(ctx, traceID, spans)
i.saveSpansIntoTest(ctx, traceID, spans, requestType)
}

return &pb.ExportTraceServiceResponse{
Expand Down Expand Up @@ -73,7 +78,7 @@ func (i ingester) getSpansByTrace(request *pb.ExportTraceServiceRequest) map[tra
return spansByTrace
}

func (e ingester) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, spans []model.Span) error {
func (e ingester) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID, spans []model.Span, requestType string) error {
run, err := e.db.GetRunByTraceID(ctx, traceID)
if err != nil && strings.Contains(err.Error(), "record not found") {
// span is not part of any known test run. So it will be ignored
Expand Down Expand Up @@ -104,6 +109,7 @@ func (e ingester) saveSpansIntoTest(ctx context.Context, traceID trace.TraceID,
newSpans := append(existingSpans, spans...)
newTrace := model.NewTrace(traceID.String(), newSpans)

e.eventEmitter.Emit(ctx, events.TraceOtlpServerReceivedSpans(run.TestID, run.ID, len(newSpans), requestType))
run.Trace = &newTrace

err = e.db.UpdateRun(ctx, run)
Expand Down

0 comments on commit 12464b7

Please sign in to comment.