From 6b120b80cbdf8cf8956f587f6f1b870b7a1bdfbc Mon Sep 17 00:00:00 2001 From: Guiheux Steven Date: Mon, 31 May 2021 09:40:20 +0200 Subject: [PATCH] feat(cdn): add route to be able to resync a backend with database (#5829) --- engine/cdn/cdn_router.go | 2 + engine/cdn/storage/cds/cds.go | 5 + engine/cdn/storage/dao.go | 13 +++ engine/cdn/storage/local/buffer.go | 39 ++++++++ engine/cdn/storage/local/local.go | 4 + engine/cdn/storage/nfs/nfs.go | 44 +++++++++ engine/cdn/storage/redis/redis.go | 5 + engine/cdn/storage/s3/s3.go | 4 + engine/cdn/storage/swift/swift.go | 4 + engine/cdn/storage/types.go | 1 + engine/cdn/storage/webdav/webdav.go | 4 + engine/cdn/unit_handler.go | 32 +++++++ engine/cdn/unit_handler_test.go | 144 ++++++++++++++++++++++++++++ 13 files changed, 301 insertions(+) diff --git a/engine/cdn/cdn_router.go b/engine/cdn/cdn_router.go index 2dc743ac64..09b0468dc3 100644 --- a/engine/cdn/cdn_router.go +++ b/engine/cdn/cdn_router.go @@ -52,4 +52,6 @@ func (s *Service) initRouter(ctx context.Context) { r.Handle("/admin/database/encryption/{entity}", nil, r.GET(s.getAdminDatabaseEncryptedTuplesByEntity)) r.Handle("/admin/database/encryption/{entity}/roll/{pk}", nil, r.POST(s.postAdminDatabaseRollEncryptedEntityByPrimaryKey)) + r.Handle("/admin/backend/{id}/resync/{type}", nil, r.POST(s.postAdminResyncBackendWithDatabaseHandler)) + } diff --git a/engine/cdn/storage/cds/cds.go b/engine/cdn/storage/cds/cds.go index 974dee1b04..722e4eb4b1 100644 --- a/engine/cdn/storage/cds/cds.go +++ b/engine/cdn/storage/cds/cds.go @@ -13,6 +13,7 @@ import ( "github.com/ovh/cds/engine/gorpmapper" "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" + "github.com/rockbears/log" ) type CDS struct { @@ -130,3 +131,7 @@ func (c *CDS) Read(_ sdk.CDNItemUnit, r io.Reader, w io.Writer) error { func (c *CDS) Write(_ sdk.CDNItemUnit, _ io.Reader, _ io.Writer) error { return nil } + +func (c *CDS) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) { + log.Error(ctx, "Resynchronization with database not implemented for CDS storage unit") +} diff --git a/engine/cdn/storage/dao.go b/engine/cdn/storage/dao.go index 5291a4de32..32f4aef656 100644 --- a/engine/cdn/storage/dao.go +++ b/engine/cdn/storage/dao.go @@ -174,6 +174,19 @@ func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLo return len(ids) > 0, sdk.WithStack(err) } +func HashItemUnitByApiRefHash(db gorp.SqlExecutor, apiRefHash string, unitID string) (bool, error) { + query := ` + SELECT count(sui.id) FROM storage_unit_item sui + JOIN item on item.id = sui.item_id + WHERE item.api_ref_hash = $1 AND unit_id = $2 + ` + nb, err := db.SelectInt(query, apiRefHash, unitID) + if err != nil { + return false, sdk.WithStack(err) + } + return nb > 0, nil +} + func LoadItemUnitByID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error) { query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE id = $1 AND to_delete = false").Args(id) return getItemUnit(ctx, m, db, query, opts...) diff --git a/engine/cdn/storage/local/buffer.go b/engine/cdn/storage/local/buffer.go index d7939939b4..5bdf4a4275 100644 --- a/engine/cdn/storage/local/buffer.go +++ b/engine/cdn/storage/local/buffer.go @@ -4,6 +4,10 @@ import ( "context" "fmt" "os" + "path/filepath" + + "github.com/go-gorp/gorp" + "github.com/rockbears/log" "github.com/ovh/cds/engine/cdn/storage" "github.com/ovh/cds/engine/cdn/storage/encryption" @@ -58,3 +62,38 @@ func (b *Buffer) Size(_ sdk.CDNItemUnit) (int64, error) { func (b *Buffer) BufferType() storage.CDNBufferType { return b.bufferType } + +func (b *Buffer) ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) { + root := fmt.Sprintf("%s/%s", b.config.Path, string(t)) + if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if info == nil { + return nil + } + if info.IsDir() { + log.Warn(ctx, "local-buffer: found directory inside %s: %s", string(t), path) + return nil + } + _, fileName := filepath.Split(path) + has, err := storage.HashItemUnitByApiRefHash(db, fileName, b.ID()) + if err != nil { + log.Error(ctx, "local-buffer: unable to check if unit item exist for api ref hash %s: %v", fileName, err) + return nil + } + if has { + return nil + } + if !dryRun { + if err := os.Remove(path); err != nil { + log.Error(ctx, "local-buffer: unable to remove file %s: %v", path, err) + return nil + } + log.Info(ctx, "local-buffer: file %s has been deleted", fileName) + } else { + log.Info(ctx, "local-buffer: file %s should be deleted", fileName) + } + return nil + }); err != nil { + log.Error(ctx, "local-buffer: error during walk operation: %v", err) + } + +} diff --git a/engine/cdn/storage/local/local.go b/engine/cdn/storage/local/local.go index 91ccddf62c..51d1374499 100644 --- a/engine/cdn/storage/local/local.go +++ b/engine/cdn/storage/local/local.go @@ -193,3 +193,7 @@ func (s *AbstractLocal) Remove(ctx context.Context, i sdk.CDNItemUnit) error { } return nil } + +func (s *Local) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) { + log.Error(ctx, "Resynchronization with database not implemented for local storage unit") +} diff --git a/engine/cdn/storage/nfs/nfs.go b/engine/cdn/storage/nfs/nfs.go index 3127e4f8e7..ffc1bf03cb 100644 --- a/engine/cdn/storage/nfs/nfs.go +++ b/engine/cdn/storage/nfs/nfs.go @@ -332,3 +332,47 @@ func (n *Buffer) ls(v *gonfs.Target, path string) ([]*gonfs.EntryPlus, error) { } return dirs, nil } + +func (n *Buffer) ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) { + dial, target, err := n.Connect() + if err != nil { + log.Error(ctx, "nfs-buffer: unable to connect to NFS: %v", err) + return + } + defer dial.Close() // nolint + defer target.Close() // + + entries, err := n.ls(target, string(t)) + if err != nil { + log.Error(ctx, "nfs-buffer: unable to list directory %s", string(t)) + return + } + for _, e := range entries { + if e.IsDir() { + log.Warn(ctx, "nfs-buffer: found directory inside %s: %s", string(t), e) + continue + } + if e.FileName == "" { + log.Warn(ctx, "nfs-buffer: missing file name") + continue + } + has, err := storage.HashItemUnitByApiRefHash(db, e.FileName, n.ID()) + if err != nil { + log.Error(ctx, "nfs-buffer: unable to check if unit item exist for api ref hash %s: %v", e.FileName, err) + continue + } + if has { + continue + } + if !dryRun { + if err := target.Remove(string(t) + "/" + e.FileName); err != nil { + log.Error(ctx, "nfs-buffer: unable to remove file %s: %v", string(t)+"/"+e.FileName, err) + continue + } + log.Info(ctx, "nfs-buffer: file %s has been deleted", e.FileName) + } else { + log.Info(ctx, "nfs-buffer: file %s should be deleted", e.FileName) + } + } + return +} diff --git a/engine/cdn/storage/redis/redis.go b/engine/cdn/storage/redis/redis.go index 5e6de881eb..92b1e97f02 100644 --- a/engine/cdn/storage/redis/redis.go +++ b/engine/cdn/storage/redis/redis.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/go-gorp/gorp" + "github.com/rockbears/log" "github.com/ovh/cds/engine/cache" "github.com/ovh/cds/engine/cdn/redis" @@ -142,3 +143,7 @@ func (s *Redis) Status(_ context.Context) []sdk.MonitoringStatusLine { func (s *Redis) Remove(_ context.Context, i sdk.CDNItemUnit) error { return sdk.WithStack(s.store.Delete(cache.Key(keyBuffer, i.ItemID))) } + +func (s *Redis) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) { + log.Error(ctx, "Resynchronization with database not implemented for redis buffer unit") +} diff --git a/engine/cdn/storage/s3/s3.go b/engine/cdn/storage/s3/s3.go index 20f136b931..f9ccdf6159 100644 --- a/engine/cdn/storage/s3/s3.go +++ b/engine/cdn/storage/s3/s3.go @@ -203,3 +203,7 @@ func (s *S3) Remove(ctx context.Context, i sdk.CDNItemUnit) error { }) return sdk.WithStack(err) } + +func (s *S3) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) { + log.Error(ctx, "Resynchronization with database not implemented for s3 storage unit") +} diff --git a/engine/cdn/storage/swift/swift.go b/engine/cdn/storage/swift/swift.go index 310f361ab3..47d8b6f341 100644 --- a/engine/cdn/storage/swift/swift.go +++ b/engine/cdn/storage/swift/swift.go @@ -139,3 +139,7 @@ func (s *Swift) Remove(ctx context.Context, i sdk.CDNItemUnit) error { } return nil } + +func (s *Swift) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) { + log.Error(ctx, "Resynchronization with database not implemented for swift storage unit") +} diff --git a/engine/cdn/storage/types.go b/engine/cdn/storage/types.go index e6574715b6..53f4563980 100644 --- a/engine/cdn/storage/types.go +++ b/engine/cdn/storage/types.go @@ -98,6 +98,7 @@ type Unit interface { Read(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error NewReader(ctx context.Context, i sdk.CDNItemUnit) (io.ReadCloser, error) GetDriverName() string + ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) } type BufferUnit interface { diff --git a/engine/cdn/storage/webdav/webdav.go b/engine/cdn/storage/webdav/webdav.go index b5be62aeb0..0fc85c78aa 100644 --- a/engine/cdn/storage/webdav/webdav.go +++ b/engine/cdn/storage/webdav/webdav.go @@ -120,3 +120,7 @@ func (s *Webdav) Remove(ctx context.Context, i sdk.CDNItemUnit) error { } return sdk.WithStack(s.client.Remove(f)) } + +func (s *Webdav) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) { + log.Error(ctx, "Resynchronization with database not implemented for webdav storage unit") +} diff --git a/engine/cdn/unit_handler.go b/engine/cdn/unit_handler.go index 452ba640d9..4baf1e1828 100644 --- a/engine/cdn/unit_handler.go +++ b/engine/cdn/unit_handler.go @@ -108,3 +108,35 @@ func (s *Service) markItemUnitAsDeleteHandler() service.Handler { return nil } } + +func (s *Service) postAdminResyncBackendWithDatabaseHandler() service.Handler { + return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error { + vars := mux.Vars(r) + unitID := vars["id"] + itemType := vars["type"] + + it := sdk.CDNItemType(itemType) + if err := it.Validate(); err != nil { + return err + } + + dryRunString := r.FormValue("dryRun") + dryRun := dryRunString != "false" + + for _, u := range s.Units.Buffers { + if u.ID() == unitID { + s.GoRoutines.Exec(context.Background(), "ResyncWithDB-"+unitID, func(ctx context.Context) { + u.ResyncWithDatabase(ctx, s.mustDBWithCtx(ctx), it, dryRun) + }) + } + } + for _, u := range s.Units.Storages { + if u.ID() == unitID { + s.GoRoutines.Exec(context.Background(), "ResyncWithDB-"+unitID, func(ctx context.Context) { + u.ResyncWithDatabase(ctx, s.mustDBWithCtx(ctx), it, dryRun) + }) + } + } + return nil + } +} diff --git a/engine/cdn/unit_handler_test.go b/engine/cdn/unit_handler_test.go index 4b49cfef2b..b24d8a15aa 100644 --- a/engine/cdn/unit_handler_test.go +++ b/engine/cdn/unit_handler_test.go @@ -2,13 +2,21 @@ package cdn import ( "context" + "fmt" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/storage" cdntest "github.com/ovh/cds/engine/cdn/test" + "github.com/ovh/cds/engine/test" "github.com/ovh/cds/sdk" + "github.com/ovh/cds/sdk/cdn" + "github.com/ovh/symmecrypt/ciphers/aesgcm" + "github.com/ovh/symmecrypt/convergent" + "github.com/ovh/symmecrypt/keyloader" "github.com/rockbears/log" "github.com/stretchr/testify/require" + "io/ioutil" "net/http/httptest" + "os" "testing" "time" ) @@ -94,3 +102,139 @@ func TestMarkItemUnitAsDeleteHandler(t *testing.T) { require.Equal(t, 204, recDel.Code) } + +func TestPostAdminResyncBackendWithDatabaseHandler(t *testing.T) { + s, db := newTestService(t) + + cfg := test.LoadTestingConf(t, sdk.TypeCDN) + + cdntest.ClearItem(t, context.TODO(), s.Mapper, db) + cdntest.ClearItem(t, context.TODO(), s.Mapper, db) + cdntest.ClearSyncRedisSet(t, s.Cache, "local_storage") + + // Start CDN + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") + require.NoError(t, err) + + tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*") + require.NoError(t, err) + + cdnUnits, err := storage.Init(ctx, s.Mapper, s.Cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ + SyncSeconds: 1, + SyncNbElements: 1000, + PurgeNbElements: 1000, + PurgeSeconds: 30, + HashLocatorSalt: "thisismysalt", + Buffers: map[string]storage.BufferConfiguration{ + "refis_buffer": { + Redis: &storage.RedisBufferConfiguration{ + Host: cfg["redisHost"], + Password: cfg["redisPassword"], + }, + BufferType: storage.CDNBufferTypeLog, + }, + "local_buffer": { + Local: &storage.LocalBufferConfiguration{ + Path: tmpDir, + Encryption: []*keyloader.KeyConfig{ + { + Key: "iamakey.iamakey.iamakey.iamakey.", + Cipher: aesgcm.CipherName, + Identifier: "local-bukker-id", + }, + }, + }, + BufferType: storage.CDNBufferTypeFile, + }, + }, + Storages: map[string]storage.StorageConfiguration{ + "local_storage": { + SyncParallel: 10, + SyncBandwidth: int64(1024 * 1024), + Local: &storage.LocalStorageConfiguration{ + Path: tmpDir2, + Encryption: []convergent.ConvergentEncryptionConfig{ + { + Cipher: aesgcm.CipherName, + LocatorSalt: "secret_locator_salt", + SecretValue: "secret_value", + }, + }, + }, + }, + }, + }) + require.NoError(t, err) + s.Units = cdnUnits + cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx)) + + // Create an Item + it, err := s.loadOrCreateItem(context.TODO(), sdk.CDNTypeItemRunResult, cdn.Signature{ + RunID: 1, + JobID: 1, + WorkflowID: 1, + NodeRunID: 1, + Worker: &cdn.SignatureWorker{ + WorkerID: "1", + FileName: sdk.RandomString(10), + RunResultType: string(sdk.WorkflowRunResultTypeArtifact), + FilePerm: 0777, + StepOrder: 0, + WorkerName: sdk.RandomString(10), + }, + }) + require.NoError(t, err) + _, err = s.loadOrCreateItemUnitBuffer(context.TODO(), it.ID, sdk.CDNTypeItemRunResult) + require.NoError(t, err) + + require.NoError(t, os.Mkdir(fmt.Sprintf("%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult)), 0755)) + + file1Path := fmt.Sprintf("%s/%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult), it.APIRefHash) + t.Logf("Creating file %s", file1Path) + content1 := []byte("I'm the real one") + f1, err := os.Create(file1Path) + require.NoError(t, err) + defer f1.Close() + _, err = f1.Write(content1) + require.NoError(t, err) + + file2Path := fmt.Sprintf("%s/%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult), "wronghash") + t.Logf("Creating file %s", file2Path) + content2 := []byte("I'm not the real one") + f2, err := os.Create(file2Path) + require.NoError(t, err) + defer f2.Close() + _, err = f2.Write(content2) + require.NoError(t, err) + + vars := make(map[string]string) + vars["id"] = s.Units.GetBuffer(sdk.CDNTypeItemRunResult).ID() + vars["type"] = string(sdk.CDNTypeItemRunResult) + uri := s.Router.GetRoute("POST", s.postAdminResyncBackendWithDatabaseHandler, vars) + "?dryRun=false" + require.NotEmpty(t, uri) + req := newRequest(t, "POST", uri, nil) + rec := httptest.NewRecorder() + s.Router.Mux.ServeHTTP(rec, req) + require.Equal(t, 204, rec.Code) + + cpt := 0 + for { + _, err := os.Stat(file2Path) + if os.IsNotExist(err) { + break + } + if cpt >= 20 { + t.FailNow() + } + cpt++ + time.Sleep(250 * time.Millisecond) + } + _, err = os.Stat(file2Path) + require.True(t, os.IsNotExist(err)) + + _, err = os.Stat(file1Path) + require.NoError(t, err) + +}