Skip to content

Commit

Permalink
fix(cdn): clean Incoming item without item unit (#5722)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 2, 2021
1 parent 25a13f2 commit 3dc3293
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 118 deletions.
40 changes: 32 additions & 8 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,27 +172,51 @@ func (s *Service) cleanBuffer(ctx context.Context) error {
}

func (s *Service) cleanWaitingItem(ctx context.Context, duration int) error {
itemUnits, err := storage.LoadOldItemUnitByItemStatusAndDuration(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNStatusItemIncoming, duration)
items, err := item.LoadOldItemByStatusAndDuration(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNStatusItemIncoming, duration)
if err != nil {
return err
}
for _, itemUnit := range itemUnits {
ctx = context.WithValue(ctx, storage.FieldAPIRef, itemUnit.Item.APIRefHash)
log.Info(ctx, "cleanWaitingItem> cleaning item %s", itemUnit.ItemID)
for _, it := range items {
ctx = context.WithValue(ctx, storage.FieldAPIRef, it.APIRefHash)
log.Info(ctx, "cleanWaitingItem> cleaning item %s", it.ID)

// Load Item Unit
itemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), it.ID)
if err != nil {
log.Error(ctx, "cleanWaitingItem> unable to load storage unit: %v", err)
continue
}

tx, err := s.mustDBWithCtx(ctx).Begin()
if err != nil {
return sdk.WrapError(err, "unable to start transaction")
}
if err := s.completeItem(ctx, tx, itemUnit); err != nil {
_ = tx.Rollback()
return err

// If there is no item unit, mark item as delete
if len(itemUnits) == 0 {
it.Status = sdk.CDNStatusItemCompleted
it.ToDelete = true
if err := item.Update(ctx, s.Mapper, tx, &it); err != nil {
_ = tx.Rollback()
return err
}
} else {
// Else complete item
if err := s.completeItem(ctx, tx, itemUnits[0]); err != nil {
_ = tx.Rollback()
return err
}
}

if err := tx.Commit(); err != nil {
_ = tx.Rollback()
return err
}
s.Units.PushInSyncQueue(ctx, itemUnit.ItemID, itemUnit.Item.Created)

// Push item ID to run backend sync
if len(itemUnits) > 0 {
s.Units.PushInSyncQueue(ctx, it.ID, it.Created)
}
telemetry.Record(ctx, s.Metrics.itemCompletedByGCCount, 1)
}
return nil
Expand Down
45 changes: 45 additions & 0 deletions engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,51 @@ func TestCleanWaitingItem(t *testing.T) {
require.NoError(t, err)

require.Equal(t, sdk.CDNStatusItemCompleted, itemDB.Status)
require.False(t, itemDB.ToDelete)
}

func TestCleanWaitingItemWithoutItemUnit(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, cache, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cdntest.ClearItem(t, context.TODO(), m, db)

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: cache,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines()

ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
s.Units = newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, cache)

it := sdk.CDNItem{
ID: sdk.UUID(),
Size: 12,
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemIncoming,

APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))

time.Sleep(2 * time.Second)

require.NoError(t, s.cleanWaitingItem(context.TODO(), 1))

itemDB, err := item.LoadByID(context.TODO(), s.Mapper, db, it.ID)
require.NoError(t, err)

require.Equal(t, sdk.CDNStatusItemCompleted, itemDB.Status)
require.True(t, itemDB.ToDelete)
}

func TestPurgeItem(t *testing.T) {
Expand Down
25 changes: 12 additions & 13 deletions engine/cdn/item/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,6 @@ func LoadFileByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db
return getItem(ctx, m, db, query)
}

func LoadFileByRunAndArtifactName(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, runID int64, artifactName string) (*sdk.CDNItem, error) {
query := gorpmapper.NewQuery(`
SELECT *
FROM item
WHERE type = $1
AND (api_ref->>'run_id')::int = $2
AND (api_ref->>'artifact_name')::text = $3
AND to_delete = false
`).Args(itemType, runID, artifactName)
return getItem(ctx, m, db, query)
}

