Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdn): add route to be able to resync a backend with database #5829

Merged
merged 2 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions engine/cdn/cdn_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,6 @@ func (s *Service) initRouter(ctx context.Context) {
r.Handle("/admin/database/encryption/{entity}", nil, r.GET(s.getAdminDatabaseEncryptedTuplesByEntity))
r.Handle("/admin/database/encryption/{entity}/roll/{pk}", nil, r.POST(s.postAdminDatabaseRollEncryptedEntityByPrimaryKey))

r.Handle("/admin/backend/{id}/resync/{type}", nil, r.POST(s.postAdminResyncBackendWithDatabaseHandler))

}
5 changes: 5 additions & 0 deletions engine/cdn/storage/cds/cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ovh/cds/engine/gorpmapper"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdsclient"
"github.com/rockbears/log"
)

type CDS struct {
Expand Down Expand Up @@ -130,3 +131,7 @@ func (c *CDS) Read(_ sdk.CDNItemUnit, r io.Reader, w io.Writer) error {
func (c *CDS) Write(_ sdk.CDNItemUnit, _ io.Reader, _ io.Writer) error {
return nil
}

func (c *CDS) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for CDS storage unit")
}
13 changes: 13 additions & 0 deletions engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLo
return len(ids) > 0, sdk.WithStack(err)
}

func HashItemUnitByApiRefHash(db gorp.SqlExecutor, apiRefHash string, unitID string) (bool, error) {
query := `
SELECT count(sui.id) FROM storage_unit_item sui
JOIN item on item.id = sui.item_id
WHERE item.api_ref_hash = $1 AND unit_id = $2
`
nb, err := db.SelectInt(query, apiRefHash, unitID)
if err != nil {
return false, sdk.WithStack(err)
}
return nb > 0, nil
}

func LoadItemUnitByID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE id = $1 AND to_delete = false").Args(id)
return getItemUnit(ctx, m, db, query, opts...)
Expand Down
39 changes: 39 additions & 0 deletions engine/cdn/storage/local/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/go-gorp/gorp"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/cdn/storage"
"github.com/ovh/cds/engine/cdn/storage/encryption"
Expand Down Expand Up @@ -58,3 +62,38 @@ func (b *Buffer) Size(_ sdk.CDNItemUnit) (int64, error) {
func (b *Buffer) BufferType() storage.CDNBufferType {
return b.bufferType
}

func (b *Buffer) ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) {
root := fmt.Sprintf("%s/%s", b.config.Path, string(t))
if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if info == nil {
return nil
}
if info.IsDir() {
log.Warn(ctx, "local-buffer: found directory inside %s: %s", string(t), path)
return nil
}
_, fileName := filepath.Split(path)
has, err := storage.HashItemUnitByApiRefHash(db, fileName, b.ID())
if err != nil {
log.Error(ctx, "local-buffer: unable to check if unit item exist for api ref hash %s: %v", fileName, err)
return nil
}
if has {
return nil
}
if !dryRun {
if err := os.Remove(path); err != nil {
log.Error(ctx, "local-buffer: unable to remove file %s: %v", path, err)
return nil
}
log.Info(ctx, "local-buffer: file %s has been deleted", fileName)
} else {
log.Info(ctx, "local-buffer: file %s should be deleted", fileName)
}
return nil
}); err != nil {
log.Error(ctx, "local-buffer: error during walk operation: %v", err)
}

}
4 changes: 4 additions & 0 deletions engine/cdn/storage/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,7 @@ func (s *AbstractLocal) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
}
return nil
}

func (s *Local) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for local storage unit")
}
44 changes: 44 additions & 0 deletions engine/cdn/storage/nfs/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,47 @@ func (n *Buffer) ls(v *gonfs.Target, path string) ([]*gonfs.EntryPlus, error) {
}
return dirs, nil
}

