Skip to content

Commit

Permalink
feat(cdn): add route to be able to resync a backend with database (#5829
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sguiheux authored May 31, 2021
1 parent c39725e commit 6b120b8
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 0 deletions.
2 changes: 2 additions & 0 deletions engine/cdn/cdn_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ func (s *Service) initRouter(ctx context.Context) {
r.Handle("/admin/database/encryption/{entity}", nil, r.GET(s.getAdminDatabaseEncryptedTuplesByEntity))
r.Handle("/admin/database/encryption/{entity}/roll/{pk}", nil, r.POST(s.postAdminDatabaseRollEncryptedEntityByPrimaryKey))

r.Handle("/admin/backend/{id}/resync/{type}", nil, r.POST(s.postAdminResyncBackendWithDatabaseHandler))

}
5 changes: 5 additions & 0 deletions engine/cdn/storage/cds/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
"github.com/rockbears/log"
)

type CDS struct {
Expand Down Expand Up @@ -130,3 +131,7 @@ func (c *CDS) Read(_ sdk.CDNItemUnit, r io.Reader, w io.Writer) error {
func (c *CDS) Write(_ sdk.CDNItemUnit, _ io.Reader, _ io.Writer) error {
return nil
}

func (c *CDS) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for CDS storage unit")
}
13 changes: 13 additions & 0 deletions engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLo
return len(ids) > 0, sdk.WithStack(err)
}

func HashItemUnitByApiRefHash(db gorp.SqlExecutor, apiRefHash string, unitID string) (bool, error) {
query := `
SELECT count(sui.id) FROM storage_unit_item sui
JOIN item on item.id = sui.item_id
WHERE item.api_ref_hash = $1 AND unit_id = $2
`
nb, err := db.SelectInt(query, apiRefHash, unitID)
if err != nil {
return false, sdk.WithStack(err)
}
return nb > 0, nil
}

func LoadItemUnitByID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE id = $1 AND to_delete = false").Args(id)
return getItemUnit(ctx, m, db, query, opts...)
Expand Down
39 changes: 39 additions & 0 deletions engine/cdn/storage/local/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/go-gorp/gorp"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/engine/cdn/storage/encryption"
Expand Down Expand Up @@ -58,3 +62,38 @@ func (b *Buffer) Size(_ sdk.CDNItemUnit) (int64, error) {
func (b *Buffer) BufferType() storage.CDNBufferType {
return b.bufferType
}

func (b *Buffer) ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) {
root := fmt.Sprintf("%s/%s", b.config.Path, string(t))
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if info == nil {
return nil
}
if info.IsDir() {
log.Warn(ctx, "local-buffer: found directory inside %s: %s", string(t), path)
return nil
}
_, fileName := filepath.Split(path)
has, err := storage.HashItemUnitByApiRefHash(db, fileName, b.ID())
if err != nil {
log.Error(ctx, "local-buffer: unable to check if unit item exist for api ref hash %s: %v", fileName, err)
return nil
}
if has {
return nil
}
if !dryRun {
if err := os.Remove(path); err != nil {
log.Error(ctx, "local-buffer: unable to remove file %s: %v", path, err)
return nil
}
log.Info(ctx, "local-buffer: file %s has been deleted", fileName)
} else {
log.Info(ctx, "local-buffer: file %s should be deleted", fileName)
}
return nil
}); err != nil {
log.Error(ctx, "local-buffer: error during walk operation: %v", err)
}

}
4 changes: 4 additions & 0 deletions engine/cdn/storage/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,7 @@ func (s *AbstractLocal) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
}
return nil
}

func (s *Local) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for local storage unit")
}
44 changes: 44 additions & 0 deletions engine/cdn/storage/nfs/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,47 @@ func (n *Buffer) ls(v *gonfs.Target, path string) ([]*gonfs.EntryPlus, error) {
}
return dirs, nil
}

