Skip to content

Commit

Permalink
Fix LoadRemote to return all possible compressions
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <[email protected]>
  • Loading branch information
ktock committed Oct 11, 2021
1 parent 579de0b commit cadec20
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 82 deletions.
15 changes: 9 additions & 6 deletions cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,7 @@ func TestConversion(t *testing.T) {
require.NoError(t, eg.Wait())
}

func TestGetRemote(t *testing.T) {
func TestGetRemotes(t *testing.T) {
t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" {
Expand Down Expand Up @@ -1252,19 +1252,22 @@ func TestGetRemote(t *testing.T) {

checkNumBlobs(ctx, t, co.cs, 1)

// Call GetRemote on all the refs
// Call GetRemotes on all the refs
eg, egctx := errgroup.WithContext(ctx)
for _, ir := range refs {
ir := ir.(*immutableRef)
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: true,
compressionopt := []solver.CompressionOpt{
{
Type: compressionType,
Force: true,
},
}
eg.Go(func() error {
remote, err := ir.GetRemote(egctx, true, compressionopt, nil)
remotes, err := ir.GetRemotes(egctx, true, compressionopt, false, nil)
require.NoError(t, err)
remote := remotes[0]
refChain := ir.parentRefChain()
for i, desc := range remote.Descriptors {
switch compressionType {
Expand Down
2 changes: 1 addition & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ImmutableRef interface {
Clone() ImmutableRef

Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error)
GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt []solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error)
}

type MutableRef interface {
Expand Down
33 changes: 30 additions & 3 deletions cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,36 @@ type Unlazier interface {
Unlazy(ctx context.Context) error
}

// GetRemote gets a *solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemote instead as moby integration requires custom GetRemote implementation.
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error) {
// GetRemotes gets []*solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemotes instead as moby integration requires custom GetRemotes implementation.
func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt []solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error) {
if len(compressionopt) == 0 {
compressionopt = append(compressionopt, solver.CompressionOpt{
Type: compression.Default,
})
}
var res []*solver.Remote
for _, c := range compressionopt {
// The first compression option is applied to the newly created blobs.
// If force option is enabled in a compression option, its Remote will be returned as well.
if len(res) == 0 || c.Force {
r, err := sr.getRemote(ctx, createIfNeeded, c, s)
if err != nil {
return nil, err
}
res = append(res, r)
if !all {
break // early return if all isn't specified
}
}
}
if len(res) == 0 {
return nil, errors.Errorf("no layer found for the specified compression")
}
return res, nil
}

func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error) {
compressionType := compressionopt.Type
forceCompression := compressionopt.Force
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
Expand Down
22 changes: 20 additions & 2 deletions cache/remotecache/v1/cachestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,27 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult)
return worker.NewWorkerRefResult(ref, cs.w), nil
}

func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, _ session.Group, _ solver.CompressionOpt) (*solver.Remote, error) {
func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopts []solver.CompressionOpt, _ bool, _ session.Group) ([]*solver.Remote, error) {
if r := cs.byResultID(res.ID); r != nil && r.result != nil {
return r.result, nil
if len(compressionopts) == 0 {
return []*solver.Remote{r.result}, nil
}
// Remote must meet any of the specified compression options.
for _, c := range compressionopts {
match := false
for _, desc := range r.result.Descriptors {
m := c.Type.IsMediaType(desc.MediaType)
match = match || m
if c.Force && !m {
match = false
break
}
}
if match {
return []*solver.Remote{r.result}, nil
}
}
return nil, nil // return nil as it's best effort.
}
return nil, errors.WithStack(solver.ErrNotFound)
}
Expand Down
9 changes: 6 additions & 3 deletions exporter/containerimage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,21 +302,23 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
Force: e.forceCompression,
}
if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, compressionopt, session.NewGroup(sessionID))
remotes, err := src.Ref.GetRemotes(ctx, false, []solver.CompressionOpt{compressionopt}, false, session.NewGroup(sessionID))
if err != nil {
return nil, err
}
remote := remotes[0]
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
addAnnotations(annotations, desc)
}
}
if len(src.Refs) > 0 {
for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, compressionopt, session.NewGroup(sessionID))
remotes, err := r.GetRemotes(ctx, false, []solver.CompressionOpt{compressionopt}, false, session.NewGroup(sessionID))
if err != nil {
return nil, err
}
remote := remotes[0]
for _, desc := range remote.Descriptors {
mprovider.Add(desc.Digest, remote.Provider)
addAnnotations(annotations, desc)
Expand Down Expand Up @@ -370,10 +372,11 @@ func (e *imageExporterInstance) unpackImage(ctx context.Context, img images.Imag
Type: e.layerCompression,
Force: e.forceCompression,
}
remote, err := topLayerRef.GetRemote(ctx, true, compressionopt, s)
remotes, err := topLayerRef.GetRemotes(ctx, true, []solver.CompressionOpt{compressionopt}, false, s)
if err != nil {
return err
}
remote := remotes[0]

// ensure the content for each layer exists locally in case any are lazy
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
Expand Down
3 changes: 2 additions & 1 deletion exporter/containerimage/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,11 @@ func (ic *ImageWriter) exportLayers(ctx context.Context, compressionType compres
return
}
eg.Go(func() error {
remote, err := ref.GetRemote(ctx, true, compressionopt, s)
remotes, err := ref.GetRemotes(ctx, true, []solver.CompressionOpt{compressionopt}, false, s)
if err != nil {
return err
}
remote := remotes[0]
out[i] = *remote
return nil
})
Expand Down
6 changes: 4 additions & 2 deletions exporter/oci/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
Force: e.forceCompression,
}
if src.Ref != nil {
remote, err := src.Ref.GetRemote(ctx, false, compressionopt, session.NewGroup(sessionID))
remotes, err := src.Ref.GetRemotes(ctx, false, []solver.CompressionOpt{compressionopt}, false, session.NewGroup(sessionID))
if err != nil {
return nil, err
}
remote := remotes[0]
// unlazy before tar export as the tar writer does not handle
// layer blobs in parallel (whereas unlazy does)
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
Expand All @@ -259,10 +260,11 @@ func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source,
}
if len(src.Refs) > 0 {
for _, r := range src.Refs {
remote, err := r.GetRemote(ctx, false, compressionopt, session.NewGroup(sessionID))
remotes, err := r.GetRemotes(ctx, false, []solver.CompressionOpt{compressionopt}, false, session.NewGroup(sessionID))
if err != nil {
return nil, err
}
remote := remotes[0]
if unlazier, ok := remote.Provider.(cache.Unlazier); ok {
if err := unlazier.Unlazy(ctx); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion solver/cachestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ type CacheInfoLink struct {
type CacheResultStorage interface {
Save(Result, time.Time) (CacheResult, error)
Load(ctx context.Context, res CacheResult) (Result, error)
LoadRemote(ctx context.Context, res CacheResult, s session.Group, compressionopt CompressionOpt) (*Remote, error)
LoadRemotes(ctx context.Context, res CacheResult, compressionopt []CompressionOpt, all bool, s session.Group) ([]*Remote, error)
Exists(id string) bool
}
64 changes: 34 additions & 30 deletions solver/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)

type exporter struct {
Expand Down Expand Up @@ -79,7 +78,8 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
selector digest.Digest
}

rec := t.Add(rootKey(e.k.Digest(), e.k.Output()))
recKey := rootKey(e.k.Digest(), e.k.Output())
rec := t.Add(recKey)
allRec := []CacheExporterRecord{rec}

addRecord := true
Expand All @@ -94,36 +94,49 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach

var remote *Remote
if v := e.record; v != nil && len(e.k.Deps()) > 0 && addRecord {
var variants []CacheExporterRecord

cm := v.cacheManager
key := cm.getID(v.key)
res, err := cm.backend.Load(key, v.ID)
if err != nil {
return nil, err
}

remote, err = cm.results.LoadRemote(ctx, res, opt.Session, opt.RemoteCompressionOpt)
// if opt.CompressionOpt is specified, try to load all possible compressions
remotes, err := cm.results.LoadRemotes(ctx, res, opt.CompressionOpt, len(opt.CompressionOpt) > 0, opt.Session)
if err != nil {
return nil, err
}
if len(opt.TargetLayers) > 0 && remote != nil {
if !isExpectedChain(remote.Descriptors, opt.TargetLayers) {
remote = nil
if len(remotes) > 0 {
remote, remotes = remotes[0], remotes[1:] // pop the first element
}
if len(opt.CompressionOpt) > 0 {
for _, r := range remotes { // record all reamaining remotes as well
rec := t.Add(recKey)
rec.AddResult(v.CreatedAt, r)
variants = append(variants, rec)
}
}

if remote == nil && opt.Mode != CacheExportModeRemoteOnly {
if (remote == nil || len(opt.CompressionOpt) > 0) && opt.Mode != CacheExportModeRemoteOnly {
res, err := cm.results.Load(ctx, res)
if err != nil {
return nil, err
}
remote, err = opt.Convert(ctx, res)
remotes, err := opt.Convert(ctx, res)
if err != nil {
return nil, err
}
res.Release(context.TODO())
if len(opt.TargetLayers) > 0 {
if !isExpectedChain(remote.Descriptors, opt.TargetLayers) {
remote = nil
if remote == nil {
remote, remotes = remotes[0], remotes[1:] // pop the first element
}
if len(opt.CompressionOpt) > 0 {
for _, r := range remotes { // record all reamaining remotes as well
rec := t.Add(recKey)
rec.AddResult(v.CreatedAt, r)
variants = append(variants, rec)
}
}
}
Expand All @@ -133,6 +146,7 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
rec.AddResult(v.CreatedAt, remote)
}
}
allRec = append(allRec, variants...)
}

if remote != nil && opt.Mode == CacheExportModeMin {
Expand Down Expand Up @@ -165,15 +179,17 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
}
}

for i, srcs := range srcs {
for _, src := range srcs {
rec.LinkFrom(src.r, i, src.selector.String())
for _, rec := range allRec {
for i, srcs := range srcs {
for _, src := range srcs {
rec.LinkFrom(src.r, i, src.selector.String())
}
}
}

for cm, id := range e.k.ids {
if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil {
return nil, err
for cm, id := range e.k.ids {
if _, err := addBacklinks(t, rec, cm, id, bkm); err != nil {
return nil, err
}
}
}

Expand All @@ -196,18 +212,6 @@ func (e *exporter) ExportTo(ctx context.Context, t CacheExporterTarget, opt Cach
return e.res, nil
}

func isExpectedChain(chain []ocispecs.Descriptor, wants []digest.Digest) bool {
if len(chain) != len(wants) {
return false
}
for i, desc := range chain {
if desc.Digest != wants[i] {
return false
}
}
return true
}

func getBestResult(records []*CacheRecord) *CacheRecord {
var rec *CacheRecord
for _, r := range records {
Expand Down
6 changes: 3 additions & 3 deletions solver/llbsolver/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ func NewContentHashFunc(selectors []Selector) solver.ResultBasedCacheFunc {
}
}

func workerRefConverter(g session.Group, compressionopt solver.CompressionOpt) func(ctx context.Context, res solver.Result) (*solver.Remote, error) {
return func(ctx context.Context, res solver.Result) (*solver.Remote, error) {
func workerRefConverter(compressionopt []solver.CompressionOpt, all bool, g session.Group) func(ctx context.Context, res solver.Result) ([]*solver.Remote, error) {
return func(ctx context.Context, res solver.Result) ([]*solver.Remote, error) {
ref, ok := res.Sys().(*worker.WorkerRef)
if !ok {
return nil, errors.Errorf("invalid result: %T", res.Sys())
}

return ref.GetRemote(ctx, true, compressionopt, g)
return ref.GetRemotes(ctx, true, compressionopt, all, g)
}
}
22 changes: 12 additions & 10 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
}
// all keys have same export chain so exporting others is not needed
_, err = r.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{
Convert: workerRefConverter(g, solver.CompressionOpt{
Type: compression.Default, // TODO: make configurable
}),
Convert: workerRefConverter([]solver.CompressionOpt{
{
Type: compression.Default, // TODO: make configurable
},
}, false, g),
Mode: exp.CacheExportMode,
Session: g,
})
Expand Down Expand Up @@ -297,22 +299,22 @@ func inlineCache(ctx context.Context, e remotecache.Exporter, res solver.CachedR
return nil, errors.Errorf("invalid reference: %T", res.Sys())
}

remote, err := workerRef.GetRemote(ctx, true, compressionopt, g)
if err != nil || remote == nil {
remotes, err := workerRef.GetRemotes(ctx, true, []solver.CompressionOpt{compressionopt}, false, g)
if err != nil || len(remotes) == 0 {
return nil, nil
}
remote := remotes[0]

digests := make([]digest.Digest, 0, len(remote.Descriptors))
for _, desc := range remote.Descriptors {
digests = append(digests, desc.Digest)
}

if _, err := res.CacheKeys()[0].Exporter.ExportTo(ctx, e, solver.CacheExportOpt{
Convert: workerRefConverter(g, compressionopt),
Mode: solver.CacheExportModeMin,
Session: g,
RemoteCompressionOpt: compressionopt,
TargetLayers: digests,
Convert: workerRefConverter([]solver.CompressionOpt{compressionopt}, true, g), // cache all possible compressions
Mode: solver.CacheExportModeMin,
Session: g,
CompressionOpt: []solver.CompressionOpt{compressionopt},
}); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit cadec20

Please sign in to comment.