Skip to content

Commit

Permalink
fix(cdn): do not select all database for item unit purge (#5739)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 5, 2021
1 parent 0f356b4 commit 97bc1ef
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 7 deletions.
8 changes: 7 additions & 1 deletion engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ func (s *Service) cleanItemToDelete(ctx context.Context) error {
}

func (s *Service) cleanBuffer(ctx context.Context) error {
storageCount := int64(len(s.Units.Storages) + 1)
storageCount := int64(1)
for _, s := range s.Units.Storages {
if !s.CanSync() {
continue
}
storageCount++
}
for _, bu := range s.Units.Buffers {
itemIDs, err := storage.LoadAllSynchronizedItemIDs(s.mustDBWithCtx(ctx), bu.ID(), storageCount)
if err != nil {
Expand Down
146 changes: 146 additions & 0 deletions engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,152 @@ func TestCleanSynchronizedItem(t *testing.T) {
require.Equal(t, 3, len(iusCDSAfter))
}

func TestCleanSynchronizedItemWithDisabledStorage(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

log.Factory = log.NewTestingWrapper(t)
db, factory, cache, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
t.Cleanup(cancel)

cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearUnits(t, context.TODO(), m, db)

// Create cdn service
s := Service{
DBConnectionFactory: factory,
Cache: cache,
Mapper: m,
}
s.GoRoutines = sdk.NewGoRoutines()

tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
t.Cleanup(cancel)

cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
},
Storages: map[string]storage.StorageConfiguration{
"fs-backend": {
Local: &storage.LocalStorageConfiguration{
Path: tmpDir,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
"cds-backend": {
DisableSync: true,
CDS: &storage.CDSStorageConfiguration{
Host: "lolcat.host",
Token: "mytoken",
},
},
},
})
require.NoError(t, err)
s.Units = cdnUnits

// Add item in redis and CDS -- Stay in redis
item1RedisCDS := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &item1RedisCDS))
iu1CDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: item1RedisCDS.ID, Type: item1RedisCDS.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu1CDS))
iu1Redis := sdk.CDNItemUnit{UnitID: s.Units.LogsBuffer().ID(), ItemID: item1RedisCDS.ID, Type: item1RedisCDS.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu1Redis))

// Add Item in Redis and FS -- Must be sync
item2RedisFs := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &item2RedisFs))
iu2Redis := sdk.CDNItemUnit{UnitID: s.Units.LogsBuffer().ID(), ItemID: item2RedisFs.ID, Type: item2RedisFs.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu2Redis))
iu2FS := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: item2RedisFs.ID, Type: item2RedisFs.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu2FS))

// Add Item in redis only - have to stay in redis
item4Redis := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &item4Redis))
iu4Redis := sdk.CDNItemUnit{UnitID: s.Units.LogsBuffer().ID(), ItemID: item4Redis.ID, Type: item4Redis.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu4Redis))

// Add Item in redis / fs/ cds -- Must be sync
item6RedisFSCDS := sdk.CDNItem{
ID: sdk.UUID(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemCompleted,
APIRefHash: sdk.RandomString(10),
}
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &item6RedisFSCDS))
iu6CDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: item6RedisFSCDS.ID, Type: item6RedisFSCDS.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu6CDS))
iu6Redis := sdk.CDNItemUnit{UnitID: s.Units.LogsBuffer().ID(), ItemID: item6RedisFSCDS.ID, Type: item6RedisFSCDS.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu6Redis))
iu6FS := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: item6RedisFSCDS.ID, Type: item6RedisFSCDS.Type}
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iu6FS))

oneHundred := 100

iusFS, err := storage.LoadItemUnitsByUnit(context.TODO(), s.Mapper, db, s.Units.Storages[0].ID(), &oneHundred)
require.NoError(t, err)
require.Equal(t, 2, len(iusFS))

iusCDS, err := storage.LoadItemUnitsByUnit(context.TODO(), s.Mapper, db, s.Units.Storages[1].ID(), &oneHundred)
require.NoError(t, err)
require.Equal(t, 2, len(iusCDS))

// RUN TEST
iusRedisBefore, err := storage.LoadItemUnitsByUnit(context.TODO(), s.Mapper, db, s.Units.LogsBuffer().ID(), &oneHundred)
require.NoError(t, err)
require.Equal(t, 4, len(iusRedisBefore))

require.NoError(t, s.cleanBuffer(context.TODO()))

iusRedisAfter, err := storage.LoadItemUnitsByUnit(context.TODO(), s.Mapper, db, s.Units.LogsBuffer().ID(), &oneHundred)
require.NoError(t, err)
require.Equal(t, 2, len(iusRedisAfter))

iusFS2After, err := storage.LoadItemUnitsByUnit(context.TODO(), s.Mapper, db, s.Units.Storages[0].ID(), &oneHundred)
require.NoError(t, err)
require.Equal(t, 2, len(iusFS2After))

