diff --git a/cmd/curio/rpc/rpc.go b/cmd/curio/rpc/rpc.go index f45e276c3..1b2bb25e6 100644 --- a/cmd/curio/rpc/rpc.go +++ b/cmd/curio/rpc/rpc.go @@ -9,9 +9,11 @@ import ( "net/http" "net/url" "os" + "path/filepath" "time" "github.com/gbrlsnchs/jwt/v3" + "github.com/google/uuid" "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" @@ -39,6 +41,8 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) +const metaFile = "sectorstore.json" + var log = logging.Logger("curio/rpc") var permissioned = os.Getenv("LOTUS_DISABLE_AUTH_PERMISSIONED") != "1" @@ -162,6 +166,40 @@ func (p *CurioAPI) Shutdown(context.Context) error { return nil } +func (p *CurioAPI) StorageInit(ctx context.Context, path string, opts storiface.LocalStorageMeta) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + if err := os.MkdirAll(path, 0755); err != nil { + if !os.IsExist(err) { + return err + } + } + _, err = os.Stat(filepath.Join(path, metaFile)) + if !os.IsNotExist(err) { + if err == nil { + return xerrors.Errorf("path is already initialized") + } + return err + } + if opts.ID == "" { + opts.ID = storiface.ID(uuid.New().String()) + } + if !(opts.CanStore || opts.CanSeal) { + return xerrors.Errorf("must specify at least one of --store or --seal") + } + b, err := json.MarshalIndent(opts, "", " ") + if err != nil { + return xerrors.Errorf("marshaling storage config: %w", err) + } + if err := os.WriteFile(filepath.Join(path, metaFile), b, 0644); err != nil { + return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(path, metaFile), err) + } + return nil +} + func (p *CurioAPI) StorageAddLocal(ctx context.Context, path string) error { path, err := homedir.Expand(path) if err != nil { diff --git a/cmd/curio/storage.go b/cmd/curio/storage.go index 9a073e037..2fa6d2d52 100644 --- a/cmd/curio/storage.go +++ b/cmd/curio/storage.go @@ -1,11 +1,8 @@ package main import ( - "encoding/json" "fmt" "math/bits" - "os" - "path/filepath" "sort" "strconv" "strings" @@ -28,8 +25,6 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) -const metaFile = "sectorstore.json" - var storageCmd = &cli.Command{ Name: "storage", Usage: "manage sector storage", @@ -122,20 +117,6 @@ over time } if cctx.Bool("init") { - if err := os.MkdirAll(p, 0755); err != nil { - if !os.IsExist(err) { - return err - } - } - - _, err := os.Stat(filepath.Join(p, metaFile)) - if !os.IsNotExist(err) { - if err == nil { - return xerrors.Errorf("path is already initialized") - } - return err - } - var maxStor int64 if cctx.IsSet("max-storage") { maxStor, err = units.RAMInBytes(cctx.String("max-storage")) @@ -144,7 +125,7 @@ over time } } - cfg := &storiface.LocalStorageMeta{ + cfg := storiface.LocalStorageMeta{ ID: storiface.ID(uuid.New().String()), Weight: cctx.Uint64("weight"), CanSeal: cctx.Bool("seal"), @@ -158,13 +139,8 @@ over time return xerrors.Errorf("must specify at least one of --store or --seal") } - b, err := json.MarshalIndent(cfg, "", " ") - if err != nil { - return xerrors.Errorf("marshaling storage config: %w", err) - } - - if err := os.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil { - return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err) + if err := minerApi.StorageInit(ctx, p, cfg); err != nil { + return xerrors.Errorf("init storage: %w", err) } } diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 71923018d..fde13c336 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -114,9 +114,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task activeTasks = append(activeTasks, sdrTask) } if cfg.Subsystems.EnableSealSDRTrees { - treesTask := seal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) + treeDTask := seal.NewTreeDTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) + treeRCTask := seal.NewTreeRCTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) finalizeTask := seal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db) - activeTasks = append(activeTasks, treesTask, finalizeTask) + activeTasks = append(activeTasks, treeDTask, treeRCTask, finalizeTask) } if cfg.Subsystems.EnableSendPrecommitMsg { precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index e9ce62831..eff49578d 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -61,7 +61,7 @@ type storageProvider struct { } func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask.TaskID, sector storiface.SectorRef, existing, allocate storiface.SectorFileType, sealing storiface.PathType) (fspaths, ids storiface.SectorPaths, release func(), err error) { - var paths, storageIDs storiface.SectorPaths + var sectorPaths, storageIDs storiface.SectorPaths var releaseStorage func() var ok bool @@ -77,7 +77,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask log.Debugw("using existing storage reservation", "task", taskID, "sector", sector, "existing", existing, "allocate", allocate) - paths = resv.Paths + sectorPaths = resv.Paths storageIDs = resv.PathIDs releaseStorage = resv.Release @@ -87,7 +87,7 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask // present locally. Note that we do not care about 'allocate' reqeuests, those files don't exist, and are just // proposed paths with a reservation of space. - _, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs})) + _, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: sectorPaths, IDs: storageIDs})) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err) } @@ -101,20 +101,20 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask // No related reservation, acquire storage as usual var err error - paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) + sectorPaths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, err } - releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal) + releaseStorage, err = l.localStore.Reserve(ctx, sector, allocate, storageIDs, storiface.FSOverheadSeal, paths.MinFreeStoragePercentage) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) } } - log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) + log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, sectorPaths) - return paths, storageIDs, func() { + return sectorPaths, storageIDs, func() { releaseStorage() for _, fileType := range storiface.PathTypes { @@ -194,13 +194,13 @@ func (sb *SealCalls) ensureOneCopy(ctx context.Context, sid abi.SectorID, pathID return nil } -func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (scid cid.Cid, ucid cid.Cid, err error) { +func (sb *SealCalls) TreeRC(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, unsealed cid.Cid) (scid cid.Cid, ucid cid.Cid, err error) { p1o, err := sb.makePhase1Out(unsealed, sector.ProofType) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) } - paths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) + fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } @@ -208,66 +208,55 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect defer func() { if err != nil { - clerr := removeDRCTrees(paths.Cache) + clerr := removeDRCTrees(fspaths.Cache, false) if clerr != nil { - log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", paths.Cache) + log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache) } } }() - treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size) + // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place + ssize, err := sector.ProofType.SectorSize() if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("building tree-d: %w", err) - } - - if treeDUnsealed != unsealed { - return cid.Undef, cid.Undef, xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid") + return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) } { - // create sector-sized file at paths.Sealed; PC2 transforms it into a sealed sector in-place - ssize, err := sector.ProofType.SectorSize() - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) - } - - { - // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector + // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector - // first try reflink + truncate, that should be way faster - err := reflink.Always(filepath.Join(paths.Cache, proofpaths.TreeDName), paths.Sealed) - if err == nil { - err = os.Truncate(paths.Sealed, int64(ssize)) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err) - } - } else { - log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", paths.Cache, "sealed", paths.Sealed) + // first try reflink + truncate, that should be way faster + err := reflink.Always(filepath.Join(fspaths.Cache, proofpaths.TreeDName), fspaths.Sealed) + if err == nil { + err = os.Truncate(fspaths.Sealed, int64(ssize)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err) + } + } else { + log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", fspaths.Cache, "sealed", fspaths.Sealed) - // fallback to slow copy, copy ssize bytes from treed to sealed - dst, err := os.OpenFile(paths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err) - } - src, err := os.Open(filepath.Join(paths.Cache, proofpaths.TreeDName)) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err) - } + // fallback to slow copy, copy ssize bytes from treed to sealed + dst, err := os.OpenFile(fspaths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err) + } + src, err := os.Open(filepath.Join(fspaths.Cache, proofpaths.TreeDName)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err) + } - _, err = io.CopyN(dst, src, int64(ssize)) - derr := dst.Close() - _ = src.Close() - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err) - } - if derr != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr) - } + _, err = io.CopyN(dst, src, int64(ssize)) + derr := dst.Close() + _ = src.Close() + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err) + } + if derr != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr) } } } - sl, uns, err := ffi.SealPreCommitPhase2(p1o, paths.Cache, paths.Sealed) + sl, uns, err := ffi.SealPreCommitPhase2(p1o, fspaths.Cache, fspaths.Sealed) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("computing seal proof: %w", err) } @@ -283,22 +272,28 @@ func (sb *SealCalls) TreeDRC(ctx context.Context, task *harmonytask.TaskID, sect return sl, uns, nil } -func removeDRCTrees(cache string) error { - // list files in cache +func removeDRCTrees(cache string, isDTree bool) error { files, err := os.ReadDir(cache) if err != nil { return xerrors.Errorf("listing cache: %w", err) } + var testFunc func(string) bool + + if isDTree { + testFunc = proofpaths.IsTreeDFile + } else { + testFunc = proofpaths.IsTreeRCFile + } + for _, file := range files { - if proofpaths.IsTreeFile(file.Name()) { + if testFunc(file.Name()) { err := os.Remove(filepath.Join(cache, file.Name())) if err != nil { return xerrors.Errorf("removing tree file: %w", err) } } } - return nil } @@ -625,3 +620,40 @@ func (sb *SealCalls) sectorStorageType(ctx context.Context, sector storiface.Sec return true, storiface.PathStorage, nil } + +// PreFetch fetches the sector file to local storage before SDR and TreeRC Tasks +func (sb *SealCalls) PreFetch(ctx context.Context, sector storiface.SectorRef, task *harmonytask.TaskID) (fsPath, pathID storiface.SectorPaths, releaseSector func(), err error) { + fsPath, pathID, releaseSector, err = sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) + if err != nil { + return storiface.SectorPaths{}, storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector paths: %w", err) + } + // Don't release the storage locks. They will be released in TreeD func() + return +} + +func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool, fspaths, pathIDs storiface.SectorPaths) error { + var err error + defer func() { + if err != nil { + clerr := removeDRCTrees(fspaths.Cache, true) + if clerr != nil { + log.Errorw("removing tree files after TreeDRC error", "error", clerr, "exec-error", err, "sector", sector, "cache", fspaths.Cache) + } + } + }() + + treeDUnsealed, err := proof.BuildTreeD(data, unpaddedData, filepath.Join(fspaths.Cache, proofpaths.TreeDName), size) + if err != nil { + return xerrors.Errorf("building tree-d: %w", err) + } + + if treeDUnsealed != unsealed { + return xerrors.Errorf("tree-d cid mismatch with supplied unsealed cid") + } + + if err := sb.ensureOneCopy(ctx, sector.ID, pathIDs, storiface.FTCache); err != nil { + return xerrors.Errorf("ensure one copy: %w", err) + } + + return nil +} diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index f01a472fa..4bbb8e343 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -43,6 +43,9 @@ type TaskStorage struct { pathType storiface.PathType taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error) + + // Minimum free storage percentage cutoff for reservation rejection + MinFreeStoragePercentage float64 } type ReleaseStorageFunc func() // free storage reservation @@ -56,14 +59,15 @@ type StorageReservation struct { Alloc, Existing storiface.SectorFileType } -func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) *TaskStorage { +func (sb *SealCalls) Storage(taskToSectorRef func(taskID harmonytask.TaskID) (SectorRef, error), alloc, existing storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType, MinFreeStoragePercentage float64) *TaskStorage { return &TaskStorage{ - sc: sb, - alloc: alloc, - existing: existing, - ssize: ssize, - pathType: pathType, - taskToSectorRef: taskToSectorRef, + sc: sb, + alloc: alloc, + existing: existing, + ssize: ssize, + pathType: pathType, + taskToSectorRef: taskToSectorRef, + MinFreeStoragePercentage: MinFreeStoragePercentage, } } @@ -166,7 +170,7 @@ func (t *TaskStorage) Claim(taskID int) error { } // reserve the space - release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal) + release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal, t.MinFreeStoragePercentage) if err != nil { return err } diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go index 45783f353..d49c51a1b 100644 --- a/curiosrc/gc/storage_endpoint_gc.go +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -209,6 +209,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool // Remove dead URLs from storage_path entries and handle path cleanup for _, du := range deadURLs { + du := du // Fetch the current URLs for the storage path var URLs string err = tx.QueryRow("SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID).Scan(&URLs) diff --git a/curiosrc/piece/task_park_piece.go b/curiosrc/piece/task_park_piece.go index 68a94a295..18ebcdef8 100644 --- a/curiosrc/piece/task_park_piece.go +++ b/curiosrc/piece/task_park_piece.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -185,7 +186,7 @@ func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 64 << 20, - Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing), + Storage: p.sc.Storage(p.taskToRef, storiface.FTPiece, storiface.FTNone, maxSizePiece, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 10, } diff --git a/curiosrc/seal/poller.go b/curiosrc/seal/poller.go index 568280bdb..073091f83 100644 --- a/curiosrc/seal/poller.go +++ b/curiosrc/seal/poller.go @@ -21,7 +21,8 @@ var log = logging.Logger("lpseal") const ( pollerSDR = iota - pollerTrees + pollerTreeD + pollerTreeRC pollerPrecommitMsg pollerPoRep pollerCommitMsg @@ -154,7 +155,8 @@ func (s *SealPoller) poll(ctx context.Context) error { } s.pollStartSDR(ctx, task) - s.pollStartSDRTrees(ctx, task) + s.pollStartSDRTreeD(ctx, task) + s.pollStartSDRTreeRC(ctx, task) s.pollStartPrecommitMsg(ctx, task) s.mustPoll(s.pollPrecommitMsgLanded(ctx, task)) s.pollStartPoRep(ctx, task, ts) @@ -187,14 +189,31 @@ func (t pollTask) afterSDR() bool { return t.AfterSDR } -func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { - if !task.AfterTreeD && !task.AfterTreeC && !task.AfterTreeR && - task.TaskTreeD == nil && task.TaskTreeC == nil && task.TaskTreeR == nil && - s.pollers[pollerTrees].IsSet() && task.AfterSDR { +func (s *SealPoller) pollStartSDRTreeD(ctx context.Context, task pollTask) { + if !task.AfterTreeD && task.TaskTreeD == nil && s.pollers[pollerTreeD].IsSet() && task.afterSDR() { + s.pollers[pollerTreeD].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1 WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL`, id, task.SpID, task.SectorNumber) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) + } +} + +func (t pollTask) afterTreeD() bool { + return t.AfterTreeD && t.afterSDR() +} - s.pollers[pollerTrees].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_d = $1, task_id_tree_c = $1, task_id_tree_r = $1 - WHERE sp_id = $2 AND sector_number = $3 AND after_sdr = TRUE AND task_id_tree_d IS NULL AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber) +func (s *SealPoller) pollStartSDRTreeRC(ctx context.Context, task pollTask) { + if !task.AfterTreeC && !task.AfterTreeR && task.TaskTreeC == nil && task.TaskTreeR == nil && s.pollers[pollerTreeRC].IsSet() && task.afterTreeD() { + s.pollers[pollerTreeRC].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_tree_c = $1, task_id_tree_r = $1 + WHERE sp_id = $2 AND sector_number = $3 AND after_tree_d = TRUE AND task_id_tree_c IS NULL AND task_id_tree_r IS NULL`, id, task.SpID, task.SectorNumber) if err != nil { return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) } @@ -207,12 +226,12 @@ func (s *SealPoller) pollStartSDRTrees(ctx context.Context, task pollTask) { } } -func (t pollTask) afterTrees() bool { - return t.AfterTreeD && t.AfterTreeC && t.AfterTreeR && t.afterSDR() +func (t pollTask) afterTreeRC() bool { + return t.AfterTreeC && t.AfterTreeR && t.afterTreeD() } func (t pollTask) afterPrecommitMsg() bool { - return t.AfterPrecommitMsg && t.afterTrees() + return t.AfterPrecommitMsg && t.afterTreeRC() } func (t pollTask) afterPrecommitMsgSuccess() bool { diff --git a/curiosrc/seal/poller_precommit_msg.go b/curiosrc/seal/poller_precommit_msg.go index 4372cbb92..42986499f 100644 --- a/curiosrc/seal/poller_precommit_msg.go +++ b/curiosrc/seal/poller_precommit_msg.go @@ -16,7 +16,7 @@ import ( ) func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { - if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTrees() && s.pollers[pollerPrecommitMsg].IsSet() { + if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterTreeRC() && s.pollers[pollerPrecommitMsg].IsSet() { s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_precommit_msg IS NULL AND after_tree_r = TRUE AND after_tree_d = TRUE`, id, task.SpID, task.SectorNumber) if err != nil { diff --git a/curiosrc/seal/task_movestorage.go b/curiosrc/seal/task_movestorage.go index 6037a390d..dab899582 100644 --- a/curiosrc/seal/task_movestorage.go +++ b/curiosrc/seal/task_movestorage.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -148,7 +149,7 @@ func (m *MoveStorageTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 1, Gpu: 0, Ram: 128 << 20, - Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage), + Storage: m.sc.Storage(m.taskToSector, storiface.FTNone, storiface.FTCache|storiface.FTSealed|storiface.FTUnsealed, ssize, storiface.PathStorage, paths.MinFreeStoragePercentage), }, MaxFailures: 10, } diff --git a/curiosrc/seal/task_porep.go b/curiosrc/seal/task_porep.go index fb03ce59b..58e307bc0 100644 --- a/curiosrc/seal/task_porep.go +++ b/curiosrc/seal/task_porep.go @@ -3,6 +3,7 @@ package seal import ( "bytes" "context" + "strings" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -111,6 +112,15 @@ func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done proof, err := p.sc.PoRepSnark(ctx, sr, sealed, unsealed, sectorParams.TicketValue, abi.InteractiveSealRandomness(rand)) if err != nil { + end, err := p.recoverErrors(ctx, sectorParams.SpID, sectorParams.SectorNumber, err) + if err != nil { + return false, xerrors.Errorf("recover errors: %w", err) + } + if end { + // done, but the error handling has stored a different than success state + return true, nil + } + return false, xerrors.Errorf("failed to compute seal proof: %w", err) } @@ -161,4 +171,46 @@ func (p *PoRepTask) Adder(taskFunc harmonytask.AddTaskFunc) { p.sp.pollers[pollerPoRep].Set(taskFunc) } +func (p *PoRepTask) recoverErrors(ctx context.Context, spid, snum int64, cerr error) (end bool, err error) { + const ( + // rust-fil-proofs error strings + // https://github.com/filecoin-project/rust-fil-proofs/blob/3f018b51b6327b135830899d237a7ba181942d7e/storage-proofs-porep/src/stacked/vanilla/proof.rs#L454C1-L463 + errstrInvalidCommD = "Invalid comm_d detected at challenge_index" + errstrInvalidCommR = "Invalid comm_r detected at challenge_index" + errstrInvalidEncoding = "Invalid encoding proof generated at layer" + ) + + if cerr == nil { + return false, xerrors.Errorf("nil error") + } + + switch { + case strings.Contains(cerr.Error(), errstrInvalidCommD): + fallthrough + case strings.Contains(cerr.Error(), errstrInvalidCommR): + // todo: it might be more optimal to just retry the Trees compute first. + // Invalid CommD/R likely indicates a problem with the data computed in that step + // For now for simplicity just retry the whole thing + fallthrough + case strings.Contains(cerr.Error(), errstrInvalidEncoding): + n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET after_porep = FALSE, after_sdr = FALSE, after_tree_d = FALSE, + after_tree_r = FALSE, after_tree_c = FALSE + WHERE sp_id = $1 AND sector_number = $2`, + spid, snum) + if err != nil { + return false, xerrors.Errorf("store sdr success: updating pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("store sdr success: updated %d rows", n) + } + + return true, nil + + default: + // if end is false the original error will be returned by the caller + return false, nil + } +} + var _ harmonytask.TaskInterface = &PoRepTask{} diff --git a/curiosrc/seal/task_sdr.go b/curiosrc/seal/task_sdr.go index 4c1164e05..0a3aebcd4 100644 --- a/curiosrc/seal/task_sdr.go +++ b/curiosrc/seal/task_sdr.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -204,7 +205,7 @@ func (s *SDRTask) TypeDetails() harmonytask.TaskTypeDetails { Cpu: 4, // todo multicore sdr Gpu: 0, Ram: 54 << 30, - Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing), + Storage: s.sc.Storage(s.taskToSector, storiface.FTCache, storiface.FTNone, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), }, MaxFailures: 2, Follows: nil, diff --git a/curiosrc/seal/task_trees.go b/curiosrc/seal/task_treed.go similarity index 82% rename from curiosrc/seal/task_trees.go rename to curiosrc/seal/task_treed.go index 7994c354a..7b31e12fb 100644 --- a/curiosrc/seal/task_trees.go +++ b/curiosrc/seal/task_treed.go @@ -23,7 +23,7 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) -type TreesTask struct { +type TreeDTask struct { sp *SealPoller db *harmonydb.DB sc *ffi.SealCalls @@ -31,8 +31,54 @@ type TreesTask struct { max int } -func NewTreesTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees int) *TreesTask { - return &TreesTask{ +func (t *TreeDTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + if engine.Resources().Gpu > 0 { + return &ids[0], nil + } + return nil, nil +} + +func (t *TreeDTask) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if isDevnet { + ssize = abi.SectorSize(2 << 20) + } + + return harmonytask.TaskTypeDetails{ + Max: t.max, + Name: "TreeD", + Cost: resources.Resources{ + Cpu: 1, + Ram: 1 << 30, + Gpu: 0, + Storage: t.sc.Storage(t.taskToSector, storiface.FTNone, storiface.FTCache, ssize, storiface.PathSealing, 1.0), + }, + MaxFailures: 3, + Follows: nil, + } +} + +func (t *TreeDTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { + var refs []ffi.SectorRef + + err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_d = $1`, id) + if err != nil { + return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) + } + + if len(refs) != 1 { + return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) + } + + return refs[0], nil +} + +func (t *TreeDTask) Adder(taskFunc harmonytask.AddTaskFunc) { + t.sp.pollers[pollerTreeD].Set(taskFunc) +} + +func NewTreeDTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees int) *TreeDTask { + return &TreeDTask{ sp: sp, db: db, sc: sc, @@ -41,7 +87,7 @@ func NewTreesTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees } } -func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { +func (t *TreeDTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() var sectorParamsArr []struct { @@ -53,7 +99,7 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done err = t.db.Select(ctx, §orParamsArr, ` SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline - WHERE task_id_tree_r = $1 AND task_id_tree_c = $1 AND task_id_tree_d = $1`, taskID) + WHERE task_id_tree_d = $1`, taskID) if err != nil { return false, xerrors.Errorf("getting sector params: %w", err) } @@ -63,6 +109,21 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } sectorParams := sectorParamsArr[0] + sref := storiface.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(sectorParams.SpID), + Number: abi.SectorNumber(sectorParams.SectorNumber), + }, + ProofType: sectorParams.RegSealProof, + } + + // Fetch the Sector to local storage + fsPaths, pathIds, release, err := t.sc.PreFetch(ctx, sref, &taskID) + if err != nil { + return false, xerrors.Errorf("failed to prefetch sectors: %w", err) + } + defer release() + var pieces []struct { PieceIndex int64 `db:"piece_index"` PieceCID string `db:"piece_cid"` @@ -178,82 +239,25 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done unpaddedData = false // nullreader includes fr32 zero bits } - sref := storiface.SectorRef{ - ID: abi.SectorID{ - Miner: abi.ActorID(sectorParams.SpID), - Number: abi.SectorNumber(sectorParams.SectorNumber), - }, - ProofType: sectorParams.RegSealProof, - } - - // D / R / C - sealed, unsealed, err := t.sc.TreeDRC(ctx, &taskID, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData) + // Generate Tree D + err = t.sc.TreeD(ctx, sref, commd, abi.PaddedPieceSize(ssize), dataReader, unpaddedData, fsPaths, pathIds) if err != nil { - return false, xerrors.Errorf("computing tree d, r and c: %w", err) + return false, xerrors.Errorf("failed to generate TreeD: %w", err) } - // todo synth porep - - // todo porep challenge check - n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_tree_r = true, after_tree_c = true, after_tree_d = true, tree_r_cid = $3, tree_d_cid = $4 - WHERE sp_id = $1 AND sector_number = $2`, - sectorParams.SpID, sectorParams.SectorNumber, sealed, unsealed) + SET after_tree_d = true, tree_d_cid = $3 WHERE sp_id = $1 AND sector_number = $2`, + sectorParams.SpID, sectorParams.SectorNumber, commd) if err != nil { - return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err) + return false, xerrors.Errorf("store TreeD success: updating pipeline: %w", err) } if n != 1 { - return false, xerrors.Errorf("store sdr-trees success: updated %d rows", n) + return false, xerrors.Errorf("store TreeD success: updated %d rows", n) } return true, nil } -func (t *TreesTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - id := ids[0] - return &id, nil -} - -func (t *TreesTask) TypeDetails() harmonytask.TaskTypeDetails { - ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size - if isDevnet { - ssize = abi.SectorSize(2 << 20) - } - - return harmonytask.TaskTypeDetails{ - Max: t.max, - Name: "SDRTrees", - Cost: resources.Resources{ - Cpu: 1, - Gpu: 1, - Ram: 8000 << 20, // todo - Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing), - }, - MaxFailures: 3, - Follows: nil, - } -} - -func (t *TreesTask) Adder(taskFunc harmonytask.AddTaskFunc) { - t.sp.pollers[pollerTrees].Set(taskFunc) -} - -func (t *TreesTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { - var refs []ffi.SectorRef - - err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id) - if err != nil { - return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) - } - - if len(refs) != 1 { - return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) - } - - return refs[0], nil -} - type UrlPieceReader struct { Url string RawSize int64 // the exact number of bytes read, if we read more or less that's an error @@ -323,4 +327,4 @@ func (u *UrlPieceReader) Close() error { return nil } -var _ harmonytask.TaskInterface = &TreesTask{} +var _ harmonytask.TaskInterface = &TreeDTask{} diff --git a/curiosrc/seal/task_trees_test.go b/curiosrc/seal/task_treed_test.go similarity index 100% rename from curiosrc/seal/task_trees_test.go rename to curiosrc/seal/task_treed_test.go diff --git a/curiosrc/seal/task_treerc.go b/curiosrc/seal/task_treerc.go new file mode 100644 index 000000000..02cf0350e --- /dev/null +++ b/curiosrc/seal/task_treerc.go @@ -0,0 +1,190 @@ +package seal + +import ( + "context" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/curiosrc/ffi" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +type TreeRCTask struct { + sp *SealPoller + db *harmonydb.DB + sc *ffi.SealCalls + + max int +} + +func NewTreeRCTask(sp *SealPoller, db *harmonydb.DB, sc *ffi.SealCalls, maxTrees int) *TreeRCTask { + return &TreeRCTask{ + sp: sp, + db: db, + sc: sc, + + max: maxTrees, + } +} + +func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.Background() + + var sectorParamsArr []struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` + CommD string `db:"tree_d_cid"` + } + + err = t.db.Select(ctx, §orParamsArr, ` + SELECT sp_id, sector_number, reg_seal_proof, tree_d_cid + FROM sectors_sdr_pipeline + WHERE task_id_tree_c = $1 AND task_id_tree_r = $1`, taskID) + if err != nil { + return false, xerrors.Errorf("getting sector params: %w", err) + } + + if len(sectorParamsArr) != 1 { + return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr)) + } + sectorParams := sectorParamsArr[0] + + commd, err := cid.Parse(sectorParams.CommD) + if err != nil { + return false, xerrors.Errorf("parsing unsealed CID: %w", err) + } + + sref := storiface.SectorRef{ + ID: abi.SectorID{ + Miner: abi.ActorID(sectorParams.SpID), + Number: abi.SectorNumber(sectorParams.SectorNumber), + }, + ProofType: sectorParams.RegSealProof, + } + + // R / C + sealed, _, err := t.sc.TreeRC(ctx, &taskID, sref, commd) + if err != nil { + return false, xerrors.Errorf("computing tree r and c: %w", err) + } + + // todo synth porep + + // todo porep challenge check + + n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3 + WHERE sp_id = $1 AND sector_number = $2`, + sectorParams.SpID, sectorParams.SectorNumber, sealed) + if err != nil { + return false, xerrors.Errorf("store sdr-trees success: updating pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("store sdr-trees success: updated %d rows", n) + } + + return true, nil +} + +func (t *TreeRCTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + var tasks []struct { + TaskID harmonytask.TaskID `db:"task_id_tree_c"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + StorageID string `db:"storage_id"` + } + + if storiface.FTCache != 4 { + panic("storiface.FTCache != 4") + } + + ctx := context.Background() + + indIDs := make([]int64, len(ids)) + for i, id := range ids { + indIDs[i] = int64(id) + } + + err := t.db.Select(ctx, &tasks, ` + SELECT p.task_id_tree_c, p.sp_id, p.sector_number, l.storage_id FROM sectors_sdr_pipeline p + INNER JOIN sector_location l ON p.sp_id = l.miner_id AND p.sector_number = l.sector_num + WHERE task_id_tree_r = ANY ($1) AND l.sector_filetype = 4 +`, indIDs) + if err != nil { + return nil, xerrors.Errorf("getting tasks: %w", err) + } + + ls, err := t.sc.LocalStorage(ctx) + if err != nil { + return nil, xerrors.Errorf("getting local storage: %w", err) + } + + acceptables := map[harmonytask.TaskID]bool{} + + for _, t := range ids { + acceptables[t] = true + } + + for _, t := range tasks { + if _, ok := acceptables[t.TaskID]; !ok { + continue + } + + for _, l := range ls { + if string(l.ID) == t.StorageID { + return &t.TaskID, nil + } + } + } + + return nil, nil +} + +func (t *TreeRCTask) TypeDetails() harmonytask.TaskTypeDetails { + ssize := abi.SectorSize(32 << 30) // todo task details needs taskID to get correct sector size + if isDevnet { + ssize = abi.SectorSize(2 << 20) + } + + return harmonytask.TaskTypeDetails{ + Max: t.max, + Name: "TreeRC", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 1, + Ram: 8 << 30, + Storage: t.sc.Storage(t.taskToSector, storiface.FTSealed, storiface.FTCache, ssize, storiface.PathSealing, paths.MinFreeStoragePercentage), + }, + MaxFailures: 3, + Follows: nil, + } +} + +func (t *TreeRCTask) Adder(taskFunc harmonytask.AddTaskFunc) { + t.sp.pollers[pollerTreeRC].Set(taskFunc) +} + +func (t *TreeRCTask) taskToSector(id harmonytask.TaskID) (ffi.SectorRef, error) { + var refs []ffi.SectorRef + + err := t.db.Select(context.Background(), &refs, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_tree_r = $1`, id) + if err != nil { + return ffi.SectorRef{}, xerrors.Errorf("getting sector ref: %w", err) + } + + if len(refs) != 1 { + return ffi.SectorRef{}, xerrors.Errorf("expected 1 sector ref, got %d", len(refs)) + } + + return refs[0], nil +} + +var _ harmonytask.TaskInterface = &TreeRCTask{} diff --git a/curiosrc/web/hapi/simpleinfo.go b/curiosrc/web/hapi/simpleinfo.go index 287e11233..11877db15 100644 --- a/curiosrc/web/hapi/simpleinfo.go +++ b/curiosrc/web/hapi/simpleinfo.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/storage/paths" @@ -310,6 +311,96 @@ func (a *app) sectorInfo(w http.ResponseWriter, r *http.Request) { } + // Pieces + type sectorPieceMeta struct { + PieceIndex int64 `db:"piece_index"` + PieceCid string `db:"piece_cid"` + PieceSize int64 `db:"piece_size"` + + DataUrl string `db:"data_url"` + DataRawSize int64 `db:"data_raw_size"` + DeleteOnFinalize bool `db:"data_delete_on_finalize"` + + F05PublishCid *string `db:"f05_publish_cid"` + F05DealID *int64 `db:"f05_deal_id"` + + DDOPam *string `db:"direct_piece_activation_manifest"` + + // display + StrPieceSize string `db:"-"` + StrDataRawSize string `db:"-"` + + // piece park + IsParkedPiece bool `db:"-"` + IsParkedPieceFound bool `db:"-"` + PieceParkID int64 `db:"-"` + PieceParkDataUrl string `db:"-"` + PieceParkCreatedAt time.Time `db:"-"` + PieceParkComplete bool `db:"-"` + PieceParkTaskID *int64 `db:"-"` + PieceParkCleanupTaskID *int64 `db:"-"` + } + var pieces []sectorPieceMeta + + err = a.db.Select(ctx, &pieces, `SELECT piece_index, piece_cid, piece_size, + data_url, data_raw_size, data_delete_on_finalize, + f05_publish_cid, f05_deal_id, direct_piece_activation_manifest FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, spid, intid) + if err != nil { + http.Error(w, xerrors.Errorf("failed to fetch sector pieces: %w", err).Error(), http.StatusInternalServerError) + return + } + + for i := range pieces { + pieces[i].StrPieceSize = types.SizeStr(types.NewInt(uint64(pieces[i].PieceSize))) + pieces[i].StrDataRawSize = types.SizeStr(types.NewInt(uint64(pieces[i].DataRawSize))) + + id, isPiecePark := strings.CutPrefix(pieces[i].DataUrl, "pieceref:") + if !isPiecePark { + continue + } + + intID, err := strconv.ParseInt(id, 10, 64) + if err != nil { + log.Errorw("failed to parse piece park id", "id", id, "error", err) + continue + } + + var parkedPiece []struct { + // parked_piece_refs + PieceID int64 `db:"piece_id"` + DataUrl string `db:"data_url"` + + // parked_pieces + CreatedAt time.Time `db:"created_at"` + Complete bool `db:"complete"` + ParkTaskID *int64 `db:"task_id"` + CleanupTaskID *int64 `db:"cleanup_task_id"` + } + + err = a.db.Select(ctx, &parkedPiece, `SELECT ppr.piece_id, ppr.data_url, pp.created_at, pp.complete, pp.task_id, pp.cleanup_task_id FROM parked_piece_refs ppr + INNER JOIN parked_pieces pp ON pp.id = ppr.piece_id + WHERE ppr.ref_id = $1`, intID) + if err != nil { + http.Error(w, xerrors.Errorf("failed to fetch parked piece: %w", err).Error(), http.StatusInternalServerError) + return + } + + if len(parkedPiece) == 0 { + pieces[i].IsParkedPieceFound = false + continue + } + + pieces[i].IsParkedPieceFound = true + pieces[i].IsParkedPiece = true + + pieces[i].PieceParkID = parkedPiece[0].PieceID + pieces[i].PieceParkDataUrl = parkedPiece[0].DataUrl + pieces[i].PieceParkCreatedAt = parkedPiece[0].CreatedAt.Local() + pieces[i].PieceParkComplete = parkedPiece[0].Complete + pieces[i].PieceParkTaskID = parkedPiece[0].ParkTaskID + pieces[i].PieceParkCleanupTaskID = parkedPiece[0].CleanupTaskID + } + // TaskIDs taskIDs := map[int64]struct{}{} var htasks []taskSummary @@ -362,6 +453,7 @@ func (a *app) sectorInfo(w http.ResponseWriter, r *http.Request) { SectorNumber int64 PipelinePoRep sectorListEntry + Pieces []sectorPieceMeta Locations []locationTable Tasks []taskSummary }{ @@ -377,6 +469,7 @@ func (a *app) sectorInfo(w http.ResponseWriter, r *http.Request) { ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))), }, + Pieces: pieces, Locations: locs, Tasks: htasks, } diff --git a/curiosrc/web/hapi/web/sector_info.gohtml b/curiosrc/web/hapi/web/sector_info.gohtml index afa96a923..49c6fabb3 100644 --- a/curiosrc/web/hapi/web/sector_info.gohtml +++ b/curiosrc/web/hapi/web/sector_info.gohtml @@ -4,6 +4,54 @@
Piece Index | +Piece CID | +Piece Size | +Data URL | +Data Raw Size | +Delete On Finalize | +F05 Publish CID | +F05 Deal ID | +Direct Piece Activation Manifest | + +PiecePark ID | +PP URL | +PP Created At | +PP Complete | +PP Cleanup Task | +|||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
{{.PieceIndex}} | +{{.PieceCid}} | +{{.PieceSize}} | +{{.DataUrl}} | +{{.DataRawSize}} | +{{.DeleteOnFinalize}} | +{{.F05PublishCid}} | +{{.F05DealID}} | +{{.DDOPam}} | + {{if .IsParkedPiece}} +{{.PieceParkID}} | +{{.PieceParkDataUrl}} | +{{.PieceParkCreatedAt}} | +{{.PieceParkComplete}} | +{{.PieceParkCleanupTaskID}} | + {{else}} +{{if not .IsParkedPieceFound}}ERR:RefNotFound{{end}} | ++ | + | + | + {{end}} + |