Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/dtynn/import from lotus sealer #327

Merged
merged 4 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ import (
stbuiltin "github.com/filecoin-project/go-state-types/builtin"
stminer "github.com/filecoin-project/go-state-types/builtin/v8/miner"

"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/core"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/modules/policy"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/modules/util"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/pkg/messager"

"github.com/filecoin-project/venus/app/submodule/chain"
"github.com/filecoin-project/venus/venus-shared/actors"
"github.com/filecoin-project/venus/venus-shared/actors/adt"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/miner"
specpolicy "github.com/filecoin-project/venus/venus-shared/actors/policy"
"github.com/filecoin-project/venus/venus-shared/types"

"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/core"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/modules/policy"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/modules/util"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/pkg/lotusminer"
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/pkg/messager"
)

var flagListOffline = &cli.BoolFlag{
Expand Down Expand Up @@ -67,6 +68,7 @@ var utilSealerSectorsCmd = &cli.Command{
utilSealerSectorsFindDealCmd,
utilSealerSectorsResendPreCommitCmd,
utilSealerSectorsResendProveCommitCmd,
utilSealerSectorsImportCommitCmd,
},
}

Expand Down Expand Up @@ -1640,3 +1642,226 @@ var utilSealerSectorsResendProveCommitCmd = &cli.Command{
return nil
},
}

var utilSealerSectorsImportCommitCmd = &cli.Command{
Name: "import",
Usage: "import sector infos from the given lotus-miner / venus-sealer instance",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "api",
Usage: "api address of the instance",
},
&cli.StringFlag{
Name: "token",
Usage: "api token of the instance",
},
&cli.BoolFlag{
Name: "override",
Usage: "override the previous sector state",
Value: false,
},
&cli.Uint64Flag{
Name: "number",
Usage: "import the specified sector number only if this flag is set",
},
},
Action: func(cctx *cli.Context) error {
override := cctx.Bool("override")

cli, gctx, stop, err := extractAPI(cctx)
if err != nil {
return err
}

defer stop()

mcli, closer, err := lotusminer.New(gctx, cctx.String("api"), cctx.String("token"))
if err != nil {
return fmt.Errorf("construct lotus-miner client: %w", err)
}

defer closer()

maddr, err := mcli.ActorAddress(gctx)
if err != nil {
return RPCCallError("ActorAddress", err)
}

mid, err := address.IDFromAddress(maddr)
if err != nil {
return fmt.Errorf("extract actor id from %s: %w", maddr, err)
}

minerID := abi.ActorID(mid)

var numbers []abi.SectorNumber
if cctx.IsSet("number") {
numbers = []abi.SectorNumber{abi.SectorNumber(cctx.Uint64("number"))}
} else {
numbers, err = mcli.SectorsList(gctx)
if err != nil {
return RPCCallError("SectorsList", err)
}
}

for _, num := range numbers {
sinfo, err := mcli.SectorsStatus(gctx, num, true)
if err != nil {
return RPCCallError("SectorsStatus", fmt.Errorf("get info for %d: %w", num, err))
}

sid := abi.SectorID{
Miner: minerID,
Number: sinfo.SectorID,
}

slog := Log.With("sector", util.FormatSectorID(sid))
state, err := sectorInfo2SectorState(sid, &sinfo)
if err != nil {
slog.Warnf("check sector info: %s", err)
continue
}

imported, err := cli.Sealer.ImportSector(gctx, core.WorkerOffline, state, override)
if err != nil {
slog.Errorf("import failed: %s", err)
continue
}

if !imported {
slog.Warn("not imported")
continue
}

slog.Info("imported")
}

return nil
},
}

