Skip to content

Commit

Permalink
fix(cdn): implement a locator derivation key to improve deduplication (
Browse files Browse the repository at this point in the history
…#5528)


Signed-off-by: francois  samin <[email protected]>
  • Loading branch information
fsamin authored Nov 2, 2020
1 parent b53d93e commit b5f7303
Show file tree
Hide file tree
Showing 34 changed files with 199 additions and 126 deletions.
48 changes: 43 additions & 5 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ func (s *Service) itemsGC(ctx context.Context) {

func (s *Service) markUnitItemToDeleteByItemID(ctx context.Context, itemID string) (int, error) {
db := s.mustDBWithCtx(ctx)
uis, err := storage.LoadAllItemUnitsByItemID(ctx, s.Mapper, db, itemID)
mapItemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, db, []string{itemID})
if err != nil {
return 0, err
}
uis, has := mapItemUnits[itemID]
if !has {
return 0, nil
}

ids := make([]string, len(uis))
for i := range uis {
Expand Down Expand Up @@ -139,21 +143,55 @@ func (s *Service) cleanBuffer(ctx context.Context) error {
cdsBackendID = sto.ID()
break
}
if cdsBackendID == "" {
return nil

itemIDs, err := storage.LoadAllSynchronizedItemIDs(s.mustDBWithCtx(ctx))
if err != nil {
return err
}
itemIDs, err := storage.LoadAllItemsIDInBufferAndAllUnitsExceptCDS(s.mustDBWithCtx(ctx), cdsBackendID)

var itemUnitIDsToRemove []string
mapItemunits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemIDs)
if err != nil {
return err
}

if len(mapItemunits) == 0 {
return nil
}

for _, itemunits := range mapItemunits {
var countWithoutCDSBackend = len(itemunits)
var bufferItemUnit string
for _, iu := range itemunits {
switch iu.UnitID {
case cdsBackendID:
countWithoutCDSBackend--
case s.Units.Buffer.ID():
bufferItemUnit = iu.ID
}

if countWithoutCDSBackend > 1 {
itemUnitIDsToRemove = append(itemUnitIDsToRemove, bufferItemUnit)
}
}
}

if len(itemUnitIDsToRemove) == 0 {
return nil
}

log.Debug("removing %d from buffer unit", len(itemUnitIDsToRemove))

tx, err := s.mustDBWithCtx(ctx).Begin()
if err != nil {
return sdk.WrapError(err, "unable to start transaction")
}
defer tx.Rollback() //nolint
if err := storage.DeleteItemsUnit(tx, s.Units.Buffer.ID(), itemIDs); err != nil {

if _, err := storage.MarkItemUnitToDelete(ctx, s.Mapper, tx, itemUnitIDsToRemove); err != nil {
return err
}

return sdk.WithStack(tx.Commit())
}

Expand Down
1 change: 1 addition & 0 deletions engine/cdn/cdn_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestCleanSynchronizedItem(t *testing.T) {
require.NoError(t, err)

cdnUnits, err := storage.Init(context.TODO(), m, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffer: storage.BufferConfiguration{
Name: "redis_buffer",
Redis: storage.RedisBufferConfiguration{
Expand Down
7 changes: 6 additions & 1 deletion engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,15 @@ func (s *Service) getItemLogValue(ctx context.Context, t sdk.CDNItemType, apiRef
func (s *Service) pushItemLogIntoCache(ctx context.Context, it sdk.CDNItem) error {
t0 := time.Now()
// Search item in a storage unit
itemUnits, err := storage.LoadAllItemUnitsByItemID(ctx, s.Mapper, s.mustDBWithCtx(ctx), it.ID)
mapItemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), []string{it.ID})
if err != nil {
return err
}
itemUnits, has := mapItemUnits[it.ID]
if !has {
return sdk.WithStack(fmt.Errorf("unable to find item units"))
}

// Random pick a unit
idx := 0
if len(itemUnits) > 1 {
Expand Down
36 changes: 18 additions & 18 deletions engine/cdn/cdn_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

gocache "github.com/patrickmn/go-cache"
"github.com/spf13/cast"

"github.com/ovh/cds/engine/cache"
"github.com/ovh/cds/sdk"
Expand Down Expand Up @@ -49,13 +50,15 @@ func (s *Service) runTCPLogServer(ctx context.Context) {
})

for i := int64(0); i < s.Cfg.NbJobLogsGoroutines; i++ {
log.Info(ctx, "CDN> Starting dequeueJobLogs - cdn-worker-job-%d", i)
s.GoRoutines.Run(ctx, fmt.Sprintf("cdn-worker-job-%d", i), func(ctx context.Context) {
if err := s.dequeueJobLogs(ctx); err != nil {
log.Error(ctx, "dequeueJobLogs: unable to dequeue redis incoming job logs: %v", err)
}
})
}
for i := int64(0); i < s.Cfg.NbServiceLogsGoroutines; i++ {
log.Info(ctx, "CDN> Starting dequeueServiceLogs - cdn-worker-service-%d", i)
s.GoRoutines.Run(ctx, fmt.Sprintf("cdn-worker-service-%d", i), func(ctx context.Context) {
if err := s.dequeueServiceLogs(ctx); err != nil {
log.Error(ctx, "dequeueJobLogs: unable to dequeue redis incoming service logs: %v", err)
Expand Down Expand Up @@ -94,7 +97,7 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) {
for {
bytes, err := bufReader.ReadBytes(byte(0))
if err != nil {
log.Debug("client left: %v", err)
log.Debug("client left: (%v) %v", conn.RemoteAddr(), err)
return
}
// remove byte(0)
Expand Down Expand Up @@ -166,17 +169,14 @@ func (s *Service) handleWorkerLog(ctx context.Context, workerName string, worker
line = int64(lineI.(float64))
}

var status string
statusI := m.Extra["_"+log.ExtraFieldJobStatus]
if statusI != nil {
status = statusI.(string)
}
terminatedI := m.Extra["_"+log.ExtraFieldTerminated]
terminated := cast.ToBool(terminatedI)

hm := handledMessage{
Signature: signature,
Msg: m,
Line: line,
Status: status,
Signature: signature,
Msg: m,
Line: line,
IsTerminated: terminated,
}

if s.cdnEnabled(ctx, signature.ProjectKey) {
Expand Down Expand Up @@ -268,17 +268,17 @@ func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatche
line = int64(lineI.(float64))
}

var status string
statusI := m.Extra["_"+log.ExtraFieldJobStatus]
if statusI != nil {
status = statusI.(string)
var terminated bool
terminatedI := m.Extra["_"+log.ExtraFieldTerminated]
if terminatedI != nil {
terminated = terminatedI.(bool)
}

hm := handledMessage{
Signature: signature,
Msg: m,
Line: line,
Status: status,
Signature: signature,
Msg: m,
Line: line,
IsTerminated: terminated,
}
if s.cdnEnabled(ctx, signature.ProjectKey) {
if err := s.Cache.Enqueue(keyServiceLogIncomingQueue, hm); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions engine/cdn/cdn_log_dequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *Service) storeLogsWithRetry(ctx context.Context, itemType sdk.CDNItemTy
currentLog := buildMessage(hm)
cpt := 0
for {
if err := s.storeLogs(ctx, itemType, hm.Signature, hm.Status, currentLog, hm.Line); err != nil {
if err := s.storeLogs(ctx, itemType, hm.Signature, hm.IsTerminated, currentLog, hm.Line); err != nil {
if sdk.ErrorIs(err, sdk.ErrLocked) && cpt < 10 {
cpt++
time.Sleep(250 * time.Millisecond)
Expand All @@ -86,7 +86,7 @@ func (s *Service) storeLogsWithRetry(ctx context.Context, itemType sdk.CDNItemTy
}
}

func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signature log.Signature, status string, content string, line int64) error {
func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signature log.Signature, terminated bool, content string, line int64) error {
it, err := s.loadOrCreateItem(ctx, itemType, signature)
if err != nil {
return err
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *Service) storeLogs(ctx context.Context, itemType sdk.CDNItemType, signa

maxLineKey := cache.Key("cdn", "log", "size", it.ID)
maxItemLine := -1
if sdk.StatusIsTerminated(status) {
if terminated {
maxItemLine = int(line)
// store the score of last line
if err := s.Cache.SetWithTTL(maxLineKey, maxItemLine, ItemLogGC); err != nil {
Expand Down
28 changes: 13 additions & 15 deletions engine/cdn/cdn_log_dequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func TestStoreNewStepLog(t *testing.T) {
Msg: hook.Message{
Full: "this is a message",
},
Status: "Building",
Line: 0,
Line: 0,
Signature: log.Signature{
ProjectKey: sdk.RandomString(10),
WorkflowID: 1,
Expand All @@ -70,7 +69,7 @@ func TestStoreNewStepLog(t *testing.T) {
}

content := buildMessage(hm)
require.NoError(t, s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.Status, content, hm.Line))
require.NoError(t, s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, false, content, hm.Line))

apiRef := sdk.CDNLogAPIRef{
ProjectKey: hm.Signature.ProjectKey,
Expand Down Expand Up @@ -133,9 +132,9 @@ func TestStoreLastStepLog(t *testing.T) {
s.Units = cdnUnits

hm := handledMessage{
Msg: hook.Message{},
Status: sdk.StatusSuccess,
Line: 0,
Msg: hook.Message{},
IsTerminated: sdk.StatusTerminated,
Line: 0,
Signature: log.Signature{
ProjectKey: sdk.RandomString(10),
WorkflowID: 1,
Expand Down Expand Up @@ -178,7 +177,7 @@ func TestStoreLastStepLog(t *testing.T) {

}()
content := buildMessage(hm)
err = s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.Status, content, hm.Line)
err = s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.IsTerminated, content, hm.Line)
require.NoError(t, err)

itemDB, err := item.LoadByID(context.TODO(), s.Mapper, db, it.ID)
Expand Down Expand Up @@ -227,8 +226,8 @@ func TestStoreLogWrongOrder(t *testing.T) {
Msg: hook.Message{
Full: "this is a message",
},
Status: sdk.StatusSuccess,
Line: 1,
IsTerminated: sdk.StatusTerminated,
Line: 1,
Signature: log.Signature{
ProjectKey: sdk.RandomString(10),
WorkflowID: 1,
Expand Down Expand Up @@ -271,7 +270,7 @@ func TestStoreLogWrongOrder(t *testing.T) {
}()

content := buildMessage(hm)
err = s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.Status, content, hm.Line)
err = s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.IsTerminated, content, hm.Line)
require.NoError(t, err)

itemDB, err := item.LoadByID(context.TODO(), s.Mapper, db, it.ID)
Expand All @@ -291,10 +290,10 @@ func TestStoreLogWrongOrder(t *testing.T) {

// Received Missing log
hm.Line = 0
hm.Status = ""
hm.IsTerminated = false
content = buildMessage(hm)

err = s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.Status, content, hm.Line)
err = s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.IsTerminated, content, hm.Line)
require.NoError(t, err)

itemDB2, err := item.LoadByID(context.TODO(), s.Mapper, db, it.ID)
Expand Down Expand Up @@ -347,8 +346,7 @@ func TestStoreNewServiceLog(t *testing.T) {
Msg: hook.Message{
Full: "this is a message",
},
Status: "Building",
Line: 0,
Line: 0,
Signature: log.Signature{
ProjectKey: sdk.RandomString(10),
WorkflowID: 1,
Expand All @@ -362,7 +360,7 @@ func TestStoreNewServiceLog(t *testing.T) {
}

content := buildMessage(hm)
require.NoError(t, s.storeLogs(context.TODO(), sdk.CDNTypeItemServiceLog, hm.Signature, hm.Status, content, 0))
require.NoError(t, s.storeLogs(context.TODO(), sdk.CDNTypeItemServiceLog, hm.Signature, hm.IsTerminated, content, 0))

apiRef := sdk.CDNLogAPIRef{
ProjectKey: hm.Signature.ProjectKey,
Expand Down
1 change: 1 addition & 0 deletions engine/cdn/cdn_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestSyncLog(t *testing.T) {
}

cdnUnits, err := storage.Init(context.TODO(), m, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffer: storage.BufferConfiguration{
Name: "redis_buffer",
Redis: storage.RedisBufferConfiguration{
Expand Down
1 change: 1 addition & 0 deletions engine/cdn/cdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func newRunningStorageUnits(t *testing.T, m *gorpmapper.Mapper, dbMap *gorp.DbMa
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
require.NoError(t, err)
cdnUnits, err := storage.Init(ctx, m, dbMap, sdk.NewGoRoutines(), storage.Configuration{
HashLocatorSalt: "thisismysalt",
Buffer: storage.BufferConfiguration{
Name: "redis_buffer",
Redis: storage.RedisBufferConfiguration{
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.
// ComputeSizeByIDs returns the size used by givenn item IDs
func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) {
query := `
SELECT SUM(size) FROM item
SELECT COALESCE(SUM(size), 0) FROM item
WHERE id = ANY($1)
`
size, err := db.SelectInt(query, pq.StringArray(itemIDs))
Expand Down
11 changes: 8 additions & 3 deletions engine/cdn/item_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,18 @@ func (s *Service) getItemHandler() service.Handler {

var res sdk.CDNItemResume
res.CDNItem = *it
res.Location = make(map[string]sdk.CDNItemUnit)

iu, err := storage.LoadItemUnitByUnit(ctx, s.Mapper, s.mustDBWithCtx(ctx), s.Units.Buffer.ID(), it.ID, opts...)
if err != nil {
return err
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
return err
}
}

if iu != nil {
res.Location[s.Units.Buffer.Name()] = *iu
}
res.Location = make(map[string]sdk.CDNItemUnit)
res.Location[s.Units.Buffer.Name()] = *iu

for _, strg := range s.Units.Storages {
iu, err := storage.LoadItemUnitByUnit(ctx, s.Mapper, s.mustDBWithCtx(ctx), strg.ID(), it.ID, opts...)
Expand Down
6 changes: 3 additions & 3 deletions engine/cdn/item_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func TestGetItemLogsDownloadHandler(t *testing.T) {
Msg: hook.Message{
Full: "this is a message",
},
Status: sdk.StatusSuccess,
Line: 2,
IsTerminated: sdk.StatusTerminated,
Line: 2,
Signature: log.Signature{
ProjectKey: projectKey,
WorkflowID: 1,
Expand All @@ -135,7 +135,7 @@ func TestGetItemLogsDownloadHandler(t *testing.T) {
}

content := buildMessage(hm)
err := s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.Status, content, hm.Line)
err := s.storeLogs(context.TODO(), sdk.CDNTypeItemStepLog, hm.Signature, hm.IsTerminated, content, hm.Line)
require.NoError(t, err)

signer, err := authentication.NewSigner("cdn-test", test.SigningKey)
Expand Down
Loading

0 comments on commit b5f7303

Please sign in to comment.