From 5fc2fc6320de1fd6ce0c13151d4bac365eaa8fd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Wed, 11 Mar 2020 13:34:03 +0100 Subject: [PATCH] Converted frontend worker to a service. (#2246) * Converted frontend worker to a service. Frontend worker is now started only after all Querier module dependencies are started. Previously, worker was started much earlier, long before Querier was ready to accept queries. --- CHANGELOG.md | 1 + pkg/cortex/cortex.go | 1 - pkg/cortex/modules.go | 17 ++----- pkg/querier/frontend/frontend_test.go | 5 +- pkg/querier/frontend/worker.go | 70 ++++++++++++--------------- 5 files changed, 39 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f01c91517f..c23d1b9519 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226 +* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246 * [FEATURE] Flusher target to flush the WAL. * `-flusher.wal-dir` for the WAL directory to recover from. * `-flusher.concurrent-flushes` for number of concurrent flushes. diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index f43c3457e2..362dfe59ec 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -182,7 +182,6 @@ type Cortex struct { ingester *ingester.Ingester flusher *flusher.Flusher store chunk.Store - worker frontend.Worker frontend *frontend.Frontend tableManager *chunk.TableManager cache cache.Cache diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 1a44fe5512..872e500962 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -202,23 +202,14 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { subrouter.Path("/chunks").Handler(t.httpAuthMiddleware.Wrap(querier.ChunksHandler(queryable))) subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(t.distributor.UserStatsHandler))) - // Start the query frontend worker once the query engine and the store - // have been successfully initialized. - t.worker, err = frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger) + // Query frontend worker will only be started after all its dependencies are started, not here. + // Worker may also be nil, if not configured, which is OK. + worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger) if err != nil { return } - // TODO: If queryable returned from querier.New was a service, it could actually wait for storeQueryable - // (if it also implemented Service) to finish starting... and return error if it's not in Running state. - // This requires extra work, which is out of scope for this proof-of-concept... - // BUT this extra functionality is ONE OF THE REASONS to introduce entire "Services" concept into Cortex. - // For now, only return service that stops the worker, and Querier will be used even before storeQueryable has finished starting. - - return services.NewIdleService(nil, func(_ error) error { - t.worker.Stop() - return nil - }), nil + return worker, nil } // Latest Prometheus requires r.RemoteAddr to be set to addr:port, otherwise it reject the request. diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index dd824f0990..954ec828ed 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/services" ) const ( @@ -197,7 +198,9 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger) require.NoError(t, err) - defer worker.Stop() + require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker)) test(httpListen.Addr().String()) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker)) } diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index f1a3b4a080..a20bb3eeea 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" @@ -18,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/grpcclient" + "github.com/cortexproject/cortex/pkg/util/services" ) var ( @@ -46,31 +48,21 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { } // Worker is the counter-part to the frontend, actually processing requests. -type Worker interface { - Stop() -} - type worker struct { cfg WorkerConfig log log.Logger server *server.Server - ctx context.Context - cancel context.CancelFunc watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. wg sync.WaitGroup } -type noopWorker struct { -} - -func (noopWorker) Stop() {} - -// NewWorker creates a new Worker. -func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) { +// NewWorker creates a new worker and returns a service that is wrapping it. +// If no address is specified, it returns nil service (and no error). +func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (services.Service, error) { if cfg.Address == "" { level.Info(log).Log("msg", "no address specified, not starting worker") - return noopWorker{}, nil + return nil, nil } resolver, err := naming.NewDNSResolverWithFreq(cfg.DNSLookupDuration) @@ -83,52 +75,50 @@ func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, return nil, err } - ctx, cancel := context.WithCancel(context.Background()) - w := &worker{ - cfg: cfg, - log: log, - server: server, - - ctx: ctx, - cancel: cancel, + cfg: cfg, + log: log, + server: server, watcher: watcher, } - w.wg.Add(1) - go w.watchDNSLoop() - return w, nil + return services.NewBasicService(nil, w.watchDNSLoop, w.stopping), nil } -// Stop the worker. -func (w *worker) Stop() { - w.watcher.Close() - w.cancel() +func (w *worker) stopping(_ error) error { + // wait until all per-address workers are done. This is only called after watchDNSLoop exits. w.wg.Wait() + return nil } // watchDNSLoop watches for changes in DNS and starts or stops workers. -func (w *worker) watchDNSLoop() { - defer w.wg.Done() +func (w *worker) watchDNSLoop(servCtx context.Context) error { + go func() { + // Close the watcher, when this service is asked to stop. + // Closing the watcher makes watchDNSLoop exit, since it only iterates on watcher updates, and has no other + // way to stop. We cannot close the watcher in `stopping` method, because it is only called *after* + // watchDNSLoop exits. + <-servCtx.Done() + w.watcher.Close() + }() cancels := map[string]context.CancelFunc{} - defer func() { - for _, cancel := range cancels { - cancel() - } - }() for { updates, err := w.watcher.Next() if err != nil { - level.Error(w.log).Log("msg", "error from DNS watcher", "err", err) - return + // watcher.Next returns error when Close is called, but we call Close when our context is done. + // we don't want to report error in that case. + if servCtx.Err() != nil { + return nil + } + return errors.Wrapf(err, "error from DNS watcher") } for _, update := range updates { switch update.Op { case naming.Add: level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) - ctx, cancel := context.WithCancel(w.ctx) + ctx, cancel := context.WithCancel(servCtx) cancels[update.Addr] = cancel w.runMany(ctx, update.Addr) @@ -139,7 +129,7 @@ func (w *worker) watchDNSLoop() { } default: - panic("unknown op") + return fmt.Errorf("unknown op: %v", update.Op) } } }