Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdn): add deduplication by item type #5804

Merged
merged 3 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ func LoadItemUnitsByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlE
return getAllItemUnits(ctx, m, db, query, opts...)
}

func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLocator string) (bool, error) {
query := "SELECT id FROM storage_unit_item WHERE unit_id = $1 AND hash_locator = $2 AND to_delete = false LIMIT 1"
func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLocator string, itemType sdk.CDNItemType) (bool, error) {
query := "SELECT id FROM storage_unit_item WHERE unit_id = $1 AND hash_locator = $2 AND type = $3 AND to_delete = false LIMIT 1"
var ids []string
_, err := db.Select(&ids, query, unitID, hashLocator)
_, err := db.Select(&ids, query, unitID, hashLocator, itemType)
return len(ids) > 0, sdk.WithStack(err)
}

Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/storage/storage_unit_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (r RunningStorageUnits) NewSource(ctx context.Context, refItemUnit sdk.CDNI
return &iuSource{iu: refItemUnit, source: unit}, nil
}

func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(locator string, unitID string) (bool, error) {
func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(locator string, unitID string, itemType sdk.CDNItemType) (bool, error) {
// Load all the itemUnit for the unit and the same hashLocator
hashLocator := r.HashLocator(locator)
return HasItemUnitsByUnitAndHashLocator(r.db, unitID, hashLocator)
return HasItemUnitsByUnitAndHashLocator(r.db, unitID, hashLocator, itemType)
}
2 changes: 1 addition & 1 deletion engine/cdn/storage/storageunit_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (x *RunningStorageUnits) Purge(ctx context.Context, s Interface) error {
var hasItemUnit bool
if _, hasLocator := s.(StorageUnitWithLocator); hasLocator {
var err error
hasItemUnit, err = x.GetItemUnitByLocatorByUnit(ui.Locator, s.ID())
hasItemUnit, err = x.GetItemUnitByLocatorByUnit(ui.Locator, s.ID(), ui.Type)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/storage/storageunit_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExec
}

// Check if the content (based on the locator) is already known from the destination unit
has, err := x.GetItemUnitByLocatorByUnit(iu.Locator, dest.ID())
has, err := x.GetItemUnitByLocatorByUnit(iu.Locator, dest.ID(), iu.Type)
if err != nil {
return err
}
Expand Down
201 changes: 201 additions & 0 deletions engine/cdn/storage/storageunit_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package storage_test

import (
"bufio"
"bytes"
"context"
"crypto/md5"
"crypto/sha512"
"encoding/hex"
"io"
"io/ioutil"
"os"
"testing"
"time"

Expand All @@ -15,12 +21,207 @@ import (
"github.com/ovh/cds/engine/cdn/storage"
_ "github.com/ovh/cds/engine/cdn/storage/local"
_ "github.com/ovh/cds/engine/cdn/storage/redis"
_ "github.com/ovh/cds/engine/cdn/storage/swift"
cdntest "github.com/ovh/cds/engine/cdn/test"
"github.com/ovh/cds/engine/gorpmapper"
commontest "github.com/ovh/cds/engine/test"
"github.com/ovh/cds/sdk"
)

func TestDeduplicationCrossType(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
storage.InitDBMapping(m)

db, cache := commontest.SetupPGWithMapper(t, m, sdk.TypeCDN)
cfg := commontest.LoadTestingConf(t, sdk.TypeCDN)

cdntest.ClearItem(t, context.TODO(), m, db)
cdntest.ClearSyncRedisSet(t, cache, "local_storage")

ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
t.Cleanup(cancel)

bufferDir, err := ioutil.TempDir("", t.Name()+"-cdnbuffer-1-*")
require.NoError(t, err)
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)

cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
SyncSeconds: 10,
SyncNbElements: 100,
HashLocatorSalt: "thisismysalt",
Buffers: map[string]storage.BufferConfiguration{
"redis_buffer": {
Redis: &storage.RedisBufferConfiguration{
Host: cfg["redisHost"],
Password: cfg["redisPassword"],
},
BufferType: storage.CDNBufferTypeLog,
},
"fs_buffer": {
Local: &storage.LocalBufferConfiguration{
Path: bufferDir,
},
BufferType: storage.CDNBufferTypeFile,
},
},
Storages: map[string]storage.StorageConfiguration{
"local_storage": {
Local: &storage.LocalStorageConfiguration{
Path: tmpDir,
Encryption: []convergent.ConvergentEncryptionConfig{
{
Cipher: aesgcm.CipherName,
LocatorSalt: "secret_locator_salt",
SecretValue: "secret_value",
},
},
},
},
},
})
require.NoError(t, err)
require.NotNil(t, cdnUnits)
cdnUnits.Start(ctx, sdk.NewGoRoutines())

units, err := storage.LoadAllUnits(ctx, m, db.DbMap)
require.NoError(t, err)
require.NotNil(t, units)
require.NotEmpty(t, units)

// Create Item Empty Log
apiRef := &sdk.CDNLogAPIRef{
ProjectKey: sdk.RandomString(5),
}
apiRefHash, err := apiRef.ToHash()
require.NoError(t, err)

i := &sdk.CDNItem{
APIRef: apiRef,
APIRefHash: apiRefHash,
Created: time.Now(),
Type: sdk.CDNTypeItemStepLog,
Status: sdk.CDNStatusItemIncoming,
}
require.NoError(t, item.Insert(ctx, m, db, i))
defer func() {
_ = item.DeleteByID(db, i.ID)
}()

itemUnit, err := cdnUnits.NewItemUnit(ctx, cdnUnits.LogsBuffer(), i)
require.NoError(t, err)
require.NoError(t, storage.InsertItemUnit(ctx, m, db, itemUnit))
itemUnit, err = storage.LoadItemUnitByID(ctx, m, db, itemUnit.ID, gorpmapper.GetOptions.WithDecryption)
require.NoError(t, err)
reader, err := cdnUnits.LogsBuffer().NewReader(context.TODO(), *itemUnit)
require.NoError(t, err)
h, err := convergent.NewHash(reader)
require.NoError(t, err)
i.Hash = h
i.Status = sdk.CDNStatusItemCompleted
require.NoError(t, item.Update(ctx, m, db, i))
require.NoError(t, err)
i, err = item.LoadByID(ctx, m, db, i.ID, gorpmapper.GetOptions.WithDecryption)
require.NoError(t, err)

localUnit, err := storage.LoadUnitByName(ctx, m, db, "local_storage")
require.NoError(t, err)

localUnitDriver := cdnUnits.Storage(localUnit.Name)
require.NotNil(t, localUnitDriver)

// Sync item in backend
require.NoError(t, cdnUnits.FillWithUnknownItems(ctx, cdnUnits.Storages[0], 100))
require.NoError(t, cdnUnits.FillSyncItemChannel(ctx, cdnUnits.Storages[0], 100))
time.Sleep(1 * time.Second)

<-ctx.Done()

// Check that the first unit has been resync
exists, err := localUnitDriver.ItemExists(context.TODO(), m, db, *i)
require.NoError(t, err)
require.True(t, exists)

logItemUnit, err := storage.LoadItemUnitByUnit(ctx, m, db, localUnitDriver.ID(), i.ID, gorpmapper.GetOptions.WithDecryption)
require.NoError(t, err)

// Add empty artifact
// Create Item Empty Log
apiRefArtifact := &sdk.CDNRunResultAPIRef{
ProjectKey: sdk.RandomString(5),
ArtifactName: "myfile.txt",
Perm: 0777,
}
apiRefHashArtifact, err := apiRefArtifact.ToHash()
require.NoError(t, err)

itemArtifact := &sdk.CDNItem{
APIRef: apiRefArtifact,
APIRefHash: apiRefHashArtifact,
Created: time.Now(),
Type: sdk.CDNTypeItemRunResult,
Status: sdk.CDNStatusItemCompleted,
}
iuArtifact, err := cdnUnits.NewItemUnit(ctx, cdnUnits.Buffers[1], itemArtifact)
require.NoError(t, err)

// Create Destination Writer
writer, err := cdnUnits.FileBuffer().NewWriter(ctx, *iuArtifact)
require.NoError(t, err)

// Compute md5 and sha512
artifactContent := make([]byte, 0)
artifactReader := bytes.NewReader(artifactContent)
md5Hash := md5.New()
sha512Hash := sha512.New()
pagesize := os.Getpagesize()
mreader := bufio.NewReaderSize(artifactReader, pagesize)
multiWriter := io.MultiWriter(md5Hash, sha512Hash)
teeReader := io.TeeReader(mreader, multiWriter)

require.NoError(t, cdnUnits.FileBuffer().Write(*iuArtifact, teeReader, writer))
sha512S := hex.EncodeToString(sha512Hash.Sum(nil))
md5S := hex.EncodeToString(md5Hash.Sum(nil))

itemArtifact.Hash = sha512S
itemArtifact.MD5 = md5S
itemArtifact.Size = 0
itemArtifact.Status = sdk.CDNStatusItemCompleted
iuArtifact, err = cdnUnits.NewItemUnit(ctx, cdnUnits.Buffers[1], itemArtifact)
require.NoError(t, err)
require.NoError(t, item.Insert(ctx, m, db, itemArtifact))
defer func() {
_ = item.DeleteByID(db, itemArtifact.ID)
}()
// Insert Item Unit
iuArtifact.ItemID = iuArtifact.Item.ID
require.NoError(t, storage.InsertItemUnit(ctx, m, db, iuArtifact))

// Check if the content (based on the locator) is already known from the destination unit
has, err := cdnUnits.GetItemUnitByLocatorByUnit(logItemUnit.Locator, cdnUnits.Storages[0].ID(), iuArtifact.Type)
require.NoError(t, err)
require.False(t, has)

require.NoError(t, cdnUnits.FillWithUnknownItems(ctx, cdnUnits.Storages[0], 100))
require.NoError(t, cdnUnits.FillSyncItemChannel(ctx, cdnUnits.Storages[0], 100))
time.Sleep(1 * time.Second)

<-ctx.Done()

// Check that the first unit has been resync
exists, err = localUnitDriver.ItemExists(context.TODO(), m, db, *itemArtifact)
require.NoError(t, err)
require.True(t, exists)

artItemUnit, err := storage.LoadItemUnitByUnit(ctx, m, db, localUnitDriver.ID(), itemArtifact.ID, gorpmapper.GetOptions.WithDecryption)
require.NoError(t, err)

require.Equal(t, logItemUnit.HashLocator, artItemUnit.HashLocator)
require.NotEqual(t, logItemUnit.Type, artItemUnit.Type)

}

func TestRun(t *testing.T) {
m := gorpmapper.New()
item.InitDBMapping(m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ export class WorkflowRunArtifactListComponent implements OnInit, OnDestroy {
}

getHumainFileSize(size: number): string {
if (size === 0) {
return '0B'
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
}
let i = Math.floor(Math.log(size) / Math.log(1024));
let hSize = (size / Math.pow(1024, i)).toFixed(2);
return hSize + ' ' + ['B', 'kB', 'MB', 'GB', 'TB'][i];
Expand Down