func (n *Buffer) ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool) {
dial, target, err := n.Connect()
if err != nil {
log.Error(ctx, "nfs-buffer: unable to connect to NFS: %v", err)
return
}
defer dial.Close() // nolint
defer target.Close() //

entries, err := n.ls(target, string(t))
if err != nil {
log.Error(ctx, "nfs-buffer: unable to list directory %s", string(t))
return
}
for _, e := range entries {
if e.IsDir() {
log.Warn(ctx, "nfs-buffer: found directory inside %s: %s", string(t), e)
continue
}
if e.FileName == "" {
log.Warn(ctx, "nfs-buffer: missing file name")
continue
}
has, err := storage.HashItemUnitByApiRefHash(db, e.FileName, n.ID())
if err != nil {
log.Error(ctx, "nfs-buffer: unable to check if unit item exist for api ref hash %s: %v", e.FileName, err)
continue
}
if has {
continue
}
if !dryRun {
if err := target.Remove(string(t) + "/" + e.FileName); err != nil {
log.Error(ctx, "nfs-buffer: unable to remove file %s: %v", string(t)+"/"+e.FileName, err)
continue
}
log.Info(ctx, "nfs-buffer: file %s has been deleted", e.FileName)
} else {
log.Info(ctx, "nfs-buffer: file %s should be deleted", e.FileName)
}
}
return
}
5 changes: 5 additions & 0 deletions engine/cdn/storage/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/go-gorp/gorp"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/engine/cdn/redis"
Expand Down Expand Up @@ -142,3 +143,7 @@ func (s *Redis) Status(_ context.Context) []sdk.MonitoringStatusLine {
func (s *Redis) Remove(_ context.Context, i sdk.CDNItemUnit) error {
return sdk.WithStack(s.store.Delete(cache.Key(keyBuffer, i.ItemID)))
}

func (s *Redis) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for redis buffer unit")
}
4 changes: 4 additions & 0 deletions engine/cdn/storage/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,7 @@ func (s *S3) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
})
return sdk.WithStack(err)
}

func (s *S3) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for s3 storage unit")
}
4 changes: 4 additions & 0 deletions engine/cdn/storage/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ func (s *Swift) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
}
return nil
}

func (s *Swift) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for swift storage unit")
}
1 change: 1 addition & 0 deletions engine/cdn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type Unit interface {
Read(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
NewReader(ctx context.Context, i sdk.CDNItemUnit) (io.ReadCloser, error)
GetDriverName() string
ResyncWithDatabase(ctx context.Context, db gorp.SqlExecutor, t sdk.CDNItemType, dryRun bool)
}

type BufferUnit interface {
Expand Down
4 changes: 4 additions & 0 deletions engine/cdn/storage/webdav/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,7 @@ func (s *Webdav) Remove(ctx context.Context, i sdk.CDNItemUnit) error {
}
return sdk.WithStack(s.client.Remove(f))
}

func (s *Webdav) ResyncWithDatabase(ctx context.Context, _ gorp.SqlExecutor, _ sdk.CDNItemType, _ bool) {
log.Error(ctx, "Resynchronization with database not implemented for webdav storage unit")
}
32 changes: 32 additions & 0 deletions engine/cdn/unit_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,35 @@ func (s *Service) markItemUnitAsDeleteHandler() service.Handler {
return nil
}
}

func (s *Service) postAdminResyncBackendWithDatabaseHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
unitID := vars["id"]
itemType := vars["type"]

it := sdk.CDNItemType(itemType)
if err := it.Validate(); err != nil {
return err
}

dryRunString := r.FormValue("dryRun")
dryRun := dryRunString != "false"

for _, u := range s.Units.Buffers {
if u.ID() == unitID {
s.GoRoutines.Exec(context.Background(), "ResyncWithDB-"+unitID, func(ctx context.Context) {
u.ResyncWithDatabase(ctx, s.mustDBWithCtx(ctx), it, dryRun)
})
}
}
for _, u := range s.Units.Storages {
if u.ID() == unitID {
s.GoRoutines.Exec(context.Background(), "ResyncWithDB-"+unitID, func(ctx context.Context) {
u.ResyncWithDatabase(ctx, s.mustDBWithCtx(ctx), it, dryRun)
})
}
}
return nil
}
}
144 changes: 144 additions & 0 deletions engine/cdn/unit_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package cdn