func (n *Buffer) ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) {
dial, target, err := n.Connect()
if err != nil {
log.Error(ctx, "nfs-buffer: unable to connect to NFS: %v", err)
return
}
defer dial.Close() // nolint
defer target.Close() //

entries, err := n.ls(target, string(t))
if err != nil {
log.Error(ctx, "nfs-buffer: unable to list directory %s", string(t))
return
}
for _, e := range entries {
if e.IsDir() {
log.Warn(ctx, "nfs-buffer: found directory inside %s: %s", string(t), e)
continue
}
if e.FileName == "" {
log.Warn(ctx, "nfs-buffer: missing file name")
continue
}
has, err := storage.HashItemUnitByApiRefHash(db, e.FileName, n.ID())
if err != nil {
log.Error(ctx, "nfs-buffer: unable to check if unit item exist for api ref hash %s: %v", e.FileName, err)
continue
}
if has {
continue
}
if !dryRun {
if err := target.Remove(string(t) + "/" + e.FileName); err != nil {
log.Error(ctx, "nfs-buffer: unable to remove file %s: %v", string(t)+"/"+e.FileName, err)
continue
}
log.Info(ctx, "nfs-buffer: file %s has been deleted", e.FileName)
} else {
log.Info(ctx, "nfs-buffer: file %s should be deleted", e.FileName)
}
}
return
}
5 changes: 5 additions & 0 deletions engine/cdn/storage/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/go-gorp/gorp"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/redis"
Expand Down Expand Up @@ -142,3 +143,7 @@ func (s *Redis) Status(_ context.Context) []sdk.MonitoringStatusLine {
func (s *Redis) Remove(_ context.Context, i sdk.CDNItemUnit) error {
return sdk.WithStack(s.store.Delete(cache.Key(keyBuffer, i.ItemID)))
}

func (s *Redis) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for redis buffer unit")
}
4 changes: 4 additions & 0 deletions engine/cdn/storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,7 @@ func (s *S3) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
})
return sdk.WithStack(err)
}

func (s *S3) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for s3 storage unit")
}
4 changes: 4 additions & 0 deletions engine/cdn/storage/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ func (s *Swift) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
}
return nil
}

func (s *Swift) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for swift storage unit")
}
1 change: 1 addition & 0 deletions engine/cdn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Unit interface {
Read(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
NewReader(ctx context.Context, i sdk.CDNItemUnit) (io.ReadCloser, error)
GetDriverName() string
ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool)
}

type BufferUnit interface {
Expand Down
4 changes: 4 additions & 0 deletions engine/cdn/storage/webdav/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,7 @@ func (s *Webdav) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
}
return sdk.WithStack(s.client.Remove(f))
}

func (s *Webdav) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for webdav storage unit")
}
32 changes: 32 additions & 0 deletions engine/cdn/unit_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,35 @@ func (s *Service) markItemUnitAsDeleteHandler() service.Handler {
return nil
}
}

func (s *Service) postAdminResyncBackendWithDatabaseHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
unitID := vars["id"]
itemType := vars["type"]

it := sdk.CDNItemType(itemType)
if err := it.Validate(); err != nil {
return err
}

dryRunString := r.FormValue("dryRun")
dryRun := dryRunString != "false"

for _, u := range s.Units.Buffers {
if u.ID() == unitID {
s.GoRoutines.Exec(context.Background(), "ResyncWithDB-"+unitID, func(ctx context.Context) {
u.ResyncWithDatabase(ctx, s.mustDBWithCtx(ctx), it, dryRun)
})
}
}
for _, u := range s.Units.Storages {
if u.ID() == unitID {
s.GoRoutines.Exec(context.Background(), "ResyncWithDB-"+unitID, func(ctx context.Context) {
u.ResyncWithDatabase(ctx, s.mustDBWithCtx(ctx), it, dryRun)
})
}
}
return nil
}
}
144 changes: 144 additions & 0 deletions engine/cdn/unit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package cdn

