From 27fd6a63bb03acd38c4bdcb0b50f00c2c9a24dfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Wed, 24 Jun 2020 11:42:16 +0200 Subject: [PATCH] fix(cdn): processLogs rollback (#5276) --- engine/api/observability/init.go | 1 + engine/api/status.go | 8 +++++++- engine/cdn/cdn_log.go | 3 ++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/engine/api/observability/init.go b/engine/api/observability/init.go index a79de4bb95..cd08649c59 100644 --- a/engine/api/observability/init.go +++ b/engine/api/observability/init.go @@ -38,6 +38,7 @@ func StatsHTTPExporter() *HTTPExporter { // Init the opencensus exporter func Init(ctx context.Context, cfg Configuration, s service) (context.Context, error) { + log.Info(ctx, "observability> initializing observability for %s/%s", s.Type(), s.Name()) ctx = ContextWithTag(ctx, TagServiceType, s.Type(), TagServiceName, s.Name(), diff --git a/engine/api/status.go b/engine/api/status.go index 2ff58791c1..62e341a437 100644 --- a/engine/api/status.go +++ b/engine/api/status.go @@ -220,7 +220,7 @@ func (api *API) initMetrics(ctx context.Context) error { "number of failed workflow runs", stats.UnitDimensionless) api.Metrics.DatabaseConns = stats.Int64( - fmt.Sprintf("cds/cds-api/%s/database_connĀ°", api.Name()), + fmt.Sprintf("cds/cds-api/%s/database_conn", api.Name()), "number database connections", stats.UnitDimensionless) @@ -258,6 +258,12 @@ func (api *API) initMetrics(ctx context.Context) error { } func (api *API) computeMetrics(ctx context.Context) { + tags := observability.ContextGetTags(ctx, observability.TagServiceType, observability.TagServiceName) + ctx, err := tag.New(ctx, tags...) + if err != nil { + log.Error(ctx, "api.computeMetrics> unable to tag observability context: %v", err) + } + sdk.GoRoutine(ctx, "api.computeMetrics", func(ctx context.Context) { tick := time.NewTicker(9 * time.Second).C for { diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go index 455fc6fee6..d079f627df 100644 --- a/engine/cdn/cdn_log.go +++ b/engine/cdn/cdn_log.go @@ -180,16 +180,17 @@ func (s *Service) processLogs(ctx context.Context, chanMessages <-chan handledMe log.Error(ctx, "unable to start tx: %v", err) continue } - defer tx.Rollback() // nolint currentLog := buildMessage(msg.signature, msg.m) if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { log.Error(ctx, "unable to process log: %+v", err) + tx.Rollback() // nolint continue } if err := tx.Commit(); err != nil { log.Error(ctx, "unable to commit tx: %+v", err) + tx.Rollback() // nolint continue }