From 817130b830fadb30986e87895a0844c7e0025f86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 6 Apr 2024 09:22:08 +0200 Subject: [PATCH] feat: curioweb: Sector info page --- .github/workflows/build.yml | 2 +- .github/workflows/check.yml | 2 +- .github/workflows/docker.yml | 4 + .github/workflows/release.yml | 4 + .github/workflows/test.yml | 2 +- chain/stmgr/forks_test.go | 14 - cli/spcli/sectors.go | 22 -- cmd/lotus-miner/sectors.go | 9 - curiosrc/web/hapi/robust_rpc.go | 9 + curiosrc/web/hapi/routes.go | 3 + curiosrc/web/hapi/simpleinfo.go | 253 ++++++++++++++++++ .../web/hapi/simpleinfo_pipeline_porep.go | 190 +++++++------ .../hapi/web/pipeline_porep_sectors.gohtml | 246 ++++++++--------- curiosrc/web/hapi/web/sector_info.gohtml | 57 ++++ curiosrc/web/static/main.css | 50 ++++ curiosrc/web/static/pipeline_porep.html | 51 ---- lib/harmony/harmonytask/task_type_handler.go | 13 - storage/pipeline/commit_batch.go | 8 +- storage/pipeline/precommit_batch.go | 2 +- storage/pipeline/states_replica_update.go | 8 +- storage/pipeline/states_sealing.go | 45 ++-- storage/pipeline/utils.go | 14 +- 22 files changed, 629 insertions(+), 379 deletions(-) create mode 100644 curiosrc/web/hapi/web/sector_info.gohtml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d7dd59e143e..ce46a4a5f73 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -14,7 +14,7 @@ defaults: concurrency: group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: ${{ github.event_name == 'pull_request' }} + cancel-in-progress: true permissions: {} diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 8d19589f728..7a77dc70bf4 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -14,7 +14,7 @@ defaults: concurrency: group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: ${{ github.event_name == 'pull_request' }} + cancel-in-progress: true permissions: {} diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 9e6bf59f1ad..8e0ca57a554 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -15,6 +15,10 @@ defaults: run: shell: bash +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + permissions: {} jobs: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index dd8a9cfbc4f..35e139b7d17 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,6 +13,10 @@ defaults: run: shell: bash +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + permissions: {} jobs: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 647806e0faa..ad274dfa5c5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,7 +14,7 @@ defaults: concurrency: group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: ${{ github.event_name == 'pull_request' }} + cancel-in-progress: true permissions: {} diff --git a/chain/stmgr/forks_test.go b/chain/stmgr/forks_test.go index 8c022755371..bf8793488b6 100644 --- a/chain/stmgr/forks_test.go +++ b/chain/stmgr/forks_test.go @@ -375,20 +375,6 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) { } func TestForkPreMigration(t *testing.T) { - // Backup the original value of the DISABLE_PRE_MIGRATIONS environment variable - originalValue, _ := os.LookupEnv("LOTUS_DISABLE_PRE_MIGRATIONS") - - // Unset the DISABLE_PRE_MIGRATIONS environment variable for the test - if err := os.Unsetenv("LOTUS_DISABLE_PRE_MIGRATIONS"); err != nil { - t.Fatalf("failed to unset LOTUS_DISABLE_PRE_MIGRATIONS: %v", err) - } - - // Restore the original DISABLE_PRE_MIGRATIONS environment variable at the end of the test - defer func() { - if err := os.Setenv("LOTUS_DISABLE_PRE_MIGRATIONS", originalValue); err != nil { - t.Fatalf("failed to restore LOTUS_DISABLE_PRE_MIGRATIONS: %v", err) - } - }() //stm: @CHAIN_GEN_NEXT_TIPSET_001, //stm: @CHAIN_STATE_RESOLVE_TO_KEY_ADDR_001, @CHAIN_STATE_SET_VM_CONSTRUCTOR_001 logging.SetAllLoggers(logging.LevelInfo) diff --git a/cli/spcli/sectors.go b/cli/spcli/sectors.go index 1b230ce04ee..5b8bf41390b 100644 --- a/cli/spcli/sectors.go +++ b/cli/spcli/sectors.go @@ -11,7 +11,6 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/fatih/color" @@ -110,27 +109,6 @@ func SectorsStatusCmd(getActorAddress ActorAddressGetter, getOnDiskInfo OnDiskIn fmt.Printf("OnTime:\t\t%v\n", status.OnTime) fmt.Printf("Early:\t\t%v\n", status.Early) - var pamsHeaderOnce sync.Once - - for pi, piece := range status.Pieces { - if piece.DealInfo == nil { - continue - } - if piece.DealInfo.PieceActivationManifest == nil { - continue - } - pamsHeaderOnce.Do(func() { - fmt.Printf("\nPiece Activation Manifests\n") - }) - - pam := piece.DealInfo.PieceActivationManifest - - fmt.Printf("Piece %d: %s %s verif-alloc:%+v\n", pi, pam.CID, types.SizeStr(types.NewInt(uint64(pam.Size))), pam.VerifiedAllocationKey) - for ni, notification := range pam.Notify { - fmt.Printf("\tNotify %d: %s (%x)\n", ni, notification.Address, notification.Payload) - } - } - } else { onChainInfo = true } diff --git a/cmd/lotus-miner/sectors.go b/cmd/lotus-miner/sectors.go index d0c13d33331..a3ffb833594 100644 --- a/cmd/lotus-miner/sectors.go +++ b/cmd/lotus-miner/sectors.go @@ -310,13 +310,6 @@ var sectorsListCmd = &cli.Command{ } } - var pams int - for _, p := range st.Pieces { - if p.DealInfo != nil && p.DealInfo.PieceActivationManifest != nil { - pams++ - } - } - exp := st.Expiration if st.OnTime > 0 && st.OnTime < exp { exp = st.OnTime // Can be different when the sector was CC upgraded @@ -331,8 +324,6 @@ var sectorsListCmd = &cli.Command{ if deals > 0 { m["Deals"] = color.GreenString("%d", deals) - } else if pams > 0 { - m["Deals"] = color.MagentaString("DDO:%d", pams) } else { m["Deals"] = color.BlueString("CC") if st.ToUpgrade { diff --git a/curiosrc/web/hapi/robust_rpc.go b/curiosrc/web/hapi/robust_rpc.go index e9914bfa67f..c10b43a03f3 100644 --- a/curiosrc/web/hapi/robust_rpc.go +++ b/curiosrc/web/hapi/robust_rpc.go @@ -4,10 +4,18 @@ import ( "context" "time" + lru "github.com/hashicorp/golang-lru/v2" + blocks "github.com/ipfs/go-block-format" + "github.com/filecoin-project/lotus/api/client" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/store" cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/lib/must" ) +var ChainBlockCache = must.One(lru.New[blockstore.MhString, blocks.Block](4096)) + func (a *app) watchRpc() { ticker := time.NewTicker(watchInterval) for { @@ -84,6 +92,7 @@ func (a *app) updateRpc(ctx context.Context) error { }() a.workingApi = v1api + a.stor = store.ActorStore(ctx, blockstore.NewReadCachedBlockstore(blockstore.NewAPIBlockstore(a.workingApi), ChainBlockCache)) } } diff --git a/curiosrc/web/hapi/routes.go b/curiosrc/web/hapi/routes.go index 430b02efb94..61724ec0ae5 100644 --- a/curiosrc/web/hapi/routes.go +++ b/curiosrc/web/hapi/routes.go @@ -41,6 +41,9 @@ func Routes(r *mux.Router, deps *deps.Deps) error { // node info page r.HandleFunc("/node/{id}", a.nodeInfo) + + // sector info page + r.HandleFunc("/sector/{sp}/{id}", a.sectorInfo) return nil } diff --git a/curiosrc/web/hapi/simpleinfo.go b/curiosrc/web/hapi/simpleinfo.go index 5c353372e8e..08218476bb1 100644 --- a/curiosrc/web/hapi/simpleinfo.go +++ b/curiosrc/web/hapi/simpleinfo.go @@ -8,6 +8,7 @@ import ( "os" "sort" "strconv" + "strings" "sync" "text/template" "time" @@ -15,8 +16,13 @@ import ( "github.com/gorilla/mux" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/must" + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) type app struct { @@ -25,6 +31,7 @@ type app struct { rpcInfoLk sync.Mutex workingApi v1api.FullNode + stor adt.Store actorInfoLk sync.Mutex actorInfos []actorInfo @@ -128,6 +135,252 @@ func (a *app) nodeInfo(writer http.ResponseWriter, request *http.Request) { a.executePageTemplate(writer, "node_info", "Node Info", mi) } +func (a *app) sectorInfo(w http.ResponseWriter, r *http.Request) { + params := mux.Vars(r) + + id, ok := params["id"] + if !ok { + http.Error(w, "missing id", http.StatusBadRequest) + return + } + + intid, err := strconv.ParseInt(id, 10, 64) + if err != nil { + http.Error(w, "invalid id", http.StatusBadRequest) + return + } + + sp, ok := params["sp"] + if !ok { + http.Error(w, "missing sp", http.StatusBadRequest) + return + } + + maddr, err := address.NewFromString(sp) + if err != nil { + http.Error(w, "invalid sp", http.StatusBadRequest) + return + } + + spid, err := address.IDFromAddress(maddr) + if err != nil { + http.Error(w, "invalid sp", http.StatusBadRequest) + return + } + + ctx := r.Context() + var tasks []PipelineTask + + err = a.db.Select(ctx, &tasks, `SELECT + sp_id, sector_number, + create_time, + task_id_sdr, after_sdr, + task_id_tree_d, after_tree_d, + task_id_tree_c, after_tree_c, + task_id_tree_r, after_tree_r, + task_id_precommit_msg, after_precommit_msg, + after_precommit_msg_success, seed_epoch, + task_id_porep, porep_proof, after_porep, + task_id_finalize, after_finalize, + task_id_move_storage, after_move_storage, + task_id_commit_msg, after_commit_msg, + after_commit_msg_success, + failed, failed_reason + FROM sectors_sdr_pipeline WHERE sp_id = $1 AND sector_number = $2`, spid, intid) + if err != nil { + http.Error(w, xerrors.Errorf("failed to fetch pipeline task info: %w", err).Error(), http.StatusInternalServerError) + return + } + + if len(tasks) == 0 { + http.Error(w, "sector not found", http.StatusInternalServerError) + return + } + + head, err := a.workingApi.ChainHead(ctx) + if err != nil { + http.Error(w, xerrors.Errorf("failed to fetch chain head: %w", err).Error(), http.StatusInternalServerError) + return + } + epoch := head.Height() + + mbf, err := a.getMinerBitfields(ctx, maddr, a.stor) + if err != nil { + http.Error(w, xerrors.Errorf("failed to load miner bitfields: %w", err).Error(), http.StatusInternalServerError) + return + } + + task := tasks[0] + + afterSeed := task.SeedEpoch != nil && *task.SeedEpoch <= int64(epoch) + + var sectorLocations []struct { + CanSeal, CanStore bool + FileType storiface.SectorFileType `db:"sector_filetype"` + StorageID string `db:"storage_id"` + Urls string `db:"urls"` + } + + err = a.db.Select(ctx, §orLocations, `SELECT p.can_seal, p.can_store, l.sector_filetype, l.storage_id, p.urls FROM sector_location l + JOIN storage_path p ON l.storage_id = p.storage_id + WHERE l.sector_num = $1 and l.miner_id = $2 ORDER BY p.can_seal, p.can_store, l.storage_id`, intid, spid) + + type fileLocations struct { + StorageID string + Urls []string + } + + type locationTable struct { + PathType *string + PathTypeRowSpan int + + FileType *string + FileTypeRowSpan int + + Locations []fileLocations + } + locs := []locationTable{} + + for i, loc := range sectorLocations { + loc := loc + + urlList := strings.Split(loc.Urls, ",") // Assuming URLs are comma-separated + + fLoc := fileLocations{ + StorageID: loc.StorageID, + Urls: urlList, + } + + var pathTypeStr *string + var fileTypeStr *string + pathTypeRowSpan := 1 + fileTypeRowSpan := 1 + + pathType := "None" + if loc.CanSeal && loc.CanStore { + pathType = "Seal/Store" + } else if loc.CanSeal { + pathType = "Seal" + } else if loc.CanStore { + pathType = "Store" + } + pathTypeStr = &pathType + + fileType := loc.FileType.String() + fileTypeStr = &fileType + + if i > 0 { + prevNonNilPathTypeLoc := i - 1 + for prevNonNilPathTypeLoc > 0 && locs[prevNonNilPathTypeLoc].PathType == nil { + prevNonNilPathTypeLoc-- + } + if *locs[prevNonNilPathTypeLoc].PathType == *pathTypeStr { + pathTypeRowSpan = 0 + pathTypeStr = nil + locs[prevNonNilPathTypeLoc].PathTypeRowSpan++ + // only if we have extended path type we may need to extend file type + + prevNonNilFileTypeLoc := i - 1 + for prevNonNilFileTypeLoc > 0 && locs[prevNonNilFileTypeLoc].FileType == nil { + prevNonNilFileTypeLoc-- + } + if *locs[prevNonNilFileTypeLoc].FileType == *fileTypeStr { + fileTypeRowSpan = 0 + fileTypeStr = nil + locs[prevNonNilFileTypeLoc].FileTypeRowSpan++ + } + } + } + + locTable := locationTable{ + PathType: pathTypeStr, + PathTypeRowSpan: pathTypeRowSpan, + FileType: fileTypeStr, + FileTypeRowSpan: fileTypeRowSpan, + Locations: []fileLocations{fLoc}, + } + + locs = append(locs, locTable) + + } + + // TaskIDs + taskIDs := map[int64]struct{}{} + var htasks []taskSummary + { + // get non-nil task IDs + appendNonNil := func(id *int64) { + if id != nil { + taskIDs[*id] = struct{}{} + } + } + appendNonNil(task.TaskSDR) + appendNonNil(task.TaskTreeD) + appendNonNil(task.TaskTreeC) + appendNonNil(task.TaskTreeR) + appendNonNil(task.TaskPrecommitMsg) + appendNonNil(task.TaskPoRep) + appendNonNil(task.TaskFinalize) + appendNonNil(task.TaskMoveStorage) + appendNonNil(task.TaskCommitMsg) + + if len(taskIDs) > 0 { + ids := make([]int64, 0, len(taskIDs)) + for id := range taskIDs { + ids = append(ids, id) + } + + var dbtasks []struct { + OwnerID *string `db:"owner_id"` + HostAndPort *string `db:"host_and_port"` + TaskID int64 `db:"id"` + Name string `db:"name"` + UpdateTime time.Time `db:"update_time"` + } + err = a.db.Select(ctx, &dbtasks, `SELECT t.owner_id, hm.host_and_port, t.id, t.name, t.update_time FROM harmony_task t LEFT JOIN curio.harmony_machines hm ON hm.id = t.owner_id WHERE t.id = ANY($1)`, ids) + if err != nil { + http.Error(w, fmt.Sprintf("failed to fetch task names: %v", err), http.StatusInternalServerError) + return + } + + for _, tn := range dbtasks { + htasks = append(htasks, taskSummary{ + Name: tn.Name, + SincePosted: time.Since(tn.UpdateTime.Local()).Round(time.Second).String(), + Owner: tn.HostAndPort, + OwnerID: tn.OwnerID, + ID: tn.TaskID, + }) + } + } + } + + mi := struct { + SectorNumber int64 + PipelinePoRep sectorListEntry + + Locations []locationTable + Tasks []taskSummary + }{ + SectorNumber: intid, + PipelinePoRep: sectorListEntry{ + PipelineTask: tasks[0], + AfterSeed: afterSeed, + + ChainAlloc: must.One(mbf.alloc.IsSet(uint64(task.SectorNumber))), + ChainSector: must.One(mbf.sectorSet.IsSet(uint64(task.SectorNumber))), + ChainActive: must.One(mbf.active.IsSet(uint64(task.SectorNumber))), + ChainUnproven: must.One(mbf.unproven.IsSet(uint64(task.SectorNumber))), + ChainFaulty: must.One(mbf.faulty.IsSet(uint64(task.SectorNumber))), + }, + + Locations: locs, + Tasks: htasks, + } + + a.executePageTemplate(w, "sector_info", "Sector Info", mi) +} + var templateDev = os.Getenv("LOTUS_WEB_DEV") == "1" func (a *app) executeTemplate(w http.ResponseWriter, name string, data interface{}) { diff --git a/curiosrc/web/hapi/simpleinfo_pipeline_porep.go b/curiosrc/web/hapi/simpleinfo_pipeline_porep.go index 236f87cc253..a37cd14ab28 100644 --- a/curiosrc/web/hapi/simpleinfo_pipeline_porep.go +++ b/curiosrc/web/hapi/simpleinfo_pipeline_porep.go @@ -1,70 +1,80 @@ package hapi import ( + "context" "net/http" "time" - lru "github.com/hashicorp/golang-lru/v2" - blocks "github.com/ipfs/go-block-format" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" - "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/must" ) -var ChainBlockCache = must.One(lru.New[blockstore.MhString, blocks.Block](4096)) +type PipelineTask struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` -func (a *app) pipelinePorepSectors(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() + CreateTime time.Time `db:"create_time"` - type PipelineTask struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` + TaskSDR *int64 `db:"task_id_sdr"` + AfterSDR bool `db:"after_sdr"` - CreateTime time.Time `db:"create_time"` + TaskTreeD *int64 `db:"task_id_tree_d"` + AfterTreeD bool `db:"after_tree_d"` - TaskSDR *int64 `db:"task_id_sdr"` - AfterSDR bool `db:"after_sdr"` + TaskTreeC *int64 `db:"task_id_tree_c"` + AfterTreeC bool `db:"after_tree_c"` - TaskTreeD *int64 `db:"task_id_tree_d"` - AfterTreeD bool `db:"after_tree_d"` + TaskTreeR *int64 `db:"task_id_tree_r"` + AfterTreeR bool `db:"after_tree_r"` - TaskTreeC *int64 `db:"task_id_tree_c"` - AfterTreeC bool `db:"after_tree_c"` + TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` + AfterPrecommitMsg bool `db:"after_precommit_msg"` - TaskTreeR *int64 `db:"task_id_tree_r"` - AfterTreeR bool `db:"after_tree_r"` + AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` + SeedEpoch *int64 `db:"seed_epoch"` - TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` - AfterPrecommitMsg bool `db:"after_precommit_msg"` + TaskPoRep *int64 `db:"task_id_porep"` + PoRepProof []byte `db:"porep_proof"` + AfterPoRep bool `db:"after_porep"` - AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` - SeedEpoch *int64 `db:"seed_epoch"` + TaskFinalize *int64 `db:"task_id_finalize"` + AfterFinalize bool `db:"after_finalize"` - TaskPoRep *int64 `db:"task_id_porep"` - PoRepProof []byte `db:"porep_proof"` - AfterPoRep bool `db:"after_porep"` + TaskMoveStorage *int64 `db:"task_id_move_storage"` + AfterMoveStorage bool `db:"after_move_storage"` - TaskFinalize *int64 `db:"task_id_finalize"` - AfterFinalize bool `db:"after_finalize"` + TaskCommitMsg *int64 `db:"task_id_commit_msg"` + AfterCommitMsg bool `db:"after_commit_msg"` - TaskMoveStorage *int64 `db:"task_id_move_storage"` - AfterMoveStorage bool `db:"after_move_storage"` + AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` - TaskCommitMsg *int64 `db:"task_id_commit_msg"` - AfterCommitMsg bool `db:"after_commit_msg"` + Failed bool `db:"failed"` + FailedReason string `db:"failed_reason"` +} - AfterCommitMsgSuccess bool `db:"after_commit_msg_success"` +type sectorListEntry struct { + PipelineTask - Failed bool `db:"failed"` - FailedReason string `db:"failed_reason"` - } + Address address.Address + CreateTime string + AfterSeed bool + + ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool +} + +type minerBitfields struct { + alloc, sectorSet, active, unproven, faulty bitfield.BitField +} + +func (a *app) pipelinePorepSectors(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() var tasks []PipelineTask @@ -89,16 +99,6 @@ func (a *app) pipelinePorepSectors(w http.ResponseWriter, r *http.Request) { return } - type sectorListEntry struct { - PipelineTask - - Address address.Address - CreateTime string - AfterSeed bool - - ChainAlloc, ChainSector, ChainActive, ChainUnproven, ChainFaulty bool - } - head, err := a.workingApi.ChainHead(ctx) if err != nil { http.Error(w, xerrors.Errorf("failed to fetch chain head: %w", err).Error(), http.StatusInternalServerError) @@ -106,11 +106,6 @@ func (a *app) pipelinePorepSectors(w http.ResponseWriter, r *http.Request) { } epoch := head.Height() - stor := store.ActorStore(ctx, blockstore.NewReadCachedBlockstore(blockstore.NewAPIBlockstore(a.workingApi), ChainBlockCache)) - - type minerBitfields struct { - alloc, sectorSet, active, unproven, faulty bitfield.BitField - } minerBitfieldCache := map[address.Address]minerBitfields{} sectorList := make([]sectorListEntry, 0, len(tasks)) @@ -127,55 +122,11 @@ func (a *app) pipelinePorepSectors(w http.ResponseWriter, r *http.Request) { mbf, ok := minerBitfieldCache[addr] if !ok { - act, err := a.workingApi.StateGetActor(ctx, addr, types.EmptyTSK) - if err != nil { - http.Error(w, xerrors.Errorf("failed to load actor: %w", err).Error(), http.StatusInternalServerError) - return - } - - mas, err := miner.Load(stor, act) - if err != nil { - http.Error(w, xerrors.Errorf("failed to load miner actor: %w", err).Error(), http.StatusInternalServerError) - return - } - - activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors) - if err != nil { - http.Error(w, xerrors.Errorf("failed to load active sectors: %w", err).Error(), http.StatusInternalServerError) - return - } - - allSectors, err := miner.AllPartSectors(mas, miner.Partition.AllSectors) - if err != nil { - http.Error(w, xerrors.Errorf("failed to load all sectors: %w", err).Error(), http.StatusInternalServerError) - return - } - - unproved, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + mbf, err := a.getMinerBitfields(ctx, addr, a.stor) if err != nil { - http.Error(w, xerrors.Errorf("failed to load unproven sectors: %w", err).Error(), http.StatusInternalServerError) + http.Error(w, xerrors.Errorf("failed to load miner bitfields: %w", err).Error(), http.StatusInternalServerError) return } - - faulty, err := miner.AllPartSectors(mas, miner.Partition.FaultySectors) - if err != nil { - http.Error(w, xerrors.Errorf("failed to load faulty sectors: %w", err).Error(), http.StatusInternalServerError) - return - } - - alloc, err := mas.GetAllocatedSectors() - if err != nil { - http.Error(w, xerrors.Errorf("failed to load allocated sectors: %w", err).Error(), http.StatusInternalServerError) - return - } - - mbf = minerBitfields{ - alloc: *alloc, - sectorSet: allSectors, - active: activeSectors, - unproven: unproved, - faulty: faulty, - } minerBitfieldCache[addr] = mbf } @@ -197,3 +148,48 @@ func (a *app) pipelinePorepSectors(w http.ResponseWriter, r *http.Request) { a.executeTemplate(w, "pipeline_porep_sectors", sectorList) } + +func (a *app) getMinerBitfields(ctx context.Context, addr address.Address, stor adt.Store) (minerBitfields, error) { + act, err := a.workingApi.StateGetActor(ctx, addr, types.EmptyTSK) + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load actor: %w", err) + } + + mas, err := miner.Load(stor, act) + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load miner actor: %w", err) + } + + activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors) + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load active sectors: %w", err) + } + + allSectors, err := miner.AllPartSectors(mas, miner.Partition.AllSectors) + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load all sectors: %w", err) + } + + unproved, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load unproven sectors: %w", err) + } + + faulty, err := miner.AllPartSectors(mas, miner.Partition.FaultySectors) + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load faulty sectors: %w", err) + } + + alloc, err := mas.GetAllocatedSectors() + if err != nil { + return minerBitfields{}, xerrors.Errorf("failed to load allocated sectors: %w", err) + } + + return minerBitfields{ + alloc: *alloc, + sectorSet: allSectors, + active: activeSectors, + unproven: unproved, + faulty: faulty, + }, nil +} diff --git a/curiosrc/web/hapi/web/pipeline_porep_sectors.gohtml b/curiosrc/web/hapi/web/pipeline_porep_sectors.gohtml index f5069fe6d37..3949de8e254 100644 --- a/curiosrc/web/hapi/web/pipeline_porep_sectors.gohtml +++ b/curiosrc/web/hapi/web/pipeline_porep_sectors.gohtml @@ -1,132 +1,136 @@ +{{define "sector_porep_state"}} + + + + + + + + + + + + + + + + + + + + + + + + +
+
SDR
+
+ {{if .AfterSDR}}done{{else}} + {{if ne .TaskSDR nil}}T:{{.TaskSDR}}{{else}}--{{end}} + {{end}} +
+
+
TreeC
+
+ {{if .AfterTreeC}}done{{else}} + {{if ne .TaskTreeC nil}}T:{{.TaskTreeC}}{{else}}--{{end}} + {{end}} +
+
+
PComm Msg
+
+ {{if .AfterPrecommitMsg}}done{{else}} + {{if ne .TaskPrecommitMsg nil}}T:{{.TaskPrecommitMsg}}{{else}}--{{end}} + {{end}} +
+
+
PComm Wait
+
+ {{if .AfterPrecommitMsgSuccess}}done{{else}} + -- + {{end}} +
+
+
Wait Seed
+
+ {{if .AfterSeed}}done{{else}} + {{if ne .SeedEpoch nil}}@{{.SeedEpoch}}{{else}}--{{end}} + {{end}} +
+
+
PoRep
+
+ {{if .AfterPoRep}}done{{else}} + {{if ne .TaskPoRep nil}}T:{{.TaskPoRep}}{{else}}--{{end}} + {{end}} +
+
+
Clear Cache
+
+ {{if .AfterFinalize}}done{{else}} + {{if ne .TaskFinalize nil}}T:{{.TaskFinalize}}{{else}}--{{end}} + {{end}} +
+
+
Move Storage
+
+ {{if .AfterMoveStorage}}done{{else}} + {{if ne .TaskMoveStorage nil}}T:{{.TaskMoveStorage}}{{else}}--{{end}} + {{end}} +
+
+
On Chain
+
{{if .ChainSector}}yes{{else}}{{if .ChainAlloc}}allocated{{else}}no{{end}}{{end}}
+
+
TreeD
+
+ {{if .AfterTreeD}}done{{else}} + {{if ne .TaskTreeD nil}}T:{{.TaskTreeD}}{{else}}--{{end}} + {{end}} +
+
+
TreeR
+
+ {{if .AfterTreeR}}done{{else}} + {{if ne .TaskTreeR nil}}T:{{.TaskTreeR}}{{else}}--{{end}} + {{end}} +
+
+
Commit Msg
+
+ {{if .AfterCommitMsg}}done{{else}} + {{if ne .TaskCommitMsg nil}}T:{{.TaskCommitMsg}}{{else}}--{{end}} + {{end}} +
+
+
Commit Wait
+
+ {{if .AfterCommitMsgSuccess}}done{{else}} + -- + {{end}} +
+
+
Active
+
{{if .ChainActive}}yes{{else}} + {{if .ChainUnproven}}unproven{{else}} + {{if .ChainFaulty}}faulty{{else}}no{{end}} + {{end}} + {{end}} +
+
+{{end}} + {{define "pipeline_porep_sectors"}} {{range .}} {{.Address}} {{.CreateTime}} - - - - - - - - - - - - - - - - - - - - - - - - -
-
SDR
-
- {{if .AfterSDR}}done{{else}} - {{if ne .TaskSDR nil}}T:{{.TaskSDR}}{{else}}--{{end}} - {{end}} -
-
-
TreeC
-
- {{if .AfterTreeC}}done{{else}} - {{if ne .TaskTreeC nil}}T:{{.TaskTreeC}}{{else}}--{{end}} - {{end}} -
-
-
PComm Msg
-
- {{if .AfterPrecommitMsg}}done{{else}} - {{if ne .TaskPrecommitMsg nil}}T:{{.TaskPrecommitMsg}}{{else}}--{{end}} - {{end}} -
-
-
PComm Wait
-
- {{if .AfterPrecommitMsgSuccess}}done{{else}} - -- - {{end}} -
-
-
Wait Seed
-
- {{if .AfterSeed}}done{{else}} - {{if ne .SeedEpoch nil}}@{{.SeedEpoch}}{{else}}--{{end}} - {{end}} -
-
-
PoRep
-
- {{if .AfterPoRep}}done{{else}} - {{if ne .TaskPoRep nil}}T:{{.TaskPoRep}}{{else}}--{{end}} - {{end}} -
-
-
Clear Cache
-
- {{if .AfterFinalize}}done{{else}} - {{if ne .TaskFinalize nil}}T:{{.TaskFinalize}}{{else}}--{{end}} - {{end}} -
-
-
Move Storage
-
- {{if .AfterMoveStorage}}done{{else}} - {{if ne .TaskMoveStorage nil}}T:{{.TaskMoveStorage}}{{else}}--{{end}} - {{end}} -
-
-
On Chain
-
{{if .ChainSector}}yes{{else}}{{if .ChainAlloc}}allocated{{else}}no{{end}}{{end}}
-
-
TreeD
-
- {{if .AfterTreeD}}done{{else}} - {{if ne .TaskTreeD nil}}T:{{.TaskTreeD}}{{else}}--{{end}} - {{end}} -
-
-
TreeR
-
- {{if .AfterTreeR}}done{{else}} - {{if ne .TaskTreeR nil}}T:{{.TaskTreeR}}{{else}}--{{end}} - {{end}} -
-
-
Commit Msg
-
- {{if .AfterCommitMsg}}done{{else}} - {{if ne .TaskCommitMsg nil}}T:{{.TaskCommitMsg}}{{else}}--{{end}} - {{end}} -
-
-
Commit Wait
-
- {{if .AfterCommitMsgSuccess}}done{{else}} - -- - {{end}} -
-
-
Active
-
{{if .ChainActive}}yes{{else}} - {{if .ChainUnproven}}unproven{{else}} - {{if .ChainFaulty}}faulty{{else}}no{{end}} - {{end}} - {{end}} -
-
+ {{template "sector_porep_state" .}} - DETAILS + DETAILS diff --git a/curiosrc/web/hapi/web/sector_info.gohtml b/curiosrc/web/hapi/web/sector_info.gohtml new file mode 100644 index 00000000000..afa96a9234a --- /dev/null +++ b/curiosrc/web/hapi/web/sector_info.gohtml @@ -0,0 +1,57 @@ +{{define "sector_info"}} +

Sector {{.SectorNumber}}

+
+

PoRep Pipeline

+ {{template "sector_porep_state" .PipelinePoRep}} +
+
+

Storage

+ + + + + + + + {{range .Locations}} + + {{if .PathType}} + + {{end}} + {{if .FileType}} + + {{end}} + + + + {{range $i, $loc := .Locations}} + {{if gt $i 0}} + + + + + {{end}} + {{end}} + {{end}} +
Path TypeFile TypePath IDHost
{{.PathType}}{{.FileType}}{{(index .Locations 0).StorageID}}{{range (index .Locations 0).Urls}}

{{.}}

{{end}}
{{$loc.StorageID}}{{range $loc.Urls}}

{{.}}

{{end}}
+
+
+

Tasks

+ + + + + + + + {{range .Tasks}} + + + + + + + {{end}} +
Task TypeTask IDPostedWorker
{{.Name}}{{.ID}}{{.SincePosted}}{{if ne nil .OwnerID}}{{.Owner}}{{end}}
+
+{{end}} diff --git a/curiosrc/web/static/main.css b/curiosrc/web/static/main.css index d9f31442086..76b90d0b544 100644 --- a/curiosrc/web/static/main.css +++ b/curiosrc/web/static/main.css @@ -68,3 +68,53 @@ a:hover { color: deeppink; } } + +/* Path: curiosrc/web/hapi/web/pipeline_porep_sectors.gohtml */ +.porep-pipeline-table, .porep-state { + color: #d0d0d0; +} + +.porep-pipeline-table td, .porep-pipeline-table th { + border-left: none; + border-collapse: collapse; +} + +.porep-pipeline-table tr:nth-child(odd) { + border-top: 6px solid #999999; + +} + +.porep-pipeline-table tr:first-child, .porep-pipeline-table tr:first-child { + border-top: none; +} + +.porep-state { + border-collapse: collapse; +} + +.porep-state td, .porep-state th { + border-left: 1px solid #f0f0f0; + border-right: 1px solid #f0f0f0; + + padding: 1px 5px; + + text-align: center; + font-size: 0.7em; +} + +.porep-state tr { + border-top: 1px solid #f0f0f0; +} +.porep-state tr:first-child { + border-top: none; +} + +.pipeline-active { + background-color: #303060; +} +.pipeline-success { + background-color: #306030; +} +.pipeline-failed { + background-color: #603030; +} diff --git a/curiosrc/web/static/pipeline_porep.html b/curiosrc/web/static/pipeline_porep.html index 8696bdd26ea..d596163b7e3 100644 --- a/curiosrc/web/static/pipeline_porep.html +++ b/curiosrc/web/static/pipeline_porep.html @@ -5,57 +5,6 @@ -
diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 7aecd380f2c..2b252194942 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -85,7 +85,6 @@ top: } // 3. What does the impl say? -canAcceptAgain: tID, err := h.CanAccept(ids, h.TaskEngine) if err != nil { log.Error(err) @@ -101,18 +100,6 @@ canAcceptAgain: if h.TaskTypeDetails.Cost.Storage != nil { if err = h.TaskTypeDetails.Cost.Storage.Claim(int(*tID)); err != nil { log.Infow("did not accept task", "task_id", strconv.Itoa(int(*tID)), "reason", "storage claim failed", "name", h.Name, "error", err) - - if len(ids) > 1 { - var tryAgain = make([]TaskID, 0, len(ids)-1) - for _, id := range ids { - if id != *tID { - tryAgain = append(tryAgain, id) - } - } - ids = tryAgain - goto canAcceptAgain - } - return false } releaseStorage = func() { diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index e7d89076f49..ecb8569d0e1 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -449,9 +449,9 @@ func (b *CommitBatcher) processBatchV2(cfg sealiface.Config, sectors []abi.Secto _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSectors3, needFunds, maxFee, enc.Bytes()) if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { - log.Errorf("simulating CommitBatch %s", err) + log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err) res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) } msgTooLarge := len(enc.Bytes()) > (messagepool.MaxMessageSize - 128) @@ -590,9 +590,9 @@ func (b *CommitBatcher) processBatchV1(cfg sealiface.Config, sectors []abi.Secto _, err = simulateMsgGas(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes()) if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(sectors) < miner.MinAggregatedSectors*2) { - log.Errorf("simulating CommitBatch %s", err) + log.Errorf("simulating CommitBatch message failed (%x): %s", enc.Bytes(), err) res.Error = err.Error() - return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch %w", err) + return []sealiface.CommitBatchRes{res}, xerrors.Errorf("simulating CommitBatch message failed: %w", err) } // If we're out of gas, split the batch in half and evaluate again diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 55bead59037..099988010db 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -291,7 +291,7 @@ func (b *PreCommitBatcher) processPreCommitBatch(cfg sealiface.Config, bf abi.To if err != nil && (!api.ErrorIsIn(err, []error{&api.ErrOutOfGas{}}) || len(entries) == 1) { res.Error = err.Error() - return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("simulating PreCommitBatch %w", err) + return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("simulating PreCommitBatch message failed: %w", err) } // If we're out of gas, split the batch in half and evaluate again diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index a0d92891cd2..380078e75c0 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -11,7 +11,6 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" @@ -171,12 +170,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec // figure out message type - nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) - if err != nil { - log.Errorf("failed to get network version: %+v", err) - } - - pams, deals, err := m.processPieces(ctx.Context(), sector, nv >= network.Version22) + pams, deals, err := m.processPieces(ctx.Context(), sector) if err != nil { log.Errorf("failed to process pieces: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 81ee85853c0..4f40ac7c7d2 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -748,33 +748,30 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo // processPieces returns either: // - a list of piece activation manifests // - a list of deal IDs, if all non-filler pieces are deal-id pieces -func (m *Sealing) processPieces(ctx context.Context, sector SectorInfo, forceDDO bool) ([]miner.PieceActivationManifest, []abi.DealID, error) { +func (m *Sealing) processPieces(ctx context.Context, sector SectorInfo) ([]miner.PieceActivationManifest, []abi.DealID, error) { pams := make([]miner.PieceActivationManifest, 0, len(sector.Pieces)) dealIDs := make([]abi.DealID, 0, len(sector.Pieces)) - hasDDO := forceDDO + var hasDDO bool - if !forceDDO { - // if not forcing DDO, check if we have any DDO pieces - for _, piece := range sector.Pieces { - piece := piece + for _, piece := range sector.Pieces { + piece := piece - // first figure out if this is a ddo sector - err := piece.handleDealInfo(handleDealInfoParams{ - FillerHandler: func(info UniversalPieceInfo) error { - // Fillers are implicit (todo review: Are they??) - return nil - }, - BuiltinMarketHandler: func(info UniversalPieceInfo) error { - return nil - }, - DDOHandler: func(info UniversalPieceInfo) error { - hasDDO = true - return nil - }, - }) - if err != nil { - return nil, nil, xerrors.Errorf("handleDealInfo: %w", err) - } + // first figure out if this is a ddo sector + err := piece.handleDealInfo(handleDealInfoParams{ + FillerHandler: func(info UniversalPieceInfo) error { + // Fillers are implicit (todo review: Are they??) + return nil + }, + BuiltinMarketHandler: func(info UniversalPieceInfo) error { + return nil + }, + DDOHandler: func(info UniversalPieceInfo) error { + hasDDO = true + return nil + }, + }) + if err != nil { + return nil, nil, xerrors.Errorf("handleDealInfo: %w", err) } } for _, piece := range sector.Pieces { @@ -850,7 +847,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) } - pams, dealIDs, err := m.processPieces(ctx.Context(), sector, false) + pams, dealIDs, err := m.processPieces(ctx.Context(), sector) if err != nil { return err } diff --git a/storage/pipeline/utils.go b/storage/pipeline/utils.go index ac519b6acef..4b99a5beadd 100644 --- a/storage/pipeline/utils.go +++ b/storage/pipeline/utils.go @@ -1,9 +1,7 @@ package sealing import ( - "bytes" "context" - "fmt" "math/bits" "github.com/ipfs/go-cid" @@ -107,17 +105,7 @@ func simulateMsgGas(ctx context.Context, sa interface { Params: params, } - var b bytes.Buffer - err := msg.MarshalCBOR(&b) - if err != nil { - return nil, xerrors.Errorf("failed to unmarshal the signed message: %w", err) - } - - gmsg, err := sa.GasEstimateMessageGas(ctx, &msg, nil, types.EmptyTSK) - if err != nil { - err = fmt.Errorf("message %x failed: %w", b.Bytes(), err) - } - return gmsg, err + return sa.GasEstimateMessageGas(ctx, &msg, nil, types.EmptyTSK) } func sendMsg(ctx context.Context, sa interface {