Skip to content

Commit

Permalink
Merge pull request #3527 from filecoin-project/feat/watch-option-mine…
Browse files Browse the repository at this point in the history
…r-storage-deals-list

Add watch option to storage-deals list
  • Loading branch information
magik6k authored Sep 4, 2020
2 parents 0a62051 + 1608cd2 commit d81feb0
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 47 deletions.
2 changes: 1 addition & 1 deletion api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type StorageMiner interface {
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error)
MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error)
MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error)
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error
MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error)
Expand Down
6 changes: 3 additions & 3 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ type StorageMinerStruct struct {
MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"`
MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
MarketListRetrievalDeals func(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) `perm:"read"`
MarketGetDealUpdates func(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) `perm:"read"`
MarketGetDealUpdates func(ctx context.Context) (<-chan storagemarket.MinerDeal, error) `perm:"read"`
MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"`
MarketSetAsk func(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error `perm:"admin"`
MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
Expand Down Expand Up @@ -1097,8 +1097,8 @@ func (c *StorageMinerStruct) MarketListRetrievalDeals(ctx context.Context) ([]re
return c.Internal.MarketListRetrievalDeals(ctx)
}

func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) {
return c.Internal.MarketGetDealUpdates(ctx, d)
func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) {
return c.Internal.MarketGetDealUpdates(ctx)
}

func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) {
Expand Down
26 changes: 14 additions & 12 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ loop:
func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, deal *cid.Cid) {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
updates, err := miner.MarketGetDealUpdates(subCtx, *deal)
updates, err := miner.MarketGetDealUpdates(subCtx)
if err != nil {
t.Fatal(err)
}
Expand All @@ -343,18 +343,20 @@ func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode,
case <-ctx.Done():
t.Fatal("context timeout")
case di := <-updates:
switch di.State {
case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected")
case storagemarket.StorageDealFailing:
t.Fatal("deal failed")
case storagemarket.StorageDealError:
t.Fatal("deal errored", di.Message)
case storagemarket.StorageDealFinalizing, storagemarket.StorageDealSealing, storagemarket.StorageDealActive:
fmt.Println("COMPLETE", di)
return
if deal.Equals(di.ProposalCid) {
switch di.State {
case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected")
case storagemarket.StorageDealFailing:
t.Fatal("deal failed")
case storagemarket.StorageDealError:
t.Fatal("deal errored", di.Message)
case storagemarket.StorageDealFinalizing, storagemarket.StorageDealSealing, storagemarket.StorageDealActive:
fmt.Println("COMPLETE", di)
return
}
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
}
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
}
}
}
Expand Down
96 changes: 71 additions & 25 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"fmt"
"io"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -345,6 +346,10 @@ var dealsListCmd = &cli.Command{
Name: "verbose",
Aliases: []string{"v"},
},
&cli.BoolFlag{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
Expand All @@ -360,42 +365,83 @@ var dealsListCmd = &cli.Command{
return err
}

sort.Slice(deals, func(i, j int) bool {
return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time())
})
verbose := cctx.Bool("verbose")
watch := cctx.Bool("watch")

w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
if watch {
updates, err := api.MarketGetDealUpdates(ctx)
if err != nil {
return err
}

verbose := cctx.Bool("verbose")
for {
tm.Clear()
tm.MoveCursor(1, 1)

if verbose {
_, _ = fmt.Fprintf(w, "Creation\tProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\tMessage\n")
} else {
_, _ = fmt.Fprintf(w, "ProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\n")
}
err = outputStorageDeals(tm.Output, deals, verbose)
if err != nil {
return err
}

tm.Flush()

for _, deal := range deals {
propcid := deal.ProposalCid.String()
if !verbose {
propcid = "..." + propcid[len(propcid)-8:]
select {
case <-ctx.Done():
return nil
case updated := <-updates:
var found bool
for i, existing := range deals {
if existing.ProposalCid.Equals(updated.ProposalCid) {
deals[i] = updated
found = true
break
}
}
if !found {
deals = append(deals, updated)
}
}
}
}

return outputStorageDeals(os.Stdout, deals, verbose)
},
}

fil := types.FIL(types.BigMul(deal.Proposal.StoragePricePerEpoch, types.NewInt(uint64(deal.Proposal.Duration()))))
func outputStorageDeals(out io.Writer, deals []storagemarket.MinerDeal, verbose bool) error {
sort.Slice(deals, func(i, j int) bool {
return deals[i].CreationTime.Time().Before(deals[j].CreationTime.Time())
})

if verbose {
_, _ = fmt.Fprintf(w, "%s\t", deal.CreationTime.Time().Format(time.Stamp))
}
w := tabwriter.NewWriter(out, 2, 4, 2, ' ', 0)

_, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%s\t%s\t%s", propcid, deal.DealID, storagemarket.DealStates[deal.State], deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize)), fil, deal.Proposal.Duration())
if verbose {
_, _ = fmt.Fprintf(w, "\t%s", deal.Message)
}
if verbose {
_, _ = fmt.Fprintf(w, "Creation\tProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\tMessage\n")
} else {
_, _ = fmt.Fprintf(w, "ProposalCid\tDealId\tState\tClient\tSize\tPrice\tDuration\n")
}

_, _ = fmt.Fprintln(w)
for _, deal := range deals {
propcid := deal.ProposalCid.String()
if !verbose {
propcid = "..." + propcid[len(propcid)-8:]
}

return w.Flush()
},
fil := types.FIL(types.BigMul(deal.Proposal.StoragePricePerEpoch, types.NewInt(uint64(deal.Proposal.Duration()))))

if verbose {
_, _ = fmt.Fprintf(w, "%s\t", deal.CreationTime.Time().Format(time.Stamp))
}

_, _ = fmt.Fprintf(w, "%s\t%d\t%s\t%s\t%s\t%s\t%s", propcid, deal.DealID, storagemarket.DealStates[deal.State], deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize)), fil, deal.Proposal.Duration())
if verbose {
_, _ = fmt.Fprintf(w, "\t%s", deal.Message)
}

_, _ = fmt.Fprintln(w)
}

return w.Flush()
}

var getBlocklistCmd = &cli.Command{
Expand Down
10 changes: 4 additions & 6 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,12 @@ func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]retr
return out, nil
}

func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) {
func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context) (<-chan storagemarket.MinerDeal, error) {
results := make(chan storagemarket.MinerDeal)
unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if deal.ProposalCid.Equals(d) {
select {
case results <- deal:
case <-ctx.Done():
}
select {
case results <- deal:
case <-ctx.Done():
}
})
go func() {
Expand Down

0 comments on commit d81feb0

Please sign in to comment.