Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feat/async-restarta…
Browse files Browse the repository at this point in the history
…ble-workers
  • Loading branch information
magik6k committed Oct 27, 2020
2 parents e1da874 + 3d02dba commit 413643a
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 22 deletions.
8 changes: 8 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ type FullNode interface {
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error)
// ClientCalcCommP calculates the CommP and data size of the specified CID
ClientDealPieceCID(ctx context.Context, root cid.Cid) (DataCIDSize, error)
// ClientCalcCommP calculates the CommP for a specified file
ClientCalcCommP(ctx context.Context, inpath string) (*CommPRet, error)
// ClientGenCar generates a CAR file for the specified file.
Expand Down Expand Up @@ -891,6 +893,12 @@ type DataSize struct {
PieceSize abi.PaddedPieceSize
}

type DataCIDSize struct {
PayloadSize int64
PieceSize abi.PaddedPieceSize
PieceCID cid.Cid
}

type CommPRet struct {
Root cid.Cid
Size abi.UnpaddedPieceSize
Expand Down
6 changes: 6 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ type FullNodeStruct struct {
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
ClientDealPieceCID func(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
Expand Down Expand Up @@ -576,6 +577,11 @@ func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
return c.Internal.ClientQueryAsk(ctx, p, miner)
}

func (c *FullNodeStruct) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
return c.Internal.ClientDealPieceCID(ctx, root)
}

func (c *FullNodeStruct) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
return c.Internal.ClientCalcCommP(ctx, inpath)
}
Expand Down
13 changes: 5 additions & 8 deletions api/test/mining.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,11 @@ func (ts *testSuite) testMiningReal(t *testing.T) {

newHeads, err := api.ChainNotify(ctx)
require.NoError(t, err)
initHead := (<-newHeads)[0]
if initHead.Val.Height() != 2 {
<-newHeads
}
at := (<-newHeads)[0].Val.Height()

h1, err := api.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, abi.ChainEpoch(2), h1.Height())
require.Equal(t, int64(at), int64(h1.Height()))

MineUntilBlock(ctx, t, apis[0], sn[0], nil)
require.NoError(t, err)
Expand All @@ -74,16 +71,16 @@ func (ts *testSuite) testMiningReal(t *testing.T) {

h2, err := api.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, abi.ChainEpoch(3), h2.Height())
require.Greater(t, int64(h2.Height()), int64(h1.Height()))

MineUntilBlock(ctx, t, apis[0], sn[0], nil)
require.NoError(t, err)

<-newHeads

h2, err = api.ChainHead(ctx)
h3, err := api.ChainHead(ctx)
require.NoError(t, err)
require.Equal(t, abi.ChainEpoch(4), h2.Height())
require.Greater(t, int64(h3.Height()), int64(h2.Height()))
}

