Skip to content

Commit

Permalink
fix(cdn): reduce transaction duration (#5878)
Browse files Browse the repository at this point in the history
* fix(cdn): reduce transaction duration

* fix(cdn): reduce transaction duration
  • Loading branch information
sguiheux authored Jul 7, 2021
1 parent f9144aa commit ae86426
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 38 deletions.
21 changes: 1 addition & 20 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,7 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
}

t0 := time.Now()
tx, err := r.db.Begin()
if err != nil {
err = sdk.WrapError(err, "unable to begin tx")
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "%v", err)
continue
}

if err := r.processItem(ctx, tx, s, id); err != nil {
if err := r.processItem(ctx, r.db, s, id); err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
t1 := time.Now()
ctx = sdk.ContextWithStacktrace(ctx, err)
Expand All @@ -357,19 +349,8 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
} else {
log.Info(ctx, "item id=%q is locked", id)
}
_ = tx.Rollback()
continue
}

if err := tx.Commit(); err != nil {
err = sdk.WrapError(err, "unable to commit tx")
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "%v", err)
_ = tx.Rollback()
continue
}

r.RemoveFromRedisSyncQueue(ctx, s, id)
}
},
)
Expand Down
48 changes: 30 additions & 18 deletions engine/cdn/storage/storageunit_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/fujiwara/shapeio"
"github.com/go-gorp/gorp"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
Expand Down Expand Up @@ -77,15 +78,27 @@ func (x *RunningStorageUnits) FillWithUnknownItems(ctx context.Context, s Storag
return nil
}

func (x *RunningStorageUnits) processItem(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, s StorageUnit, id string) error {
it, err := item.LoadAndLockByID(ctx, x.m, tx, id, gorpmapper.GetOptions.WithDecryption)
func (x *RunningStorageUnits) processItem(ctx context.Context, db *gorp.DbMap, s StorageUnit, id string) error {
lockKey := cache.Key("cdn", "sync", "item", id)
hasLock, err := x.cache.Lock(lockKey, 20*time.Minute, 0, 1)
if err != nil {
log.Error(ctx, "unable to get lock %s: %v", lockKey, err)
}
if !hasLock {
return nil
}
defer func() {
_ = x.cache.Unlock(lockKey)
}()

it, err := item.LoadByID(ctx, x.m, db, id, gorpmapper.GetOptions.WithDecryption)
if err != nil {
return err
}
ctx = context.WithValue(ctx, FieldAPIRef, it.APIRefHash)
ctx = context.WithValue(ctx, FieldSize, it.Size)
log.Info(ctx, "processing item %s on %s", it.ID, s.Name())
if _, err = LoadItemUnitByUnit(ctx, x.m, tx, s.ID(), id); err == nil {
if _, err = LoadItemUnitByUnit(ctx, x.m, db, s.ID(), id); err == nil {
log.Info(ctx, "Item %s already sync on %s", id, s.Name())
return nil

Expand All @@ -94,31 +107,20 @@ func (x *RunningStorageUnits) processItem(ctx context.Context, tx gorpmapper.Sql
return err
}

if err := x.runItem(ctx, tx, s, it); err != nil {
if err := x.runItem(ctx, db, s, it); err != nil {
return err
}

x.RemoveFromRedisSyncQueue(ctx, s, id)
return nil
}

func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, dest StorageUnit, item *sdk.CDNItem) error {
func (x *RunningStorageUnits) runItem(ctx context.Context, db *gorp.DbMap, dest StorageUnit, item *sdk.CDNItem) error {
iu, err := x.NewItemUnit(ctx, dest, item)
if err != nil {
return err
}
iu.Item = item

// Save in database that the item is complete for the storage unit
if err := InsertItemUnit(ctx, x.m, tx, iu); err != nil {
return err
}

// Reload the item unit
iu, err = LoadItemUnitByID(ctx, x.m, tx, iu.ID, gorpmapper.GetOptions.WithDecryption)
if err != nil {
return err
}

// Check if the content (based on the locator) is already known from the destination unit
has, err := x.GetItemUnitByLocatorByUnit(iu.Locator, dest.ID(), iu.Type)
if err != nil {
Expand Down Expand Up @@ -199,7 +201,17 @@ func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExec
}

log.Info(ctx, "item %s has been pushed to %s (%.3f s)", item.ID, dest.Name(), t2.Sub(t1).Seconds())
return nil

tx, err := db.Begin()
if err != nil {
return sdk.WrapError(err, "unable to start transaction")
}
defer tx.Rollback() //nolint
// Save in database that the item is complete for the storage unit
if err := InsertItemUnit(ctx, x.m, tx, iu); err != nil {
return err
}
return sdk.WrapError(tx.Commit(), "unable to commit tx")
}

func (x *RunningStorageUnits) NewItemUnit(_ context.Context, su Interface, i *sdk.CDNItem) (*sdk.CDNItemUnit, error) {
Expand Down

0 comments on commit ae86426

Please sign in to comment.