Skip to content

Commit

Permalink
fix(cdn): do not stack purge goroutine (#5862)
Browse files Browse the repository at this point in the history
* fix(cdn): do not stack purge goroutine
* fix: loop over item to delete
* fix: add order by
  • Loading branch information
sguiheux authored Jul 1, 2021
1 parent 966f98a commit 2f6a6fc
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 58 deletions.
62 changes: 35 additions & 27 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
)

func (s *Service) itemPurge(ctx context.Context) {
tickPurge := time.NewTicker(1 * time.Minute)
tickPurge := time.NewTicker(15 * time.Minute)
defer tickPurge.Stop()
for {
select {
Expand Down Expand Up @@ -86,39 +86,46 @@ func (s *Service) markUnitItemToDeleteByItemID(ctx context.Context, itemID strin
}

func (s *Service) cleanItemToDelete(ctx context.Context) error {
ids, err := item.LoadIDsToDelete(s.mustDBWithCtx(ctx), 1000)
if err != nil {
return err
}
offset := 0
limit := 1000

r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(ids), func(i, j int) { ids[i], ids[j] = ids[j], ids[i] })

if len(ids) > 0 {
log.Info(ctx, "cdn:purge:item: %d items to delete", len(ids))
}

for _, id := range ids {
nbUnitItemToDelete, err := s.markUnitItemToDeleteByItemID(ctx, id)
for {
ids, err := item.LoadIDsToDelete(s.mustDBWithCtx(ctx), offset, limit)
if err != nil {
log.Error(ctx, "cdn:purge:item: unable to mark unit item %q to delete: %v", id, err)
continue
return err
}

log.Debug(ctx, "cdn:purge:item: %d unit items to delete for item %q", nbUnitItemToDelete, id)
if len(ids) == 0 {
return nil
}

// If and only If there is not more unit item to mark as delete,
// let's delete the item in database
if nbUnitItemToDelete == 0 {
nbItemUnits, err := storage.CountItemUnitsToDeleteByItemID(s.mustDBWithCtx(ctx), id)
log.Info(ctx, "cdn:purge:item: %d items to delete", len(ids))
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(ids), func(i, j int) { ids[i], ids[j] = ids[j], ids[i] })

for _, id := range ids {
nbUnitItemToDelete, err := s.markUnitItemToDeleteByItemID(ctx, id)
if err != nil {
log.Error(ctx, "cdn:purge:item: unable to count unit item %q to delete: %v", id, err)
log.Error(ctx, "cdn:purge:item: unable to mark unit item %q to delete: %v", id, err)
continue
}

if nbItemUnits > 0 {
log.Debug(ctx, "cdn:purge:item: %d unit items to delete for item %q", nbItemUnits, id)
} else {
log.Debug(ctx, "cdn:purge:item: %d unit items to delete for item %q", nbUnitItemToDelete, id)

// If and only If there is not more unit item to mark as delete,
// let's delete the item in database
if nbUnitItemToDelete == 0 {
nbItemUnits, err := storage.CountItemUnitsToDeleteByItemID(s.mustDBWithCtx(ctx), id)
if err != nil {
log.Error(ctx, "cdn:purge:item: unable to count unit item %q to delete: %v", id, err)
continue
}

if nbItemUnits > 0 {
log.Debug(ctx, "cdn:purge:item: %d unit items to delete for item %q", nbItemUnits, id)
continue
}

if err := s.LogCache.Remove([]string{id}); err != nil {
return sdk.WrapError(err, "cdn:purge:item: unable to remove from logCache for item %q", id)
}
Expand All @@ -128,12 +135,13 @@ func (s *Service) cleanItemToDelete(ctx context.Context) error {
for _, sto := range s.Units.Storages {
s.Units.RemoveFromRedisSyncQueue(ctx, sto, id)
}

log.Debug(ctx, "cdn:purge:item: %s item deleted", id)
}
}
if len(ids) < limit {
return nil
}
}
return nil
}

func (s *Service) cleanBuffer(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func TestPurgeItem(t *testing.T) {
require.Equal(t, 1, len(items))

// Check there are 2 item to delete
ids, err := item.LoadIDsToDelete(db, 10)
ids, err := item.LoadIDsToDelete(db, 0, 10)
require.NoError(t, err)
require.GreaterOrEqual(t, len(ids), 2)

Expand Down
8 changes: 5 additions & 3 deletions engine/cdn/item/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,17 @@ func getItem(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, q g
return &item, sdk.WithStack(err)
}

func LoadIDsToDelete(db gorp.SqlExecutor, size int) ([]string, error) {
func LoadIDsToDelete(db gorp.SqlExecutor, offset int, limit int) ([]string, error) {
query := `
SELECT id
FROM item
WHERE to_delete = true
LIMIT $1
ORDER BY created ASC
OFFSET $1
LIMIT $2
`
var ids []string
if _, err := db.Select(&ids, query, size); err != nil {
if _, err := db.Select(&ids, query, offset, limit); err != nil {
return nil, sdk.WithStack(err)
}
return ids, nil
Expand Down
71 changes: 44 additions & 27 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,54 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
}
}

// Purge Buffer unit
for i := range r.Buffers {
b := r.Buffers[i]
gorts.RunWithRestart(ctx, "RunningStorageUnits.purge."+b.Name(),
func(ctx context.Context) {
tickrPurge := time.NewTicker(time.Duration(r.config.PurgeSeconds) * time.Second)
defer tickrPurge.Stop()
for {
select {
case <-ctx.Done():
return
case <-tickrPurge.C:
if err := r.Purge(ctx, b); err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "RunningStorageUnits.purge> error: %v", err)
}
}
}
},
)
}

// Purge Storage Unit
for i := range r.Storages {
s := r.Storages[i]
gorts.RunWithRestart(ctx, "RunningStorageUnits.purge."+s.Name(),
func(ctx context.Context) {
tickrPurge := time.NewTicker(time.Duration(r.config.PurgeSeconds) * time.Second)
defer tickrPurge.Stop()
for {
select {
case <-ctx.Done():
return
case <-tickrPurge.C:
if err := r.Purge(ctx, s); err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "RunningStorageUnits.purge> error: %v", err)
}
}
}
},
)
}

// Feed the sync processes with a ticker
gorts.Run(ctx, "RunningStorageUnits.Start", func(ctx context.Context) {
tickr := time.NewTicker(time.Duration(r.config.SyncSeconds) * time.Second)
tickrPurge := time.NewTicker(time.Duration(r.config.PurgeSeconds) * time.Second)

defer tickr.Stop()
defer tickrPurge.Stop()
for {
select {
case <-ctx.Done():
Expand All @@ -406,30 +447,6 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
)
}
wg.Wait()
case <-tickrPurge.C:
for i := range r.Buffers {
b := r.Buffers[i]
gorts.Exec(ctx, "RunningStorageUnits.purge."+b.Name(),
func(ctx context.Context) {
if err := r.Purge(ctx, b); err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "RunningStorageUnits.purge> error: %v", err)
}
},
)
}

for i := range r.Storages {
s := r.Storages[i]
gorts.Exec(ctx, "RunningStorageUnits.purge."+s.Name(),
func(ctx context.Context) {
if err := r.Purge(ctx, s); err != nil {
ctx = sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "RunningStorageUnits.purge> error: %v", err)
}
},
)
}
}
}

Expand Down

0 comments on commit 2f6a6fc

Please sign in to comment.