func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExport bool) {
Expand Down
7 changes: 5 additions & 2 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func interactiveDeal(cctx *cli.Context) error {
var dur time.Duration
var epochs abi.ChainEpoch
var verified bool
var ds lapi.DataSize
var ds lapi.DataCIDSize

// find
var candidateAsks []*storagemarket.StorageAsk
Expand Down Expand Up @@ -557,7 +557,7 @@ uiLoop:
}

color.Blue(".. calculating data size\n")
ds, err = api.ClientDealSize(ctx, data)
ds, err = api.ClientDealPieceCID(ctx, data)
if err != nil {
return err
}
Expand Down Expand Up @@ -847,6 +847,9 @@ uiLoop:
Data: &storagemarket.DataRef{
TransferType: storagemarket.TTGraphsync,
Root: data,

PieceCid: &ds.PieceCID,
PieceSize: ds.PieceSize.Unpadded(),
},
Wallet: a,
Miner: maddr,
Expand Down
8 changes: 4 additions & 4 deletions cli/paych_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestPaymentChannels(t *testing.T) {
receiverAddr := addrs[1]

// Create mock CLI
mockCLI := clitest.NewMockCLI(t, Commands)
mockCLI := clitest.NewMockCLI(ctx, t, Commands)
creatorCLI := mockCLI.Client(paymentCreator.ListenAddr)
receiverCLI := mockCLI.Client(paymentReceiver.ListenAddr)

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestPaymentChannelStatus(t *testing.T) {
receiverAddr := addrs[1]

// Create mock CLI
mockCLI := clitest.NewMockCLI(t, Commands)
mockCLI := clitest.NewMockCLI(ctx, t, Commands)
creatorCLI := mockCLI.Client(paymentCreator.ListenAddr)

// creator: paych status-by-from-to <creator> <receiver>
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestPaymentChannelVouchers(t *testing.T) {
receiverAddr := addrs[1]

// Create mock CLI
mockCLI := clitest.NewMockCLI(t, Commands)
mockCLI := clitest.NewMockCLI(ctx, t, Commands)
creatorCLI := mockCLI.Client(paymentCreator.ListenAddr)
receiverCLI := mockCLI.Client(paymentReceiver.ListenAddr)

Expand Down Expand Up @@ -310,7 +310,7 @@ func TestPaymentChannelVoucherCreateShortfall(t *testing.T) {
receiverAddr := addrs[1]

// Create mock CLI
mockCLI := clitest.NewMockCLI(t, Commands)
mockCLI := clitest.NewMockCLI(ctx, t, Commands)
creatorCLI := mockCLI.Client(paymentCreator.ListenAddr)

// creator: paych add-funds <creator> <receiver> <amount>
Expand Down
20 changes: 18 additions & 2 deletions cli/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api/test"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
Expand All @@ -25,7 +27,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
defer cancel()

// Create mock CLI
mockCLI := NewMockCLI(t, cmds)
mockCLI := NewMockCLI(ctx, t, cmds)
clientCLI := mockCLI.Client(clientNode.ListenAddr)

// Get the miner address
Expand Down Expand Up @@ -74,7 +76,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)

// Wait for provider to start sealing deal
dealStatus := ""
for dealStatus != "StorageDealSealing" {
for {
// client list-deals
out = clientCLI.RunCmd("client", "list-deals")
fmt.Println("list-deals:\n", out)
Expand All @@ -88,6 +90,9 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
}
dealStatus = parts[3]
fmt.Println(" Deal status:", dealStatus)
if dealComplete(t, dealStatus) {
break
}

time.Sleep(time.Second)
}
Expand All @@ -101,3 +106,14 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
fmt.Println("retrieve:\n", out)
require.Regexp(t, regexp.MustCompile("Success"), out)
}

func dealComplete(t *testing.T, dealStatus string) bool {
switch dealStatus {
case "StorageDealFailing", "StorageDealError":
t.Fatal(xerrors.Errorf("Storage deal failed with status: " + dealStatus))
case "StorageDealStaged", "StorageDealSealing", "StorageDealActive", "StorageDealExpired", "StorageDealSlashed":
return true
}

return false
}
4 changes: 3 additions & 1 deletion cli/test/mockcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"bytes"
"context"
"flag"
"strings"
"testing"
Expand All @@ -18,7 +19,7 @@ type MockCLI struct {
out *bytes.Buffer
}

func NewMockCLI(t *testing.T, cmds []*lcli.Command) *MockCLI {
func NewMockCLI(ctx context.Context, t *testing.T, cmds []*lcli.Command) *MockCLI {
// Create a CLI App with an --api-url flag so that we can specify which node
// the command should be executed against
app := &lcli.App{
Expand All @@ -36,6 +37,7 @@ func NewMockCLI(t *testing.T, cmds []*lcli.Command) *MockCLI {
app.Setup()

cctx := lcli.NewContext(app, &flag.FlagSet{}, nil)
cctx.Context = ctx
return &MockCLI{t: t, cmds: cmds, cctx: cctx, out: &out}
}

Expand Down
2 changes: 1 addition & 1 deletion cli/test/multisig.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func RunMultisigTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNod
ctx := context.Background()

// Create mock CLI
mockCLI := NewMockCLI(t, cmds)
mockCLI := NewMockCLI(ctx, t, cmds)
clientCLI := mockCLI.Client(clientNode.ListenAddr)

// Create some wallets on the node to use for testing multisig
Expand Down
27 changes: 27 additions & 0 deletions documentation/en/api-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ClientCalcCommP](#ClientCalcCommP)
* [ClientCancelDataTransfer](#ClientCancelDataTransfer)
* [ClientDataTransferUpdates](#ClientDataTransferUpdates)
* [ClientDealPieceCID](#ClientDealPieceCID)
* [ClientDealSize](#ClientDealSize)
* [ClientFindData](#ClientFindData)
* [ClientGenCar](#ClientGenCar)
Expand Down Expand Up @@ -901,6 +902,32 @@ Response:
}
```

### ClientDealPieceCID
ClientCalcCommP calculates the CommP and data size of the specified CID


Perms: read

Inputs:
```json
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
]
```

Response:
```json
{
"PayloadSize": 9,
"PieceSize": 1032,
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
}
```

### ClientDealSize
ClientDealSize calculates real deal data size

Expand Down
113 changes: 113 additions & 0 deletions lib/commp/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package commp

import (
"bytes"
"math/bits"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
)

const commPBufPad = abi.PaddedPieceSize(8 << 20)
const CommPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const

type Writer struct {
len int64
buf [CommPBuf]byte
leaves []cid.Cid
}

func (w *Writer) Write(p []byte) (int, error) {
n := len(p)
for len(p) > 0 {
buffered := int(w.len % int64(len(w.buf)))
toBuffer := len(w.buf) - buffered
if toBuffer > len(p) {
toBuffer = len(p)
}

copied := copy(w.buf[buffered:], p[:toBuffer])
p = p[copied:]
w.len += int64(copied)

if copied > 0 && w.len%int64(len(w.buf)) == 0 {
leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), CommPBuf)
if err != nil {
return 0, err
}
w.leaves = append(w.leaves, leaf)
}
}
return n, nil
}

func (w *Writer) Sum() (api.DataCIDSize, error) {
// process last non-zero leaf if exists
lastLen := w.len % int64(len(w.buf))
rawLen := w.len

// process remaining bit of data
if lastLen != 0 {
if len(w.leaves) != 0 {
copy(w.buf[lastLen:], make([]byte, int(int64(CommPBuf)-lastLen)))
lastLen = int64(CommPBuf)
}

r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen))
p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz)
if err != nil {
return api.DataCIDSize{}, err
}

if sz < CommPBuf { // special case for pieces smaller than 16MiB
return api.DataCIDSize{
PayloadSize: w.len,
PieceSize: sz.Padded(),
PieceCID: p,
}, nil
}

w.leaves = append(w.leaves, p)
}

// pad with zero pieces to power-of-two size
fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves)
for i := 0; i < fillerLeaves; i++ {
w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(CommPBuf))
}

if len(w.leaves) == 1 {
return api.DataCIDSize{
PayloadSize: rawLen,
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
PieceCID: w.leaves[0],
}, nil
}

pieces := make([]abi.PieceInfo, len(w.leaves))
for i, leaf := range w.leaves {
pieces[i] = abi.PieceInfo{
Size: commPBufPad,
PieceCID: leaf,
}
}

p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err)
}

return api.DataCIDSize{
PayloadSize: rawLen,
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
PieceCID: p,
}, nil
}
Loading

0 comments on commit 413643a

Please sign in to comment.