Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdn): clean old worker cache items #5856

Merged
merged 1 commit into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions engine/cdn/cdn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"os"
"sort"

"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
Expand Down Expand Up @@ -126,11 +127,6 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
}
defer tx.Rollback() //nolint

// Check and Clean file with same ref
if err := s.cleanPreviousFileItem(ctx, tx, sig, itemType, apiRef.ToFilename()); err != nil {
return err
}

// Insert Item
if err := item.Insert(ctx, s.Mapper, tx, it); err != nil {
return err
Expand Down Expand Up @@ -186,22 +182,42 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
}

s.Units.PushInSyncQueue(ctx, it.ID, it.Created)

// For worker cache item clean others with same ref to purge old cached data
if itemType == sdk.CDNTypeItemWorkerCache {
tx, err := s.mustDBWithCtx(ctx).Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback() //nolint

if err := s.cleanPreviousCachedData(ctx, tx, sig, apiRef.ToFilename()); err != nil {
return err
}

if err := tx.Commit(); err != nil {
return sdk.WithStack(err)
}
}

return nil
}

func (s *Service) cleanPreviousFileItem(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, sig cdn.Signature, itemType sdk.CDNItemType, name string) error {
switch itemType {
case sdk.CDNTypeItemWorkerCache:
// Check if item already exist
existingItem, err := item.LoadFileByProjectAndCacheTag(ctx, s.Mapper, tx, itemType, sig.ProjectKey, name)
if err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
return err
}
return nil
// Mark to delete all items for given cache tag except the most recent one.
func (s *Service) cleanPreviousCachedData(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, sig cdn.Signature, cacheTag string) error {
items, err := item.LoadWorkerCacheItemsByProjectAndCacheTag(ctx, s.Mapper, tx, sig.ProjectKey, cacheTag)
if err != nil {
return err
}

sort.Slice(items, func(i, j int) bool { return items[i].Created.Before(items[j].Created) })

for i := 0; i < len(items)-1; i++ {
items[i].ToDelete = true
if err := item.Update(ctx, s.Mapper, tx, &items[i]); err != nil {
return err
}
existingItem.ToDelete = true
return item.Update(ctx, s.Mapper, tx, existingItem)
}

return nil
}
36 changes: 24 additions & 12 deletions engine/cdn/item/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,31 +144,44 @@ func Update(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.SqlExecutor

func MarkToDeleteByRunIDs(db gorpmapper.SqlExecutorWithTx, runID int64) error {
query := `
UPDATE item SET to_delete = true WHERE (api_ref->>'run_id')::int = $1
UPDATE item SET to_delete = true WHERE (api_ref->>'run_id')::int = $1
`
_, err := db.Exec(query, runID)
return sdk.WrapError(err, "unable to mark item to delete for run %d", runID)
}

func LoadFileByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, projKey string, cacheTag string) (*sdk.CDNItem, error) {
func LoadWorkerCacheItemByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, projKey string, cacheTag string) (*sdk.CDNItem, error) {
query := gorpmapper.NewQuery(`
SELECT *
FROM item
WHERE type = $1
AND (api_ref->>'project_key')::text = $2
AND (api_ref->>'cache_tag')::text = $3
AND to_delete = false

`).Args(itemType, projKey, cacheTag)
ORDER BY created DESC
LIMIT 1
`).Args(sdk.CDNTypeItemWorkerCache, projKey, cacheTag)
return getItem(ctx, m, db, query)
}

func LoadWorkerCacheItemsByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, projKey string, cacheTag string) ([]sdk.CDNItem, error) {
query := gorpmapper.NewQuery(`
SELECT *
FROM item
WHERE type = $1
AND (api_ref->>'project_key')::text = $2
AND (api_ref->>'cache_tag')::text = $3
AND to_delete = false
`).Args(sdk.CDNTypeItemWorkerCache, projKey, cacheTag)
return getItems(ctx, m, db, query)
}

// LoadByAPIRefHashAndType load an item by his job id, step order and type
func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, hash string, itemType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItem, error) {
query := gorpmapper.NewQuery(`
SELECT *
FROM item
WHERE api_ref_hash = $1
WHERE api_ref_hash = $1
AND type = $2
AND to_delete = false
`).Args(hash, itemType)
Expand All @@ -179,7 +192,7 @@ func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.
func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) {
query := `
SELECT COALESCE(SUM(size), 0) FROM item
WHERE id = ANY($1)
WHERE id = ANY($1)
`
size, err := db.SelectInt(query, pq.StringArray(itemIDs))
if err != nil {
Expand All @@ -191,9 +204,9 @@ func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) {
func ListNodeRunByProject(db gorp.SqlExecutor, projectKey string) ([]int64, error) {
var IDs []int64
query := `
SELECT
SELECT
DISTINCT((api_ref->>'node_run_id')::int)
FROM item
FROM item
WHERE api_ref->>'project_key' = $1
`
_, err := db.Select(&IDs, query, projectKey)
Expand All @@ -206,7 +219,7 @@ func ListNodeRunByProject(db gorp.SqlExecutor, projectKey string) ([]int64, erro
// ComputeSizeByProjectKey returns the size used by a project
func ComputeSizeByProjectKey(db gorp.SqlExecutor, projectKey string) (int64, error) {
query := `
SELECT SUM(size) FROM item WHERE api_ref->>'project_key' = $1
SELECT SUM(size) FROM item WHERE api_ref->>'project_key' = $1
`
size, err := db.SelectInt(query, projectKey)
if err != nil {
Expand All @@ -223,15 +236,15 @@ type Stat struct {

func CountItems(db gorp.SqlExecutor) (res []Stat, err error) {
_, err = db.Select(&res, `
SELECT status, type, count(status) as "number"
SELECT status, type, count(status) as "number"
FROM item
GROUP BY status, type`)
return res, sdk.WithStack(err)
}

func CountItemsToDelete(db gorp.SqlExecutor) (int64, error) {
query := `SELECT count(1) as "number"
FROM item
FROM item
WHERE to_delete = true`
nb, err := db.SelectInt(query)
return nb, sdk.WithStack(err)
Expand Down Expand Up @@ -334,5 +347,4 @@ func LoadRunResultByRunID(ctx context.Context, m *gorpmapper.Mapper, db gorp.Sql
SELECT * FROM item WHERE id IN (SELECT id FROM deduplication)
`).Args(runID, sdk.CDNTypeItemRunResult)
return getItems(ctx, m, db, query)

}
2 changes: 1 addition & 1 deletion engine/cdn/item_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (s *Service) getWorkerCache(ctx context.Context, r *http.Request, w http.Re
if projectKey == "" || cachetag == "" {
return sdk.WrapError(sdk.ErrWrongRequest, "invalid data to get worker cache")
}
item, err := item.LoadFileByProjectAndCacheTag(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNTypeItemWorkerCache, projectKey, cachetag)
item, err := item.LoadWorkerCacheItemByProjectAndCacheTag(ctx, s.Mapper, s.mustDBWithCtx(ctx), projectKey, cachetag)
if err != nil {
return err
}
Expand Down