From d4582fb32e297d56dbef99964ae04abf1667c924 Mon Sep 17 00:00:00 2001 From: caoao Date: Thu, 10 Dec 2020 11:07:17 +0800 Subject: [PATCH] write size according to RegisteredSealProof --- extern/sector-storage/stores/remote.go | 87 ++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 5 deletions(-) diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index bf66c1bb5cd..dd5add9c83a 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -3,6 +3,8 @@ package stores import ( "context" "encoding/json" + "fmt" + "io" "io/ioutil" "math/bits" "mime" @@ -59,6 +61,30 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int } } +var SealProofIosizes = map[abi.RegisteredSealProof]uint64{ + abi.RegisteredSealProof_StackedDrg2KiBV1: 32 * 1024, + abi.RegisteredSealProof_StackedDrg8MiBV1: 32 * 1024, + abi.RegisteredSealProof_StackedDrg512MiBV1: 1024 * 1024, + abi.RegisteredSealProof_StackedDrg32GiBV1: 1024 * 1024, + abi.RegisteredSealProof_StackedDrg64GiBV1: 1024 * 1024, + + abi.RegisteredSealProof_StackedDrg2KiBV1_1: 32 * 1024, + abi.RegisteredSealProof_StackedDrg8MiBV1_1: 32 * 1024, + abi.RegisteredSealProof_StackedDrg512MiBV1_1: 1024 * 1024, + abi.RegisteredSealProof_StackedDrg32GiBV1_1: 1024 * 1024, + abi.RegisteredSealProof_StackedDrg64GiBV1_1: 1024 * 1024, +} + +const MaxIoSize = 64 * 1024 * 1024 + +func GetIoSizeByProofType(p abi.RegisteredSealProof) (uint64, error) { + size, ok := SealProofIosizes[p] + if !ok { + return 0, xerrors.Errorf("unsupported proof type: %v", p) + } + return size, nil +} + func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { if existing|allocate != existing^allocate { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") @@ -123,6 +149,12 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin } defer releaseStorage() + var iosize uint64 = 0 + ssize, err := GetIoSizeByProofType(s.ProofType) + if err == nil { + iosize = ssize + } + for _, fileType := range storiface.PathTypes { if fileType&existing == 0 { continue @@ -135,7 +167,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin dest := storiface.PathByType(apaths, fileType) storageID := storiface.PathByType(ids, fileType) - url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest) + url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest, iosize) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, err } @@ -170,7 +202,7 @@ func tempFetchDest(spath string, create bool) (string, error) { return filepath.Join(tempdir, b), nil } -func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string) (string, error) { +func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string, iosize uint64) (string, error) { si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false) if err != nil { return "", err @@ -198,7 +230,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return "", xerrors.Errorf("removing dest: %w", err) } - err = r.fetch(ctx, url, tempDest) + err = r.fetch(ctx, url, tempDest, iosize) if err != nil { merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, tempDest, err)) continue @@ -218,7 +250,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return "", xerrors.Errorf("failed to acquire sector %v from remote (tried %v): %w", s, si, merr) } -func (r *Remote) fetch(ctx context.Context, url, outname string) error { +func (r *Remote) fetch(ctx context.Context, url, outname string, iosize uint64) error { log.Infof("Fetch %s -> %s", url, outname) if len(r.limit) >= cap(r.limit) { @@ -276,12 +308,57 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { case "application/x-tar": return tarutil.ExtractTar(resp.Body, outname) case "application/octet-stream": - return files.WriteTo(files.NewReaderFile(resp.Body), outname) + return WriteWithSize(files.NewReaderFile(resp.Body), outname, iosize) default: return xerrors.Errorf("unknown content type: '%s'", mediatype) } } +func WriteWithSize(nd files.Node, fpath string, size uint64) error { + switch nd := nd.(type) { + case *files.Symlink: + return os.Symlink(nd.Target, fpath) + case files.File: + f, err := os.Create(fpath) + defer func() { + err = f.Close() + if err != nil { + log.Warnf("failed to close file:%v, err:%v", fpath, err) + } + }() + if err != nil { + return err + } + if size <= 0 || size > MaxIoSize { + _, err = io.Copy(f, nd) + } else { + buf := make([]byte, size) + _, err = io.CopyBuffer(f, nd, buf) + } + + if err != nil { + return err + } + return nil + case files.Directory: + err := os.Mkdir(fpath, 0777) + if err != nil { + return err + } + + entries := nd.Entries() + for entries.Next() { + child := filepath.Join(fpath, entries.Name()) + if err := WriteWithSize(entries.Node(), child, size); err != nil { + return err + } + } + return entries.Err() + default: + return fmt.Errorf("file type %T at %q is not supported", nd, fpath) + } +} + func (r *Remote) MoveStorage(ctx context.Context, s storage.SectorRef, types storiface.SectorFileType) error { // Make sure we have the data local _, _, err := r.AcquireSector(ctx, s, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)