From 3dc3293d836c16116756e5f0d53ac6ddac8e2a50 Mon Sep 17 00:00:00 2001 From: Guiheux Steven Date: Tue, 2 Mar 2021 09:19:57 +0100 Subject: [PATCH] fix(cdn): clean Incoming item without item unit (#5722) --- engine/cdn/cdn_gc.go | 40 ++++++++++++---- engine/cdn/cdn_gc_test.go | 45 ++++++++++++++++++ engine/cdn/item/dao.go | 25 +++++----- engine/cdn/storage/dao.go | 13 ------ engine/cdn/storage/dao_test.go | 84 ---------------------------------- 5 files changed, 89 insertions(+), 118 deletions(-) diff --git a/engine/cdn/cdn_gc.go b/engine/cdn/cdn_gc.go index 36bc37b0db..027372b044 100644 --- a/engine/cdn/cdn_gc.go +++ b/engine/cdn/cdn_gc.go @@ -172,27 +172,51 @@ func (s *Service) cleanBuffer(ctx context.Context) error { } func (s *Service) cleanWaitingItem(ctx context.Context, duration int) error { - itemUnits, err := storage.LoadOldItemUnitByItemStatusAndDuration(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNStatusItemIncoming, duration) + items, err := item.LoadOldItemByStatusAndDuration(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNStatusItemIncoming, duration) if err != nil { return err } - for _, itemUnit := range itemUnits { - ctx = context.WithValue(ctx, storage.FieldAPIRef, itemUnit.Item.APIRefHash) - log.Info(ctx, "cleanWaitingItem> cleaning item %s", itemUnit.ItemID) + for _, it := range items { + ctx = context.WithValue(ctx, storage.FieldAPIRef, it.APIRefHash) + log.Info(ctx, "cleanWaitingItem> cleaning item %s", it.ID) + + // Load Item Unit + itemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), it.ID) + if err != nil { + log.Error(ctx, "cleanWaitingItem> unable to load storage unit: %v", err) + continue + } tx, err := s.mustDBWithCtx(ctx).Begin() if err != nil { return sdk.WrapError(err, "unable to start transaction") } - if err := s.completeItem(ctx, tx, itemUnit); err != nil { - _ = tx.Rollback() - return err + + // If there is no item unit, mark item as delete + if len(itemUnits) == 0 { + it.Status = sdk.CDNStatusItemCompleted + it.ToDelete = true + if err := item.Update(ctx, s.Mapper, tx, &it); err != nil { + _ = tx.Rollback() + return err + } + } else { + // Else complete item + if err := s.completeItem(ctx, tx, itemUnits[0]); err != nil { + _ = tx.Rollback() + return err + } } + if err := tx.Commit(); err != nil { _ = tx.Rollback() return err } - s.Units.PushInSyncQueue(ctx, itemUnit.ItemID, itemUnit.Item.Created) + + // Push item ID to run backend sync + if len(itemUnits) > 0 { + s.Units.PushInSyncQueue(ctx, it.ID, it.Created) + } telemetry.Record(ctx, s.Metrics.itemCompletedByGCCount, 1) } return nil diff --git a/engine/cdn/cdn_gc_test.go b/engine/cdn/cdn_gc_test.go index eec26a531a..df52cc04e5 100644 --- a/engine/cdn/cdn_gc_test.go +++ b/engine/cdn/cdn_gc_test.go @@ -236,6 +236,51 @@ func TestCleanWaitingItem(t *testing.T) { require.NoError(t, err) require.Equal(t, sdk.CDNStatusItemCompleted, itemDB.Status) + require.False(t, itemDB.ToDelete) +} + +func TestCleanWaitingItemWithoutItemUnit(t *testing.T) { + m := gorpmapper.New() + item.InitDBMapping(m) + storage.InitDBMapping(m) + + log.Factory = log.NewTestingWrapper(t) + db, factory, cache, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN) + t.Cleanup(cancel) + + cdntest.ClearItem(t, context.TODO(), m, db) + + // Create cdn service + s := Service{ + DBConnectionFactory: factory, + Cache: cache, + Mapper: m, + } + s.GoRoutines = sdk.NewGoRoutines() + + ctx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) + s.Units = newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, cache) + + it := sdk.CDNItem{ + ID: sdk.UUID(), + Size: 12, + Type: sdk.CDNTypeItemStepLog, + Status: sdk.CDNStatusItemIncoming, + + APIRefHash: sdk.RandomString(10), + } + require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it)) + + time.Sleep(2 * time.Second) + + require.NoError(t, s.cleanWaitingItem(context.TODO(), 1)) + + itemDB, err := item.LoadByID(context.TODO(), s.Mapper, db, it.ID) + require.NoError(t, err) + + require.Equal(t, sdk.CDNStatusItemCompleted, itemDB.Status) + require.True(t, itemDB.ToDelete) } func TestPurgeItem(t *testing.T) { diff --git a/engine/cdn/item/dao.go b/engine/cdn/item/dao.go index 836b54f7de..e0c4ef81b7 100644 --- a/engine/cdn/item/dao.go +++ b/engine/cdn/item/dao.go @@ -163,19 +163,6 @@ func LoadFileByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db return getItem(ctx, m, db, query) } -func LoadFileByRunAndArtifactName(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, runID int64, artifactName string) (*sdk.CDNItem, error) { - query := gorpmapper.NewQuery(` - SELECT * - FROM item - WHERE type = $1 - AND (api_ref->>'run_id')::int = $2 - AND (api_ref->>'artifact_name')::text = $3 - AND to_delete = false - - `).Args(itemType, runID, artifactName) - return getItem(ctx, m, db, query) -} - // LoadByAPIRefHashAndType load an item by his job id, step order and type func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, hash string, itemType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItem, error) { query := gorpmapper.NewQuery(` @@ -320,6 +307,18 @@ func CountItemSizePercentil(db gorp.SqlExecutor) ([]StatItemPercentil, error) { return res, sdk.WithStack(err) } +func LoadOldItemByStatusAndDuration(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, status string, duration int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItem, error) { + query := gorpmapper.NewQuery(` + SELECT item.* + FROM item + WHERE + item.status = $1 AND + item.last_modified < NOW() - $2 * INTERVAL '1 second' + ORDER BY item.last_modified ASC + `).Args(status, duration) + return getItems(ctx, m, db, query, opts...) +} + func LoadByRunID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, runID string) ([]sdk.CDNItem, error) { query := gorpmapper.NewQuery("SELECT * FROM item WHERE api_ref->>'run_id'::text = $1 AND type = $2 AND to_delete = false").Args(runID, itemType) return getItems(ctx, m, db, query) diff --git a/engine/cdn/storage/dao.go b/engine/cdn/storage/dao.go index 437450fc75..b94db19209 100644 --- a/engine/cdn/storage/dao.go +++ b/engine/cdn/storage/dao.go @@ -157,19 +157,6 @@ func LoadAllSynchronizedItemIDs(db gorp.SqlExecutor, bufferUnitID string, maxSto return itemIDs, nil } -func LoadOldItemUnitByItemStatusAndDuration(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, status string, duration int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) { - query := gorpmapper.NewQuery(` - SELECT storage_unit_item.* - FROM storage_unit_item - LEFT JOIN item ON item.id = storage_unit_item.item_id - WHERE - item.status = $1 AND - item.last_modified < NOW() - $2 * INTERVAL '1 second' - ORDER BY item.last_modified ASC - `).Args(status, duration) - return getAllItemUnits(ctx, m, db, query, opts...) -} - func LoadItemUnitByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, itemID string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error) { query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 and item_id = $2 AND to_delete = false LIMIT 1").Args(unitID, itemID) return getItemUnit(ctx, m, db, query, opts...) diff --git a/engine/cdn/storage/dao_test.go b/engine/cdn/storage/dao_test.go index d5b1a7299f..95b4ee95b9 100644 --- a/engine/cdn/storage/dao_test.go +++ b/engine/cdn/storage/dao_test.go @@ -20,90 +20,6 @@ import ( "github.com/ovh/cds/sdk" ) -func TestLoadOldItemUnitByItemStatusAndDuration(t *testing.T) { - m := gorpmapper.New() - item.InitDBMapping(m) - storage.InitDBMapping(m) - db, store := test.SetupPGWithMapper(t, m, sdk.TypeCDN) - cfg := test.LoadTestingConf(t, sdk.TypeCDN) - - cdntest.ClearItem(t, context.TODO(), m, db) - tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") - - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - t.Cleanup(cancel) - - cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ - HashLocatorSalt: "thisismysalt", - Buffers: map[string]storage.BufferConfiguration{ - "redis_buffer": { - Redis: &storage.RedisBufferConfiguration{ - Host: cfg["redisHost"], - Password: cfg["redisPassword"], - }, - BufferType: storage.CDNBufferTypeLog, - }, - }, - Storages: map[string]storage.StorageConfiguration{ - "local_storage": { - Local: &storage.LocalStorageConfiguration{ - Path: tmpDir, - }, - }, - }, - }) - require.NoError(t, err) - - // Clean old test - time.Sleep(1 * time.Second) - itemUnits, err := storage.LoadOldItemUnitByItemStatusAndDuration(context.TODO(), m, db, sdk.CDNStatusItemIncoming, 1) - require.NoError(t, err) - for _, itemUnit := range itemUnits { - i, err := item.LoadByID(context.TODO(), m, db, itemUnit.ItemID) - require.NoError(t, err) - require.NoError(t, item.DeleteByID(db, i.ID)) - } - - i1 := &sdk.CDNItem{ - ID: sdk.UUID(), - APIRefHash: sdk.UUID(), - Type: sdk.CDNTypeItemStepLog, - Status: sdk.CDNStatusItemCompleted, - } - err = item.Insert(context.TODO(), m, db, i1) - require.NoError(t, err) - defer func() { - _ = item.DeleteByID(db, i1.ID) - }() - - itemUnit1, err := cdnUnits.NewItemUnit(context.TODO(), cdnUnits.LogsBuffer(), i1) - require.NoError(t, err) - require.NoError(t, storage.InsertItemUnit(context.TODO(), m, db, itemUnit1)) - - i2 := &sdk.CDNItem{ - ID: sdk.UUID(), - APIRefHash: sdk.UUID(), - Type: sdk.CDNTypeItemStepLog, - Status: sdk.CDNStatusItemIncoming, - } - err = item.Insert(context.TODO(), m, db, i2) - require.NoError(t, err) - defer func() { - _ = item.DeleteByID(db, i2.ID) - }() - itemUnit2, err := cdnUnits.NewItemUnit(context.TODO(), cdnUnits.LogsBuffer(), i2) - require.NoError(t, err) - - require.NoError(t, storage.InsertItemUnit(context.TODO(), m, db, itemUnit2)) - - time.Sleep(2 * time.Second) - - itemUnits, err = storage.LoadOldItemUnitByItemStatusAndDuration(context.TODO(), m, db, sdk.CDNStatusItemIncoming, 1) - require.NoError(t, err) - require.Len(t, itemUnits, 1) - require.Equal(t, i2.ID, itemUnits[0].ItemID) -} - func TestLoadAllItemIDUnknownByUnit(t *testing.T) { m := gorpmapper.New() item.InitDBMapping(m)