From 595783e52edff04db6759f68109846609e009a0b Mon Sep 17 00:00:00 2001 From: Avi Deitcher Date: Wed, 20 Dec 2023 11:01:35 +0200 Subject: [PATCH] add support for optional progress updates channel Signed-off-by: Avi Deitcher --- copy.go | 27 +++++++++++++++++++++++++-- progress_reader.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 progress_reader.go diff --git a/copy.go b/copy.go index 9caed9809..32786b621 100644 --- a/copy.go +++ b/copy.go @@ -112,6 +112,23 @@ type CopyGraphOptions struct { // source storage to fetch large blobs. // If FindSuccessors is nil, content.Successors will be used. FindSuccessors func(ctx context.Context, fetcher content.Fetcher, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) + + // UpdateChannel is an optional channel to receive progress updates. + // Each update will include the number of bytes copied for a particular blob + // or manifest, the expected total size, and the descriptor of the blob or + // manifest. It is up to the consumer of the channel to differentiate + // between updates among different blobs and manifests; no mechanism is + // provided for distinguishing between them, other than the descriptor + // passed with each update. The total size of downloads of all blobs and + // manifests is not provided, as it is not known. You can calculate the + // percentage downloaded for a particular blob in an individual update + // based on the total size of that blob, which is provided in the + // descriptor, and the number of bytes copied, which is provided in the + // update. + // Updates are sent each time a block is copied. The number of bytes copied + // depends upon io.Copy, which, by default, is 32KB. As of now, this cannot + // be changed. We may provided that capability in a future update. + UpdateChannel chan<- CopyUpdate } // Copy copies a rooted directed acyclic graph (DAG) with the tagged root node @@ -266,11 +283,17 @@ func copyGraph(ctx context.Context, src content.ReadOnlyStorage, dst content.Sto } // doCopyNode copies a single content from the source CAS to the destination CAS. -func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor) error { +func doCopyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Storage, desc ocispec.Descriptor, ch chan<- CopyUpdate) error { rc, err := src.Fetch(ctx, desc) if err != nil { return err } + if ch != nil { + rc = &progressReader{ + c: ch, + r: rc, + } + } defer rc.Close() err = dst.Push(ctx, desc, rc) if err != nil && !errors.Is(err, errdef.ErrAlreadyExists) { @@ -291,7 +314,7 @@ func copyNode(ctx context.Context, src content.ReadOnlyStorage, dst content.Stor } } - if err := doCopyNode(ctx, src, dst, desc); err != nil { + if err := doCopyNode(ctx, src, dst, desc, opts.UpdateChannel); err != nil { return err } diff --git a/progress_reader.go b/progress_reader.go new file mode 100644 index 000000000..a5a887f53 --- /dev/null +++ b/progress_reader.go @@ -0,0 +1,46 @@ +/* +Copyright The ORAS Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package oras + +import ( + "io" + + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +type CopyUpdate struct { + Copied int64 + Descriptor ocispec.Descriptor +} + +type progressReader struct { + desc ocispec.Descriptor + r io.ReadCloser + c chan<- CopyUpdate +} + +func (p *progressReader) Close() error { + close(p.c) + return p.r.Close() +} + +func (p *progressReader) Read(buf []byte) (int, error) { + n, err := p.r.Read(buf) + if n > 0 { + p.c <- CopyUpdate{Copied: int64(n), Descriptor: p.desc} + } + return n, err +}