func sectorInfo2SectorState(sid abi.SectorID, sinfo *lotusminer.SectorInfo) (*core.SectorState, error) {
var upgraded core.SectorUpgraded
switch lotusminer.SectorState(sinfo.State) {
case lotusminer.FinalizeSector:

case lotusminer.FinalizeReplicaUpdate,
lotusminer.UpdateActivating,
lotusminer.ReleaseSectorKey:
upgraded = true

default:
return nil, fmt.Errorf("unexpected sector state %s", sinfo.State)
}

if sinfo.CommD == nil {
return nil, fmt.Errorf("no comm_d")
}

if sinfo.CommR == nil {
return nil, fmt.Errorf("no comm_r")
}

commR, err := util.CID2ReplicaCommitment(*sinfo.CommR)
if err != nil {
return nil, fmt.Errorf("convert comm_r cid to commitment: %w", err)
}

if len(sinfo.Proof) == 0 {
return nil, fmt.Errorf("no proof")
}

if sinfo.Ticket.Epoch == 0 || len(sinfo.Ticket.Value) == 0 {
return nil, fmt.Errorf("no ticket")
}

if sinfo.Seed.Epoch == 0 || len(sinfo.Seed.Value) == 0 {
return nil, fmt.Errorf("no seed")
}

ticket := core.Ticket{
Epoch: sinfo.Ticket.Epoch,
Ticket: abi.Randomness(sinfo.Ticket.Value),
}

// pieces
pieces := make(core.Deals, 0, len(sinfo.Pieces))
dealIDs := make([]abi.DealID, 0, len(sinfo.Pieces))
for pi := range sinfo.Pieces {
ipiece := sinfo.Pieces[pi]
spiece := core.DealInfo{}

if ipiece.DealInfo != nil {
if ipiece.DealInfo.DealID != 0 {
dealIDs = append(dealIDs, ipiece.DealInfo.DealID)
spiece.ID = ipiece.DealInfo.DealID
spiece.IsCompatible = true
}

spiece.Piece = core.PieceInfo{
Size: ipiece.Piece.Size,
Cid: ipiece.Piece.PieceCID,
}
spiece.Proposal = ipiece.DealInfo.DealProposal
}

pieces = append(pieces, spiece)
}

if len(dealIDs) == 0 {
pieces = pieces[:0]
}

state := &core.SectorState{
ID: sid,
SectorType: sinfo.SealProof,

Ticket: &ticket,
Seed: &core.Seed{
Epoch: sinfo.Seed.Epoch,
Seed: abi.Randomness(sinfo.Seed.Value),
},
Pieces: pieces,
Pre: &core.PreCommitInfo{
CommR: *sinfo.CommR,
CommD: *sinfo.CommD,
Ticket: ticket,
Deals: dealIDs,
},
Proof: &core.ProofInfo{
Proof: sinfo.Proof,
},

MessageInfo: core.MessageInfo{
PreCommitCid: sinfo.PreCommitMsg,
CommitCid: sinfo.CommitMsg,
},

Finalized: true,

Upgraded: upgraded,

Imported: true,
}

if upgraded {
state.UpgradePublic = &core.SectorUpgradePublic{
CommR: commR,
SealedCID: *sinfo.CommR,
Activation: sinfo.Activation,
Expiration: sinfo.Expiration,
}

state.UpgradedInfo = &core.SectorUpgradedInfo{}

if sinfo.ReplicaUpdateMessage != nil {
msgID := core.SectorUpgradeMessageID(sinfo.ReplicaUpdateMessage.String())
state.UpgradeMessageID = &msgID
}

var landedEpoch core.SectorUpgradeLandedEpoch
state.UpgradeLandedEpoch = &landedEpoch
}

return state, nil
}
2 changes: 2 additions & 0 deletions venus-sector-manager/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type SealerCliAPI interface {

FindSectorsWithDeal(ctx context.Context, state SectorWorkerState, dealID abi.DealID) ([]*SectorState, error)

ImportSector(ctx context.Context, ws SectorWorkerState, state *SectorState, override bool) (bool, error)

RestoreSector(ctx context.Context, sid abi.SectorID, forced bool) (Meta, error)

CheckProvable(ctx context.Context, mid abi.ActorID, sectors []builtin.ExtendedSectorInfo, strict bool) (map[abi.SectorNumber]string, error)
Expand Down
6 changes: 6 additions & 0 deletions venus-sector-manager/core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ var UnavailableSealerCliClient = SealerCliClient{
panic("sealer client unavailable")
},

ImportSector: func(ctx context.Context, ws SectorWorkerState, state *SectorState, override bool) (bool, error) {
panic("sealer client unavailable")
},

RestoreSector: func(ctx context.Context, sid abi.SectorID, forced bool) (Meta, error) {
panic("sealer client unavailable")
},
Expand Down Expand Up @@ -104,6 +108,8 @@ type SealerCliClient struct {

FindSectorsWithDeal func(ctx context.Context, state SectorWorkerState, dealID abi.DealID) ([]*SectorState, error)

ImportSector func(ctx context.Context, ws SectorWorkerState, state *SectorState, override bool) (bool, error)

RestoreSector func(ctx context.Context, sid abi.SectorID, forced bool) (Meta, error)

ReportFinalized func(context.Context, abi.SectorID) (Meta, error)
Expand Down
1 change: 1 addition & 0 deletions venus-sector-manager/core/ifaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SectorNumberAllocator interface {
}

type SectorStateManager interface {
Import(ctx context.Context, ws SectorWorkerState, state *SectorState, override bool) (bool, error)
Init(ctx context.Context, sid abi.SectorID, proofType abi.RegisteredSealProof, ws SectorWorkerState) error
InitWith(ctx context.Context, sid abi.SectorID, proofType abi.RegisteredSealProof, ws SectorWorkerState, fields ...interface{}) error
Load(ctx context.Context, sid abi.SectorID, ws SectorWorkerState) (*SectorState, error)
Expand Down
4 changes: 4 additions & 0 deletions venus-sector-manager/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type DealInfo struct {
PayloadSize uint64
Piece PieceInfo
Proposal *DealProposal

// this is the flag for pieces from original implmentations
// if true, workers should use the piece data directly, instead of padding themselves
IsCompatible bool
}

type Deals []DealInfo
Expand Down
4 changes: 4 additions & 0 deletions venus-sector-manager/core/types_sector_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func SectorWinningPoSt(proofType abi.RegisteredSealProof) (abi.RegisteredPoStPro
type SectorFinalized bool
type SectorUpgraded bool
type SectorRemoved bool
type SectorImported bool
type SectorUpgradeLandedEpoch abi.ChainEpoch
type SectorUpgradeMessageID string
type SectorUpgradePublic SectorPublicInfo
Expand Down Expand Up @@ -63,6 +64,9 @@ type SectorState struct {
UpgradedInfo *SectorUpgradedInfo
UpgradeMessageID *SectorUpgradeMessageID
UpgradeLandedEpoch *SectorUpgradeLandedEpoch

// Imported
Imported SectorImported
}

func (s SectorState) DealIDs() []abi.DealID {
Expand Down
Loading