// LoadByAPIRefHashAndType load an item by his job id, step order and type
func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, hash string, itemType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItem, error) {
query := gorpmapper.NewQuery(`
Expand Down Expand Up @@ -320,6 +307,18 @@ func CountItemSizePercentil(db gorp.SqlExecutor) ([]StatItemPercentil, error) {
return res, sdk.WithStack(err)
}

func LoadOldItemByStatusAndDuration(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, status string, duration int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItem, error) {
query := gorpmapper.NewQuery(`
SELECT item.*
FROM item
WHERE
item.status = $1 AND
item.last_modified < NOW() - $2 * INTERVAL '1 second'
ORDER BY item.last_modified ASC
`).Args(status, duration)
return getItems(ctx, m, db, query, opts...)
}

func LoadByRunID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, runID string) ([]sdk.CDNItem, error) {
query := gorpmapper.NewQuery("SELECT * FROM item WHERE api_ref->>'run_id'::text = $1 AND type = $2 AND to_delete = false").Args(runID, itemType)
return getItems(ctx, m, db, query)
Expand Down
13 changes: 0 additions & 13 deletions engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,6 @@ func LoadAllSynchronizedItemIDs(db gorp.SqlExecutor, bufferUnitID string, maxSto
return itemIDs, nil
}

func LoadOldItemUnitByItemStatusAndDuration(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, status string, duration int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery(`
SELECT storage_unit_item.*
FROM storage_unit_item
LEFT JOIN item ON item.id = storage_unit_item.item_id
WHERE
item.status = $1 AND
item.last_modified < NOW() - $2 * INTERVAL '1 second'
ORDER BY item.last_modified ASC
`).Args(status, duration)
return getAllItemUnits(ctx, m, db, query, opts...)
}

func LoadItemUnitByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, itemID string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 and item_id = $2 AND to_delete = false LIMIT 1").Args(unitID, itemID)
return getItemUnit(ctx, m, db, query, opts...)
Expand Down
84 changes: 0 additions & 84 deletions engine/cdn/storage/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,90 +20,6 @@ import (
"github.com/ovh/cds/sdk"
)

func TestLoadOldItemUnitByItemStatusAndDuration(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)
db, store := test.SetupPGWithMapper(t, m, sdk.TypeCDN)
cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), m, db)
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
t.Cleanup(cancel)

cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
},
Storages: map[string]storage.StorageConfiguration{
"local_storage": {
Local: &storage.LocalStorageConfiguration{
Path: tmpDir,
},
},
},
})
require.NoError(t, err)

// Clean old test
time.Sleep(1 * time.Second)
itemUnits, err := storage.LoadOldItemUnitByItemStatusAndDuration(context.TODO(), m, db, sdk.CDNStatusItemIncoming, 1)
require.NoError(t, err)
for _, itemUnit := range itemUnits {
i, err := item.LoadByID(context.TODO(), m, db, itemUnit.ItemID)
require.NoError(t, err)
require.NoError(t, item.DeleteByID(db, i.ID))
}

i1 := &sdk.CDNItem{
ID: sdk.UUID(),
APIRefHash: sdk.UUID(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemCompleted,
}
err = item.Insert(context.TODO(), m, db, i1)
require.NoError(t, err)
defer func() {
_ = item.DeleteByID(db, i1.ID)
}()

itemUnit1, err := cdnUnits.NewItemUnit(context.TODO(), cdnUnits.LogsBuffer(), i1)
require.NoError(t, err)
require.NoError(t, storage.InsertItemUnit(context.TODO(), m, db, itemUnit1))

i2 := &sdk.CDNItem{
ID: sdk.UUID(),
APIRefHash: sdk.UUID(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemIncoming,
}
err = item.Insert(context.TODO(), m, db, i2)
require.NoError(t, err)
defer func() {
_ = item.DeleteByID(db, i2.ID)
}()
itemUnit2, err := cdnUnits.NewItemUnit(context.TODO(), cdnUnits.LogsBuffer(), i2)
require.NoError(t, err)

require.NoError(t, storage.InsertItemUnit(context.TODO(), m, db, itemUnit2))

time.Sleep(2 * time.Second)

itemUnits, err = storage.LoadOldItemUnitByItemStatusAndDuration(context.TODO(), m, db, sdk.CDNStatusItemIncoming, 1)
require.NoError(t, err)
require.Len(t, itemUnits, 1)
require.Equal(t, i2.ID, itemUnits[0].ItemID)
}

func TestLoadAllItemIDUnknownByUnit(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
Expand Down

0 comments on commit 3dc3293

Please sign in to comment.