Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flesh out data transfer features #4572

Merged
merged 2 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ type FullNode interface {
ClientDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// ClientRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
ClientRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientRetrieveTryRestartInsufficientFunds attempts to restart stalled retrievals on a given payment channel
// which are stuck due to insufficient funds
ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error
Expand Down
6 changes: 6 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -81,6 +83,10 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error

DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]MarketDeal, error)
Expand Down
15 changes: 15 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ type FullNodeStruct struct {
ClientListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
ClientDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
ClientRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
ClientCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
ClientRetrieveTryRestartInsufficientFunds func(ctx context.Context, paymentChannel address.Address) error `perm:"write"`

StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
Expand Down Expand Up @@ -284,6 +285,8 @@ type StorageMinerStruct struct {
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`

PledgeSector func(context.Context) error `perm:"write"`

Expand Down Expand Up @@ -568,6 +571,10 @@ func (c *FullNodeStruct) ClientRestartDataTransfer(ctx context.Context, transfer
return c.Internal.ClientRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *FullNodeStruct) ClientCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.ClientCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *FullNodeStruct) ClientRetrieveTryRestartInsufficientFunds(ctx context.Context, paymentChannel address.Address) error {
return c.Internal.ClientRetrieveTryRestartInsufficientFunds(ctx, paymentChannel)
}
Expand Down Expand Up @@ -1304,6 +1311,14 @@ func (c *StorageMinerStruct) MarketDataTransferUpdates(ctx context.Context) (<-c
return c.Internal.MarketDataTransferUpdates(ctx)
}

func (c *StorageMinerStruct) MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MarketRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file)
}
Expand Down
76 changes: 72 additions & 4 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var clientCmd = &cli.Command{
WithCategory("util", clientInfoCmd),
WithCategory("util", clientListTransfers),
WithCategory("util", clientRestartTransfer),
WithCategory("util", clientCancelTransfer),
},
}

Expand Down Expand Up @@ -1694,6 +1695,66 @@ var clientRestartTransfer = &cli.Command{
},
}

var clientCancelTransfer = &cli.Command{
Name: "cancel-transfer",
Usage: "Force cancel a data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := api.ClientListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return api.ClientCancelDataTransfer(ctx, transferID, other, initiator)
},
}

var clientListTransfers = &cli.Command{
Name: "list-transfers",
Usage: "List ongoing data transfers for deals",
Expand All @@ -1711,6 +1772,10 @@ var clientListTransfers = &cli.Command{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/cancelled transfers",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
Expand All @@ -1728,7 +1793,7 @@ var clientListTransfers = &cli.Command{
completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

showFailed := cctx.Bool("show-failed")
if watch {
channelUpdates, err := api.ClientDataTransferUpdates(ctx)
if err != nil {
Expand All @@ -1740,7 +1805,7 @@ var clientListTransfers = &cli.Command{

tm.MoveCursor(1, 1)

OutputDataTransferChannels(tm.Screen, channels, completed, color)
OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed)

tm.Flush()

Expand All @@ -1765,13 +1830,13 @@ var clientListTransfers = &cli.Command{
}
}
}
OutputDataTransferChannels(os.Stdout, channels, completed, color)
OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed)
return nil
},
}

// OutputDataTransferChannels generates table output for a list of channels
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool) {
func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChannel, completed bool, color bool, showFailed bool) {
sort.Slice(channels, func(i, j int) bool {
return channels[i].TransferID < channels[j].TransferID
})
Expand All @@ -1781,6 +1846,9 @@ func OutputDataTransferChannels(out io.Writer, channels []lapi.DataTransferChann
if !completed && channel.Status == datatransfer.Completed {
continue
}
if !showFailed && (channel.Status == datatransfer.Failed || channel.Status == datatransfer.Cancelled) {
continue
}
if channel.IsSender {
sendingChannels = append(sendingChannels, channel)
} else {
Expand Down
135 changes: 132 additions & 3 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"errors"
"fmt"
"io"
"os"
Expand All @@ -13,8 +14,10 @@ import (

tm "github.com/buger/goterm"
"github.com/docker/go-units"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -569,6 +572,128 @@ var dataTransfersCmd = &cli.Command{
Usage: "Manage data transfers",
Subcommands: []*cli.Command{
transfersListCmd,
marketRestartTransfer,
marketCancelTransfer,
},
}

var marketRestartTransfer = &cli.Command{
Name: "restart",
Usage: "Force restart a stalled data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return nodeApi.MarketRestartDataTransfer(ctx, transferID, other, initiator)
},
}

var marketCancelTransfer = &cli.Command{
Name: "cancel",
Usage: "Force cancel a data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return nodeApi.MarketCancelDataTransfer(ctx, transferID, other, initiator)
},
}

Expand All @@ -589,6 +714,10 @@ var transfersListCmd = &cli.Command{
Name: "watch",
Usage: "watch deal updates in real-time, rather than a one time list",
},
&cli.BoolFlag{
Name: "show-failed",
Usage: "show failed/cancelled transfers",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
Expand All @@ -606,7 +735,7 @@ var transfersListCmd = &cli.Command{
completed := cctx.Bool("completed")
color := cctx.Bool("color")
watch := cctx.Bool("watch")

showFailed := cctx.Bool("show-failed")
if watch {
channelUpdates, err := api.MarketDataTransferUpdates(ctx)
if err != nil {
Expand All @@ -618,7 +747,7 @@ var transfersListCmd = &cli.Command{

tm.MoveCursor(1, 1)

lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color)
lcli.OutputDataTransferChannels(tm.Screen, channels, completed, color, showFailed)

tm.Flush()

Expand All @@ -643,7 +772,7 @@ var transfersListCmd = &cli.Command{
}
}
}
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color)
lcli.OutputDataTransferChannels(os.Stdout, channels, completed, color, showFailed)
return nil
},
}
Loading