Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sector termination support #5341

Merged
merged 18 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ jobs:
<<: *test
test-window-post:
<<: *test
test-terminate:
<<: *test
test-conformance:
description: |
Run tests using a corpus of interoperable test vectors for Filecoin
Expand Down Expand Up @@ -476,9 +478,15 @@ workflows:
test-suite-name: cli
packages: "./cli/... ./cmd/... ./api/..."
- test-window-post:
codecov-upload: true
go-test-flags: "-run=TestWindowedPost"
winpost-test: "1"
test-suite-name: window-post
- test-terminate:
codecov-upload: true
go-test-flags: "-run=TestTerminate"
winpost-test: "1"
test-suite-name: terminate
- test-short:
go-test-flags: "--timeout 10m --short"
test-suite-name: short
Expand Down
13 changes: 13 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ type StorageMiner interface {
// SectorGetExpectedSealDuration gets the expected time for a sector to seal
SectorGetExpectedSealDuration(context.Context) (time.Duration, error)
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
// SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
// be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.
SectorRemove(context.Context, abi.SectorNumber) error
// SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then
// automatically removes it from storage
SectorTerminate(context.Context, abi.SectorNumber) error
// SectorTerminateFlush immediately sends a terminate message with sectors batched for termination.
// Returns null if message wasn't sent
SectorTerminateFlush(ctx context.Context) (*cid.Cid, error)
// SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message
SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error)
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error

StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
Expand Down Expand Up @@ -217,9 +227,12 @@ const (
PreCommitAddr AddrUse = iota
CommitAddr
PoStAddr

TerminateSectorsAddr
)

