Skip to content

Commit

Permalink
fix(cdn): get item unit from buffer while deleting it (#5831)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Jun 3, 2021
1 parent b2559e4 commit b8c656e
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 13 deletions.
32 changes: 30 additions & 2 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/sdk"
Expand Down Expand Up @@ -153,22 +154,49 @@ func (s *Service) cleanBuffer(ctx context.Context) error {
if len(itemIDs) == 0 {
continue
}

itemUnitsIDs, err := storage.LoadAllItemUnitsIDsByItemIDsAndUnitID(s.mustDBWithCtx(ctx), bu.ID(), itemIDs)
if err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to load item units: %v", err)
continue
}

itemUnitsToMark := make([]string, 0, len(itemUnitsIDs))
for i := range itemUnitsIDs {
uiID := itemUnitsIDs[i]
lockKey := cache.Key(storage.FileBufferKey, bu.ID(), "lock", uiID)
b, err := s.Cache.Lock(lockKey, 30*time.Second, 0, 1)
if err != nil {
log.Error(ctx, "unable to lock unit item: %v: %v", uiID, err)
continue
}
if !b {
log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID)
continue
}
readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*")
keys, err := s.Cache.Keys(cache.Key(readerPatternKey))
if err != nil {
log.Error(ctx, "unable to check if item unit is currently reading by cdn")
_ = s.Cache.Unlock(lockKey)
continue
}
if len(keys) > 0 {
log.Info(ctx, "do not delete item unit, it is currently reading by cdn")
_ = s.Cache.Unlock(lockKey)
continue
}
itemUnitsToMark = append(itemUnitsToMark, uiID)
}

tx, err := s.mustDBWithCtx(ctx).Begin()
if err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to start transaction: %v", err)
continue
}

if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil {
if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsToMark); err != nil {
_ = tx.Rollback()
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "unable to mark item as delete: %v", err)
Expand Down
134 changes: 130 additions & 4 deletions engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"testing"
"time"

"github.com/ovh/symmecrypt/ciphers/aesgcm"
"github.com/ovh/symmecrypt/convergent"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/lru"
"github.com/ovh/cds/engine/cdn/storage"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"
"github.com/ovh/symmecrypt/ciphers/aesgcm"
"github.com/ovh/symmecrypt/convergent"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
)

func TestCleanSynchronizedItem(t *testing.T) {
Expand Down Expand Up @@ -503,3 +505,127 @@ func TestPurgeItem(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(items))
}

func TestCleanSynchronizedReadingItem(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearUnits(t, context.TODO(), m, db)

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: store,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines(context.TODO())

tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*")
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
t.Cleanup(cancel)

cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
"file_buffer": {
Local: &storage.LocalBufferConfiguration{
Path: tmpDir2,
},
BufferType: storage.CDNBufferTypeFile,
},
},
Storages: map[string]storage.StorageConfiguration{
"fs-backend": {
Local: &storage.LocalStorageConfiguration{
Path: tmpDir,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
"cds-backend": {
CDS: &storage.CDSStorageConfiguration{
Host: "lolcat.host",
Token: "mytoken",
},
},
},
})
require.NoError(t, err)
s.Units = cdnUnits

// Add Item in redis / fs/ cds -will be delete from redis
it := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
iuCDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuCDS))
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))
iuFileStorage := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileStorage))

///////////////////////////////////////
// 1st test, getItem Lock the item unit
///////////////////////////////////////
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
require.NoError(t, err)
t.Cleanup(func() {
s.Cache.Unlock(lockKey)
})
require.True(t, hasLocked)
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.NoError(t, err)

require.NoError(t, s.Cache.Unlock(lockKey))

////////////////////////////////////////////////////////
// 2nd test, getItem is reading the file from the buffer
////////////////////////////////////////////////////////
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", iuFileBuf.ID, sdk.UUID())
require.NoError(t, s.Cache.SetWithTTL(readerKey, true, 30))
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.NoError(t, err)

require.NoError(t, s.Cache.Delete(readerKey))
////////////////////////////////////////////////////////
// 3rd test, mark as delete
////////////////////////////////////////////////////////
require.NoError(t, s.cleanBuffer(context.TODO()))