import (
"context"
"fmt"
"github.com/ovh/cds/engine/cdn/item"
"github.com/ovh/cds/engine/cdn/storage"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/cdn"
"github.com/ovh/symmecrypt/ciphers/aesgcm"
"github.com/ovh/symmecrypt/convergent"
"github.com/ovh/symmecrypt/keyloader"
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
"io/ioutil"
"net/http/httptest"
"os"
"testing"
"time"
)
Expand Down Expand Up @@ -94,3 +102,139 @@ func TestMarkItemUnitAsDeleteHandler(t *testing.T) {
require.Equal(t, 204, recDel.Code)

}

func TestPostAdminResyncBackendWithDatabaseHandler(t *testing.T) {
s, db := newTestService(t)

cfg := test.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), s.Mapper, db)
cdntest.ClearItem(t, context.TODO(), s.Mapper, db)
cdntest.ClearSyncRedisSet(t, s.Cache, "local_storage")

// Start CDN
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*")
require.NoError(t, err)

cdnUnits, err := storage.Init(ctx, s.Mapper, s.Cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
SyncSeconds: 1,
SyncNbElements: 1000,
PurgeNbElements: 1000,
PurgeSeconds: 30,
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"refis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
"local_buffer": {
Local: &storage.LocalBufferConfiguration{
Path: tmpDir,
Encryption: []*keyloader.KeyConfig{
{
Key: "iamakey.iamakey.iamakey.iamakey.",
Cipher: aesgcm.CipherName,
Identifier: "local-bukker-id",
},
},
},
BufferType: storage.CDNBufferTypeFile,
},
},
Storages: map[string]storage.StorageConfiguration{
"local_storage": {
SyncParallel: 10,
SyncBandwidth: int64(1024 * 1024),
Local: &storage.LocalStorageConfiguration{
Path: tmpDir2,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
},
})
require.NoError(t, err)
s.Units = cdnUnits
cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx))

// Create an Item
it, err := s.loadOrCreateItem(context.TODO(), sdk.CDNTypeItemRunResult, cdn.Signature{
RunID: 1,
JobID: 1,
WorkflowID: 1,
NodeRunID: 1,
Worker: &cdn.SignatureWorker{
WorkerID: "1",
FileName: sdk.RandomString(10),
RunResultType: string(sdk.WorkflowRunResultTypeArtifact),
FilePerm: 0777,
StepOrder: 0,
WorkerName: sdk.RandomString(10),
},
})
require.NoError(t, err)
_, err = s.loadOrCreateItemUnitBuffer(context.TODO(), it.ID, sdk.CDNTypeItemRunResult)
require.NoError(t, err)

require.NoError(t, os.Mkdir(fmt.Sprintf("%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult)), 0755))

file1Path := fmt.Sprintf("%s/%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult), it.APIRefHash)
t.Logf("Creating file %s", file1Path)
content1 := []byte("I'm the real one")
f1, err := os.Create(file1Path)
require.NoError(t, err)
defer f1.Close()
_, err = f1.Write(content1)
require.NoError(t, err)

file2Path := fmt.Sprintf("%s/%s/%s", tmpDir, string(sdk.CDNTypeItemRunResult), "wronghash")
t.Logf("Creating file %s", file2Path)
content2 := []byte("I'm not the real one")
f2, err := os.Create(file2Path)
require.NoError(t, err)
defer f2.Close()
_, err = f2.Write(content2)
require.NoError(t, err)

vars := make(map[string]string)
vars["id"] = s.Units.GetBuffer(sdk.CDNTypeItemRunResult).ID()
vars["type"] = string(sdk.CDNTypeItemRunResult)
uri := s.Router.GetRoute("POST", s.postAdminResyncBackendWithDatabaseHandler, vars) + "?dryRun=false"
require.NotEmpty(t, uri)
req := newRequest(t, "POST", uri, nil)
rec := httptest.NewRecorder()
s.Router.Mux.ServeHTTP(rec, req)
require.Equal(t, 204, rec.Code)

cpt := 0
for {
_, err := os.Stat(file2Path)
if os.IsNotExist(err) {
break
}
if cpt >= 20 {
t.FailNow()
}
cpt++
time.Sleep(250 * time.Millisecond)
}
_, err = os.Stat(file2Path)
require.True(t, os.IsNotExist(err))

_, err = os.Stat(file1Path)
require.NoError(t, err)

}