type AddressConfig struct {
PreCommitControl []address.Address
CommitControl []address.Address
TerminateControl []address.Address
}
15 changes: 15 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ type StorageMinerStruct struct {
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"`
SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`

WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
Expand Down Expand Up @@ -1310,6 +1313,18 @@ func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.Sector
return c.Internal.SectorRemove(ctx, number)
}

func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.SectorNumber) error {
return c.Internal.SectorTerminate(ctx, number)
}

func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
return c.Internal.SectorTerminateFlush(ctx)
}

func (c *StorageMinerStruct) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
return c.Internal.SectorTerminatePending(ctx)
}

func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error {
return c.Internal.SectorMarkForUpgrade(ctx, number)
}
Expand Down
174 changes: 174 additions & 0 deletions api/test/window_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
Expand Down Expand Up @@ -211,6 +212,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
}

}

func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int,
upgradeHeight abi.ChainEpoch) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -428,3 +430,175 @@ func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration,
sectors = p.MinerPower.RawBytePower.Uint64() / uint64(ssz)
require.Equal(t, nSectors+GenesisPreseals-2+1, int(sectors)) // -2 not recovered sectors + 1 just pledged
}

func TestTerminate(t *testing.T, b APIBuilder, blocktime time.Duration) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

nSectors := uint64(2)

n, sn := b(t, []FullNodeOpts{FullNodeWithActorsV2At(1)}, []StorageMiner{{Full: 0, Preseal: int(nSectors)}})

client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]

addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}

if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
build.Clock.Sleep(time.Second)

done := make(chan struct{})
go func() {
defer close(done)
for ctx.Err() == nil {
build.Clock.Sleep(blocktime)
if err := sn[0].MineOne(ctx, MineNext); err != nil {
if ctx.Err() != nil {
// context was canceled, ignore the error.
return
}
t.Error(err)
}
}
}()
defer func() {
cancel()
<-done
}()

maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)

ssz, err := miner.ActorSectorSize(ctx, maddr)
require.NoError(t, err)

p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors))

fmt.Printf("Seal a sector\n")

pledgeSectors(t, ctx, miner, 1, 0, nil)

fmt.Printf("wait for power\n")

{
// Wait until proven.
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)

waitUntil := di.PeriodStart + di.WPoStProvingPeriod + 2
fmt.Printf("End for head.Height > %d\n", waitUntil)

for {
head, err := client.ChainHead(ctx)
require.NoError(t, err)

if head.Height() > waitUntil {
fmt.Printf("Now head.Height = %d\n", head.Height())
break
}
}
}

nSectors++

p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors))

fmt.Println("Terminate a sector")

toTerminate := abi.SectorNumber(3)

err = miner.SectorTerminate(ctx, toTerminate)
require.NoError(t, err)

msgTriggerred := false
loop:
for {
si, err := miner.SectorsStatus(ctx, toTerminate, false)
require.NoError(t, err)

fmt.Println("state: ", si.State, msgTriggerred)

switch sealing.SectorState(si.State) {
case sealing.Terminating:
if !msgTriggerred {
{
p, err := miner.SectorTerminatePending(ctx)
require.NoError(t, err)
require.Len(t, p, 1)
require.Equal(t, abi.SectorNumber(3), p[0].Number)
}

c, err := miner.SectorTerminateFlush(ctx)
require.NoError(t, err)
if c != nil {
msgTriggerred = true
fmt.Println("terminate message:", c)

{
p, err := miner.SectorTerminatePending(ctx)
require.NoError(t, err)
require.Len(t, p, 0)
}
}
}
case sealing.TerminateWait, sealing.TerminateFinality, sealing.Removed:
break loop
}

time.Sleep(100 * time.Millisecond)
}

// check power decreased
p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1)))

// check in terminated set
{
parts, err := client.StateMinerPartitions(ctx, maddr, 1, types.EmptyTSK)
require.NoError(t, err)
require.Greater(t, len(parts), 0)

bflen := func(b bitfield.BitField) uint64 {
l, err := b.Count()
require.NoError(t, err)
return l
}

require.Equal(t, uint64(1), bflen(parts[0].AllSectors))
require.Equal(t, uint64(0), bflen(parts[0].LiveSectors))
}

di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
for {
head, err := client.ChainHead(ctx)
require.NoError(t, err)

if head.Height() > di.PeriodStart+di.WPoStProvingPeriod+2 {
fmt.Printf("Now head.Height = %d\n", head.Height())
break
}
build.Clock.Sleep(blocktime)
}
require.NoError(t, err)
fmt.Printf("End for head.Height > %d\n", di.PeriodStart+di.WPoStProvingPeriod+2)

p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)

require.Equal(t, p.MinerPower, p.TotalPower)
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1)))
}
2 changes: 1 addition & 1 deletion build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func VersionForType(nodeType NodeType) (Version, error) {
// semver versions of the rpc api exposed
var (
FullAPIVersion = newVer(1, 0, 0)
MinerAPIVersion = newVer(1, 0, 0)
MinerAPIVersion = newVer(1, 0, 1)
WorkerAPIVersion = newVer(1, 0, 0)
)

Expand Down
5 changes: 5 additions & 0 deletions chain/actors/builtin/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
)

func init() {
Expand All @@ -42,6 +43,10 @@ var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff

const MinSectorExpiration = miner0.MinSectorExpiration

// Not used / checked in v0
var DeclarationsMax = miner2.DeclarationsMax
var AddressedSectorsMax = miner2.AddressedSectorsMax

func Load(store adt.Store, act *types.Actor) (st State, err error) {
switch act.Code {
case builtin0.StorageMinerActorCodeID:
Expand Down
4 changes: 4 additions & 0 deletions cmd/lotus-storage-miner/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.CommitWait},
{col: color.FgYellow, state: sealing.FinalizeSector},

{col: color.FgCyan, state: sealing.Terminating},
{col: color.FgCyan, state: sealing.TerminateWait},
{col: color.FgCyan, state: sealing.TerminateFinality},
{col: color.FgCyan, state: sealing.TerminateFailed},
{col: color.FgCyan, state: sealing.Removing},
{col: color.FgCyan, state: sealing.Removed},

Expand Down
Loading