Skip to content

Commit

Permalink
fix(cdn): processLogs rollback (#5276)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored Jun 24, 2020
1 parent 1e8bc3a commit 27fd6a6
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions engine/api/observability/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func StatsHTTPExporter() *HTTPExporter {

// Init the opencensus exporter
func Init(ctx context.Context, cfg Configuration, s service) (context.Context, error) {
log.Info(ctx, "observability> initializing observability for %s/%s", s.Type(), s.Name())
ctx = ContextWithTag(ctx,
TagServiceType, s.Type(),
TagServiceName, s.Name(),
Expand Down
8 changes: 7 additions & 1 deletion engine/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (api *API) initMetrics(ctx context.Context) error {
"number of failed workflow runs",
stats.UnitDimensionless)
api.Metrics.DatabaseConns = stats.Int64(
fmt.Sprintf("cds/cds-api/%s/database_conn°", api.Name()),
fmt.Sprintf("cds/cds-api/%s/database_conn", api.Name()),
"number database connections",
stats.UnitDimensionless)

Expand Down Expand Up @@ -258,6 +258,12 @@ func (api *API) initMetrics(ctx context.Context) error {
}

func (api *API) computeMetrics(ctx context.Context) {
tags := observability.ContextGetTags(ctx, observability.TagServiceType, observability.TagServiceName)
ctx, err := tag.New(ctx, tags...)
if err != nil {
log.Error(ctx, "api.computeMetrics> unable to tag observability context: %v", err)
}

sdk.GoRoutine(ctx, "api.computeMetrics", func(ctx context.Context) {
tick := time.NewTicker(9 * time.Second).C
for {
Expand Down
3 changes: 2 additions & 1 deletion engine/cdn/cdn_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,17 @@ func (s *Service) processLogs(ctx context.Context, chanMessages <-chan handledMe
log.Error(ctx, "unable to start tx: %v", err)
continue
}
defer tx.Rollback() // nolint

currentLog := buildMessage(msg.signature, msg.m)
if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil {
log.Error(ctx, "unable to process log: %+v", err)
tx.Rollback() // nolint
continue
}

if err := tx.Commit(); err != nil {
log.Error(ctx, "unable to commit tx: %+v", err)
tx.Rollback() // nolint
continue
}

Expand Down

0 comments on commit 27fd6a6

Please sign in to comment.