Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetch size according to RegisteredSealProof #5167

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 82 additions & 5 deletions extern/sector-storage/stores/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package stores
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math/bits"
"mime"
Expand Down Expand Up @@ -59,6 +61,30 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
}
}

var SealProofIosizes = map[abi.RegisteredSealProof]uint64{
abi.RegisteredSealProof_StackedDrg2KiBV1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg8MiBV1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg512MiBV1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg32GiBV1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg64GiBV1: 1024 * 1024,

abi.RegisteredSealProof_StackedDrg2KiBV1_1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg8MiBV1_1: 32 * 1024,
abi.RegisteredSealProof_StackedDrg512MiBV1_1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg32GiBV1_1: 1024 * 1024,
abi.RegisteredSealProof_StackedDrg64GiBV1_1: 1024 * 1024,
}

const MaxIoSize = 64 * 1024 * 1024

func GetIoSizeByProofType(p abi.RegisteredSealProof) (uint64, error) {
size, ok := SealProofIosizes[p]
if !ok {
return 0, xerrors.Errorf("unsupported proof type: %v", p)
}
return size, nil
}

func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
Expand Down Expand Up @@ -123,6 +149,12 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
}
defer releaseStorage()

var iosize uint64 = 0
ssize, err := GetIoSizeByProofType(s.ProofType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd do iosize = 32 << 10; if s.ProofType.SectorSize() > 8 << 20 {iosize=1<<20}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this way we can drop GetIoSizeByProofType from above)

if err == nil {
iosize = ssize
}

for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 {
continue
Expand All @@ -135,7 +167,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
dest := storiface.PathByType(apaths, fileType)
storageID := storiface.PathByType(ids, fileType)

url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest, iosize)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, err
}
Expand Down Expand Up @@ -170,7 +202,7 @@ func tempFetchDest(spath string, create bool) (string, error) {
return filepath.Join(tempdir, b), nil
}

func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string) (string, error) {
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string, iosize uint64) (string, error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false)
if err != nil {
return "", err
Expand Down Expand Up @@ -198,7 +230,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("removing dest: %w", err)
}

err = r.fetch(ctx, url, tempDest)
err = r.fetch(ctx, url, tempDest, iosize)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err))
continue
Expand All @@ -218,7 +250,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr)
}

func (r *Remote) fetch(ctx context.Context, url, outname string) error {
func (r *Remote) fetch(ctx context.Context, url, outname string, iosize uint64) error {
log.Infof("Fetch %s -> %s", url, outname)

if len(r.limit) >= cap(r.limit) {
Expand Down Expand Up @@ -276,12 +308,57 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
case "application/x-tar":
return tarutil.ExtractTar(resp.Body, outname)
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(resp.Body), outname)
return WriteWithSize(files.NewReaderFile(resp.Body), outname, iosize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used the files api to make things simpler. Instead of duplicating the code below, we can just do

Suggested change
return WriteWithSize(files.NewReaderFile(resp.Body), outname, iosize)
f, err := os.Create(outname)
defer f.Close()
if err != nil {
return err
}
_, err = io.CopyBuffer(f, resp.Body, make([]byte, iosize))
if err != nil {
return err
}
return nil

default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}

func WriteWithSize(nd files.Node, fpath string, size uint64) error {
switch nd := nd.(type) {
case *files.Symlink:
return os.Symlink(nd.Target, fpath)
case files.File:
f, err := os.Create(fpath)
defer func() {
err = f.Close()
if err != nil {
log.Warnf("failed to close file:%v, err:%v", fpath, err)
}
}()
if err != nil {
return err
}
if size <= 0 || size > MaxIoSize {
_, err = io.Copy(f, nd)
} else {
buf := make([]byte, size)
_, err = io.CopyBuffer(f, nd, buf)
}

if err != nil {
return err
}
return nil
case files.Directory:
err := os.Mkdir(fpath, 0777)
if err != nil {
return err
}

entries := nd.Entries()
for entries.Next() {
child := filepath.Join(fpath, entries.Name())
if err := WriteWithSize(entries.Node(), child, size); err != nil {
return err
}
}
return entries.Err()
default:
return fmt.Errorf("file type %T at %q is not supported", nd, fpath)
}
}

func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error {
// Make sure we have the data local
_, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
Expand Down