Skip to content

Commit

Permalink
fix(cdn): remove lock + increase delay of buffler clean (#5837)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Jun 8, 2021
1 parent 861df2d commit 1f88624
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 252 deletions.
34 changes: 2 additions & 32 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/sdk"
Expand Down Expand Up @@ -39,7 +38,7 @@ func (s *Service) itemPurge(ctx context.Context) {

// ItemsGC clean long incoming item + delete item from buffer when synchronized everywhere
func (s *Service) itemsGC(ctx context.Context) {
tickGC := time.NewTicker(1 * time.Minute)
tickGC := time.NewTicker(30 * time.Minute)
defer tickGC.Stop()
for {
select {
Expand Down Expand Up @@ -160,43 +159,14 @@ func (s *Service) cleanBuffer(ctx context.Context) error {
log.Error(ctx, "unable to load item units: %v", err)
continue
}

itemUnitsToMark := make([]string, 0, len(itemUnitsIDs))
for i := range itemUnitsIDs {
uiID := itemUnitsIDs[i]
lockKey := cache.Key(storage.FileBufferKey, bu.ID(), "lock", uiID)
b, err := s.Cache.Lock(lockKey, 30*time.Second, 0, 1)
if err != nil {
log.Error(ctx, "unable to lock unit item: %v: %v", uiID, err)
continue
}
if !b {
log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID)
continue
}
readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*")
keys, err := s.Cache.Keys(cache.Key(readerPatternKey))
if err != nil {
log.Error(ctx, "unable to check if item unit is currently reading by cdn")
_ = s.Cache.Unlock(lockKey)
continue
}
if len(keys) > 0 {
log.Info(ctx, "do not delete item unit, it is currently reading by cdn")
_ = s.Cache.Unlock(lockKey)
continue
}
itemUnitsToMark = append(itemUnitsToMark, uiID)
}

tx, err := s.mustDBWithCtx(ctx).Begin()
if err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to start transaction: %v", err)
continue
}

if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsToMark); err != nil {
if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil {
_ = tx.Rollback()
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to mark item as delete: %v", err)
Expand Down
125 changes: 0 additions & 125 deletions engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/lru"
"github.com/ovh/cds/engine/cdn/storage"
Expand Down Expand Up @@ -505,127 +504,3 @@ func TestPurgeItem(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(items))
}

func TestCleanSynchronizedReadingItem(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearUnits(t, context.TODO(), m, db)

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: store,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines(context.TODO())

tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*")
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
t.Cleanup(cancel)

cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
"file_buffer": {
Local: &storage.LocalBufferConfiguration{
Path: tmpDir2,
},
BufferType: storage.CDNBufferTypeFile,
},
},
Storages: map[string]storage.StorageConfiguration{
"fs-backend": {
Local: &storage.LocalStorageConfiguration{
Path: tmpDir,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
"cds-backend": {
CDS: &storage.CDSStorageConfiguration{
Host: "lolcat.host",
Token: "mytoken",
},
},
},
})
require.NoError(t, err)
s.Units = cdnUnits

// Add Item in redis / fs/ cds -will be delete from redis
it := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
iuCDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuCDS))
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))
iuFileStorage := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileStorage))

///////////////////////////////////////
// 1st test, getItem Lock the item unit
///////////////////////////////////////
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
require.NoError(t, err)
t.Cleanup(func() {
s.Cache.Unlock(lockKey)
})
require.True(t, hasLocked)
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.NoError(t, err)

require.NoError(t, s.Cache.Unlock(lockKey))

////////////////////////////////////////////////////////
// 2nd test, getItem is reading the file from the buffer
////////////////////////////////////////////////////////
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", iuFileBuf.ID, sdk.UUID())
require.NoError(t, s.Cache.SetWithTTL(readerKey, true, 30))
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.NoError(t, err)

require.NoError(t, s.Cache.Delete(readerKey))
////////////////////////////////////////////////////////
// 3rd test, mark as delete
////////////////////////////////////////////////////////
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound))
}
41 changes: 3 additions & 38 deletions engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/engine/gorpmapper"
Expand Down Expand Up @@ -210,45 +209,11 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe
// If item is in Buffer, get from it
if itemUnit != nil {
log.Debug(ctx, "getItemFileValue> Getting file from buffer")
ignoreBuffer := false
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
if err != nil {
log.Error(ctx, "unable to get lock for %s", lockKey)
ignoreBuffer = true
}
if hasLocked {
// Reload to be sure that it's not marked as delete
_, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID)
if err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
log.Error(ctx, "unable to load item unit: %v", err)
}
ignoreBuffer = true
}

if !ignoreBuffer {
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID())
if err := s.Cache.SetWithTTL(readerKey, true, 300); err != nil {
log.Error(ctx, "unable to set reader on file buffer: %v", err)
ignoreBuffer = true
}
}

if err := s.Cache.Unlock(lockKey); err != nil {
log.Error(ctx, "unable to release lock for %s", lockKey)
}
} else {
ignoreBuffer = true
}

if !ignoreBuffer {
rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
if err != nil {
return nil, nil, nil, err
}
return itemUnit, s.Units.FileBuffer(), rc, nil
return nil, nil, nil, err
}
return itemUnit, s.Units.FileBuffer(), rc, nil

}

Expand Down
52 changes: 0 additions & 52 deletions engine/cdn/cdn_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/lru"
"github.com/ovh/cds/engine/cdn/redis"
Expand Down Expand Up @@ -510,54 +509,3 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
require.Equal(t, int64(0), lines[226].Number)
require.Equal(t, "Line 0\n", lines[226].Value)
}

func TestGetFileItemFromBuffer(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearSyncRedisSet(t, store, "local_storage")

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: store,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines(context.TODO())

ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)

_ = test.LoadTestingConf(t, sdk.TypeCDN)
cdnUnits := newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, store)
s.Units = cdnUnits

// Add Item in redis / fs/ cds -will be delete from redis
it := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))

// IU locked by gc
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
require.NoError(t, err)
require.True(t, hasLocked)

ui, unit, reader, err := s.getItemFileValue(ctx, sdk.CDNTypeItemRunResult, it.APIRefHash, getItemFileOptions{})
require.Nil(t, reader)
require.Nil(t, ui)
require.Nil(t, unit)
require.Contains(t, err.Error(), "unable to find item units for item with id: "+it.ID)
}
2 changes: 1 addition & 1 deletion engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func LoadAllSynchronizedItemIDs(db gorp.SqlExecutor, bufferUnitID string, maxSto
WITH inBuffer as (
SELECT item_id
FROM storage_unit_item
WHERE unit_id = $2
WHERE unit_id = $2 AND last_modified < NOW() - INTERVAL '15 minutes'
)
SELECT item_id
FROM storage_unit_item
Expand Down
4 changes: 0 additions & 4 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
cdslog "github.com/ovh/cds/sdk/log"
)

var (
FileBufferKey = cache.Key("cdn", "unit")
)

func (r RunningStorageUnits) Storage(name string) StorageUnit {
for _, x := range r.Storages {
if x.Name() == name {
Expand Down

0 comments on commit 1f88624

Please sign in to comment.