iusCDSAfter, err := storage.LoadItemUnitsByUnit(context.TODO(), s.Mapper, db, s.Units.Storages[1].ID(), &oneHundred)
require.NoError(t, err)
require.Equal(t, 2, len(iusCDSAfter))
}

func TestCleanWaitingItem(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
Expand Down
2 changes: 2 additions & 0 deletions engine/cdn/cdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func newRunningStorageUnits(t *testing.T, m *gorpmapper.Mapper, dbMap *gorp.DbMa
cdnUnits, err := storage.Init(ctx, m, store, dbMap, sdk.NewGoRoutines(), storage.Configuration{
SyncSeconds: 2,
SyncNbElements: 100,
PurgeSeconds: 30,
PurgeNbElements: 100,
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Expand Down
2 changes: 2 additions & 0 deletions engine/cdn/item_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func TestPostUploadHandler(t *testing.T) {
cdnUnits, err := storage.Init(ctx, s.Mapper, s.Cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
SyncSeconds: 1,
SyncNbElements: 1000,
PurgeNbElements: 1000,
PurgeSeconds: 30,
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"refis_buffer": {
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ func CountItemUnitsToDeleteByItemID(db gorp.SqlExecutor, itemID string) (int64,
return nb, sdk.WithStack(err)
}

func LoadAllItemUnitsToDeleteByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 AND to_delete = true ORDER BY last_modified ASC").Args(unitID)
func LoadAllItemUnitsToDeleteByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, limit int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 AND to_delete = true ORDER BY last_modified ASC LIMIT $2").Args(unitID, limit)
return getAllItemUnits(ctx, m, db, query, opts...)
}

Expand Down
18 changes: 16 additions & 2 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
return nil, sdk.WithStack(fmt.Errorf("too much CDN Buffer for file items"))
}

if len(config.Storages) == 0 {
activeStorage := 0
for _, s := range config.Storages {
if !s.DisableSync {
activeStorage++
}
}
if activeStorage == 0 {
return nil, sdk.WithStack(fmt.Errorf("invalid CDN configuration. Missing storage unit"))
}

Expand All @@ -77,6 +83,14 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
config.SyncSeconds = 30
}

if config.PurgeSeconds <= 0 {
config.PurgeSeconds = 30
}

if config.PurgeNbElements <= 0 {
config.PurgeNbElements = 1000
}

for name, bu := range config.Buffers {
var bufferUnit BufferUnit
switch {
Expand Down Expand Up @@ -378,7 +392,7 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
// Feed the sync processes with a ticker
gorts.Run(ctx, "RunningStorageUnits.Start", func(ctx context.Context) {
tickr := time.NewTicker(time.Duration(r.config.SyncSeconds) * time.Second)
tickrPurge := time.NewTicker(30 * time.Second)
tickrPurge := time.NewTicker(time.Duration(r.config.PurgeSeconds) * time.Second)

defer tickr.Stop()
defer tickrPurge.Stop()
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/storage/storageunit_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func init() {
}

func (x *RunningStorageUnits) Purge(ctx context.Context, s Interface) error {
unitItems, err := LoadAllItemUnitsToDeleteByUnit(ctx, x.m, x.db, s.ID(), gorpmapper.GetOptions.WithDecryption)
unitItems, err := LoadAllItemUnitsToDeleteByUnit(ctx, x.m, x.db, s.ID(), x.config.PurgeNbElements, gorpmapper.GetOptions.WithDecryption)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions engine/cdn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ type Configuration struct {
Storages map[string]StorageConfiguration `toml:"storages" json:"storages" mapstructure:"storages"`
SyncSeconds int `toml:"syncSeconds" default:"30" json:"syncSeconds" comment:"each n seconds, all storage backends will have to start a synchronization with the buffer"`
SyncNbElements int64 `toml:"syncNbElements" default:"100" json:"syncNbElements" comment:"nb items to synchronize from the buffer"`
PurgeSeconds int `toml:"purgeSeconds" default:"5" json:"purgeSeconds" comment:"each n seconds, all storage backends will have to start to delete storage unit item with deleted flag"`
PurgeNbElements int `toml:"purgeNbElements" default:"1000" json:"purgeNbElements" comment:"nb items to delete in each purge loop"`
}

type BufferConfiguration struct {
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/unit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestMarkItemUnitAsDeleteHandler(t *testing.T) {
if cpt >= 10 {
t.FailNow()
}
uis, err := storage.LoadAllItemUnitsToDeleteByUnit(ctx, s.Mapper, db, unit.ID)
uis, err := storage.LoadAllItemUnitsToDeleteByUnit(ctx, s.Mapper, db, unit.ID, 100)
require.NoError(t, err)
if len(uis) != 10 {
time.Sleep(250 * time.Millisecond)
Expand Down

0 comments on commit 97bc1ef

Please sign in to comment.