Skip to content

Commit

Permalink
fix(cdn): remove slow queries (#5574)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Dec 2, 2020
1 parent 05a32fa commit cc5bc23
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 44 deletions.
18 changes: 6 additions & 12 deletions engine/cdn/cdn_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,22 @@ func (s *Service) itemsGC(ctx context.Context) {

func (s *Service) markUnitItemToDeleteByItemID(ctx context.Context, itemID string) (int, error) {
db := s.mustDBWithCtx(ctx)
mapItemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, db, []string{itemID})
itemUnitIDs, err := storage.LoadAllItemUnitsIDsByItemID(db, itemID)
if err != nil {
return 0, err
}
uis, has := mapItemUnits[itemID]
if !has {
if len(itemUnitIDs) == 0 {
return 0, nil
}

ids := make([]string, len(uis))
for i := range uis {
ids[i] = uis[i].ID
}

tx, err := db.Begin()
if err != nil {
return 0, sdk.WithStack(err)
}

defer tx.Rollback() // nolint

n, err := storage.MarkItemUnitToDelete(tx, ids)
n, err := storage.MarkItemUnitToDelete(tx, itemUnitIDs)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -107,14 +101,14 @@ func (s *Service) cleanItemToDelete(ctx context.Context) error {
// If and only If there is not more unit item to mark as delete,
// let's delete the item in database
if nbUnitItemToDelete == 0 {
itemUnits, err := storage.LoadAllItemUnitsToDeleteByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), id)
nbItemUnits, err := storage.CountItemUnitsToDeleteByItemID(s.mustDBWithCtx(ctx), id)
if err != nil {
log.Error(ctx, "unable to count unit item %q to delete: %v", id, err)
continue
}

if len(itemUnits) > 0 {
log.Debug("cdn:purge:item: %d unit items to delete for item %s", len(itemUnits), id)
if nbItemUnits > 0 {
log.Debug("cdn:purge:item: %d unit items to delete for item %s", nbItemUnits, id)
} else {
if err := s.LogCache.Remove([]string{id}); err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func (s *Service) getItemLogValue(ctx context.Context, t sdk.CDNItemType, apiRef
func (s *Service) pushItemLogIntoCache(ctx context.Context, it sdk.CDNItem, unitName string) error {
t0 := time.Now()
// Search item in a storage unit
mapItemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), []string{it.ID})
itemUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), it.ID)
if err != nil {
return err
}
itemUnits, has := mapItemUnits[it.ID]
if !has {

if len(itemUnits) == 0 {
return sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", it.ID))
}

Expand Down
6 changes: 3 additions & 3 deletions engine/cdn/item_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,17 @@ func (s *Service) getItemCheckSyncHandler() service.Handler {
return err
}

itemsUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), []string{it.ID}, gorpmapper.GetOptions.WithDecryption)
itemsUnits, err := storage.LoadAllItemUnitsByItemIDs(ctx, s.Mapper, s.mustDBWithCtx(ctx), it.ID, gorpmapper.GetOptions.WithDecryption)
if err != nil {
return err
}

if len(itemsUnits) != 1 {
if len(itemsUnits) == 0 {
return sdk.WithStack(sdk.ErrNotFound)
}

var contents = map[string]*bytes.Buffer{}
for _, iu := range itemsUnits[it.ID] {
for _, iu := range itemsUnits {
src, err := s.Units.NewSource(ctx, iu)
if err != nil {
return err
Expand Down
37 changes: 20 additions & 17 deletions engine/cdn/storage/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ func LoadItemUnitsByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlE
return getAllItemUnits(ctx, m, db, query, opts...)
}

func LoadItemUnitsByUnitAndHashLocator(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, hashLocator string, size *int, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 AND hash_locator = $2 AND to_delete = false ORDER BY last_modified ASC LIMIT $3").Args(unitID, hashLocator, size)
return getAllItemUnits(ctx, m, db, query, opts...)
func CountItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLocator string, size *int) (int64, error) {
query := "SELECT COUNT(*) FROM storage_unit_item WHERE unit_id = $1 AND hash_locator = $2 AND to_delete = false LIMIT $3"
nb, err := db.SelectInt(query, unitID, hashLocator, size)
return nb, sdk.WithStack(err)
}

func LoadItemUnitByID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItemUnit, error) {
Expand Down Expand Up @@ -206,9 +207,10 @@ func getItemUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor,
return &i.CDNItemUnit, nil
}

func LoadAllItemUnitsToDeleteByID(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemID string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE item_id = $1 AND to_delete = true ORDER BY last_modified ASC").Args(itemID)
return getAllItemUnits(ctx, m, db, query, opts...)
func CountItemUnitsToDeleteByItemID(db gorp.SqlExecutor, itemID string) (int64, error) {
query := "SELECT COUNT(id) FROM storage_unit_item WHERE item_id = $1 AND to_delete = true"
nb, err := db.SelectInt(query, itemID)
return nb, sdk.WithStack(err)
}

func LoadAllItemUnitsToDeleteByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, unitID string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
Expand All @@ -225,18 +227,19 @@ func LoadAllItemUnitsIDsByItemIDsAndUnitID(db gorp.SqlExecutor, unitID string, i
return IDs, nil
}

func LoadAllItemUnitsByItemIDs(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemID []string, opts ...gorpmapper.GetOptionFunc) (map[string][]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE item_id = ANY($1) AND to_delete = false").Args(pq.StringArray(itemID))
allItemUnits, err := getAllItemUnits(ctx, m, db, query, opts...)
if err != nil {
return nil, err
}
var res = make(map[string][]sdk.CDNItemUnit, len(itemID))
for i := range allItemUnits {
var itemUnit = allItemUnits[i]
res[itemUnit.ItemID] = append(res[itemUnit.ItemID], itemUnit)
func LoadAllItemUnitsIDsByItemID(db gorp.SqlExecutor, itemID string) ([]string, error) {
var IDs []string
query := "SELECT storage_unit_item.id FROM storage_unit_item WHERE item_id = $1 AND to_delete = false"
if _, err := db.Select(&IDs, query, itemID); err != nil {
return nil, sdk.WithStack(err)
}
return res, nil
return IDs, nil
}

func LoadAllItemUnitsByItemIDs(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemID string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE item_id = $1 AND to_delete = false").Args(itemID)
allItemUnits, err := getAllItemUnits(ctx, m, db, query, opts...)
return allItemUnits, sdk.WithStack(err)
}

func getAllItemUnits(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, query gorpmapper.Query, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
Expand Down
7 changes: 3 additions & 4 deletions engine/cdn/storage/storage_unit_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ func (r RunningStorageUnits) GetSource(ctx context.Context, i *sdk.CDNItem) (Sou
}

// Find a storage unit where the item is complete
mapItemUnits, err := LoadAllItemUnitsByItemIDs(ctx, r.m, r.db, []string{i.ID}, gorpmapper.GetOptions.WithDecryption)
itemUnits, err := LoadAllItemUnitsByItemIDs(ctx, r.m, r.db, i.ID, gorpmapper.GetOptions.WithDecryption)
if err != nil {
return nil, err
}

var itemUnits = mapItemUnits[i.ID]
if len(itemUnits) == 0 {
log.Warning(ctx, "item %s can't be found. No unit knows it...", i.ID)
return nil, sdk.WithStack(sdk.ErrNotFound)
Expand Down Expand Up @@ -116,8 +115,8 @@ func (r RunningStorageUnits) NewSource(ctx context.Context, refItemUnit sdk.CDNI
return &iuSource{iu: refItemUnit, source: unit}, nil
}

func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(ctx context.Context, locator string, unitID string, opts ...gorpmapper.GetOptionFunc) ([]sdk.CDNItemUnit, error) {
func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(ctx context.Context, locator string, unitID string) (int64, error) {
// Load all the itemUnit for the unit and the same hashLocator
hashLocator := r.HashLocator(locator)
return LoadItemUnitsByUnitAndHashLocator(ctx, r.m, r.db, unitID, hashLocator, nil, opts...)
return CountItemUnitsByUnitAndHashLocator(r.db, unitID, hashLocator, nil)
}
6 changes: 3 additions & 3 deletions engine/cdn/storage/storageunit_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ func (x *RunningStorageUnits) Purge(ctx context.Context, s Interface) error {
return err
}
if exists {
otherItemUnits, err := x.GetItemUnitByLocatorByUnit(ctx, ui.Locator, s.ID())
nbItemUnits, err := x.GetItemUnitByLocatorByUnit(ctx, ui.Locator, s.ID())
if err != nil {
return err
}

if len(otherItemUnits) > 0 {
log.Debug("cdn:purge:%s: item unit %s content will not be deleted because there is %d other item units with the same content ", s.Name(), ui.ID, len(otherItemUnits))
if nbItemUnits > 0 {
log.Debug("cdn:purge:%s: item unit %s content will not be deleted because there is %d other item units with the same content ", s.Name(), ui.ID, nbItemUnits)
} else {
if err := s.Remove(ctx, ui); err != nil {
log.ErrorWithFields(ctx, log.Fields{
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/storage/storageunit_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExec
}

// Check if the content (based on the locator) is already known from the destination unit
otherItemUnits, err := x.GetItemUnitByLocatorByUnit(ctx, iu.Locator, dest.ID())
nbItemUnits, err := x.GetItemUnitByLocatorByUnit(ctx, iu.Locator, dest.ID())
if err != nil {
return err
}

if len(otherItemUnits) > 0 {
if nbItemUnits > 0 {
log.InfoWithFields(ctx, log.Fields{
"item_apiref": item.APIRefHash,
"item_size_num": item.Size,
Expand Down

0 comments on commit cc5bc23

Please sign in to comment.