From 7b6dcd879850e61253b1c1745c5af94f01aeab19 Mon Sep 17 00:00:00 2001 From: Guiheux Steven Date: Mon, 8 Mar 2021 10:40:08 +0100 Subject: [PATCH] fix(cdn,ui): fix download run-result + clean buffer (#5746) --- engine/cdn/cdn_gc.go | 18 ++++++++++----- engine/cdn/cdn_item.go | 2 ++ engine/cdn/item_handler.go | 2 ++ engine/cdn/storage/cds/cds.go | 8 ++++++- engine/cdn/storage/local/buffer.go | 8 ++++++- engine/cdn/storage/local/local.go | 8 ++++++- engine/cdn/storage/nfs/nfs.go | 8 ++++++- engine/cdn/storage/redis/redis.go | 8 ++++++- engine/cdn/storage/s3/s3.go | 8 ++++++- engine/cdn/storage/storage_unit_io.go | 2 ++ engine/cdn/storage/swift/swift.go | 8 ++++++- engine/cdn/storage/types.go | 22 +++++++++++++++++++ engine/cdn/storage/webdav/webdav.go | 8 ++++++- .../node/artifact/artifact.list.component.ts | 2 +- 14 files changed, 98 insertions(+), 14 deletions(-) diff --git a/engine/cdn/cdn_gc.go b/engine/cdn/cdn_gc.go index a84b2792b7..7a7b786f5d 100644 --- a/engine/cdn/cdn_gc.go +++ b/engine/cdn/cdn_gc.go @@ -151,27 +151,35 @@ func (s *Service) cleanBuffer(ctx context.Context) error { } log.Debug(ctx, "item to remove from buffer: %d", len(itemIDs)) if len(itemIDs) == 0 { - return nil + continue } itemUnitsIDs, err := storage.LoadAllItemUnitsIDsByItemIDsAndUnitID(s.mustDBWithCtx(ctx), bu.ID(), itemIDs) if err != nil { - return err + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "unable to load item units: %v", err) + continue } tx, err := s.mustDBWithCtx(ctx).Begin() if err != nil { - return sdk.WrapError(err, "unable to start transaction") + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "unable to start transaction: %v", err) + continue } if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil { _ = tx.Rollback() - return err + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "unable to mark item as delete: %v", err) + continue } if err := tx.Commit(); err != nil { _ = tx.Rollback() - return sdk.WithStack(err) + ctx := sdk.ContextWithStacktrace(ctx, err) + log.Error(ctx, "unable to commit transaction: %v", err) + continue } } return nil diff --git a/engine/cdn/cdn_item.go b/engine/cdn/cdn_item.go index 169211ac4c..82adf7afe1 100644 --- a/engine/cdn/cdn_item.go +++ b/engine/cdn/cdn_item.go @@ -353,6 +353,8 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID)) } + itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits) + var unit *sdk.CDNUnit var selectedItemUnit *sdk.CDNItemUnit if defaultUnitName != "" { diff --git a/engine/cdn/item_handler.go b/engine/cdn/item_handler.go index d8fc102e89..3502663250 100644 --- a/engine/cdn/item_handler.go +++ b/engine/cdn/item_handler.go @@ -184,6 +184,8 @@ func (s *Service) getItemCheckSyncHandler() service.Handler { return sdk.WithStack(sdk.ErrNotFound) } + itemsUnits = s.Units.FilterItemUnitReaderByType(itemsUnits) + var contents = map[string]*bytes.Buffer{} for _, iu := range itemsUnits { src, err := s.Units.NewSource(ctx, iu) diff --git a/engine/cdn/storage/cds/cds.go b/engine/cdn/storage/cds/cds.go index a248499cbb..974dee1b04 100644 --- a/engine/cdn/storage/cds/cds.go +++ b/engine/cdn/storage/cds/cds.go @@ -21,8 +21,14 @@ type CDS struct { config storage.CDSStorageConfiguration } +const driverName = "cds" + func init() { - storage.RegisterDriver("cds", new(CDS)) + storage.RegisterDriver(driverName, new(CDS)) +} + +func (c *CDS) GetDriverName() string { + return driverName } func (c *CDS) GetClient() cdsclient.Interface { diff --git a/engine/cdn/storage/local/buffer.go b/engine/cdn/storage/local/buffer.go index d4c13bb132..d7939939b4 100644 --- a/engine/cdn/storage/local/buffer.go +++ b/engine/cdn/storage/local/buffer.go @@ -21,8 +21,14 @@ type Buffer struct { bufferType storage.CDNBufferType } +const driverBufferName = "local-buffer" + func init() { - storage.RegisterDriver("local-buffer", new(Buffer)) + storage.RegisterDriver(driverBufferName, new(Buffer)) +} + +func (b *Buffer) GetDriverName() string { + return driverBufferName } func (b *Buffer) Init(ctx context.Context, cfg interface{}, bufferType storage.CDNBufferType) error { diff --git a/engine/cdn/storage/local/local.go b/engine/cdn/storage/local/local.go index 72e93d1077..91ccddf62c 100644 --- a/engine/cdn/storage/local/local.go +++ b/engine/cdn/storage/local/local.go @@ -30,8 +30,14 @@ type Local struct { encryption.ConvergentEncryption } +const driverName = "local" + func init() { - storage.RegisterDriver("local", new(Local)) + storage.RegisterDriver(driverName, new(Local)) +} + +func (s *Local) GetDriverName() string { + return driverName } func (s *Local) Init(ctx context.Context, cfg interface{}) error { diff --git a/engine/cdn/storage/nfs/nfs.go b/engine/cdn/storage/nfs/nfs.go index 7092728b95..b6c5b5cee1 100644 --- a/engine/cdn/storage/nfs/nfs.go +++ b/engine/cdn/storage/nfs/nfs.go @@ -32,8 +32,14 @@ var ( _ storage.FileBufferUnit = new(Buffer) ) +const driverBufferName = "nfs-buffer" + func init() { - storage.RegisterDriver("nfs-buffer", new(Buffer)) + storage.RegisterDriver(driverBufferName, new(Buffer)) +} + +func (n *Buffer) GetDriverName() string { + return driverBufferName } type Reader struct { diff --git a/engine/cdn/storage/redis/redis.go b/engine/cdn/storage/redis/redis.go index 99a334258d..5e6de881eb 100644 --- a/engine/cdn/storage/redis/redis.go +++ b/engine/cdn/storage/redis/redis.go @@ -27,8 +27,14 @@ type Redis struct { bufferType storage.CDNBufferType } +const driverName = "redis" + func init() { - storage.RegisterDriver("redis", new(Redis)) + storage.RegisterDriver(driverName, new(Redis)) +} + +func (s *Redis) GetDriverName() string { + return driverName } func (s *Redis) Init(_ context.Context, cfg interface{}, bufferType storage.CDNBufferType) error { diff --git a/engine/cdn/storage/s3/s3.go b/engine/cdn/storage/s3/s3.go index c507c2e8cc..20f136b931 100644 --- a/engine/cdn/storage/s3/s3.go +++ b/engine/cdn/storage/s3/s3.go @@ -34,8 +34,14 @@ var ( _ storage.StorageUnit = new(S3) ) +const driverName = "s3" + func init() { - storage.RegisterDriver("s3", new(S3)) + storage.RegisterDriver(driverName, new(S3)) +} + +func (s *S3) GetDriverName() string { + return driverName } func (s *S3) Init(_ context.Context, cfg interface{}) error { diff --git a/engine/cdn/storage/storage_unit_io.go b/engine/cdn/storage/storage_unit_io.go index 1842085104..99a48e89f3 100644 --- a/engine/cdn/storage/storage_unit_io.go +++ b/engine/cdn/storage/storage_unit_io.go @@ -76,6 +76,8 @@ func (r RunningStorageUnits) GetSource(ctx context.Context, i *sdk.CDNItem) (Sou return nil, sdk.WithStack(sdk.ErrNotFound) } + itemUnits = r.FilterItemUnitReaderByType(itemUnits) + // Random pick a unit idx := 0 if len(itemUnits) > 1 { diff --git a/engine/cdn/storage/swift/swift.go b/engine/cdn/storage/swift/swift.go index 335fb3694c..310f361ab3 100644 --- a/engine/cdn/storage/swift/swift.go +++ b/engine/cdn/storage/swift/swift.go @@ -28,8 +28,14 @@ var ( _ storage.StorageUnit = new(Swift) ) +const driverName = "swift" + func init() { - storage.RegisterDriver("swift", new(Swift)) + storage.RegisterDriver(driverName, new(Swift)) +} + +func (s *Swift) GetDriverName() string { + return driverName } func (s *Swift) Init(_ context.Context, cfg interface{}) error { diff --git a/engine/cdn/storage/types.go b/engine/cdn/storage/types.go index 2b5eaedda2..e6574715b6 100644 --- a/engine/cdn/storage/types.go +++ b/engine/cdn/storage/types.go @@ -97,6 +97,7 @@ func (a *AbstractUnit) SyncBandwidth() float64 { type Unit interface { Read(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error NewReader(ctx context.Context, i sdk.CDNItemUnit) (io.ReadCloser, error) + GetDriverName() string } type BufferUnit interface { @@ -284,6 +285,27 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit { } } +func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit { + // Remove cds backend from getting something that is not a log + if ius[0].Type != sdk.CDNTypeItemStepLog && ius[0].Type != sdk.CDNTypeItemServiceLog { + var cdsBackendID string + for _, unit := range x.Storages { + if unit.GetDriverName() == "cds" { + cdsBackendID = unit.ID() + break + } + } + + for i, s := range ius { + if s.UnitID == cdsBackendID { + ius = append(ius[:i], ius[i+1:]...) + break + } + } + } + return ius +} + type LogConfig struct { // Step logs StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"` diff --git a/engine/cdn/storage/webdav/webdav.go b/engine/cdn/storage/webdav/webdav.go index f2c6005428..e96401feaa 100644 --- a/engine/cdn/storage/webdav/webdav.go +++ b/engine/cdn/storage/webdav/webdav.go @@ -28,8 +28,14 @@ var ( _ storage.StorageUnit = new(Webdav) ) +const driverName = "webdav" + func init() { - storage.RegisterDriver("webdav", new(Webdav)) + storage.RegisterDriver(driverName, new(Webdav)) +} + +func (s *Webdav) GetDriverName() string { + return driverName } func (s *Webdav) Init(_ context.Context, cfg interface{}) error { diff --git a/ui/src/app/views/workflow/run/node/artifact/artifact.list.component.ts b/ui/src/app/views/workflow/run/node/artifact/artifact.list.component.ts index 8e482433bc..45aa737a02 100644 --- a/ui/src/app/views/workflow/run/node/artifact/artifact.list.component.ts +++ b/ui/src/app/views/workflow/run/node/artifact/artifact.list.component.ts @@ -43,7 +43,7 @@ export class WorkflowRunArtifactListComponent implements OnInit, OnDestroy { let size = this.getHumainFileSize(a.size); let link = `./cdsapi/workflow/artifact/${a.download_hash}` if (!a.id) { - link = `./cdscdn/item/artifact/${a.download_hash}/download` + link = `./cdscdn/item/run-result/${a.download_hash}/download` } return { link,