_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound))
}
50 changes: 43 additions & 7 deletions engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/engine/gorpmapper"
Expand Down Expand Up @@ -208,13 +209,47 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe

// If item is in Buffer, get from it
if itemUnit != nil {
log.Error(ctx, "getItemFileValue> Getting file from buffer")

rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
log.Debug(ctx, "getItemFileValue> Getting file from buffer")
ignoreBuffer := false
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
if err != nil {
return nil, nil, nil, err
log.Error(ctx, "unable to get lock for %s", lockKey)
ignoreBuffer = true
}
if hasLocked {
// Reload to be sure that it's not marked as delete
_, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID)
if err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
log.Error(ctx, "unable to load item unit: %v", err)
}
ignoreBuffer = true
}

if !ignoreBuffer {
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID())
if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil {
log.Error(ctx, "unable to set reader on file buffer: %v", err)
ignoreBuffer = true
}
}

if err := s.Cache.Unlock(lockKey); err != nil {
log.Error(ctx, "unable to release lock for %s", lockKey)
}
} else {
ignoreBuffer = true
}
return itemUnit, s.Units.FileBuffer(), rc, nil

if !ignoreBuffer {
rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
if err != nil {
return nil, nil, nil, err
}
return itemUnit, s.Units.FileBuffer(), rc, nil
}

}

// Get from storage
Expand Down Expand Up @@ -349,12 +384,13 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string
return "", "", err
}

itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits)
itemUnits = s.Units.FilterItemUnitFromBuffer(itemUnits)

if len(itemUnits) == 0 {
return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID))
}

itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits)

var unit *sdk.CDNUnit
var selectedItemUnit *sdk.CDNItemUnit
if defaultUnitName != "" {
Expand Down
52 changes: 52 additions & 0 deletions engine/cdn/cdn_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rockbears/log"
"github.com/stretchr/testify/require"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/lru"
"github.com/ovh/cds/engine/cdn/redis"
Expand Down Expand Up @@ -509,3 +510,54 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
require.Equal(t, int64(0), lines[226].Number)
require.Equal(t, "Line 0\n", lines[226].Value)
}

func TestGetFileItemFromBuffer(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearSyncRedisSet(t, store, "local_storage")

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: store,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines(context.TODO())

ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)

_ = test.LoadTestingConf(t, sdk.TypeCDN)
cdnUnits := newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, store)
s.Units = cdnUnits

// Add Item in redis / fs/ cds -will be delete from redis
it := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))

// IU locked by gc
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
require.NoError(t, err)
require.True(t, hasLocked)

ui, unit, reader, err := s.getItemFileValue(ctx, sdk.CDNTypeItemRunResult, it.APIRefHash, getItemFileOptions{})
require.Nil(t, reader)
require.Nil(t, ui)
require.Nil(t, unit)
require.Contains(t, err.Error(), "unable to find item units for item with id: "+it.ID)
}
4 changes: 4 additions & 0 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
cdslog "github.com/ovh/cds/sdk/log"
)

var (
FileBufferKey = cache.Key("cdn", "unit")
)

func (r RunningStorageUnits) Storage(name string) StorageUnit {
for _, x := range r.Storages {
if x.Name() == name {
Expand Down
20 changes: 20 additions & 0 deletions engine/cdn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,17 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit {
}
}

func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit {
itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius))
for _, u := range ius {
if x.IsBuffer(u.UnitID) {
continue
}
itemsUnits = append(itemsUnits, u)
}
return itemsUnits
}

func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit {
// Remove cds backend from getting something that is not a log
if ius[0].Type != sdk.CDNTypeItemStepLog && ius[0].Type != sdk.CDNTypeItemServiceLog {
Expand All @@ -307,6 +318,15 @@ func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit)
return ius
}

func (x *RunningStorageUnits) IsBuffer(id string) bool {
for _, buf := range x.Buffers {
if buf.ID() == id {
return true
}
}
return false
}

type LogConfig struct {
// Step logs
StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`
Expand Down

0 comments on commit b8c656e

Please sign in to comment.