Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miner Data Transfer Restart Cmd #4387

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
Expand Down Expand Up @@ -102,6 +104,9 @@ type StorageMiner interface {
PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error)
PiecesGetCIDInfo(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error)

// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MinerRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be prefixed with Markets and be next to MarketDataTransferUpdates

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree


// CreateBackup creates node backup onder the specified file name. The
// method requires that the lotus-miner is running with the
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
Expand Down
6 changes: 6 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ type StorageMinerStruct struct {
PiecesGetPieceInfo func(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) `perm:"read"`
PiecesGetCIDInfo func(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error) `perm:"read"`

MinerRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`

CreateBackup func(ctx context.Context, fpath string) error `perm:"admin"`
}
}
Expand Down Expand Up @@ -1354,6 +1356,10 @@ func (c *StorageMinerStruct) PiecesGetCIDInfo(ctx context.Context, payloadCid ci
return c.Internal.PiecesGetCIDInfo(ctx, payloadCid)
}

func (c *StorageMinerStruct) MinerRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
return c.Internal.MinerRestartDataTransfer(ctx, transferID, otherPeer, isInitiator)
}

func (c *StorageMinerStruct) CreateBackup(ctx context.Context, fpath string) error {
return c.Internal.CreateBackup(ctx, fpath)
}
Expand Down
64 changes: 64 additions & 0 deletions cmd/lotus-storage-miner/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"errors"
"fmt"
"io"
"os"
Expand All @@ -13,8 +14,10 @@ import (

tm "github.com/buger/goterm"
"github.com/docker/go-units"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -569,6 +572,67 @@ var dataTransfersCmd = &cli.Command{
Usage: "Manage data transfers",
Subcommands: []*cli.Command{
transfersListCmd,
minerRestartTransfer,
},
}

var minerRestartTransfer = &cli.Command{
Name: "restart-transfer",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can just be "restart" rather than "restart-transfer" cause the miner command will already be under the "data-transfers" namespace

Usage: "Force restart a stalled data transfer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "peerid",
Usage: "narrow to transfer with specific peer",
},
&cli.BoolFlag{
Name: "initiator",
Usage: "specify only transfers where peer is/is not initiator",
Value: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initiator on this side should default to false 99% of the time the miner is NOT the initiator of the request (even when sending data)

},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return cli.ShowCommandHelp(cctx, cctx.Command.Name)
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)

transferUint, err := strconv.ParseUint(cctx.Args().First(), 10, 64)
if err != nil {
return fmt.Errorf("Error reading transfer ID: %w", err)
}
transferID := datatransfer.TransferID(transferUint)
initiator := cctx.Bool("initiator")
var other peer.ID
if pidstr := cctx.String("peerid"); pidstr != "" {
p, err := peer.Decode(pidstr)
if err != nil {
return err
}
other = p
} else {
channels, err := nodeApi.MarketListDataTransfers(ctx)
if err != nil {
return err
}
found := false
for _, channel := range channels {
if channel.IsInitiator == initiator && channel.TransferID == transferID {
other = channel.OtherPeer
found = true
break
}
}
if !found {
return errors.New("unable to find matching data transfer")
}
}

return nodeApi.MinerRestartDataTransfer(ctx, transferID, other, initiator)
},
}

Expand Down
9 changes: 9 additions & 0 deletions node/impl/storminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -400,6 +401,14 @@ func (sm *StorageMinerAPI) MarketListDataTransfers(ctx context.Context) ([]api.D
return apiChannels, nil
}

func (sm *StorageMinerAPI) MinerRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error {
selfPeer := sm.Host.ID()
if isInitiator {
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: selfPeer, Responder: otherPeer, ID: transferID})
}
return sm.DataTransfer.RestartDataTransferChannel(ctx, datatransfer.ChannelID{Initiator: otherPeer, Responder: selfPeer, ID: transferID})
}

func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-chan api.DataTransferChannel, error) {
channels := make(chan api.DataTransferChannel)

Expand Down