import (
"context"
"fmt"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdn"
"github.com/ovh/symmecrypt/ciphers/aesgcm"
"github.com/ovh/symmecrypt/convergent"
"github.com/ovh/symmecrypt/keyloader"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
"io/ioutil"
"net/http/httptest"
"os"
"testing"
"time"
)
Expand Down Expand Up @@ -94,3 +102,139 @@ func TestMarkItemUnitAsDeleteHandler(t *testing.T) {
require.Equal(t, 204, recDel.Code)

}

func TestPostAdminResyncBackendWithDatabaseHandler(t *testing.T) {
s, db := newTestService(t)

cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), s.Mapper, db)
cdntest.ClearItem(t, context.TODO(), s.Mapper, db)
cdntest.ClearSyncRedisSet(t, s.Cache, "local_storage")

// Start CDN
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*")
require.NoError(t, err)

cdnUnits, err := storage.Init(ctx, s.Mapper, s.Cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
SyncSeconds: 1,
SyncNbElements: 1000,
PurgeNbElements: 1000,
PurgeSeconds: 30,
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"refis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
"local_buffer": {
Local: &storage.LocalBufferConfiguration{
Path: tmpDir,
Encryption: []*keyloader.KeyConfig{
{
Key: "iamakey.iamakey.iamakey.iamakey.",
Cipher: aesgcm.CipherName,
Identifier: "local-bukker-id",
},
},
},
BufferType: storage.CDNBufferTypeFile,
},
},
Storages: map[string]storage.StorageConfiguration{
"local_storage": {
SyncParallel: 10,
SyncBandwidth: int64(1024 * 1024),
Local: &storage.LocalStorageConfiguration{
Path: tmpDir2,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
},
})
require.NoError(t, err)
s.Units = cdnUnits
cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx))

// Create an Item
it, err := s.loadOrCreateItem(context.TODO(), sdk.CDNTypeItemRunResult, cdn.Signature{
RunID: 1,
JobID: 1,
WorkflowID: 1,
NodeRunID: 1,
Worker: &cdn.SignatureWorker{
WorkerID: "1",
FileName: sdk.RandomString(10),
RunResultType: string(sdk.WorkflowRunResultTypeArtifact),
FilePerm: 0777,
StepOrder: 0,
WorkerName: sdk.RandomString(10),
},
})
require.NoError(t, err)
_, err = s.loadOrCreateItemUnitBuffer(context.TODO(), it.ID, sdk.CDNTypeItemRunResult)
require.NoError(t, err)

require.NoError(t, os.Mkdir(fmt.Sprintf("%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult)), 0755))

file1Path := fmt.Sprintf("%s/%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult), it.APIRefHash)
t.Logf("Creating file %s", file1Path)
content1 := []byte("I'm the real one")
f1, err := os.Create(file1Path)
require.NoError(t, err)
defer f1.Close()
_, err = f1.Write(content1)
require.NoError(t, err)

file2Path := fmt.Sprintf("%s/%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult), "wronghash")
t.Logf("Creating file %s", file2Path)
content2 := []byte("I'm not the real one")
f2, err := os.Create(file2Path)
require.NoError(t, err)
defer f2.Close()
_, err = f2.Write(content2)
require.NoError(t, err)

vars := make(map[string]string)
vars["id"] = s.Units.GetBuffer(sdk.CDNTypeItemRunResult).ID()
vars["type"] = string(sdk.CDNTypeItemRunResult)
uri := s.Router.GetRoute("POST", s.postAdminResyncBackendWithDatabaseHandler, vars) + "?dryRun=false"
require.NotEmpty(t, uri)
req := newRequest(t, "POST", uri, nil)
rec := httptest.NewRecorder()
s.Router.Mux.ServeHTTP(rec, req)
require.Equal(t, 204, rec.Code)

cpt := 0
for {
_, err := os.Stat(file2Path)
if os.IsNotExist(err) {
break
}
if cpt >= 20 {
t.FailNow()
}
cpt++
time.Sleep(250 * time.Millisecond)
}
_, err = os.Stat(file2Path)
require.True(t, os.IsNotExist(err))

_, err = os.Stat(file1Path)
require.NoError(t, err)

}

0 comments on commit 6b120b8

Please sign in to comment.