Skip to content

Commit

Permalink
Wait jaeger receiver server goroutines exit on shutdown (#2985)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjanotti authored Apr 22, 2021
1 parent d10b842 commit d084963
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type jReceiver struct {
agentProcessors []processors.Processor
agentServer *http.Server

goroutines sync.WaitGroup

logger *zap.Logger
}

Expand Down Expand Up @@ -213,7 +215,6 @@ func (jr *jReceiver) Shutdown(context.Context) error {
if aerr := jr.agentServer.Close(); aerr != nil {
errs = append(errs, aerr)
}
jr.agentServer = nil
}
for _, processor := range jr.agentProcessors {
processor.Stop()
Expand All @@ -223,12 +224,12 @@ func (jr *jReceiver) Shutdown(context.Context) error {
if cerr := jr.collectorServer.Close(); cerr != nil {
errs = append(errs, cerr)
}
jr.collectorServer = nil
}
if jr.grpc != nil {
jr.grpc.Stop()
jr.grpc = nil
}

jr.goroutines.Wait()
err = consumererror.Combine(errs)
})

Expand Down Expand Up @@ -303,7 +304,7 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
return &api_v2.PostSpansResponse{}, nil
}

func (jr *jReceiver) startAgent(_ component.Host) error {
func (jr *jReceiver) startAgent(host component.Host) error {
if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() {
return nil
}
Expand Down Expand Up @@ -334,8 +335,12 @@ func (jr *jReceiver) startAgent(_ component.Host) error {
jr.agentProcessors = append(jr.agentProcessors, processor)
}

jr.goroutines.Add(len(jr.agentProcessors))
for _, processor := range jr.agentProcessors {
go processor.Serve()
go func(p processors.Processor) {
defer jr.goroutines.Done()
p.Serve()
}(processor)
}

// Start upstream grpc client before serving sampling endpoints over HTTP
Expand All @@ -357,9 +362,11 @@ func (jr *jReceiver) startAgent(_ component.Host) error {
if jr.agentHTTPEnabled() {
jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPAddr(), jr, metrics.NullFactory)

jr.goroutines.Add(1)
go func() {
defer jr.goroutines.Done()
if err := jr.agentServer.ListenAndServe(); err != http.ErrServerClosed {
jr.logger.Error("http server failure", zap.Error(err))
host.ReportFatalError(fmt.Errorf("jaeger agent server error: %w", err))
}
}()
}
Expand Down Expand Up @@ -465,8 +472,12 @@ func (jr *jReceiver) startCollector(host component.Host) error {
nr := mux.NewRouter()
nr.HandleFunc("/api/traces", jr.HandleThriftHTTPBatch).Methods(http.MethodPost)
jr.collectorServer = &http.Server{Handler: nr}
jr.goroutines.Add(1)
go func() {
_ = jr.collectorServer.Serve(cln)
defer jr.goroutines.Done()
if err := jr.collectorServer.Serve(cln); err != http.ErrServerClosed {
host.ReportFatalError(err)
}
}()
}

Expand All @@ -489,8 +500,10 @@ func (jr *jReceiver) startCollector(host component.Host) error {
}
api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss))

jr.goroutines.Add(1)
go func() {
if err := jr.grpc.Serve(gln); err != nil {
defer jr.goroutines.Done()
if err := jr.grpc.Serve(gln); err != nil && err != grpc.ErrServerStopped {
host.ReportFatalError(err)
}
}()
Expand Down

0 comments on commit d084963

Please sign in to comment.