Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate compression options to the inline cache export #2405

Merged
merged 1 commit into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 118 additions & 3 deletions cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -35,6 +36,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
Expand Down Expand Up @@ -1125,7 +1127,9 @@ func TestConversion(t *testing.T) {
require.NoError(t, eg.Wait())
}

func TestGetRemote(t *testing.T) {
type idxToVariants []map[compression.Type]ocispecs.Descriptor

func TestGetRemotes(t *testing.T) {
t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" {
Expand Down Expand Up @@ -1251,15 +1255,24 @@ func TestGetRemote(t *testing.T) {

checkNumBlobs(ctx, t, co.cs, 1)

// Call GetRemote on all the refs
variantsMap := make(map[string]idxToVariants)
var variantsMapMu sync.Mutex

// Call GetRemotes on all the refs
eg, egctx := errgroup.WithContext(ctx)
for _, ir := range refs {
ir := ir.(*immutableRef)
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: true,
}
eg.Go(func() error {
remote, err := ir.GetRemote(egctx, true, compressionType, true, nil)
remotes, err := ir.GetRemotes(egctx, true, compressionopt, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
remote := remotes[0]
refChain := ir.parentRefChain()
for i, desc := range remote.Descriptors {
switch compressionType {
Expand All @@ -1278,6 +1291,21 @@ func TestGetRemote(t *testing.T) {
require.Contains(t, expectedContent, dgst, "for %v", compressionType)
checkDescriptor(ctx, t, co.cs, desc, compressionType)

variantsMapMu.Lock()
if len(variantsMap[ir.ID()]) == 0 {
variantsMap[ir.ID()] = make(idxToVariants, len(remote.Descriptors))
}
variantsMapMu.Unlock()

require.Equal(t, len(remote.Descriptors), len(variantsMap[ir.ID()]))

variantsMapMu.Lock()
if variantsMap[ir.ID()][i] == nil {
variantsMap[ir.ID()][i] = make(map[compression.Type]ocispecs.Descriptor)
}
variantsMap[ir.ID()][i][compressionType] = desc
variantsMapMu.Unlock()

r := refChain[i]
isLazy, err := r.isLazy(egctx)
require.NoError(t, err)
Expand Down Expand Up @@ -1318,6 +1346,93 @@ func TestGetRemote(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, map[digest.Digest]struct{}{}, expectedContent)

// Check if "all" option returns all available blobs
for _, ir := range refs {
ir := ir.(*immutableRef)
variantsMapMu.Lock()
variants, ok := variantsMap[ir.ID()]
variantsMapMu.Unlock()
require.True(t, ok, ir.ID())
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{Type: compressionType}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, false, compressionopt, true, nil)
require.NoError(t, err)
require.True(t, len(remotes) > 0, "for %s : %d", compressionType, len(remotes))
gotMain, gotVariants := remotes[0], remotes[1:]

// Check the main blob is compatible with all == false
mainOnly, err := ir.GetRemotes(egctx, false, compressionopt, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(mainOnly))
mainRemote := mainOnly[0]
require.Equal(t, len(mainRemote.Descriptors), len(gotMain.Descriptors))
for i := 0; i < len(mainRemote.Descriptors); i++ {
require.Equal(t, mainRemote.Descriptors[i].Digest, gotMain.Descriptors[i].Digest)
}

// Check all variants are covered
checkVariantsCoverage(egctx, t, variants, len(remotes[0].Descriptors)-1, gotVariants, &compressionType)
return nil
})
}
}
require.NoError(t, eg.Wait())
}

func checkVariantsCoverage(ctx context.Context, t *testing.T, variants idxToVariants, idx int, remotes []*solver.Remote, expectCompression *compression.Type) {
if idx < 0 {
for _, r := range remotes {
require.Equal(t, len(r.Descriptors), 0)
}
return
}

// check the contents of the topmost blob of each remote
got := make(map[digest.Digest][]*solver.Remote)
for _, r := range remotes {
require.Equal(t, len(r.Descriptors)-1, idx, "idx = %d", idx)

// record this variant
topmost, lower := r.Descriptors[idx], r.Descriptors[:idx]
got[topmost.Digest] = append(got[topmost.Digest], &solver.Remote{Descriptors: lower, Provider: r.Provider})

// check the contents
r, err := r.Provider.ReaderAt(ctx, topmost)
require.NoError(t, err)
dgstr := digest.Canonical.Digester()
_, err = io.Copy(dgstr.Hash(), io.NewSectionReader(r, 0, topmost.Size))
require.NoError(t, err)
require.NoError(t, r.Close())
require.Equal(t, dgstr.Digest(), topmost.Digest)
}

// check the lowers as well
eg, egctx := errgroup.WithContext(ctx)
for _, lowers := range got {
lowers := lowers
eg.Go(func() error {
checkVariantsCoverage(egctx, t, variants, idx-1, lowers, nil) // expect all compression variants
return nil
})
}
require.NoError(t, eg.Wait())

// check the coverage of the variants
targets := variants[idx]
if expectCompression != nil {
c, ok := variants[idx][*expectCompression]
require.True(t, ok, "idx = %d, compression = %q, variants = %+v, got = %+v", idx, *expectCompression, variants[idx], got)
targets = map[compression.Type]ocispecs.Descriptor{*expectCompression: c}
}
for c, d := range targets {
_, ok := got[d.Digest]
require.True(t, ok, "idx = %d, compression = %q, want = %+v, got = %+v", idx, c, d, got)
delete(got, d.Digest)
}
require.Equal(t, 0, len(got))
}

func checkInfo(ctx context.Context, t *testing.T, cs content.Store, info content.Info) {
Expand Down
26 changes: 23 additions & 3 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ImmutableRef interface {
Clone() ImmutableRef

Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error)
GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error)
}

type MutableRef interface {
Expand Down Expand Up @@ -376,9 +376,29 @@ func compressionVariantDigestLabel(compressionType compression.Type) string {
return compressionVariantDigestLabelPrefix + compressionType.String()
}

func getCompressionVariants(ctx context.Context, cs content.Store, dgst digest.Digest) (res []compression.Type, _ error) {
info, err := cs.Info(ctx, dgst)
if errors.Is(err, errdefs.ErrNotFound) {
return nil, nil
} else if err != nil {
return nil, err
}
for k := range info.Labels {
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if t := compression.Parse(strings.TrimPrefix(k, compressionVariantDigestLabelPrefix)); t != compression.UnknownCompression {
res = append(res, t)
}
}
}
return
}

func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) {
cs := sr.cm.ContentStore
info, err := cs.Info(ctx, sr.getBlob())
return getCompressionVariantBlob(ctx, sr.cm.ContentStore, sr.getBlob(), compressionType)
}

func getCompressionVariantBlob(ctx context.Context, cs content.Store, dgst digest.Digest, compressionType compression.Type) (ocispecs.Descriptor, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return ocispecs.Descriptor{}, err
}
Expand Down
114 changes: 110 additions & 4 deletions cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,122 @@ type Unlazier interface {
Unlazy(ctx context.Context) error
}

// GetRemote gets a *solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemote instead as moby integration requires custom GetRemote implementation.
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) {
// GetRemotes gets []*solver.Remote from content store for this ref (potentially pulling lazily).
// Compressionopt can be used to specify the compression type of blobs. If Force is true, the compression
// type is applied to all blobs in the chain. If Force is false, it's applied only to the newly created
// layers. If all is true, all available chains that has the specified compression type of topmost blob are
// appended to the result.
// Note: Use WorkerRef.GetRemotes instead as moby integration requires custom GetRemotes implementation.
func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
Copy link
Member

@tonistiigi tonistiigi Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lease should be moved to the caller as it is somewhat expensive and we shouldn't call it for every compression. In follow-up we should figure out if there is a way to avoid it at all when no new blobs need to be created.

if err != nil {
return nil, err
}
defer done(ctx)

err = sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
// fast path if compression variants aren't required
// NOTE: compressionopt is applied only to *newly created layers* if Force != true.
remote, err := sr.getRemote(ctx, createIfNeeded, compressionopt, s)
if err != nil {
return nil, err
}
if !all || compressionopt.Force || len(remote.Descriptors) == 0 {
return []*solver.Remote{remote}, nil // early return if compression variants aren't required
}

// Search all available remotes that has the topmost blob with the specified
// compression with all combination of copmressions
res := []*solver.Remote{remote}
topmost, parentChain := remote.Descriptors[len(remote.Descriptors)-1], remote.Descriptors[:len(remote.Descriptors)-1]
vDesc, err := getCompressionVariantBlob(ctx, sr.cm.ContentStore, topmost.Digest, compressionopt.Type)
if err != nil {
return res, nil // compression variant doesn't exist. return the main blob only.
}

var variants []*solver.Remote
if len(parentChain) == 0 {
variants = append(variants, &solver.Remote{
Descriptors: []ocispecs.Descriptor{vDesc},
Provider: sr.cm.ContentStore,
})
} else {
// get parents with all combination of all available compressions.
parents, err := getAvailableBlobs(ctx, sr.cm.ContentStore, &solver.Remote{
Descriptors: parentChain,
Provider: remote.Provider,
})
if err != nil {
return nil, err
}
variants = appendRemote(parents, vDesc, sr.cm.ContentStore)
}

// Return the main remote and all its compression variants.
// NOTE: Because compressionopt is applied only to *newly created layers* in the main remote (i.e. res[0]),
// it's possible that the main remote doesn't contain any blobs of the compressionopt.Type.
// The topmost blob of the variants (res[1:]) is guaranteed to be the compressionopt.Type.
res = append(res, variants...)
return res, nil
}

func appendRemote(parents []*solver.Remote, desc ocispecs.Descriptor, p content.Provider) (res []*solver.Remote) {
for _, pRemote := range parents {
provider := contentutil.NewMultiProvider(pRemote.Provider)
provider.Add(desc.Digest, p)
res = append(res, &solver.Remote{
Descriptors: append(pRemote.Descriptors, desc),
Provider: provider,
})
}
return
}

func getAvailableBlobs(ctx context.Context, cs content.Store, chain *solver.Remote) ([]*solver.Remote, error) {
if len(chain.Descriptors) == 0 {
return nil, nil
}
target, parentChain := chain.Descriptors[len(chain.Descriptors)-1], chain.Descriptors[:len(chain.Descriptors)-1]
parents, err := getAvailableBlobs(ctx, cs, &solver.Remote{
Descriptors: parentChain,
Provider: chain.Provider,
})
if err != nil {
return nil, err
}
compressions, err := getCompressionVariants(ctx, cs, target.Digest)
if err != nil {
return nil, err
}
var res []*solver.Remote
for _, c := range compressions {
desc, err := getCompressionVariantBlob(ctx, cs, target.Digest, c)
if err != nil {
return nil, err
}
if len(parents) == 0 { // bottommost ref
res = append(res, &solver.Remote{
Descriptors: []ocispecs.Descriptor{desc},
Provider: cs,
})
continue
}
res = append(res, appendRemote(parents, desc, cs)...)
}
if len(res) == 0 {
// no available compression blobs for this blob. return the original blob.
if len(parents) == 0 { // bottommost ref
return []*solver.Remote{chain}, nil
}
return appendRemote(parents, target, chain.Provider), nil
}
return res, nil
}

func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error) {
compressionType := compressionopt.Type
forceCompression := compressionopt.Force

err := sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
if err != nil {
return nil, err
}
Expand Down
20 changes: 18 additions & 2 deletions cache/remotecache/v1/cachestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,25 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult)
return worker.NewWorkerRefResult(ref, cs.w), nil
}

func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, _ session.Group) (*solver.Remote, error) {
func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopts *solver.CompressionOpt, _ session.Group) ([]*solver.Remote, error) {
if r := cs.byResultID(res.ID); r != nil && r.result != nil {
return r.result, nil
if compressionopts == nil {
return []*solver.Remote{r.result}, nil
}
// Any of blobs in the remote must meet the specified compression option.
match := false
for _, desc := range r.result.Descriptors {
m := compressionopts.Type.IsMediaType(desc.MediaType)
match = match || m
if compressionopts.Force && !m {
match = false
break
}
}
if match {
return []*solver.Remote{r.result}, nil
}
return nil, nil // return nil as it's best effort.
}
return nil, errors.WithStack(solver.ErrNotFound)
}
Expand Down
Loading