diff --git a/CHANGELOG.md b/CHANGELOG.md index 503ee5105..eb8db2b39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,30 @@ # go-fil-markets changelog +# go-fil-markets v1.3.0 + +- github.com/filecoin-project/go-fil-markets: + - fix restarts during data transfer for a retrieval deal (#540) ([filecoin-project/go-fil-markets#540](https://github.com/filecoin-project/go-fil-markets/pull/540)) + - Test Retrieval for offline deals (#541) ([filecoin-project/go-fil-markets#541](https://github.com/filecoin-project/go-fil-markets/pull/541)) + - Allow anonymous submodule checkout (#535) ([filecoin-project/go-fil-markets#535](https://github.com/filecoin-project/go-fil-markets/pull/535)) +- github.com/filecoin-project/go-data-transfer (v1.4.3 -> v1.5.0): + - Add isRestart param to validators (#197) ([filecoin-project/go-data-transfer#197](https://github.com/filecoin-project/go-data-transfer/pull/197)) + - fix: flaky TestChannelMonitorAutoRestart (#198) ([filecoin-project/go-data-transfer#198](https://github.com/filecoin-project/go-data-transfer/pull/198)) + - Channel monitor watches for errors instead of measuring data rate (#190) ([filecoin-project/go-data-transfer#190](https://github.com/filecoin-project/go-data-transfer/pull/190)) + - fix: prevent concurrent restarts for same channel (#195) ([filecoin-project/go-data-transfer#195](https://github.com/filecoin-project/go-data-transfer/pull/195)) + - fix: channel state machine event handling (#194) ([filecoin-project/go-data-transfer#194](https://github.com/filecoin-project/go-data-transfer/pull/194)) + - Dont double count data sent (#185) ([filecoin-project/go-data-transfer#185](https://github.com/filecoin-project/go-data-transfer/pull/185)) +- github.com/ipfs/go-graphsync (v0.6.0 -> v0.6.1): + - feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164)) + +Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| dirkmc | 9 | +2595/-1346 | 70 | +| Aarsh Shah | 1 | +226/-71 | 1 | +| Dirk McCormick | 2 | +32/-0 | 2 | +| Peter Rabbitson | 1 | +1/-1 | 1 | + # go-fil-markets v1.2.5 - github.com/filecoin-project/go-fil-markets: diff --git a/docs/retrievalclient.mmd b/docs/retrievalclient.mmd index f396819a9..2e0e6cfeb 100644 --- a/docs/retrievalclient.mmd +++ b/docs/retrievalclient.mmd @@ -1,49 +1,49 @@ stateDiagram-v2 - state "DealStatusNew" as 0 - state "DealStatusWaitForAcceptance" as 3 - state "DealStatusPaymentChannelCreating" as 4 - state "DealStatusPaymentChannelAddingFunds" as 5 - state "DealStatusAccepted" as 6 - state "DealStatusFailing" as 8 - state "DealStatusRejected" as 9 - state "DealStatusFundsNeeded" as 10 - state "DealStatusSendFunds" as 11 - state "DealStatusSendFundsLastPayment" as 12 - state "DealStatusOngoing" as 13 - state "DealStatusFundsNeededLastPayment" as 14 - state "DealStatusCompleted" as 15 - state "DealStatusDealNotFound" as 16 - state "DealStatusErrored" as 17 - state "DealStatusBlocksComplete" as 18 - state "DealStatusFinalizing" as 19 - state "DealStatusCheckComplete" as 21 - state "DealStatusCheckFunds" as 22 - state "DealStatusInsufficientFunds" as 23 - state "DealStatusPaymentChannelAllocatingLane" as 24 - state "DealStatusCancelling" as 25 - state "DealStatusCancelled" as 26 - state "DealStatusRetryLegacy" as 27 - state "DealStatusWaitForAcceptanceLegacy" as 28 - state "DealStatusWaitingForLastBlocks" as 29 - state "DealStatusPaymentChannelAddingInitialFunds" as 30 - 0 : On entry runs ProposeDeal - 4 : On entry runs WaitPaymentChannelReady - 5 : On entry runs WaitPaymentChannelReady - 6 : On entry runs SetupPaymentChannelStart - 8 : On entry runs CancelDeal - 10 : On entry runs ProcessPaymentRequested - 11 : On entry runs SendFunds - 12 : On entry runs SendFunds - 13 : On entry runs Ongoing - 14 : On entry runs ProcessPaymentRequested - 21 : On entry runs CheckComplete - 22 : On entry runs CheckFunds - 24 : On entry runs AllocateLane - 25 : On entry runs CancelDeal - 27 : On entry runs ProposeDeal - 30 : On entry runs WaitPaymentChannelReady - [*] --> 0 - note right of 0 + state "DealStatusNew" as DealStatusNew + state "DealStatusWaitForAcceptance" as DealStatusWaitForAcceptance + state "DealStatusPaymentChannelCreating" as DealStatusPaymentChannelCreating + state "DealStatusPaymentChannelAddingFunds" as DealStatusPaymentChannelAddingFunds + state "DealStatusAccepted" as DealStatusAccepted + state "DealStatusFailing" as DealStatusFailing + state "DealStatusRejected" as DealStatusRejected + state "DealStatusFundsNeeded" as DealStatusFundsNeeded + state "DealStatusSendFunds" as DealStatusSendFunds + state "DealStatusSendFundsLastPayment" as DealStatusSendFundsLastPayment + state "DealStatusOngoing" as DealStatusOngoing + state "DealStatusFundsNeededLastPayment" as DealStatusFundsNeededLastPayment + state "DealStatusCompleted" as DealStatusCompleted + state "DealStatusDealNotFound" as DealStatusDealNotFound + state "DealStatusErrored" as DealStatusErrored + state "DealStatusBlocksComplete" as DealStatusBlocksComplete + state "DealStatusFinalizing" as DealStatusFinalizing + state "DealStatusCheckComplete" as DealStatusCheckComplete + state "DealStatusCheckFunds" as DealStatusCheckFunds + state "DealStatusInsufficientFunds" as DealStatusInsufficientFunds + state "DealStatusPaymentChannelAllocatingLane" as DealStatusPaymentChannelAllocatingLane + state "DealStatusCancelling" as DealStatusCancelling + state "DealStatusCancelled" as DealStatusCancelled + state "DealStatusRetryLegacy" as DealStatusRetryLegacy + state "DealStatusWaitForAcceptanceLegacy" as DealStatusWaitForAcceptanceLegacy + state "DealStatusWaitingForLastBlocks" as DealStatusWaitingForLastBlocks + state "DealStatusPaymentChannelAddingInitialFunds" as DealStatusPaymentChannelAddingInitialFunds + DealStatusNew : On entry runs ProposeDeal + DealStatusPaymentChannelCreating : On entry runs WaitPaymentChannelReady + DealStatusPaymentChannelAddingFunds : On entry runs WaitPaymentChannelReady + DealStatusAccepted : On entry runs SetupPaymentChannelStart + DealStatusFailing : On entry runs CancelDeal + DealStatusFundsNeeded : On entry runs ProcessPaymentRequested + DealStatusSendFunds : On entry runs SendFunds + DealStatusSendFundsLastPayment : On entry runs SendFunds + DealStatusOngoing : On entry runs Ongoing + DealStatusFundsNeededLastPayment : On entry runs ProcessPaymentRequested + DealStatusCheckComplete : On entry runs CheckComplete + DealStatusCheckFunds : On entry runs CheckFunds + DealStatusPaymentChannelAllocatingLane : On entry runs AllocateLane + DealStatusCancelling : On entry runs CancelDeal + DealStatusRetryLegacy : On entry runs ProposeDeal + DealStatusPaymentChannelAddingInitialFunds : On entry runs WaitPaymentChannelReady + [*] --> DealStatusNew + note right of DealStatusNew The following events are not shown cause they can trigger from any state. ClientEventWriteDealProposalErrored - transitions state to DealStatusErrored @@ -53,93 +53,116 @@ stateDiagram-v2 ClientEventProviderCancelled - transitions state to DealStatusCancelling ClientEventCancel - transitions state to DealStatusCancelling end note - 0 --> 0 : ClientEventOpen - 0 --> 3 : ClientEventDealProposed - 27 --> 28 : ClientEventDealProposed - 3 --> 27 : ClientEventDealRejected - 28 --> 9 : ClientEventDealRejected - 3 --> 16 : ClientEventDealNotFound - 28 --> 16 : ClientEventDealNotFound - 3 --> 6 : ClientEventDealAccepted - 28 --> 6 : ClientEventDealAccepted - 4 --> 8 : ClientEventPaymentChannelErrored - 5 --> 8 : ClientEventPaymentChannelErrored - 6 --> 8 : ClientEventPaymentChannelErrored - 6 --> 13 : ClientEventPaymentChannelSkip - 6 --> 4 : ClientEventPaymentChannelCreateInitiated - 6 --> 30 : ClientEventPaymentChannelAddingFunds - 22 --> 5 : ClientEventPaymentChannelAddingFunds - 4 --> 24 : ClientEventPaymentChannelReady - 5 --> 13 : ClientEventPaymentChannelReady - 22 --> 13 : ClientEventPaymentChannelReady - 30 --> 24 : ClientEventPaymentChannelReady - 24 --> 8 : ClientEventAllocateLaneErrored - 24 --> 13 : ClientEventLaneAllocated - 10 --> 14 : ClientEventLastPaymentRequested - 13 --> 14 : ClientEventLastPaymentRequested - 14 --> 14 : ClientEventLastPaymentRequested - 18 --> 12 : ClientEventLastPaymentRequested - 10 --> 10 : ClientEventPaymentRequested - 13 --> 10 : ClientEventPaymentRequested - 18 --> 10 : ClientEventPaymentRequested - 3 --> 6 : ClientEventUnsealPaymentRequested - 28 --> 6 : ClientEventUnsealPaymentRequested - 13 --> 18 : ClientEventAllBlocksReceived - 14 --> 12 : ClientEventAllBlocksReceived - 18 --> 18 : ClientEventAllBlocksReceived - 29 --> 15 : ClientEventAllBlocksReceived - 10 --> 10 : ClientEventBlocksReceived - 13 --> 13 : ClientEventBlocksReceived - 14 --> 14 : ClientEventBlocksReceived - 21 --> 21 : ClientEventBlocksReceived - 29 --> 29 : ClientEventBlocksReceived - 10 --> 11 : ClientEventSendFunds - 14 --> 12 : ClientEventSendFunds - 22 --> 23 : ClientEventFundsExpended - 11 --> 8 : ClientEventBadPaymentRequested - 12 --> 8 : ClientEventBadPaymentRequested - 11 --> 8 : ClientEventCreateVoucherFailed - 12 --> 8 : ClientEventCreateVoucherFailed - 11 --> 22 : ClientEventVoucherShortfall - 12 --> 22 : ClientEventVoucherShortfall - 11 --> 13 : ClientEventPaymentSent - 12 --> 19 : ClientEventPaymentSent - 13 --> 21 : ClientEventComplete - 18 --> 21 : ClientEventComplete - 19 --> 15 : ClientEventComplete - 21 --> 15 : ClientEventCompleteVerified - 21 --> 17 : ClientEventEarlyTermination - 21 --> 29 : ClientEventWaitForLastBlocks - 8 --> 17 : ClientEventCancelComplete - 25 --> 26 : ClientEventCancelComplete - 23 --> 22 : ClientEventRecheckFunds - - note left of 3 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - - - note left of 4 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - - - note left of 6 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - - - note left of 8 : The following events only record in this state.

ClientEventProviderCancelled - - - note left of 10 : The following events only record in this state.

ClientEventAllBlocksReceived - - - note left of 11 : The following events only record in this state.

ClientEventAllBlocksReceived - - - note left of 24 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - - - note left of 25 : The following events only record in this state.

ClientEventDealProposed
ClientEventProviderCancelled - - - note left of 28 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived - - - note left of 30 : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + DealStatusNew --> DealStatusNew : ClientEventOpen + DealStatusNew --> DealStatusWaitForAcceptance : ClientEventDealProposed + DealStatusRetryLegacy --> DealStatusWaitForAcceptanceLegacy : ClientEventDealProposed + DealStatusWaitForAcceptance --> DealStatusRetryLegacy : ClientEventDealRejected + DealStatusWaitForAcceptanceLegacy --> DealStatusRejected : ClientEventDealRejected + DealStatusWaitForAcceptance --> DealStatusDealNotFound : ClientEventDealNotFound + DealStatusWaitForAcceptanceLegacy --> DealStatusDealNotFound : ClientEventDealNotFound + DealStatusWaitForAcceptance --> DealStatusAccepted : ClientEventDealAccepted + DealStatusWaitForAcceptanceLegacy --> DealStatusAccepted : ClientEventDealAccepted + DealStatusPaymentChannelCreating --> DealStatusFailing : ClientEventPaymentChannelErrored + DealStatusPaymentChannelAddingFunds --> DealStatusFailing : ClientEventPaymentChannelErrored + DealStatusAccepted --> DealStatusFailing : ClientEventPaymentChannelErrored + DealStatusAccepted --> DealStatusOngoing : ClientEventPaymentChannelSkip + DealStatusAccepted --> DealStatusPaymentChannelCreating : ClientEventPaymentChannelCreateInitiated + DealStatusAccepted --> DealStatusPaymentChannelAddingInitialFunds : ClientEventPaymentChannelAddingFunds + DealStatusCheckFunds --> DealStatusPaymentChannelAddingFunds : ClientEventPaymentChannelAddingFunds + DealStatusPaymentChannelCreating --> DealStatusPaymentChannelAllocatingLane : ClientEventPaymentChannelReady + DealStatusPaymentChannelAddingFunds --> DealStatusOngoing : ClientEventPaymentChannelReady + DealStatusCheckFunds --> DealStatusOngoing : ClientEventPaymentChannelReady + DealStatusPaymentChannelAddingInitialFunds --> DealStatusPaymentChannelAllocatingLane : ClientEventPaymentChannelReady + DealStatusPaymentChannelAllocatingLane --> DealStatusFailing : ClientEventAllocateLaneErrored + DealStatusPaymentChannelAllocatingLane --> DealStatusOngoing : ClientEventLaneAllocated + DealStatusFundsNeeded --> DealStatusFundsNeededLastPayment : ClientEventLastPaymentRequested + DealStatusSendFunds --> DealStatusOngoing : ClientEventLastPaymentRequested + DealStatusOngoing --> DealStatusFundsNeededLastPayment : ClientEventLastPaymentRequested + DealStatusFundsNeededLastPayment --> DealStatusFundsNeededLastPayment : ClientEventLastPaymentRequested + DealStatusBlocksComplete --> DealStatusSendFundsLastPayment : ClientEventLastPaymentRequested + DealStatusCheckComplete --> DealStatusCheckComplete : ClientEventLastPaymentRequested + DealStatusFundsNeeded --> DealStatusFundsNeeded : ClientEventPaymentRequested + DealStatusSendFunds --> DealStatusOngoing : ClientEventPaymentRequested + DealStatusOngoing --> DealStatusFundsNeeded : ClientEventPaymentRequested + DealStatusFundsNeededLastPayment --> DealStatusFundsNeeded : ClientEventPaymentRequested + DealStatusBlocksComplete --> DealStatusFundsNeeded : ClientEventPaymentRequested + DealStatusCheckComplete --> DealStatusCheckComplete : ClientEventPaymentRequested + DealStatusWaitForAcceptance --> DealStatusAccepted : ClientEventUnsealPaymentRequested + DealStatusWaitForAcceptanceLegacy --> DealStatusAccepted : ClientEventUnsealPaymentRequested + DealStatusFundsNeeded --> DealStatusFundsNeeded : ClientEventAllBlocksReceived + DealStatusSendFunds --> DealStatusOngoing : ClientEventAllBlocksReceived + DealStatusSendFundsLastPayment --> DealStatusOngoing : ClientEventAllBlocksReceived + DealStatusOngoing --> DealStatusBlocksComplete : ClientEventAllBlocksReceived + DealStatusFundsNeededLastPayment --> DealStatusSendFundsLastPayment : ClientEventAllBlocksReceived + DealStatusBlocksComplete --> DealStatusBlocksComplete : ClientEventAllBlocksReceived + DealStatusCheckComplete --> DealStatusCompleted : ClientEventAllBlocksReceived + DealStatusWaitingForLastBlocks --> DealStatusCompleted : ClientEventAllBlocksReceived + DealStatusFundsNeeded --> DealStatusFundsNeeded : ClientEventBlocksReceived + DealStatusSendFunds --> DealStatusOngoing : ClientEventBlocksReceived + DealStatusSendFundsLastPayment --> DealStatusOngoing : ClientEventBlocksReceived + DealStatusOngoing --> DealStatusOngoing : ClientEventBlocksReceived + DealStatusFundsNeededLastPayment --> DealStatusFundsNeededLastPayment : ClientEventBlocksReceived + DealStatusCheckComplete --> DealStatusCheckComplete : ClientEventBlocksReceived + DealStatusWaitingForLastBlocks --> DealStatusWaitingForLastBlocks : ClientEventBlocksReceived + DealStatusFundsNeeded --> DealStatusSendFunds : ClientEventSendFunds + DealStatusSendFunds --> DealStatusOngoing : ClientEventSendFunds + DealStatusSendFundsLastPayment --> DealStatusOngoing : ClientEventSendFunds + DealStatusFundsNeededLastPayment --> DealStatusSendFundsLastPayment : ClientEventSendFunds + DealStatusCheckFunds --> DealStatusInsufficientFunds : ClientEventFundsExpended + DealStatusSendFunds --> DealStatusFailing : ClientEventBadPaymentRequested + DealStatusSendFundsLastPayment --> DealStatusFailing : ClientEventBadPaymentRequested + DealStatusSendFunds --> DealStatusFailing : ClientEventCreateVoucherFailed + DealStatusSendFundsLastPayment --> DealStatusFailing : ClientEventCreateVoucherFailed + DealStatusSendFunds --> DealStatusCheckFunds : ClientEventVoucherShortfall + DealStatusSendFundsLastPayment --> DealStatusCheckFunds : ClientEventVoucherShortfall + DealStatusSendFunds --> DealStatusOngoing : ClientEventPaymentNotSent + DealStatusSendFundsLastPayment --> DealStatusFinalizing : ClientEventPaymentNotSent + DealStatusFundsNeeded --> DealStatusOngoing : ClientEventPaymentSent + DealStatusSendFunds --> DealStatusOngoing : ClientEventPaymentSent + DealStatusSendFundsLastPayment --> DealStatusFinalizing : ClientEventPaymentSent + DealStatusFundsNeededLastPayment --> DealStatusOngoing : ClientEventPaymentSent + DealStatusBlocksComplete --> DealStatusCheckComplete : ClientEventPaymentSent + DealStatusCheckComplete --> DealStatusCheckComplete : ClientEventPaymentSent + DealStatusFundsNeeded --> DealStatusCheckComplete : ClientEventComplete + DealStatusSendFunds --> DealStatusCheckComplete : ClientEventComplete + DealStatusSendFundsLastPayment --> DealStatusCheckComplete : ClientEventComplete + DealStatusOngoing --> DealStatusCheckComplete : ClientEventComplete + DealStatusFundsNeededLastPayment --> DealStatusCheckComplete : ClientEventComplete + DealStatusBlocksComplete --> DealStatusCheckComplete : ClientEventComplete + DealStatusFinalizing --> DealStatusCompleted : ClientEventComplete + DealStatusCheckComplete --> DealStatusCompleted : ClientEventCompleteVerified + DealStatusCheckComplete --> DealStatusErrored : ClientEventEarlyTermination + DealStatusCheckComplete --> DealStatusWaitingForLastBlocks : ClientEventWaitForLastBlocks + DealStatusFailing --> DealStatusErrored : ClientEventCancelComplete + DealStatusCancelling --> DealStatusCancelled : ClientEventCancelComplete + DealStatusInsufficientFunds --> DealStatusCheckFunds : ClientEventRecheckFunds + + note left of DealStatusWaitForAcceptance : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + + + note left of DealStatusPaymentChannelCreating : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + + + note left of DealStatusAccepted : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + + + note left of DealStatusFailing : The following events only record in this state.

ClientEventProviderCancelled + + + note left of DealStatusOngoing : The following events only record in this state.

ClientEventPaymentNotSent
ClientEventPaymentSent + + + note left of DealStatusCompleted : The following events only record in this state.

ClientEventWaitForLastBlocks + + + note left of DealStatusPaymentChannelAllocatingLane : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + + + note left of DealStatusCancelling : The following events only record in this state.

ClientEventDealProposed
ClientEventProviderCancelled + + + note left of DealStatusWaitForAcceptanceLegacy : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived + + + note left of DealStatusPaymentChannelAddingInitialFunds : The following events only record in this state.

ClientEventLastPaymentRequested
ClientEventPaymentRequested
ClientEventAllBlocksReceived
ClientEventBlocksReceived diff --git a/docs/retrievalclient.mmd.png b/docs/retrievalclient.mmd.png index c9794cb99..0707dd58d 100644 Binary files a/docs/retrievalclient.mmd.png and b/docs/retrievalclient.mmd.png differ diff --git a/docs/retrievalclient.mmd.svg b/docs/retrievalclient.mmd.svg index d41b2b71e..9990cb903 100644 --- a/docs/retrievalclient.mmd.svg +++ b/docs/retrievalclient.mmd.svg @@ -1,6 +1,6 @@ -ClientEventOpenClientEventDealProposedClientEventDealProposedClientEventDealRejectedClientEventDealRejectedClientEventDealNotFoundClientEventDealNotFoundClientEventDealAcceptedClientEventDealAcceptedClientEventPaymentChannelErroredClientEventPaymentChannelErroredClientEventPaymentChannelErroredClientEventPaymentChannelSkipClientEventPaymentChannelCreateInitiatedClientEventPaymentChannelAddingFundsClientEventPaymentChannelAddingFundsClientEventPaymentChannelReadyClientEventPaymentChannelReadyClientEventPaymentChannelReadyClientEventPaymentChannelReadyClientEventAllocateLaneErroredClientEventLaneAllocatedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventPaymentRequestedClientEventUnsealPaymentRequestedClientEventUnsealPaymentRequestedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventAllBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventBlocksReceivedClientEventSendFundsClientEventSendFundsClientEventSendFundsClientEventSendFundsClientEventFundsExpendedClientEventBadPaymentRequestedClientEventBadPaymentRequestedClientEventCreateVoucherFailedClientEventCreateVoucherFailedClientEventVoucherShortfallClientEventVoucherShortfallClientEventPaymentNotSentClientEventPaymentNotSentClientEventPaymentSentClientEventPaymentSentClientEventPaymentSentClientEventPaymentSentClientEventPaymentSentClientEventPaymentSentClientEventCompleteClientEventCompleteClientEventCompleteClientEventCompleteClientEventCompleteClientEventCompleteClientEventCompleteClientEventCompleteVerifiedClientEventEarlyTerminationClientEventWaitForLastBlocksClientEventCancelCompleteClientEventCancelCompleteClientEventRecheckFundsDealStatusNewOn entry runs ProposeDealDealStatusWaitForAcceptanceDealStatusPaymentChannelCreatingOn entry runs WaitPaymentChannelReadyDealStatusPaymentChannelAddingFundsOn entry runs WaitPaymentChannelReadyDealStatusAcceptedOn entry runs SetupPaymentChannelStartDealStatusFailingOn entry runs CancelDealDealStatusRejectedDealStatusFundsNeededOn entry runs ProcessPaymentRequestedDealStatusSendFundsOn entry runs SendFundsDealStatusSendFundsLastPaymentOn entry runs SendFundsDealStatusOngoingOn entry runs OngoingDealStatusFundsNeededLastPaymentOn entry runs ProcessPaymentRequestedDealStatusCompletedDealStatusDealNotFoundDealStatusErroredDealStatusBlocksCompleteDealStatusFinalizingDealStatusCheckCompleteOn entry runs CheckCompleteDealStatusCheckFundsOn entry runs CheckFundsDealStatusInsufficientFundsDealStatusPaymentChannelAllocatingLaneOn entry runs AllocateLaneDealStatusCancellingOn entry runs CancelDealDealStatusCancelledDealStatusRetryLegacyOn entry runs ProposeDealDealStatusWaitForAcceptanceLegacyDealStatusWaitingForLastBlocksDealStatusPaymentChannelAddingInitialFundsOn entry runs WaitPaymentChannelReadyThe following events are not shown cause they can trigger from any state.ClientEventWriteDealProposalErrored - transitions state to DealStatusErroredClientEventUnknownResponseReceived - transitions state to DealStatusFailingClientEventDataTransferError - transitions state to DealStatusErroredClientEventWriteDealPaymentErrored - transitions state to DealStatusErroredClientEventProviderCancelled - transitions state to DealStatusCancellingClientEventCancel - transitions state to DealStatusCancellingThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventProviderCancelledThe following events only record in this state.ClientEventPaymentNotSentClientEventPaymentSentThe following events only record in this state.ClientEventWaitForLastBlocksThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventDealProposedClientEventProviderCancelledThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceivedThe following events only record in this state.ClientEventLastPaymentRequestedClientEventPaymentRequestedClientEventAllBlocksReceivedClientEventBlocksReceived \ No newline at end of file diff --git a/docs/retrievalprovider.mmd b/docs/retrievalprovider.mmd index 7c68fd4d9..6caac383a 100644 --- a/docs/retrievalprovider.mmd +++ b/docs/retrievalprovider.mmd @@ -1,60 +1,72 @@ stateDiagram-v2 - state "DealStatusNew" as 0 - state "DealStatusUnsealing" as 1 - state "DealStatusUnsealed" as 2 - state "DealStatusFundsNeededUnseal" as 7 - state "DealStatusFailing" as 8 - state "DealStatusFundsNeeded" as 10 - state "DealStatusOngoing" as 13 - state "DealStatusFundsNeededLastPayment" as 14 - state "DealStatusCompleted" as 15 - state "DealStatusErrored" as 17 - state "DealStatusBlocksComplete" as 18 - state "DealStatusFinalizing" as 19 - state "DealStatusCompleting" as 20 - state "DealStatusCancelling" as 25 - state "DealStatusCancelled" as 26 - 1 : On entry runs UnsealData - 2 : On entry runs UnpauseDeal - 7 : On entry runs TrackTransfer - 8 : On entry runs CancelDeal - 20 : On entry runs CleanupDeal - 25 : On entry runs CancelDeal - [*] --> 0 - note right of 0 + state "DealStatusNew" as DealStatusNew + state "DealStatusUnsealing" as DealStatusUnsealing + state "DealStatusUnsealed" as DealStatusUnsealed + state "DealStatusFundsNeededUnseal" as DealStatusFundsNeededUnseal + state "DealStatusFailing" as DealStatusFailing + state "DealStatusFundsNeeded" as DealStatusFundsNeeded + state "DealStatusOngoing" as DealStatusOngoing + state "DealStatusFundsNeededLastPayment" as DealStatusFundsNeededLastPayment + state "DealStatusCompleted" as DealStatusCompleted + state "DealStatusErrored" as DealStatusErrored + state "DealStatusBlocksComplete" as DealStatusBlocksComplete + state "DealStatusFinalizing" as DealStatusFinalizing + state "DealStatusCompleting" as DealStatusCompleting + state "DealStatusCancelling" as DealStatusCancelling + state "DealStatusCancelled" as DealStatusCancelled + DealStatusUnsealing : On entry runs UnsealData + DealStatusUnsealed : On entry runs UnpauseDeal + DealStatusFundsNeededUnseal : On entry runs TrackTransfer + DealStatusFailing : On entry runs CancelDeal + DealStatusCompleting : On entry runs CleanupDeal + DealStatusCancelling : On entry runs CancelDeal + [*] --> DealStatusNew + note right of DealStatusNew The following events are not shown cause they can trigger from any state. ProviderEventDataTransferError - transitions state to DealStatusErrored ProviderEventMultiStoreError - transitions state to DealStatusErrored ProviderEventClientCancelled - transitions state to DealStatusCancelling end note - 0 --> 0 : ProviderEventOpen - 0 --> 1 : ProviderEventDealAccepted - 7 --> 7 : ProviderEventDealAccepted - 1 --> 8 : ProviderEventUnsealError - 1 --> 2 : ProviderEventUnsealComplete - 2 --> 13 : ProviderEventBlockSent - 13 --> 13 : ProviderEventBlockSent - 13 --> 18 : ProviderEventBlocksCompleted - 0 --> 7 : ProviderEventPaymentRequested - 2 --> 10 : ProviderEventPaymentRequested - 13 --> 10 : ProviderEventPaymentRequested - 18 --> 14 : ProviderEventPaymentRequested - 10 --> 8 : ProviderEventSaveVoucherFailed - 14 --> 8 : ProviderEventSaveVoucherFailed - 10 --> 10 : ProviderEventPartialPaymentReceived - 14 --> 14 : ProviderEventPartialPaymentReceived - 7 --> 1 : ProviderEventPaymentReceived - 10 --> 13 : ProviderEventPaymentReceived - 14 --> 19 : ProviderEventPaymentReceived - 18 --> 20 : ProviderEventComplete - 19 --> 20 : ProviderEventComplete - 20 --> 15 : ProviderEventCleanupComplete - 8 --> 17 : ProviderEventCancelComplete - 25 --> 26 : ProviderEventCancelComplete - - note left of 8 : The following events only record in this state.

ProviderEventClientCancelled - - - note left of 25 : The following events only record in this state.

ProviderEventClientCancelled + DealStatusNew --> DealStatusNew : ProviderEventOpen + DealStatusNew --> DealStatusUnsealing : ProviderEventDealAccepted + DealStatusFundsNeededUnseal --> DealStatusFundsNeededUnseal : ProviderEventDealAccepted + DealStatusUnsealing --> DealStatusFailing : ProviderEventUnsealError + DealStatusUnsealing --> DealStatusUnsealed : ProviderEventUnsealComplete + DealStatusUnsealed --> DealStatusOngoing : ProviderEventBlockSent + DealStatusOngoing --> DealStatusOngoing : ProviderEventBlockSent + DealStatusOngoing --> DealStatusBlocksComplete : ProviderEventBlocksCompleted + DealStatusNew --> DealStatusFundsNeededUnseal : ProviderEventPaymentRequested + DealStatusUnsealed --> DealStatusFundsNeeded : ProviderEventPaymentRequested + DealStatusOngoing --> DealStatusFundsNeeded : ProviderEventPaymentRequested + DealStatusBlocksComplete --> DealStatusFundsNeededLastPayment : ProviderEventPaymentRequested + DealStatusFundsNeeded --> DealStatusFailing : ProviderEventSaveVoucherFailed + DealStatusFundsNeededLastPayment --> DealStatusFailing : ProviderEventSaveVoucherFailed + DealStatusFundsNeeded --> DealStatusFundsNeeded : ProviderEventPartialPaymentReceived + DealStatusFundsNeededLastPayment --> DealStatusFundsNeededLastPayment : ProviderEventPartialPaymentReceived + DealStatusFundsNeededUnseal --> DealStatusUnsealing : ProviderEventPaymentReceived + DealStatusFundsNeeded --> DealStatusOngoing : ProviderEventPaymentReceived + DealStatusFundsNeededLastPayment --> DealStatusFinalizing : ProviderEventPaymentReceived + DealStatusBlocksComplete --> DealStatusCompleting : ProviderEventComplete + DealStatusFinalizing --> DealStatusCompleting : ProviderEventComplete + DealStatusCompleting --> DealStatusCompleted : ProviderEventCleanupComplete + DealStatusFailing --> DealStatusErrored : ProviderEventCancelComplete + DealStatusCancelling --> DealStatusCancelled : ProviderEventCancelComplete + + note left of DealStatusFailing : The following events only record in this state.

ProviderEventClientCancelled + + + note left of DealStatusFundsNeeded : The following events only record in this state.

ProviderEventPaymentRequested + + + note left of DealStatusOngoing : The following events only record in this state.

ProviderEventPaymentReceived + + + note left of DealStatusBlocksComplete : The following events only record in this state.

ProviderEventPaymentReceived + + + note left of DealStatusFinalizing : The following events only record in this state.

ProviderEventPaymentReceived + + + note left of DealStatusCancelling : The following events only record in this state.

ProviderEventClientCancelled diff --git a/docs/retrievalprovider.mmd.png b/docs/retrievalprovider.mmd.png index d64233a2e..0d97b8e1b 100644 Binary files a/docs/retrievalprovider.mmd.png and b/docs/retrievalprovider.mmd.png differ diff --git a/docs/retrievalprovider.mmd.svg b/docs/retrievalprovider.mmd.svg index 38fe43972..da711eae6 100644 --- a/docs/retrievalprovider.mmd.svg +++ b/docs/retrievalprovider.mmd.svg @@ -1,6 +1,6 @@ -ProviderEventOpenProviderEventDealAcceptedProviderEventDealAcceptedProviderEventUnsealErrorProviderEventUnsealCompleteProviderEventBlockSentProviderEventBlockSentProviderEventBlocksCompletedProviderEventPaymentRequestedProviderEventPaymentRequestedProviderEventPaymentRequestedProviderEventPaymentRequestedProviderEventSaveVoucherFailedProviderEventSaveVoucherFailedProviderEventPartialPaymentReceivedProviderEventPartialPaymentReceivedProviderEventPaymentReceivedProviderEventPaymentReceivedProviderEventPaymentReceivedProviderEventCompleteProviderEventCompleteProviderEventCleanupCompleteProviderEventCancelCompleteProviderEventCancelCompleteDealStatusNewDealStatusUnsealingOn entry runs UnsealDataDealStatusUnsealedOn entry runs UnpauseDealDealStatusFundsNeededUnsealOn entry runs TrackTransferDealStatusFailingOn entry runs CancelDealDealStatusFundsNeededDealStatusOngoingDealStatusFundsNeededLastPaymentDealStatusCompletedDealStatusErroredDealStatusBlocksCompleteDealStatusFinalizingDealStatusCompletingOn entry runs CleanupDealDealStatusCancellingOn entry runs CancelDealDealStatusCancelledThe following events are not shown cause they can trigger from any state.ProviderEventDataTransferError - transitions state to DealStatusErroredProviderEventMultiStoreError - transitions state to DealStatusErroredProviderEventClientCancelled - transitions state to DealStatusCancellingThe following events only record in this state.ProviderEventClientCancelledThe following events only record in this state.ProviderEventPaymentRequestedThe following events only record in this state.ProviderEventPaymentReceivedThe following events only record in this state.ProviderEventPaymentReceivedThe following events only record in this state.ProviderEventPaymentReceivedThe following events only record in this state.ProviderEventClientCancelled \ No newline at end of file diff --git a/go.mod b/go.mod index d6cc17785..e6d642560 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/filecoin-project/go-address v0.0.3 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434 - github.com/filecoin-project/go-data-transfer v1.4.3 + github.com/filecoin-project/go-data-transfer v1.5.0 github.com/filecoin-project/go-ds-versioning v0.1.0 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 @@ -21,7 +21,7 @@ require ( github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 - github.com/ipfs/go-graphsync v0.6.0 + github.com/ipfs/go-graphsync v0.6.1 github.com/ipfs/go-ipfs-blockstore v1.0.3 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index eb9385394..725190551 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= +github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= @@ -107,8 +109,8 @@ github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo= -github.com/filecoin-project/go-data-transfer v1.4.3 h1:ECEw69NOfmEZ7XN1NSBvj3KTbbH2mIczQs+Z2w4bD7c= -github.com/filecoin-project/go-data-transfer v1.4.3/go.mod h1:n8kbDQXWrY1c4UgfMa9KERxNCWbOTDwdNhf2MpN9dpo= +github.com/filecoin-project/go-data-transfer v1.5.0 h1:eXmcq7boRl/S3plV0/h4qdxkM6EgFIXF9y3UdOL0VXE= +github.com/filecoin-project/go-data-transfer v1.5.0/go.mod h1:E3WW4mCEYwU2y65swPEajSZoFWFmfXt7uwGduoACZQc= github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ= github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= @@ -283,8 +285,8 @@ github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPi github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= -github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0= -github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.6.1 h1:i9wN7YkBXWwIsUjVQeuaDxFB59yWZrG1xL564Nz7aGE= +github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= diff --git a/retrievalmarket/dealstatus.go b/retrievalmarket/dealstatus.go index c73e76a05..fb279443d 100644 --- a/retrievalmarket/dealstatus.go +++ b/retrievalmarket/dealstatus.go @@ -1,5 +1,7 @@ package retrievalmarket +import "fmt" + // DealStatus is the status of a retrieval deal returned by a provider // in a DealResponse type DealStatus uint64 @@ -154,3 +156,11 @@ var DealStatuses = map[DealStatus]string{ DealStatusClientWaitingForLastBlocks: "DealStatusWaitingForLastBlocks", DealStatusPaymentChannelAddingInitialFunds: "DealStatusPaymentChannelAddingInitialFunds", } + +func (s DealStatus) String() string { + str, ok := DealStatuses[s] + if ok { + return str + } + return fmt.Sprintf("DealStatusUnknown - %d", s) +} diff --git a/retrievalmarket/events.go b/retrievalmarket/events.go index a77e47b59..0410c6c9b 100644 --- a/retrievalmarket/events.go +++ b/retrievalmarket/events.go @@ -1,5 +1,7 @@ package retrievalmarket +import "fmt" + // ClientEvent is an event that occurs in a deal lifecycle on the client type ClientEvent uint64 @@ -86,7 +88,8 @@ const ( // ClientEventPaymentSent indicates a payment was sent to the provider ClientEventPaymentSent - // ClientEventComplete indicates a deal has completed + // ClientEventComplete is fired when the provider sends a message + // indicating that a deal has completed ClientEventComplete // ClientEventDataTransferError emits when something go wrong at the data transfer level @@ -123,6 +126,10 @@ const ( // ClientEventPaymentChannelSkip is fired when the total deal price is zero // so there's no need to set up a payment channel ClientEventPaymentChannelSkip + + // ClientEventPaymentNotSent indicates that payment was requested, but no + // payment was actually due, so a voucher was not sent to the provider + ClientEventPaymentNotSent ) // ClientEvents is a human readable map of client event name -> event description @@ -163,6 +170,15 @@ var ClientEvents = map[ClientEvent]string{ ClientEventCancel: "ClientEventCancel", ClientEventWaitForLastBlocks: "ClientEventWaitForLastBlocks", ClientEventPaymentChannelSkip: "ClientEventPaymentChannelSkip", + ClientEventPaymentNotSent: "ClientEventPaymentNotSent", +} + +func (e ClientEvent) String() string { + s, ok := ClientEvents[e] + if ok { + return s + } + return fmt.Sprintf("ClientEventUnknown: %d", e) } // ProviderEvent is an event that occurs in a deal lifecycle on the provider diff --git a/retrievalmarket/impl/clientstates/client_fsm.go b/retrievalmarket/impl/clientstates/client_fsm.go index ebf4923d6..ae2698d9a 100644 --- a/retrievalmarket/impl/clientstates/client_fsm.go +++ b/retrievalmarket/impl/clientstates/client_fsm.go @@ -167,6 +167,8 @@ var ClientEvents = fsm.Events{ rm.DealStatusOngoing, rm.DealStatusFundsNeededLastPayment, rm.DealStatusFundsNeeded).To(rm.DealStatusFundsNeededLastPayment). + From(rm.DealStatusSendFunds).To(rm.DealStatusOngoing). + From(rm.DealStatusCheckComplete).ToNoChange(). From(rm.DealStatusBlocksComplete).To(rm.DealStatusSendFundsLastPayment). FromMany( paymentChannelCreationStates...).ToJustRecord(). @@ -179,7 +181,10 @@ var ClientEvents = fsm.Events{ FromMany( rm.DealStatusOngoing, rm.DealStatusBlocksComplete, - rm.DealStatusFundsNeeded).To(rm.DealStatusFundsNeeded). + rm.DealStatusFundsNeeded, + rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusFundsNeeded). + From(rm.DealStatusSendFunds).To(rm.DealStatusOngoing). + From(rm.DealStatusCheckComplete).ToNoChange(). FromMany( paymentChannelCreationStates...).ToJustRecord(). Action(func(deal *rm.ClientDealState, paymentOwed abi.TokenAmount) error { @@ -201,9 +206,11 @@ var ClientEvents = fsm.Events{ rm.DealStatusBlocksComplete, ).To(rm.DealStatusBlocksComplete). FromMany(paymentChannelCreationStates...).ToJustRecord(). - FromMany(rm.DealStatusSendFunds, rm.DealStatusFundsNeeded).ToJustRecord(). + FromMany(rm.DealStatusSendFunds, rm.DealStatusSendFundsLastPayment).To(rm.DealStatusOngoing). + From(rm.DealStatusFundsNeeded).ToNoChange(). From(rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusSendFundsLastPayment). From(rm.DealStatusClientWaitingForLastBlocks).To(rm.DealStatusCompleted). + From(rm.DealStatusCheckComplete).To(rm.DealStatusCompleted). Action(func(deal *rm.ClientDealState) error { deal.AllBlocksReceived = true return nil @@ -214,10 +221,12 @@ var ClientEvents = fsm.Events{ rm.DealStatusFundsNeededLastPayment, rm.DealStatusCheckComplete, rm.DealStatusClientWaitingForLastBlocks).ToNoChange(). + FromMany(rm.DealStatusSendFunds, rm.DealStatusSendFundsLastPayment).To(rm.DealStatusOngoing). FromMany(paymentChannelCreationStates...).ToJustRecord(). Action(recordReceived), fsm.Event(rm.ClientEventSendFunds). + FromMany(rm.DealStatusSendFunds, rm.DealStatusSendFundsLastPayment).To(rm.DealStatusOngoing). From(rm.DealStatusFundsNeeded).To(rm.DealStatusSendFunds). From(rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusSendFundsLastPayment), @@ -252,35 +261,68 @@ var ClientEvents = fsm.Events{ deal.Message = xerrors.Errorf("writing deal payment: %w", err).Error() return nil }), - fsm.Event(rm.ClientEventPaymentSent). + + // Payment was requested, but there was not actually any payment due, so + // no payment voucher was actually sent + fsm.Event(rm.ClientEventPaymentNotSent). + From(rm.DealStatusOngoing).ToJustRecord(). From(rm.DealStatusSendFunds).To(rm.DealStatusOngoing). + From(rm.DealStatusSendFundsLastPayment).To(rm.DealStatusFinalizing), + + fsm.Event(rm.ClientEventPaymentSent). + From(rm.DealStatusOngoing).ToJustRecord(). + From(rm.DealStatusBlocksComplete).To(rm.DealStatusCheckComplete). + From(rm.DealStatusCheckComplete).ToNoChange(). + FromMany( + rm.DealStatusFundsNeeded, + rm.DealStatusFundsNeededLastPayment, + rm.DealStatusSendFunds).To(rm.DealStatusOngoing). From(rm.DealStatusSendFundsLastPayment).To(rm.DealStatusFinalizing). - Action(func(deal *rm.ClientDealState) error { - // paymentRequested = 0 - // fundsSpent = fundsSpent + paymentRequested - // if paymentRequested / pricePerByte >= currentInterval - // currentInterval = currentInterval + proposal.intervalIncrease - // bytesPaidFor = bytesPaidFor + (paymentRequested / pricePerByte) - deal.FundsSpent = big.Add(deal.FundsSpent, deal.PaymentRequested) - - paymentForUnsealing := big.Min(deal.PaymentRequested, big.Sub(deal.UnsealPrice, deal.UnsealFundsPaid)) - deal.UnsealFundsPaid = big.Add(deal.UnsealFundsPaid, paymentForUnsealing) - - // If the price per bytes is zero, we ONLY need to account for the Unsealing payments here. - if !deal.PricePerByte.IsZero() { - bytesPaidFor := big.Div(big.Sub(deal.PaymentRequested, paymentForUnsealing), deal.PricePerByte).Uint64() - if bytesPaidFor >= deal.CurrentInterval { - deal.CurrentInterval += deal.DealProposal.PaymentIntervalIncrease - } - deal.BytesPaidFor += bytesPaidFor + Action(func(deal *rm.ClientDealState, voucherAmt abi.TokenAmount) error { + // Reduce the payment requested by the amount of funds sent. + // Note that it may not be reduced to zero, if a new payment + // request came in while this one was being processed. + sentAmt := big.Sub(voucherAmt, deal.FundsSpent) + deal.PaymentRequested = big.Sub(deal.PaymentRequested, sentAmt) + + // Update the total funds sent to the provider + deal.FundsSpent = voucherAmt + + // If the unseal price hasn't yet been met, set the unseal funds + // paid to the amount sent to the provider + if deal.UnsealPrice.GreaterThanEqual(deal.FundsSpent) { + deal.UnsealFundsPaid = deal.FundsSpent + return nil + } + // The unseal funds have been fully paid + deal.UnsealFundsPaid = deal.UnsealPrice + + // If the price per byte is zero, no further accounting needed + if deal.PricePerByte.IsZero() { + return nil + } + + // Calculate the amount spent on transferring data, and update the + // bytes paid for accordingly + paidSoFarForTransfer := big.Sub(deal.FundsSpent, deal.UnsealFundsPaid) + deal.BytesPaidFor = big.Div(paidSoFarForTransfer, deal.PricePerByte).Uint64() + + // If the number of bytes paid for is above the current interval, + // increase the interval + if deal.BytesPaidFor >= deal.CurrentInterval { + deal.CurrentInterval = deal.NextInterval() } - deal.PaymentRequested = abi.NewTokenAmount(0) return nil }), // completing deals fsm.Event(rm.ClientEventComplete). + FromMany( + rm.DealStatusSendFunds, + rm.DealStatusSendFundsLastPayment, + rm.DealStatusFundsNeeded, + rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusCheckComplete). From(rm.DealStatusOngoing).To(rm.DealStatusCheckComplete). From(rm.DealStatusBlocksComplete).To(rm.DealStatusCheckComplete). From(rm.DealStatusFinalizing).To(rm.DealStatusCompleted), @@ -297,7 +339,8 @@ var ClientEvents = fsm.Events{ // should wait for the last blocks to arrive (only needed when price // per byte is zero) fsm.Event(rm.ClientEventWaitForLastBlocks). - From(rm.DealStatusCheckComplete).To(rm.DealStatusClientWaitingForLastBlocks), + From(rm.DealStatusCheckComplete).To(rm.DealStatusClientWaitingForLastBlocks). + From(rm.DealStatusCompleted).ToJustRecord(), // after cancelling a deal is complete fsm.Event(rm.ClientEventCancelComplete). diff --git a/retrievalmarket/impl/clientstates/client_states.go b/retrievalmarket/impl/clientstates/client_states.go index 3826f59cf..fe7918d5f 100644 --- a/retrievalmarket/impl/clientstates/client_states.go +++ b/retrievalmarket/impl/clientstates/client_states.go @@ -3,6 +3,7 @@ package clientstates import ( "context" + logging "github.com/ipfs/go-log/v2" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" @@ -14,6 +15,8 @@ import ( rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" ) +var log = logging.Logger("markets-rtvl") + // ClientDealEnvironment is a bridge to the environment a client deal is executing in. // It provides access to relevant functionality on the retrieval client type ClientDealEnvironment interface { @@ -89,22 +92,59 @@ func Ongoing(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientD // ProcessPaymentRequested processes a request for payment from the provider func ProcessPaymentRequested(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error { - // see if we need to send payment - if deal.TotalReceived-deal.BytesPaidFor >= deal.CurrentInterval || - deal.AllBlocksReceived || - deal.UnsealPrice.GreaterThan(deal.UnsealFundsPaid) { + // If the unseal payment hasn't been made, we need to send funds + if deal.UnsealPrice.GreaterThan(deal.UnsealFundsPaid) { + log.Debugf("client: payment needed: unseal price %d > unseal paid %d", + deal.UnsealPrice, deal.UnsealFundsPaid) + return ctx.Trigger(rm.ClientEventSendFunds) + } + + // If all bytes received have been paid for, we don't need to send funds + if deal.BytesPaidFor >= deal.TotalReceived { + log.Debugf("client: no payment needed: bytes paid for %d >= bytes received %d", + deal.BytesPaidFor, deal.TotalReceived) + return nil + } + + // Not all bytes received have been paid for + + // If all blocks have been received we need to send a final payment + if deal.AllBlocksReceived { + log.Debugf("client: payment needed: all blocks received, bytes paid for %d < bytes received %d", + deal.BytesPaidFor, deal.TotalReceived) return ctx.Trigger(rm.ClientEventSendFunds) } + + // Payments are made in intervals, as bytes are received from the provider. + // If the number of bytes received is at or above the size of the current + // interval, we need to send a payment. + if deal.TotalReceived >= deal.CurrentInterval { + log.Debugf("client: payment needed: bytes received %d >= interval %d, bytes paid for %d < bytes received %d", + deal.TotalReceived, deal.CurrentInterval, deal.BytesPaidFor, deal.TotalReceived) + return ctx.Trigger(rm.ClientEventSendFunds) + } + + log.Debugf("client: no payment needed: received %d < interval %d (paid for %d)", + deal.TotalReceived, deal.CurrentInterval, deal.BytesPaidFor) return nil } // SendFunds sends the next amount requested by the provider func SendFunds(ctx fsm.Context, environment ClientDealEnvironment, deal rm.ClientDealState) error { - // check that paymentRequest <= (totalReceived - bytesPaidFor) * pricePerByte + (unsealPrice - unsealFundsPaid), or fail - retrievalPrice := big.Mul(abi.NewTokenAmount(int64(deal.TotalReceived-deal.BytesPaidFor)), deal.PricePerByte) - unsealPrice := big.Sub(deal.UnsealPrice, deal.UnsealFundsPaid) - if deal.PaymentRequested.GreaterThan(big.Add(retrievalPrice, unsealPrice)) { - return ctx.Trigger(rm.ClientEventBadPaymentRequested, "too much money requested for bytes sent") + totalBytesToPayFor := deal.TotalReceived + + // If unsealing has been paid for, and not all blocks have been received + if deal.UnsealFundsPaid.GreaterThanEqual(deal.UnsealPrice) && !deal.AllBlocksReceived { + // If the number of bytes received is less than the number required + // for the current payment interval, no need to send a payment + if totalBytesToPayFor < deal.CurrentInterval { + log.Debugf("client: ignoring payment request for %d: total bytes to pay for %d < interval %d", + deal.PaymentRequested, totalBytesToPayFor, deal.CurrentInterval) + return ctx.Trigger(rm.ClientEventPaymentNotSent) + } + + // Otherwise round the number of bytes to pay for down to the current interval + totalBytesToPayFor = deal.CurrentInterval } tok, _, err := environment.Node().GetChainHead(ctx.Context()) @@ -112,19 +152,37 @@ func SendFunds(ctx fsm.Context, environment ClientDealEnvironment, deal rm.Clien return ctx.Trigger(rm.ClientEventCreateVoucherFailed, err) } - // create payment voucher with node (or fail) for (fundsSpent + paymentRequested) - // use correct payCh + lane - // (node will do subtraction back to paymentRequested... slightly odd behavior but... well anyway) - voucher, err := environment.Node().CreatePaymentVoucher(ctx.Context(), deal.PaymentInfo.PayCh, big.Add(deal.FundsSpent, deal.PaymentRequested), deal.PaymentInfo.Lane, tok) + // Calculate the payment amount due for data received + transferPrice := big.Mul(abi.NewTokenAmount(int64(totalBytesToPayFor)), deal.PricePerByte) + // Calculate the total amount including the unsealing cost + totalPrice := big.Add(transferPrice, deal.UnsealPrice) + + // If we've already sent at or above the amount due, no need to send funds + if totalPrice.LessThanEqual(deal.FundsSpent) { + log.Debugf("client: not sending voucher: funds spent %d >= total price %d: transfer price %d + unseal price %d (payment requested %d)", + deal.FundsSpent, totalPrice, transferPrice, deal.UnsealPrice, deal.PaymentRequested) + return ctx.Trigger(rm.ClientEventPaymentNotSent) + } + + log.Debugf("client: sending voucher for %d = transfer price %d + unseal price %d (payment requested %d)", + totalPrice, transferPrice, deal.UnsealPrice, deal.PaymentRequested) + + // Create a payment voucher + voucher, err := environment.Node().CreatePaymentVoucher(ctx.Context(), deal.PaymentInfo.PayCh, totalPrice, deal.PaymentInfo.Lane, tok) if err != nil { shortfallErr, ok := err.(rm.ShortfallError) if ok { + // There were not enough funds in the payment channel to create a + // voucher of this amount, so the client needs to add more funds to + // the payment channel + log.Debugf("client: voucher shortfall of %d when creating voucher for %d", + shortfallErr.Shortfall(), totalPrice) return ctx.Trigger(rm.ClientEventVoucherShortfall, shortfallErr.Shortfall()) } return ctx.Trigger(rm.ClientEventCreateVoucherFailed, err) } - // send payment voucher (or fail) + // Send the payment voucher err = environment.SendDataTransferVoucher(ctx.Context(), *deal.ChannelID, &rm.DealPayment{ ID: deal.DealProposal.ID, PaymentChannel: deal.PaymentInfo.PayCh, @@ -134,7 +192,7 @@ func SendFunds(ctx fsm.Context, environment ClientDealEnvironment, deal rm.Clien return ctx.Trigger(rm.ClientEventWriteDealPaymentErrored, err) } - return ctx.Trigger(rm.ClientEventPaymentSent) + return ctx.Trigger(rm.ClientEventPaymentSent, totalPrice) } // CheckFunds examines current available funds in a payment channel after a voucher shortfall to determine diff --git a/retrievalmarket/impl/clientstates/client_states_test.go b/retrievalmarket/impl/clientstates/client_states_test.go index e2a762183..666be775b 100644 --- a/retrievalmarket/impl/clientstates/client_states_test.go +++ b/retrievalmarket/impl/clientstates/client_states_test.go @@ -298,23 +298,55 @@ func TestProcessPaymentRequested(t *testing.T) { fsmCtx.ReplayEvents(t, dealState) } - t.Run("it works - to send funds", func(t *testing.T) { + t.Run("send funds last payment", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusFundsNeededLastPayment) + dealState.TotalReceived = defaultBytesPaidFor + 500 + dealState.AllBlocksReceived = true + runProcessPaymentRequested(t, dealState) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusSendFundsLastPayment) + }) + + t.Run("send funds if unseal funds needed", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusFundsNeeded) + dealState.UnsealPrice = abi.NewTokenAmount(1000) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) runProcessPaymentRequested(t, dealState) require.Equal(t, dealState.Status, retrievalmarket.DealStatusSendFunds) }) - t.Run("it works - to send funds", func(t *testing.T) { - dealState := makeDealState(retrievalmarket.DealStatusFundsNeededLastPayment) - dealState.TotalReceived = defaultBytesPaidFor + 500 + t.Run("dont send funds if paid for all bytes received", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusFundsNeeded) + dealState.BytesPaidFor = 1000 + dealState.TotalReceived = 1000 + dealState.CurrentInterval = 1000 + runProcessPaymentRequested(t, dealState) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusFundsNeeded) + }) + + t.Run("send funds if not all bytes paid for and all blocks received", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusFundsNeeded) + dealState.BytesPaidFor = 900 + dealState.TotalReceived = 1000 dealState.AllBlocksReceived = true + dealState.CurrentInterval = 1000 runProcessPaymentRequested(t, dealState) - require.Equal(t, dealState.Status, retrievalmarket.DealStatusSendFundsLastPayment) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusSendFunds) }) - t.Run("no change", func(t *testing.T) { + t.Run("send funds if total received > current deal interval", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusFundsNeeded) - dealState.BytesPaidFor = defaultBytesPaidFor + 500 + dealState.BytesPaidFor = 900 + dealState.TotalReceived = 1000 + dealState.CurrentInterval = 900 + runProcessPaymentRequested(t, dealState) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusSendFunds) + }) + + t.Run("dont send funds if total received < current deal interval", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusFundsNeeded) + dealState.BytesPaidFor = 900 + dealState.TotalReceived = 999 + dealState.CurrentInterval = 1000 runProcessPaymentRequested(t, dealState) require.Equal(t, dealState.Status, retrievalmarket.DealStatusFundsNeeded) }) @@ -343,112 +375,203 @@ func TestSendFunds(t *testing.T) { testVoucher := &paych.SignedVoucher{} - t.Run("it works", func(t *testing.T) { + t.Run("send funds", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusSendFunds) var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(200) + dealState.UnsealFundsPaid = abi.NewTokenAmount(200) + dealState.BytesPaidFor = 800 + dealState.FundsSpent = abi.NewTokenAmount(1000) + dealState.PaymentRequested = abi.NewTokenAmount(500) + dealState.CurrentInterval = 1000 + dealState.PaymentInterval = 1000 + dealState.PaymentIntervalIncrease = 100 + dealState.TotalReceived = 1000 + + // Should send voucher for 1200 = transfer price (1000 * 1) + unseal price 200 runSendFunds(t, sendVoucherError, nodeParams, dealState) require.Empty(t, dealState.Message) - require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(0)) - require.Equal(t, dealState.FundsSpent, big.Add(defaultFundsSpent, defaultPaymentRequested)) - require.Equal(t, dealState.BytesPaidFor, defaultTotalReceived) - require.Equal(t, dealState.CurrentInterval, defaultCurrentInterval+defaultIntervalIncrease) + require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(500-(1000-800))) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(1000+200)) + require.EqualValues(t, dealState.BytesPaidFor, 1000) + require.EqualValues(t, dealState.CurrentInterval, 1000+(1000+100)) require.Equal(t, dealState.Status, retrievalmarket.DealStatusOngoing) }) - t.Run("last payment", func(t *testing.T) { + t.Run("send funds last payment", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusSendFundsLastPayment) var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(200) + dealState.UnsealFundsPaid = abi.NewTokenAmount(200) + dealState.BytesPaidFor = 800 + dealState.FundsSpent = abi.NewTokenAmount(1000) + dealState.PaymentRequested = abi.NewTokenAmount(500) + dealState.CurrentInterval = 1000 + dealState.PaymentInterval = 1000 + dealState.PaymentIntervalIncrease = 100 + dealState.TotalReceived = 1000 + + // Should send voucher for 1200 = transfer price (1000 * 1) + unseal price 200 runSendFunds(t, sendVoucherError, nodeParams, dealState) require.Empty(t, dealState.Message) - require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(0)) - require.Equal(t, dealState.FundsSpent, big.Add(defaultFundsSpent, defaultPaymentRequested)) - require.Equal(t, dealState.BytesPaidFor, defaultTotalReceived) - require.Equal(t, dealState.CurrentInterval, defaultCurrentInterval+defaultIntervalIncrease) + require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(500-(1000-800))) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(1000+200)) + require.EqualValues(t, dealState.BytesPaidFor, 1000) + require.EqualValues(t, dealState.CurrentInterval, 1000+(1000+100)) require.Equal(t, dealState.Status, retrievalmarket.DealStatusFinalizing) }) - t.Run("only unsealing payment is accounted for when price per bytes is zero", func(t *testing.T) { - dealState := makeDealState(retrievalmarket.DealStatusSendFundsLastPayment) - unsealPrice := abi.NewTokenAmount(100) - - dealState.UnsealPrice = unsealPrice - dealState.PaymentRequested = unsealPrice - dealState.PricePerByte = abi.NewTokenAmount(0) - + t.Run("dont send funds if total received less then interval", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusSendFunds) var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(200) + dealState.UnsealFundsPaid = abi.NewTokenAmount(200) + dealState.BytesPaidFor = 800 + dealState.FundsSpent = abi.NewTokenAmount(1000) + dealState.PaymentRequested = abi.NewTokenAmount(500) + dealState.CurrentInterval = 2000 + dealState.TotalReceived = 1000 + + // Should not send voucher runSendFunds(t, sendVoucherError, nodeParams, dealState) require.Empty(t, dealState.Message) - require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(0)) - require.Equal(t, dealState.FundsSpent, big.Add(defaultFundsSpent, unsealPrice)) - require.Equal(t, dealState.BytesPaidFor, defaultBytesPaidFor) - require.Equal(t, dealState.CurrentInterval, defaultCurrentInterval) - require.Equal(t, dealState.Status, retrievalmarket.DealStatusFinalizing) + require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(500)) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(1000)) + require.EqualValues(t, dealState.BytesPaidFor, 800) + require.EqualValues(t, dealState.CurrentInterval, 2000) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusOngoing) }) - t.Run("more bytes since last payment than interval works, can charge more", func(t *testing.T) { + t.Run("dont send funds if total price <= funds spent", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusSendFunds) - dealState.BytesPaidFor = defaultBytesPaidFor - 500 - largerPaymentRequested := abi.NewTokenAmount(750000) - dealState.PaymentRequested = largerPaymentRequested var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(200) + dealState.UnsealFundsPaid = abi.NewTokenAmount(200) + dealState.BytesPaidFor = 800 + dealState.FundsSpent = abi.NewTokenAmount(1200) + dealState.PaymentRequested = abi.NewTokenAmount(500) + dealState.CurrentInterval = 1000 + dealState.PaymentInterval = 1000 + dealState.PaymentIntervalIncrease = 100 + dealState.TotalReceived = 1000 + + // Total price 1200 = transfer price (1000 * 1) + unseal price 200 + // Funds spent = 1200 + // So don't send voucher runSendFunds(t, sendVoucherError, nodeParams, dealState) require.Empty(t, dealState.Message) - require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(0)) - require.Equal(t, dealState.FundsSpent, big.Add(defaultFundsSpent, largerPaymentRequested)) - require.Equal(t, dealState.BytesPaidFor, defaultTotalReceived) - require.Equal(t, dealState.CurrentInterval, defaultCurrentInterval+defaultIntervalIncrease) + require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(500)) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(1200)) + require.EqualValues(t, dealState.BytesPaidFor, 800) + require.EqualValues(t, dealState.CurrentInterval, 1000) require.Equal(t, dealState.Status, retrievalmarket.DealStatusOngoing) }) - t.Run("too much payment requested", func(t *testing.T) { + t.Run("dont send funds if interval not met", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusSendFunds) - dealState.PaymentRequested = abi.NewTokenAmount(750000) var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(0) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) + dealState.FundsSpent = abi.NewTokenAmount(1000) + dealState.PaymentRequested = abi.NewTokenAmount(200) + dealState.BytesPaidFor = 1000 + dealState.TotalReceived = 1200 + dealState.CurrentInterval = 2000 + + // Should not send voucher: bytes received 1200 < interval 2000 runSendFunds(t, sendVoucherError, nodeParams, dealState) - require.NotEmpty(t, dealState.Message) - require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailing) + require.Empty(t, dealState.Message) + require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(200)) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(1000)) + require.EqualValues(t, dealState.BytesPaidFor, 1000) + require.EqualValues(t, dealState.CurrentInterval, 2000) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusOngoing) }) - t.Run("too little payment requested works but records correctly", func(t *testing.T) { + t.Run("send funds if all blocks received, even if interval not met", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusSendFunds) - smallerPaymentRequested := abi.NewTokenAmount(250000) - dealState.PaymentRequested = smallerPaymentRequested var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(0) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) + dealState.FundsSpent = abi.NewTokenAmount(1000) + dealState.PaymentRequested = abi.NewTokenAmount(200) + dealState.BytesPaidFor = 1000 + dealState.TotalReceived = 1200 + dealState.CurrentInterval = 2000 + dealState.AllBlocksReceived = true + + // Should send voucher for 1200 = transfer price (1200 * 1) runSendFunds(t, sendVoucherError, nodeParams, dealState) require.Empty(t, dealState.Message) - require.Equal(t, dealState.PaymentRequested, abi.NewTokenAmount(0)) - require.Equal(t, dealState.FundsSpent, big.Add(defaultFundsSpent, smallerPaymentRequested)) - // only records change for those bytes paid for - require.Equal(t, dealState.BytesPaidFor, defaultBytesPaidFor+500) - // no interval increase - require.Equal(t, dealState.CurrentInterval, defaultCurrentInterval) + require.True(t, dealState.PaymentRequested.IsZero()) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(1200)) + require.EqualValues(t, dealState.BytesPaidFor, 1200) require.Equal(t, dealState.Status, retrievalmarket.DealStatusOngoing) }) + t.Run("only unsealing payment is accounted for when price per bytes is zero", func(t *testing.T) { + dealState := makeDealState(retrievalmarket.DealStatusSendFundsLastPayment) + + dealState.PricePerByte = abi.NewTokenAmount(0) + dealState.UnsealPrice = abi.NewTokenAmount(200) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) + dealState.BytesPaidFor = 0 + dealState.FundsSpent = abi.NewTokenAmount(0) + dealState.PaymentRequested = abi.NewTokenAmount(200) + dealState.CurrentInterval = 1000 + dealState.TotalReceived = 1000 + + var sendVoucherError error = nil + nodeParams := testnodes.TestRetrievalClientNodeParams{ + Voucher: testVoucher, + } + runSendFunds(t, sendVoucherError, nodeParams, dealState) + require.Empty(t, dealState.Message) + require.True(t, dealState.PaymentRequested.IsZero()) + require.Equal(t, dealState.FundsSpent, abi.NewTokenAmount(200)) + require.EqualValues(t, dealState.BytesPaidFor, 0) + require.EqualValues(t, dealState.CurrentInterval, 1000) + require.Equal(t, dealState.Status, retrievalmarket.DealStatusFinalizing) + }) + t.Run("voucher create fails", func(t *testing.T) { dealState := makeDealState(retrievalmarket.DealStatusSendFunds) var sendVoucherError error = nil nodeParams := testnodes.TestRetrievalClientNodeParams{ VoucherError: errors.New("Something Went Wrong"), } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(0) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) + dealState.BytesPaidFor = 0 + dealState.FundsSpent = abi.NewTokenAmount(0) + dealState.PaymentRequested = abi.NewTokenAmount(1000) + dealState.CurrentInterval = 1000 + dealState.TotalReceived = 1000 runSendFunds(t, sendVoucherError, nodeParams, dealState) require.NotEmpty(t, dealState.Message) require.Equal(t, dealState.Status, retrievalmarket.DealStatusFailing) @@ -461,6 +584,14 @@ func TestSendFunds(t *testing.T) { nodeParams := testnodes.TestRetrievalClientNodeParams{ VoucherError: retrievalmarket.NewShortfallError(shortFall), } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(0) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) + dealState.BytesPaidFor = 0 + dealState.FundsSpent = abi.NewTokenAmount(0) + dealState.PaymentRequested = abi.NewTokenAmount(1000) + dealState.CurrentInterval = 1000 + dealState.TotalReceived = 1000 runSendFunds(t, sendVoucherError, nodeParams, dealState) require.Empty(t, dealState.Message) require.Equal(t, dealState.Status, retrievalmarket.DealStatusCheckFunds) @@ -472,6 +603,14 @@ func TestSendFunds(t *testing.T) { nodeParams := testnodes.TestRetrievalClientNodeParams{ Voucher: testVoucher, } + dealState.PricePerByte = abi.NewTokenAmount(1) + dealState.UnsealPrice = abi.NewTokenAmount(0) + dealState.UnsealFundsPaid = abi.NewTokenAmount(0) + dealState.BytesPaidFor = 0 + dealState.FundsSpent = abi.NewTokenAmount(0) + dealState.PaymentRequested = abi.NewTokenAmount(1000) + dealState.CurrentInterval = 1000 + dealState.TotalReceived = 1000 runSendFunds(t, sendVoucherError, nodeParams, dealState) require.NotEmpty(t, dealState.Message) require.Equal(t, dealState.Status, retrievalmarket.DealStatusErrored) diff --git a/retrievalmarket/impl/dtutils/dtutils.go b/retrievalmarket/impl/dtutils/dtutils.go index da7a021e1..ce98ef6fa 100644 --- a/retrievalmarket/impl/dtutils/dtutils.go +++ b/retrievalmarket/impl/dtutils/dtutils.go @@ -89,10 +89,10 @@ func clientEventForResponse(response *rm.DealResponse) (rm.ClientEvent, []interf return rm.ClientEventLastPaymentRequested, []interface{}{response.PaymentOwed} case rm.DealStatusCompleted: return rm.ClientEventComplete, nil - case rm.DealStatusFundsNeeded: + case rm.DealStatusFundsNeeded, rm.DealStatusOngoing: return rm.ClientEventPaymentRequested, []interface{}{response.PaymentOwed} default: - return rm.ClientEventUnknownResponseReceived, nil + return rm.ClientEventUnknownResponseReceived, []interface{}{response.Status} } } @@ -100,7 +100,7 @@ const noEvent = rm.ClientEvent(math.MaxUint64) func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelState) (rm.ClientEvent, []interface{}) { switch event.Code { - case datatransfer.DataReceived: + case datatransfer.DataReceivedProgress: return rm.ClientEventBlocksReceived, []interface{}{channelState.Received()} case datatransfer.FinishTransfer: return rm.ClientEventAllBlocksReceived, nil diff --git a/retrievalmarket/impl/dtutils/dtutils_test.go b/retrievalmarket/impl/dtutils/dtutils_test.go index 5fd5f4a9e..52b3f47e8 100644 --- a/retrievalmarket/impl/dtutils/dtutils_test.go +++ b/retrievalmarket/impl/dtutils/dtutils_test.go @@ -174,7 +174,7 @@ func TestClientDataTransferSubscriber(t *testing.T) { ignored: true, }, "progress": { - code: datatransfer.DataReceived, + code: datatransfer.DataReceivedProgress, state: shared_testutil.TestChannelParams{ Vouchers: []datatransfer.Voucher{&dealProposal}, Status: datatransfer.Ongoing, @@ -302,6 +302,7 @@ func TestClientDataTransferSubscriber(t *testing.T) { Status: datatransfer.Ongoing}, expectedID: dealProposal.ID, expectedEvent: rm.ClientEventUnknownResponseReceived, + expectedArgs: []interface{}{retrievalmarket.DealStatusPaymentChannelAddingFunds}, }, "error": { code: datatransfer.Error, diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 2aa2e7702..d3c237081 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -221,7 +221,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { filename: "lorem_under_1_block.txt", filesize: 410, unsealPrice: abi.NewTokenAmount(100), - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(100), abi.NewTokenAmount(410000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(100), abi.NewTokenAmount(410100)}, selector: shared.AllSelector(), paramsV1: true, }, @@ -272,7 +272,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { {name: "multi-block file retrieval succeeds", filename: "lorem.txt", filesize: 19000, - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10000000), abi.NewTokenAmount(19920000)}, }, {name: "multi-block file retrieval with zero price per byte succeeds", filename: "lorem.txt", @@ -282,7 +282,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { {name: "multi-block file retrieval succeeds with V1 params and AllSelector", filename: "lorem.txt", filesize: 19000, - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10000000), abi.NewTokenAmount(19920000)}, paramsV1: true, selector: shared.AllSelector()}, {name: "partial file retrieval succeeds with V1 params and selector recursion depth 1", @@ -303,7 +303,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { {name: "succeeds for regular blockstore", filename: "lorem.txt", filesize: 19000, - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10000000), abi.NewTokenAmount(19920000)}, skipStores: true, }, { @@ -313,18 +313,27 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { voucherAmts: []abi.TokenAmount{}, failsUnseal: true, }, - {name: "multi-block file retrieval succeeds, final block lands on payment interval", + {name: "multi-block file retrieval succeeds, final block exceeds payment interval", filename: "lorem.txt", filesize: 19000, - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(9112000), abi.NewTokenAmount(10808000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(9000000), abi.NewTokenAmount(19250000), abi.NewTokenAmount(19920000)}, paymentInterval: 9000, paymentIntervalIncrease: 1250, }, + {name: "multi-block file retrieval succeeds, final block lands on payment interval", + filename: "lorem.txt", + filesize: 19000, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(9000000), abi.NewTokenAmount(19920000)}, + // Total bytes: 19,920 + // intervals: 9,000 | 9,000 + (9,000 + 1920) + paymentInterval: 9000, + paymentIntervalIncrease: 1920, + }, {name: "multi-block file retrieval succeeds, with provider only accepting legacy deals", filename: "lorem.txt", filesize: 19000, disableNewDeals: true, - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10000000), abi.NewTokenAmount(19920000)}, }, } diff --git a/retrievalmarket/impl/provider_environments.go b/retrievalmarket/impl/provider_environments.go index f714606db..ec1067329 100644 --- a/retrievalmarket/impl/provider_environments.go +++ b/retrievalmarket/impl/provider_environments.go @@ -312,7 +312,8 @@ type providerStoreGetter struct { func (psg *providerStoreGetter) Get(otherPeer peer.ID, dealID retrievalmarket.DealID) (*multistore.Store, error) { var deal retrievalmarket.ProviderDealState - err := psg.p.stateMachines.GetSync(context.TODO(), retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID}, &deal) + provDealID := retrievalmarket.ProviderDealIdentifier{Receiver: otherPeer, DealID: dealID} + err := psg.p.stateMachines.Get(provDealID).Get(&deal) if err != nil { return nil, err } diff --git a/retrievalmarket/impl/providerstates/provider_fsm.go b/retrievalmarket/impl/providerstates/provider_fsm.go index 8941699a2..2f92566ff 100644 --- a/retrievalmarket/impl/providerstates/provider_fsm.go +++ b/retrievalmarket/impl/providerstates/provider_fsm.go @@ -58,6 +58,7 @@ var ProviderEvents = fsm.Events{ // request payment fsm.Event(rm.ProviderEventPaymentRequested). FromMany(rm.DealStatusOngoing, rm.DealStatusUnsealed).To(rm.DealStatusFundsNeeded). + From(rm.DealStatusFundsNeeded).ToJustRecord(). From(rm.DealStatusBlocksComplete).To(rm.DealStatusFundsNeededLastPayment). From(rm.DealStatusNew).To(rm.DealStatusFundsNeededUnseal). Action(func(deal *rm.ProviderDealState, totalSent uint64) error { @@ -79,12 +80,13 @@ var ProviderEvents = fsm.Events{ From(rm.DealStatusFundsNeeded).To(rm.DealStatusOngoing). From(rm.DealStatusFundsNeededLastPayment).To(rm.DealStatusFinalizing). From(rm.DealStatusFundsNeededUnseal).To(rm.DealStatusUnsealing). + FromMany(rm.DealStatusBlocksComplete, rm.DealStatusOngoing, rm.DealStatusFinalizing).ToJustRecord(). Action(func(deal *rm.ProviderDealState, fundsReceived abi.TokenAmount) error { deal.FundsReceived = big.Add(deal.FundsReceived, fundsReceived) // only update interval if the payment is for bytes and not for unsealing. if deal.Status != rm.DealStatusFundsNeededUnseal { - deal.CurrentInterval += deal.PaymentIntervalIncrease + deal.CurrentInterval = deal.NextInterval() } return nil }), diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation.go b/retrievalmarket/impl/requestvalidation/requestvalidation.go index 1de805ad0..fdd9543f3 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation.go @@ -55,12 +55,12 @@ func NewProviderRequestValidator(env ValidationEnvironment) *ProviderRequestVali } // ValidatePush validates a push request received from the peer that will send data -func (rv *ProviderRequestValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (rv *ProviderRequestValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { return nil, errors.New("No pushes accepted") } // ValidatePull validates a pull request received from the peer that will receive data -func (rv *ProviderRequestValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (rv *ProviderRequestValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { proposal, ok := voucher.(*retrievalmarket.DealProposal) var legacyProtocol bool if !ok { @@ -72,7 +72,7 @@ func (rv *ProviderRequestValidator) ValidatePull(receiver peer.ID, voucher datat proposal = &newProposal legacyProtocol = true } - response, err := rv.validatePull(receiver, proposal, legacyProtocol, baseCid, selector) + response, err := rv.validatePull(isRestart, receiver, proposal, legacyProtocol, baseCid, selector) if response == nil { return nil, err } @@ -88,12 +88,19 @@ func (rv *ProviderRequestValidator) ValidatePull(receiver peer.ID, voucher datat return response, err } -func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *retrievalmarket.DealProposal, legacyProtocol bool, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) { - +// validatePull is called by the data provider when a new graphsync pull +// request is created. This can be the initial pull request or a new request +// created when the data transfer is restarted (eg after a connection failure). +// By default the graphsync request starts immediately sending data, unless +// validatePull returns ErrPause or the data-transfer has not yet started +// (because the provider is still unsealing the data). +func (rv *ProviderRequestValidator) validatePull(isRestart bool, receiver peer.ID, proposal *retrievalmarket.DealProposal, legacyProtocol bool, baseCid cid.Cid, selector ipld.Node) (*retrievalmarket.DealResponse, error) { + // Check the proposal CID matches if proposal.PayloadCID != baseCid { return nil, errors.New("incorrect CID for this proposal") } + // Check the proposal selector matches buf := new(bytes.Buffer) err := dagcbor.Encoder(selector, buf) if err != nil { @@ -107,6 +114,13 @@ func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *ret return nil, errors.New("incorrect selector for this proposal") } + // If the validation is for a restart request, return nil, which means + // the data-transfer should not be explicitly paused or resumed + if isRestart { + return nil, nil + } + + // This is a new graphsync request (not a restart) pds := retrievalmarket.ProviderDealState{ DealProposal: *proposal, Receiver: receiver, @@ -114,6 +128,7 @@ func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *ret CurrentInterval: proposal.PaymentInterval, } + // Decide whether to accept the deal status, err := rv.acceptDeal(&pds) response := retrievalmarket.DealResponse{ @@ -135,6 +150,8 @@ func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *ret return nil, err } + // Pause the data transfer while unsealing the data. + // The state machine will unpause the transfer when unsealing completes. return &response, datatransfer.ErrPause } diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation_test.go b/retrievalmarket/impl/requestvalidation/requestvalidation_test.go index 955b55dc3..d20a7e4c9 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation_test.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation_test.go @@ -29,7 +29,7 @@ func TestValidatePush(t *testing.T) { sender := shared_testutil.GeneratePeers(1)[0] voucher := shared_testutil.MakeTestDealProposal() requestValidator := requestvalidation.NewProviderRequestValidator(fve) - voucherResult, err := requestValidator.ValidatePush(sender, &voucher, voucher.PayloadCID, shared.AllSelector()) + voucherResult, err := requestValidator.ValidatePush(false, sender, &voucher, voucher.PayloadCID, shared.AllSelector()) require.Equal(t, nil, voucherResult) require.Error(t, err) } @@ -49,6 +49,7 @@ func TestValidatePull(t *testing.T) { }, } testCases := map[string]struct { + isRestart bool fve fakeValidationEnvironment sender peer.ID voucher datatransfer.Voucher @@ -194,11 +195,22 @@ func TestValidatePull(t *testing.T) { ID: proposal.ID, }, }, + "restart": { + isRestart: true, + fve: fakeValidationEnvironment{ + RunDealDecisioningLogicAccepted: true, + }, + baseCid: proposal.PayloadCID, + selector: shared.AllSelector(), + voucher: &proposal, + expectedError: nil, + expectedVoucherResult: nil, + }, } for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { requestValidator := requestvalidation.NewProviderRequestValidator(&data.fve) - voucherResult, err := requestValidator.ValidatePull(data.sender, data.voucher, data.baseCid, data.selector) + voucherResult, err := requestValidator.ValidatePull(data.isRestart, data.sender, data.voucher, data.baseCid, data.selector) require.Equal(t, data.expectedVoucherResult, voucherResult) if data.expectedError == nil { require.NoError(t, err) diff --git a/retrievalmarket/impl/requestvalidation/revalidator.go b/retrievalmarket/impl/requestvalidation/revalidator.go index d894000ee..3c5b56ecb 100644 --- a/retrievalmarket/impl/requestvalidation/revalidator.go +++ b/retrievalmarket/impl/requestvalidation/revalidator.go @@ -16,7 +16,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations" ) -var log = logging.Logger("retrieval-revalidator") +var log = logging.Logger("markets-rtvl-reval") // RevalidatorEnvironment are the dependencies needed to // build the logic of revalidation -- essentially, access to the node at statemachines @@ -56,9 +56,7 @@ func NewProviderRevalidator(env RevalidatorEnvironment) *ProviderRevalidator { // a given channel ID with a retrieval deal, so that checks run for data sent // on the channel func (pr *ProviderRevalidator) TrackChannel(deal rm.ProviderDealState) { - // Sanity check if deal.ChannelID == nil { - log.Errorf("cannot track deal %s: channel ID is nil", deal.ID) return } @@ -150,29 +148,28 @@ func (pr *ProviderRevalidator) processPayment(dealID rm.ProviderDealIdentifier, return errorDealResponse(dealID, err), err } - // attempt to redeem voucher - // (totalSent * pricePerByte + unsealPrice) - fundsReceived - paymentOwed := big.Sub(big.Add(big.Mul(abi.NewTokenAmount(int64(deal.TotalSent)), deal.PricePerByte), deal.UnsealPrice), deal.FundsReceived) - received, err := pr.env.Node().SavePaymentVoucher(context.TODO(), payment.PaymentChannel, payment.PaymentVoucher, nil, paymentOwed, tok) + // Save voucher + received, err := pr.env.Node().SavePaymentVoucher(context.TODO(), payment.PaymentChannel, payment.PaymentVoucher, nil, big.Zero(), tok) if err != nil { _ = pr.env.SendEvent(dealID, rm.ProviderEventSaveVoucherFailed, err) return errorDealResponse(dealID, err), err } - // received = 0 / err = nil indicates that the voucher was already saved, but this may be ok - // if we are making a deal with ourself - in this case, we'll instead calculate received - // but subtracting from fund sent - if big.Cmp(received, big.Zero()) == 0 { - received = big.Sub(payment.PaymentVoucher.Amount, deal.FundsReceived) - } + totalPaid := big.Add(deal.FundsReceived, received) // check if all payments are received to continue the deal, or send updated required payment - if received.LessThan(paymentOwed) { + owed := paymentOwed(deal, totalPaid) + + log.Debugf("provider: owed %d: received voucher for %d, total received %d = received so far %d + newly received %d, total sent %d, unseal price %d, price per byte %d", + owed, payment.PaymentVoucher.Amount, totalPaid, deal.FundsReceived, received, deal.TotalSent, deal.UnsealPrice, deal.PricePerByte) + + if owed.GreaterThan(big.Zero()) { + log.Debugf("provider: owed %d: sending partial payment request", owed) _ = pr.env.SendEvent(dealID, rm.ProviderEventPartialPaymentReceived, received) return &rm.DealResponse{ ID: deal.ID, Status: deal.Status, - PaymentOwed: big.Sub(paymentOwed, received), + PaymentOwed: owed, }, datatransfer.ErrPause } @@ -180,6 +177,7 @@ func (pr *ProviderRevalidator) processPayment(dealID rm.ProviderDealIdentifier, _ = pr.env.SendEvent(dealID, rm.ProviderEventPaymentReceived, received) if deal.Status == rm.DealStatusFundsNeededLastPayment { + log.Debugf("provider: funds needed: last payment") return &rm.DealResponse{ ID: deal.ID, Status: rm.DealStatusCompleted, @@ -195,6 +193,36 @@ func (pr *ProviderRevalidator) processPayment(dealID rm.ProviderDealIdentifier, return nil, datatransfer.ErrResume } +func paymentOwed(deal rm.ProviderDealState, totalPaid big.Int) big.Int { + // Check if the payment covers unsealing + if totalPaid.LessThan(deal.UnsealPrice) { + log.Debugf("provider: total paid %d < unseal price %d", totalPaid, deal.UnsealPrice) + return big.Sub(deal.UnsealPrice, totalPaid) + } + + // Calculate how much payment has been made for transferred data + transferPayment := big.Sub(totalPaid, deal.UnsealPrice) + + // The provider sends data and the client sends payment for the data. + // The provider will send a limited amount of extra data before receiving + // payment. Given the current limit, check if the client has paid enough + // to unlock the next interval. + currentLimitLower := deal.IntervalLowerBound() + + log.Debugf("provider: total sent %d bytes, but require payment for interval lower bound %d bytes", + deal.TotalSent, currentLimitLower) + + // Calculate the minimum required payment + totalPaymentRequired := big.Mul(big.NewInt(int64(currentLimitLower)), deal.PricePerByte) + + // Calculate payment owed + owed := big.Sub(totalPaymentRequired, transferPayment) + log.Debugf("provider: payment owed %d = payment required %d - transfer paid %d", + owed, totalPaymentRequired, transferPayment) + + return owed +} + func errorDealResponse(dealID rm.ProviderDealIdentifier, err error) *rm.DealResponse { return &rm.DealResponse{ ID: dealID.DealID, @@ -220,12 +248,21 @@ func (pr *ProviderRevalidator) OnPullDataSent(chid datatransfer.ChannelID, addit return true, nil, err } + // Calculate how much data has been sent in total channel.totalSent += additionalBytesSent - if channel.pricePerByte.IsZero() || channel.totalSent-channel.totalPaidFor < channel.interval { + if channel.pricePerByte.IsZero() || channel.totalSent < channel.interval { + if !channel.pricePerByte.IsZero() { + log.Debugf("provider: total sent %d < interval %d, sending block", channel.totalSent, channel.interval) + } return true, nil, pr.env.SendEvent(channel.dealID, rm.ProviderEventBlockSent, channel.totalSent) } + // Calculate the payment owed paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte) + log.Debugf("provider: owed %d = (total sent %d - paid for %d) * price per byte %d: sending payment request", + paymentOwed, channel.totalSent, channel.totalPaidFor, channel.pricePerByte) + + // Request payment err = pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent) if err != nil { return true, nil, err @@ -267,6 +304,7 @@ func (pr *ProviderRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, da return true, nil, err } + // Calculate how much payment is owed paymentOwed := big.Mul(abi.NewTokenAmount(int64(channel.totalSent-channel.totalPaidFor)), channel.pricePerByte) if paymentOwed.Equals(big.Zero()) { return true, finalResponse(&rm.DealResponse{ @@ -274,6 +312,10 @@ func (pr *ProviderRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, da Status: rm.DealStatusCompleted, }, channel.legacyProtocol), nil } + + // Send a request for payment + log.Debugf("provider: last payment owed %d = (total sent %d - paid for %d) * price per byte %d", + paymentOwed, channel.totalSent, channel.totalPaidFor, channel.pricePerByte) err = pr.env.SendEvent(channel.dealID, rm.ProviderEventPaymentRequested, channel.totalSent) if err != nil { return true, nil, err diff --git a/retrievalmarket/impl/requestvalidation/revalidator_test.go b/retrievalmarket/impl/requestvalidation/revalidator_test.go index b8baea550..ffe3fccdf 100644 --- a/retrievalmarket/impl/requestvalidation/revalidator_test.go +++ b/retrievalmarket/impl/requestvalidation/revalidator_test.go @@ -82,7 +82,7 @@ func TestOnPullDataSent(t *testing.T) { expectedResult: &rm.DealResponse{ ID: deal.ID, Status: rm.DealStatusFundsNeeded, - PaymentOwed: defaultPaymentPerInterval, + PaymentOwed: big.Mul(abi.NewTokenAmount(int64(defaultCurrentInterval)), defaultPricePerByte), }, expectedHandled: true, }, @@ -97,7 +97,7 @@ func TestOnPullDataSent(t *testing.T) { expectedResult: &migrations.DealResponse0{ ID: legacyDeal.ID, Status: rm.DealStatusFundsNeeded, - PaymentOwed: defaultPaymentPerInterval, + PaymentOwed: big.Mul(abi.NewTokenAmount(int64(defaultCurrentInterval)), defaultPricePerByte), }, expectedHandled: true, }, @@ -291,23 +291,37 @@ func TestOnComplete(t *testing.T) { func TestRevalidate(t *testing.T) { payCh := address.TestAddress + voucher := shared_testutil.MakeTestSignedVoucher() voucher.Amount = big.Add(defaultFundsReceived, defaultPaymentPerInterval) + smallerPaymentAmt := abi.NewTokenAmount(int64(defaultPaymentInterval / 2)) + smallerVoucher := shared_testutil.MakeTestSignedVoucher() + smallerVoucher.Amount = big.Add(defaultFundsReceived, smallerPaymentAmt) + deal := *makeDealState(rm.DealStatusFundsNeeded) - deal.TotalSent = defaultTotalSent + defaultCurrentInterval + deal.TotalSent = defaultTotalSent + defaultPaymentInterval + defaultPaymentInterval/2 channelID := *deal.ChannelID - smallerPayment := abi.NewTokenAmount(400000) payment := &retrievalmarket.DealPayment{ ID: deal.ID, PaymentChannel: payCh, PaymentVoucher: voucher, } + smallerPayment := &retrievalmarket.DealPayment{ + ID: deal.ID, + PaymentChannel: payCh, + PaymentVoucher: smallerVoucher, + } legacyPayment := &migrations.DealPayment0{ ID: deal.ID, PaymentChannel: payCh, PaymentVoucher: voucher, } + legacySmallerPayment := &migrations.DealPayment0{ + ID: deal.ID, + PaymentChannel: payCh, + PaymentVoucher: smallerVoucher, + } lastPaymentDeal := deal lastPaymentDeal.Status = rm.DealStatusFundsNeededLastPayment testCases := map[string]struct { @@ -370,7 +384,7 @@ func TestRevalidate(t *testing.T) { }, "payment voucher error": { configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, abi.NewTokenAmount(0), errors.New("your money's no good here")) + _ = tn.ExpectVoucher(payCh, voucher, nil, voucher.Amount, abi.NewTokenAmount(0), errors.New("your money's no good here")) }, deal: deal, channelID: channelID, @@ -387,7 +401,7 @@ func TestRevalidate(t *testing.T) { }, "payment voucher error, legacy payment": { configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, abi.NewTokenAmount(0), errors.New("your money's no good here")) + _ = tn.ExpectVoucher(payCh, voucher, nil, voucher.Amount, abi.NewTokenAmount(0), errors.New("your money's no good here")) }, deal: deal, channelID: channelID, @@ -403,43 +417,34 @@ func TestRevalidate(t *testing.T) { }, }, "not enough funds send": { - configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, smallerPayment, nil) - }, deal: deal, channelID: channelID, - voucher: payment, + voucher: smallerPayment, expectedError: datatransfer.ErrPause, expectedID: deal.Identifier(), expectedEvent: rm.ProviderEventPartialPaymentReceived, - expectedArgs: []interface{}{smallerPayment}, + expectedArgs: []interface{}{smallerPaymentAmt}, expectedResult: &rm.DealResponse{ ID: deal.ID, Status: deal.Status, - PaymentOwed: big.Sub(defaultPaymentPerInterval, smallerPayment), + PaymentOwed: big.Sub(defaultPaymentPerInterval, smallerPaymentAmt), }, }, "not enough funds send, legacyPayment": { - configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, smallerPayment, nil) - }, deal: deal, channelID: channelID, - voucher: legacyPayment, + voucher: legacySmallerPayment, expectedError: datatransfer.ErrPause, expectedID: deal.Identifier(), expectedEvent: rm.ProviderEventPartialPaymentReceived, - expectedArgs: []interface{}{smallerPayment}, + expectedArgs: []interface{}{smallerPaymentAmt}, expectedResult: &migrations.DealResponse0{ ID: deal.ID, Status: deal.Status, - PaymentOwed: big.Sub(defaultPaymentPerInterval, smallerPayment), + PaymentOwed: big.Sub(defaultPaymentPerInterval, smallerPaymentAmt), }, }, "it works": { - configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, defaultPaymentPerInterval, nil) - }, deal: deal, channelID: channelID, voucher: payment, @@ -450,9 +455,6 @@ func TestRevalidate(t *testing.T) { }, "it completes": { - configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, defaultPaymentPerInterval, nil) - }, deal: lastPaymentDeal, channelID: channelID, voucher: payment, @@ -466,9 +468,6 @@ func TestRevalidate(t *testing.T) { }, }, "it completes, legacy payment": { - configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, defaultPaymentPerInterval, nil) - }, deal: lastPaymentDeal, channelID: channelID, voucher: legacyPayment, @@ -482,9 +481,6 @@ func TestRevalidate(t *testing.T) { }, }, "voucher already saved": { - configureTestNode: func(tn *testnodes.TestRetrievalProviderNode) { - _ = tn.ExpectVoucher(payCh, voucher, nil, defaultPaymentPerInterval, big.Zero(), nil) - }, deal: deal, channelID: channelID, voucher: payment, @@ -497,6 +493,7 @@ func TestRevalidate(t *testing.T) { for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { tn := testnodes.NewTestRetrievalProviderNode() + tn.AddReceivedVoucher(deal.FundsReceived) if data.configureTestNode != nil { data.configureTestNode(tn) } @@ -524,7 +521,6 @@ func TestRevalidate(t *testing.T) { } else { require.Len(t, fre.sentEvents, 0) } - tn.VerifyExpectations(t) }) } } @@ -556,12 +552,13 @@ func (fre *fakeRevalidatorEnvironment) Get(dealID rm.ProviderDealIdentifier) (rm } var dealID = retrievalmarket.DealID(10) -var defaultCurrentInterval = uint64(1000) -var defaultIntervalIncrease = uint64(500) -var defaultPricePerByte = abi.NewTokenAmount(500) -var defaultPaymentPerInterval = big.Mul(defaultPricePerByte, abi.NewTokenAmount(int64(defaultCurrentInterval))) -var defaultTotalSent = uint64(5000) -var defaultFundsReceived = abi.NewTokenAmount(2500000) +var defaultCurrentInterval = uint64(3000) +var defaultPaymentInterval = uint64(1000) +var defaultIntervalIncrease = uint64(0) +var defaultPricePerByte = abi.NewTokenAmount(1000) +var defaultPaymentPerInterval = big.Mul(defaultPricePerByte, abi.NewTokenAmount(int64(defaultPaymentInterval))) +var defaultTotalSent = defaultPaymentInterval +var defaultFundsReceived = big.Mul(defaultPricePerByte, abi.NewTokenAmount(int64(defaultTotalSent))) func makeDealState(status retrievalmarket.DealStatus) *retrievalmarket.ProviderDealState { channelID := shared_testutil.MakeTestChannelID() @@ -574,7 +571,7 @@ func makeDealState(status retrievalmarket.DealStatus) *retrievalmarket.ProviderD Receiver: channelID.Initiator, DealProposal: retrievalmarket.DealProposal{ ID: dealID, - Params: retrievalmarket.NewParamsV0(defaultPricePerByte, defaultCurrentInterval, defaultIntervalIncrease), + Params: retrievalmarket.NewParamsV0(defaultPricePerByte, defaultPaymentInterval, defaultIntervalIncrease), }, } } diff --git a/retrievalmarket/impl/testnodes/test_retrieval_client_node.go b/retrievalmarket/impl/testnodes/test_retrieval_client_node.go index 1472a5282..1574b5de2 100644 --- a/retrievalmarket/impl/testnodes/test_retrieval_client_node.go +++ b/retrievalmarket/impl/testnodes/test_retrieval_client_node.go @@ -120,7 +120,12 @@ func (trcn *TestRetrievalClientNode) CreatePaymentVoucher(ctx context.Context, p if trcn.intergrationTest && amount.GreaterThan(trcn.channelAvailableFunds.ConfirmedAmt) { return nil, retrievalmarket.NewShortfallError(big.Sub(amount, trcn.channelAvailableFunds.ConfirmedAmt)) } - return trcn.voucher, trcn.voucherError + if trcn.voucherError != nil { + return nil, trcn.voucherError + } + voucher := *trcn.voucher + voucher.Amount = amount + return &voucher, nil } // GetChainHead returns a mock value for the chain head diff --git a/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go b/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go index 2b41d0fe6..da4373766 100644 --- a/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go +++ b/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go @@ -7,19 +7,26 @@ import ( "errors" "io" "io/ioutil" + "sort" + "sync" "testing" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-actors/actors/builtin/paych" "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/shared" ) +var log = logging.Logger("retrieval_provnode_test") + type expectedVoucherKey struct { paymentChannel string voucher string @@ -45,8 +52,8 @@ type TestRetrievalProviderNode struct { sectorStubs map[sectorKey][]byte expectations map[sectorKey]struct{} received map[sectorKey]struct{} + lk sync.Mutex expectedVouchers map[expectedVoucherKey]voucherResult - receivedVouchers map[expectedVoucherKey]struct{} expectedPricingParamDeals []abi.DealID receivedPricingParamDeals []abi.DealID @@ -56,6 +63,9 @@ type TestRetrievalProviderNode struct { unsealed map[sectorKey]struct{} isVerified bool + + receivedVouchers []abi.TokenAmount + unsealPaused chan struct{} } var _ retrievalmarket.RetrievalProviderNode = &TestRetrievalProviderNode{} @@ -67,7 +77,6 @@ func NewTestRetrievalProviderNode() *TestRetrievalProviderNode { expectations: make(map[sectorKey]struct{}), received: make(map[sectorKey]struct{}), expectedVouchers: make(map[expectedVoucherKey]voucherResult), - receivedVouchers: make(map[expectedVoucherKey]struct{}), unsealed: make(map[sectorKey]struct{}), } @@ -118,9 +127,31 @@ func (trpn *TestRetrievalProviderNode) ExpectUnseal(sectorID abi.SectorNumber, o trpn.StubUnseal(sectorID, offset, length, data) } +func (trpn *TestRetrievalProviderNode) PauseUnseal() { + trpn.lk.Lock() + defer trpn.lk.Unlock() + + trpn.unsealPaused = make(chan struct{}) +} + +func (trpn *TestRetrievalProviderNode) FinishUnseal() { + close(trpn.unsealPaused) +} + // UnsealSector simulates unsealing a sector by returning a stubbed response // or erroring func (trpn *TestRetrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + trpn.lk.Lock() + defer trpn.lk.Unlock() + + if trpn.unsealPaused != nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-trpn.unsealPaused: + } + } + trpn.received[sectorKey{sectorID, offset, length}] = struct{}{} data, ok := trpn.sectorStubs[sectorKey{sectorID, offset, length}] if !ok { @@ -147,16 +178,41 @@ func (trpn *TestRetrievalProviderNode) SavePaymentVoucher( proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) { - key, err := trpn.toExpectedVoucherKey(paymentChannel, voucher, proof, expectedAmount) + + trpn.lk.Lock() + defer trpn.lk.Unlock() + + key, err := trpn.toExpectedVoucherKey(paymentChannel, voucher, proof, voucher.Amount) if err != nil { return abi.TokenAmount{}, err } + + max := big.Zero() + for _, amt := range trpn.receivedVouchers { + max = big.Max(max, amt) + } + trpn.receivedVouchers = append(trpn.receivedVouchers, voucher.Amount) + rcvd := big.Sub(voucher.Amount, max) + if rcvd.LessThan(big.Zero()) { + rcvd = big.Zero() + } + if len(trpn.expectedVouchers) == 0 { + return rcvd, nil + } + result, ok := trpn.expectedVouchers[key] if ok { - trpn.receivedVouchers[key] = struct{}{} - return result.amount, result.err + return rcvd, result.err + } + var amts []abi.TokenAmount + for _, vchr := range trpn.expectedVouchers { + amts = append(amts, vchr.amount) } - return abi.TokenAmount{}, errors.New("SavePaymentVoucher failed") + sort.Slice(amts, func(i, j int) bool { + return amts[i].LessThan(amts[j]) + }) + err = xerrors.Errorf("SavePaymentVoucher failed - voucher %d didnt match expected voucher %d in %s", voucher.Amount, expectedAmount, amts) + return abi.TokenAmount{}, err } // GetMinerWorkerAddress translates an address @@ -198,10 +254,24 @@ func (trpn *TestRetrievalProviderNode) ExpectVoucher( expectedAmount abi.TokenAmount, actualAmount abi.TokenAmount, // the actual amount it should have (same unless you want to trigger an error) expectedErr error) error { - key, err := trpn.toExpectedVoucherKey(paymentChannel, voucher, proof, expectedAmount) + vch := *voucher + vch.Amount = expectedAmount + key, err := trpn.toExpectedVoucherKey(paymentChannel, &vch, proof, expectedAmount) if err != nil { return err } trpn.expectedVouchers[key] = voucherResult{actualAmount, expectedErr} return nil } + +func (trpn *TestRetrievalProviderNode) AddReceivedVoucher(amt abi.TokenAmount) { + trpn.receivedVouchers = append(trpn.receivedVouchers, amt) +} + +func (trpn *TestRetrievalProviderNode) MaxReceivedVoucher() abi.TokenAmount { + max := abi.NewTokenAmount(0) + for _, amt := range trpn.receivedVouchers { + max = big.Max(max, amt) + } + return max +} diff --git a/retrievalmarket/retrieval_restart_integration_test.go b/retrievalmarket/retrieval_restart_integration_test.go new file mode 100644 index 000000000..df26f34af --- /dev/null +++ b/retrievalmarket/retrieval_restart_integration_test.go @@ -0,0 +1,303 @@ +package retrievalmarket_test + +import ( + "context" + "testing" + "time" + + "github.com/ipfs/go-datastore" + logger "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/require" + + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-data-transfer/channelmonitor" + dtimpl "github.com/filecoin-project/go-data-transfer/impl" + dtnet "github.com/filecoin-project/go-data-transfer/network" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/shared_testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket/testharness" + "github.com/filecoin-project/go-fil-markets/storagemarket/testharness/dependencies" + "github.com/filecoin-project/go-fil-markets/storagemarket/testnodes" +) + +var log = logger.Logger("restart_test") + +var noOpDelay = testnodes.DelayFakeCommonNode{} + +// TODO +// TEST CONNECTION BOUNCE FOR ALL MEANINGFUL STATES OF THE CLIENT AND PROVIDER DEAL LIFECYCLE. +// CURRENTLY, WE ONLY TEST THIS FOR THE DEALSTATUS ONGOING STATE. + +// TestBounceConnectionDealTransferOngoing tests that when the the connection is +// broken and then restarted during deal data transfer for an ongoing deal, the data transfer will resume and the deal will +// complete successfully. +func TestBounceConnectionDealTransferOngoing(t *testing.T) { + bgCtx := context.Background() + logger.SetLogLevel("restart_test", "debug") + //logger.SetLogLevel("dt-impl", "debug") + //logger.SetLogLevel("dt-chanmon", "debug") + //logger.SetLogLevel("dt_graphsync", "debug") + //logger.SetLogLevel("markets-rtvl", "debug") + //logger.SetLogLevel("markets-rtvl-reval", "debug") + + tcs := map[string]struct { + unSealPrice abi.TokenAmount + pricePerByte abi.TokenAmount + paymentInterval uint64 + paymentIntervalIncrease uint64 + voucherAmts []abi.TokenAmount + maxVoucherAmt abi.TokenAmount + }{ + "non-zero unseal, non zero price per byte": { + unSealPrice: abi.NewTokenAmount(1000), + pricePerByte: abi.NewTokenAmount(1000), + paymentInterval: uint64(10000), + paymentIntervalIncrease: uint64(1000), + maxVoucherAmt: abi.NewTokenAmount(19921000), + }, + + "zero unseal, non-zero price per byte": { + unSealPrice: big.Zero(), + pricePerByte: abi.NewTokenAmount(1000), + paymentInterval: uint64(10000), + paymentIntervalIncrease: uint64(1000), + maxVoucherAmt: abi.NewTokenAmount(19920000), + }, + + "zero unseal, zero price per byte": { + unSealPrice: big.Zero(), + pricePerByte: big.Zero(), + paymentInterval: uint64(0), + paymentIntervalIncrease: uint64(0), + maxVoucherAmt: abi.NewTokenAmount(0), + }, + + "non-zero unseal, zero price per byte": { + unSealPrice: abi.NewTokenAmount(1000), + pricePerByte: big.Zero(), + maxVoucherAmt: abi.NewTokenAmount(1000), + }, + } + + for name, tc := range tcs { + t.Run(name, func(t *testing.T) { + dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1) + restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ + AcceptTimeout: 100 * time.Millisecond, + RestartBackoff: 100 * time.Millisecond, + RestartAckTimeout: 2 * time.Second, + RestartDebounce: 100 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + }) + td := shared_testutil.NewLibp2pTestData(bgCtx, t) + td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) + depGen := dependencies.NewDepGenerator() + depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, dir, transferNetwork, transport, restartConf) + } + deps := depGen.New(t, bgCtx, td, testnodes.NewStorageMarketState(), "", noOpDelay, noOpDelay) + + sh := testharness.NewHarnessWithTestData(t, td, deps, true, false) + + // do a storage deal + storageClientSeenDeal := doStorage(t, bgCtx, sh) + ctxTimeout, canc := context.WithTimeout(bgCtx, 5*time.Second) + defer canc() + + // create a retrieval test harness + rh := newRetrievalHarness(ctxTimeout, t, sh, storageClientSeenDeal, retrievalmarket.Params{ + UnsealPrice: tc.unSealPrice, + PricePerByte: tc.pricePerByte, + PaymentInterval: tc.paymentInterval, + PaymentIntervalIncrease: tc.paymentIntervalIncrease, + }) + clientHost := rh.TestDataNet.Host1.ID() + providerHost := rh.TestDataNet.Host2.ID() + + // Bounce connection after this many bytes have been queued for sending + bounceConnectionAt := map[uint64]bool{ + 1000: false, + 3000: false, + 5000: false, + 7000: false, + 9000: false, + } + + sh.DTProvider.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if event.Code == datatransfer.DataQueuedProgress { + log.Debugf("DataQueuedProgress %d", channelState.Queued()) + // Check if enough bytes have been queued that the connection + // should be bounced + for at, already := range bounceConnectionAt { + if channelState.Queued() > at && !already { + bounceConnectionAt[at] = true + + // Break the connection + queued := channelState.Queued() + sent := channelState.Sent() + t.Logf("breaking connection at queue %d sent %d bytes", queued, sent) + rh.TestDataNet.MockNet.DisconnectPeers(clientHost, providerHost) + rh.TestDataNet.MockNet.UnlinkPeers(clientHost, providerHost) + + go func() { + time.Sleep(100 * time.Millisecond) + t.Logf("restoring connection at queue %d sent %d bytes", queued, sent) + rh.TestDataNet.MockNet.LinkPeers(clientHost, providerHost) + }() + } + } + } + if event.Code == datatransfer.DataSent { + log.Debugf("DataSent %d", channelState.Sent()) + } + if event.Code == datatransfer.DataSentProgress { + log.Debugf("DataSentProgress %d", channelState.Sent()) + } + }) + sh.DTClient.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if event.Code == datatransfer.DataReceived { + log.Debugf("DataReceived %d", channelState.Received()) + } + if event.Code == datatransfer.DataReceivedProgress { + log.Debugf("DataReceivedProgress %d", channelState.Received()) + } + }) + + checkRetrieve(t, bgCtx, rh, sh, tc.voucherAmts) + require.Equal(t, tc.maxVoucherAmt, rh.ProviderNode.MaxReceivedVoucher()) + }) + } +} + +// TestBounceConnectionDealTransferUnsealing tests that when the the connection +// is broken and then restarted during unsealing, the data transfer will resume +// and the deal will complete successfully. +func TestBounceConnectionDealTransferUnsealing(t *testing.T) { + bgCtx := context.Background() + //logger.SetLogLevel("dt-chanmon", "debug") + //logger.SetLogLevel("retrieval", "debug") + //logger.SetLogLevel("retrievalmarket_impl", "debug") + logger.SetLogLevel("restart_test", "debug") + //logger.SetLogLevel("markets-rtvl-reval", "debug") + //logger.SetLogLevel("graphsync", "debug") + //logger.SetLogLevel("gs-traversal", "debug") + //logger.SetLogLevel("gs-executor", "debug") + + beforeRestoringConnection := true + afterRestoringConnection := !beforeRestoringConnection + tcs := []struct { + name string + finishUnseal bool + }{{ + name: "finish unseal before restoring connection", + finishUnseal: beforeRestoringConnection, + }, { + name: "finish unseal after restoring connection", + finishUnseal: afterRestoringConnection, + }} + + for _, tc := range tcs { + tc := tc + t.Run(tc.name, func(t *testing.T) { + restartComplete := make(chan struct{}) + onRestartComplete := func(_ datatransfer.ChannelID) { + close(restartComplete) + } + + dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1) + restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ + AcceptTimeout: 100 * time.Millisecond, + RestartBackoff: 100 * time.Millisecond, + RestartAckTimeout: 2 * time.Second, + RestartDebounce: 100 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + OnRestartComplete: onRestartComplete, + }) + td := shared_testutil.NewLibp2pTestData(bgCtx, t) + td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) + depGen := dependencies.NewDepGenerator() + depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, dir, transferNetwork, transport, restartConf) + } + deps := depGen.New(t, bgCtx, td, testnodes.NewStorageMarketState(), "", noOpDelay, noOpDelay) + + sh := testharness.NewHarnessWithTestData(t, td, deps, true, false) + + // do a storage deal + storageClientSeenDeal := doStorage(t, bgCtx, sh) + ctxTimeout, canc := context.WithTimeout(bgCtx, 5*time.Second) + defer canc() + + // create a retrieval test harness + maxVoucherAmt := abi.NewTokenAmount(19921000) + rh := newRetrievalHarness(ctxTimeout, t, sh, storageClientSeenDeal, retrievalmarket.Params{ + UnsealPrice: abi.NewTokenAmount(1000), + PricePerByte: abi.NewTokenAmount(1000), + PaymentInterval: uint64(10000), + PaymentIntervalIncrease: uint64(1000), + }) + clientHost := rh.TestDataNet.Host1.ID() + providerHost := rh.TestDataNet.Host2.ID() + + // Pause unsealing + rh.ProviderNode.PauseUnseal() + + firstPayRcvd := false + rh.Provider.SubscribeToEvents(func(event retrievalmarket.ProviderEvent, state retrievalmarket.ProviderDealState) { + // When the provider receives the first payment from the + // client (the payment for unsealing), the provider moves + // to the unsealing state + if event != retrievalmarket.ProviderEventPaymentReceived || firstPayRcvd { + return + } + + firstPayRcvd = true + + log.Debugf("breaking connection at %s", retrievalmarket.ProviderEvents[event]) + rh.TestDataNet.MockNet.DisconnectPeers(clientHost, providerHost) + rh.TestDataNet.MockNet.UnlinkPeers(clientHost, providerHost) + + go func() { + // Simulate unsealing delay + time.Sleep(50 * time.Millisecond) + + // If unsealing should finish before restoring the connection + if tc.finishUnseal == beforeRestoringConnection { + // Finish unsealing + log.Debugf("finish unseal") + rh.ProviderNode.FinishUnseal() + time.Sleep(20 * time.Millisecond) + } + + // Restore the connection + log.Debugf("restoring connection") + rh.TestDataNet.MockNet.LinkPeers(clientHost, providerHost) + + // If unsealing should finish after restoring the connection + if tc.finishUnseal == afterRestoringConnection { + // Wait for the Restart message to be sent and + // acknowledged + select { + case <-ctxTimeout.Done(): + return + case <-restartComplete: + } + + // Finish unsealing + time.Sleep(50 * time.Millisecond) + log.Debugf("finish unseal") + rh.ProviderNode.FinishUnseal() + } + }() + }) + + checkRetrieve(t, bgCtx, rh, sh, nil) + require.Equal(t, maxVoucherAmt, rh.ProviderNode.MaxReceivedVoucher()) + }) + } +} diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index b83602a12..a28a5a1ce 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -3,6 +3,7 @@ package retrievalmarket_test import ( "bytes" "context" + "fmt" "sync" "testing" "time" @@ -20,6 +21,7 @@ import ( "github.com/filecoin-project/go-commp-utils/pieceio" "github.com/filecoin-project/go-commp-utils/pieceio/cario" datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/filecoin-project/go-multistore" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/specs-actors/actors/builtin/paych" @@ -59,7 +61,7 @@ func TestStorageRetrieval(t *testing.T) { pricePerByte: abi.NewTokenAmount(1000), paymentInterval: uint64(10000), paymentIntervalIncrease: uint64(1000), - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10000000), abi.NewTokenAmount(19920000)}, }, "zero unseal, zero price per byte": { @@ -75,61 +77,20 @@ func TestStorageRetrieval(t *testing.T) { pricePerByte: abi.NewTokenAmount(1000), paymentInterval: uint64(10000), paymentIntervalIncrease: uint64(1000), - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(1000), abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(1000), abi.NewTokenAmount(10001000), abi.NewTokenAmount(19921000)}, }, - // TODO : Repeated Partial Payments } for name, tc := range tcs { t.Run(name, func(t *testing.T) { sh := testharness.NewHarness(t, bgCtx, true, testnodes.DelayFakeCommonNode{}, testnodes.DelayFakeCommonNode{}, false) - shared_testutil.StartAndWaitForReady(bgCtx, t, sh.Client) - shared_testutil.StartAndWaitForReady(bgCtx, t, sh.Provider) - - // set up a subscriber - providerDealChan := make(chan storagemarket.MinerDeal) - subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { - providerDealChan <- deal - } - _ = sh.Provider.SubscribeToEvents(subscriber) - - clientDealChan := make(chan storagemarket.ClientDeal) - clientSubscriber := func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { - clientDealChan <- deal - } - _ = sh.Client.SubscribeToEvents(clientSubscriber) - - // set ask price where we'll accept any price - err := sh.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000) - assert.NoError(t, err) - - result := sh.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: sh.PayloadCid}, false, false) - require.False(t, result.ProposalCid.Equals(cid.Undef)) - - time.Sleep(time.Millisecond * 200) + storageProviderSeenDeal := doStorage(t, bgCtx, sh) ctxTimeout, canc := context.WithTimeout(bgCtx, 25*time.Second) defer canc() - var storageProviderSeenDeal storagemarket.MinerDeal - var storageClientSeenDeal storagemarket.ClientDeal - for storageProviderSeenDeal.State != storagemarket.StorageDealExpired || - storageClientSeenDeal.State != storagemarket.StorageDealExpired { - select { - case storageProviderSeenDeal = <-providerDealChan: - case storageClientSeenDeal = <-clientDealChan: - case <-ctxTimeout.Done(): - t.Fatalf("never saw completed deal, client deal state: %s (%d), provider deal state: %s (%d)", - storagemarket.DealStates[storageClientSeenDeal.State], - storageClientSeenDeal.State, - storagemarket.DealStates[storageProviderSeenDeal.State], - storageProviderSeenDeal.State, - ) - } - } - - rh := newRetrievalHarness(ctxTimeout, t, sh, storageClientSeenDeal, retrievalmarket.Params{ + rh := newRetrievalHarness(ctxTimeout, t, sh, storageProviderSeenDeal, retrievalmarket.Params{ UnsealPrice: tc.unSealPrice, PricePerByte: tc.pricePerByte, PaymentInterval: tc.paymentInterval, @@ -163,7 +124,7 @@ func TestOfflineStorageRetrieval(t *testing.T) { pricePerByte: abi.NewTokenAmount(1000), paymentInterval: uint64(10000), paymentIntervalIncrease: uint64(1000), - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(10000000), abi.NewTokenAmount(19920000)}, }, "zero unseal, zero price per byte": { @@ -179,7 +140,7 @@ func TestOfflineStorageRetrieval(t *testing.T) { pricePerByte: abi.NewTokenAmount(1000), paymentInterval: uint64(10000), paymentIntervalIncrease: uint64(1000), - voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(1000), abi.NewTokenAmount(10136000), abi.NewTokenAmount(9784000)}, + voucherAmts: []abi.TokenAmount{abi.NewTokenAmount(1000), abi.NewTokenAmount(10001000), abi.NewTokenAmount(19921000)}, }, } @@ -318,37 +279,7 @@ func checkRetrieve(t *testing.T, bgCtx context.Context, rh *retrievalHarness, sh } }) - // **** Send the query for the Piece - // set up retrieval params - peers := rh.Client.FindProviders(sh.PayloadCid) - require.Len(t, peers, 1) - retrievalPeer := peers[0] - require.NotNil(t, retrievalPeer.PieceCID) - - rh.ClientNode.ExpectKnownAddresses(retrievalPeer, nil) - - resp, err := rh.Client.Query(bgCtx, retrievalPeer, sh.PayloadCid, retrievalmarket.QueryParams{}) - require.NoError(t, err) - require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) - - // testing V1 only - rmParams, err := retrievalmarket.NewParamsV1(rh.RetrievalParams.PricePerByte, rh.RetrievalParams.PaymentInterval, rh.RetrievalParams.PaymentIntervalIncrease, shared.AllSelector(), nil, - rh.RetrievalParams.UnsealPrice) - require.NoError(t, err) - - proof := []byte("") - for _, voucherAmt := range vAmts { - require.NoError(t, rh.ProviderNode.ExpectVoucher(*rh.ExpPaych, rh.ExpVoucher, proof, voucherAmt, voucherAmt, nil)) - } - // just make sure there is enough to cover the transfer - fsize := 19000 // this is the known file size of the test file lorem.txt - expectedTotal := big.Add(big.Mul(rh.RetrievalParams.PricePerByte, abi.NewTokenAmount(int64(fsize*2))), rh.RetrievalParams.UnsealPrice) - - // *** Retrieve the piece - - clientStoreID := sh.TestData.MultiStore1.Next() - _, err = rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID) - require.NoError(t, err) + fsize, clientStoreID := doRetrieve(t, bgCtx, rh, sh, vAmts) ctxTimeout, cancel := context.WithTimeout(bgCtx, 10*time.Second) defer cancel() @@ -407,6 +338,8 @@ type retrievalHarness struct { ExpPaychAmt, ActualPaychAmt *abi.TokenAmount ExpVoucher, ActualVoucher *paych.SignedVoucher RetrievalParams retrievalmarket.Params + + TestDataNet *shared_testutil.Libp2pTestData } func newRetrievalHarness(ctx context.Context, t *testing.T, sh *testharness.StorageHarness, deal storagemarket.ClientDeal, @@ -529,15 +462,105 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *testharness.Stor ProviderNode: providerNode, PieceStore: sh.PieceStore, RetrievalParams: p, + TestDataNet: sh.TestData, } } type fakeDTValidator struct{} -func (v *fakeDTValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (v *fakeDTValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { return nil, nil } -func (v *fakeDTValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (v *fakeDTValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { return nil, nil } + +func doStorage(t *testing.T, ctx context.Context, sh *testharness.StorageHarness) storagemarket.ClientDeal { + shared_testutil.StartAndWaitForReady(ctx, t, sh.Client) + shared_testutil.StartAndWaitForReady(ctx, t, sh.Provider) + + // set up a subscriber + providerDealChan := make(chan storagemarket.MinerDeal) + subscriber := func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + providerDealChan <- deal + } + _ = sh.Provider.SubscribeToEvents(subscriber) + + clientDealChan := make(chan storagemarket.ClientDeal) + clientSubscriber := func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + clientDealChan <- deal + } + _ = sh.Client.SubscribeToEvents(clientSubscriber) + + // set ask price where we'll accept any price + err := sh.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000) + assert.NoError(t, err) + + result := sh.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: sh.PayloadCid}, false, false) + require.False(t, result.ProposalCid.Equals(cid.Undef)) + + time.Sleep(time.Millisecond * 200) + + ctxTimeout, canc := context.WithTimeout(ctx, 25*time.Second) + defer canc() + + var storageProviderSeenDeal storagemarket.MinerDeal + var storageClientSeenDeal storagemarket.ClientDeal + for storageProviderSeenDeal.State != storagemarket.StorageDealExpired || + storageClientSeenDeal.State != storagemarket.StorageDealExpired { + select { + case storageProviderSeenDeal = <-providerDealChan: + case storageClientSeenDeal = <-clientDealChan: + case <-ctxTimeout.Done(): + t.Fatalf("never saw completed deal, client deal state: %s (%d), provider deal state: %s (%d)", + storagemarket.DealStates[storageClientSeenDeal.State], + storageClientSeenDeal.State, + storagemarket.DealStates[storageProviderSeenDeal.State], + storageProviderSeenDeal.State, + ) + } + } + // --------------- + fmt.Println("\n Storage is complete") + + return storageClientSeenDeal +} + +func doRetrieve(t *testing.T, ctx context.Context, rh *retrievalHarness, sh *testharness.StorageHarness, + voucherAmts []abi.TokenAmount) (int, multistore.StoreID) { + + proof := []byte("") + for _, voucherAmt := range voucherAmts { + require.NoError(t, rh.ProviderNode.ExpectVoucher(*rh.ExpPaych, rh.ExpVoucher, proof, voucherAmt, voucherAmt, nil)) + } + + peers := rh.Client.FindProviders(sh.PayloadCid) + require.Len(t, peers, 1) + retrievalPeer := peers[0] + require.NotNil(t, retrievalPeer.PieceCID) + + rh.ClientNode.ExpectKnownAddresses(retrievalPeer, nil) + + resp, err := rh.Client.Query(ctx, retrievalPeer, sh.PayloadCid, retrievalmarket.QueryParams{}) + require.NoError(t, err) + require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) + + // testing V1 only + rmParams, err := retrievalmarket.NewParamsV1(rh.RetrievalParams.PricePerByte, rh.RetrievalParams.PaymentInterval, rh.RetrievalParams.PaymentIntervalIncrease, + shared.AllSelector(), nil, + rh.RetrievalParams.UnsealPrice) + require.NoError(t, err) + + // just make sure there is enough to cover the transfer + fsize := 19000 // this is the known file size of the test file lorem.txt + expectedTotal := big.Add(big.Mul(rh.RetrievalParams.PricePerByte, abi.NewTokenAmount(int64(fsize*2))), rh.RetrievalParams.UnsealPrice) + + // *** Retrieve the piece + + clientStoreID := sh.TestData.MultiStore1.Next() + _, err = rh.Client.Retrieve(ctx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID) + require.NoError(t, err) + + return fsize, clientStoreID +} diff --git a/retrievalmarket/types.go b/retrievalmarket/types.go index 34680dd33..5da7138be 100644 --- a/retrievalmarket/types.go +++ b/retrievalmarket/types.go @@ -69,6 +69,10 @@ type ClientDealState struct { LegacyProtocol bool } +func (deal *ClientDealState) NextInterval() uint64 { + return deal.Params.NextInterval(deal.CurrentInterval) +} + // ProviderDealState is the current state of a deal from the point of view // of a retrieval provider type ProviderDealState struct { @@ -85,6 +89,14 @@ type ProviderDealState struct { LegacyProtocol bool } +func (deal *ProviderDealState) IntervalLowerBound() uint64 { + return deal.Params.IntervalLowerBound(deal.CurrentInterval) +} + +func (deal *ProviderDealState) NextInterval() uint64 { + return deal.Params.NextInterval(deal.CurrentInterval) +} + // Identifier provides a unique id for this provider deal func (pds ProviderDealState) Identifier() ProviderDealIdentifier { return ProviderDealIdentifier{Receiver: pds.Receiver, DealID: pds.ID} @@ -244,6 +256,28 @@ func (p Params) SelectorSpecified() bool { return p.Selector != nil && !bytes.Equal(p.Selector.Raw, cbg.CborNull) } +func (p Params) IntervalLowerBound(currentInterval uint64) uint64 { + intervalSize := p.PaymentInterval + var lowerBound uint64 + var target uint64 + for target < currentInterval { + lowerBound = target + target += intervalSize + intervalSize += p.PaymentIntervalIncrease + } + return lowerBound +} + +func (p Params) NextInterval(currentInterval uint64) uint64 { + intervalSize := p.PaymentInterval + var nextInterval uint64 + for nextInterval <= currentInterval { + nextInterval += intervalSize + intervalSize += p.PaymentIntervalIncrease + } + return nextInterval +} + // NewParamsV0 generates parameters for a retrieval deal, which is always a whole piece deal func NewParamsV0(pricePerByte abi.TokenAmount, paymentInterval uint64, paymentIntervalIncrease uint64) Params { return Params{ diff --git a/retrievalmarket/types_test.go b/retrievalmarket/types_test.go index 3bf186f22..774e7f715 100644 --- a/retrievalmarket/types_test.go +++ b/retrievalmarket/types_test.go @@ -7,6 +7,7 @@ import ( "github.com/ipld/go-ipld-prime/codec/dagcbor" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -39,3 +40,52 @@ func TestParamsMarshalUnmarshal(t *testing.T) { sel := nb.Build() assert.Equal(t, sel, allSelector) } + +func TestParamsIntervalBounds(t *testing.T) { + testCases := []struct { + name string + currentInterval uint64 + paymentInterval uint64 + intervalIncrease uint64 + expLowerBound uint64 + expNextInterval uint64 + }{{ + currentInterval: 0, + paymentInterval: 10, + intervalIncrease: 5, + expLowerBound: 0, + expNextInterval: 10, + }, { + currentInterval: 10, + paymentInterval: 10, + intervalIncrease: 5, + expLowerBound: 0, + expNextInterval: 25, // 10 + (10 + 5) + }, { + currentInterval: 25, + paymentInterval: 10, + intervalIncrease: 5, + expLowerBound: 10, + expNextInterval: 45, // 10 + (10 + 5) + (10 + 5 + 5) + }, { + currentInterval: 45, + paymentInterval: 10, + intervalIncrease: 5, + expLowerBound: 25, + expNextInterval: 70, // 10 + (10 + 5) + (10 + 5 + 5) + (10 + 5 + 5 + 5) + }} + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + params := retrievalmarket.Params{ + PaymentInterval: tc.paymentInterval, + PaymentIntervalIncrease: tc.intervalIncrease, + } + lowerBound := params.IntervalLowerBound(tc.currentInterval) + nextInterval := params.NextInterval(tc.currentInterval) + + require.Equal(t, tc.expLowerBound, lowerBound) + require.Equal(t, tc.expNextInterval, nextInterval) + }) + } +} diff --git a/shared_testutil/generators.go b/shared_testutil/generators.go index 92df574db..3cabdfc78 100644 --- a/shared_testutil/generators.go +++ b/shared_testutil/generators.go @@ -294,11 +294,11 @@ func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket type FakeDTValidator struct{} -func (v *FakeDTValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (v *FakeDTValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { return nil, nil } -func (v *FakeDTValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (v *FakeDTValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { return nil, nil } diff --git a/storagemarket/events.go b/storagemarket/events.go index d773002b3..afd043ce1 100644 --- a/storagemarket/events.go +++ b/storagemarket/events.go @@ -1,5 +1,7 @@ package storagemarket +import "fmt" + // ClientEvent is an event that happens in the client's deal state machine type ClientEvent uint64 @@ -143,6 +145,14 @@ var ClientEvents = map[ClientEvent]string{ ClientEventDataTransferCancelled: "ClientEventDataTransferCancelled", } +func (e ClientEvent) String() string { + str, ok := ClientEvents[e] + if ok { + return str + } + return fmt.Sprintf("ClientEventUnknown - %d", e) +} + // ProviderEvent is an event that happens in the provider's deal state machine type ProviderEvent uint64 @@ -318,3 +328,11 @@ var ProviderEvents = map[ProviderEvent]string{ ProviderEventDataTransferStalled: "ProviderEventDataTransferStalled", ProviderEventDataTransferCancelled: "ProviderEventDataTransferCancelled", } + +func (e ProviderEvent) String() string { + str, ok := ProviderEvents[e] + if ok { + return str + } + return fmt.Sprintf("ProviderEventUnknown - %d", e) +} diff --git a/storagemarket/impl/requestvalidation/request_validation_test.go b/storagemarket/impl/requestvalidation/request_validation_test.go index 5a873ddd1..256679f67 100644 --- a/storagemarket/impl/requestvalidation/request_validation_test.go +++ b/storagemarket/impl/requestvalidation/request_validation_test.go @@ -131,7 +131,7 @@ func TestUnifiedRequestValidator(t *testing.T) { urv := rv.NewUnifiedRequestValidator(nil, &pullDeals{state}) t.Run("ValidatePush fails", func(t *testing.T) { - _, err := urv.ValidatePush(minerID, wrongDTType{}, block.Cid(), nil) + _, err := urv.ValidatePush(false, minerID, wrongDTType{}, block.Cid(), nil) if !xerrors.Is(err, rv.ErrNoPushAccepted) { t.Fatal("Push should fail for the client request validator for storage deals") } @@ -144,7 +144,7 @@ func TestUnifiedRequestValidator(t *testing.T) { urv := rv.NewUnifiedRequestValidator(&pushDeals{state}, nil) t.Run("ValidatePull fails", func(t *testing.T) { - _, err := urv.ValidatePull(clientID, wrongDTType{}, block.Cid(), nil) + _, err := urv.ValidatePull(false, clientID, wrongDTType{}, block.Cid(), nil) if !xerrors.Is(err, rv.ErrNoPullAccepted) { t.Fatal("Pull should fail for the provider request validator for storage deals") } @@ -171,7 +171,7 @@ func AssertPushValidator(t *testing.T, validator datatransfer.RequestValidator, if err != nil { t.Fatal("error serializing proposal") } - _, err = validator.ValidatePush(sender, &rv.StorageDataTransferVoucher{proposalNd.Cid()}, proposal.Proposal.PieceCID, nil) + _, err = validator.ValidatePush(false, sender, &rv.StorageDataTransferVoucher{proposalNd.Cid()}, proposal.Proposal.PieceCID, nil) if !xerrors.Is(err, rv.ErrNoDeal) { t.Fatal("Push should fail if there is no deal stored") } @@ -186,7 +186,7 @@ func AssertPushValidator(t *testing.T, validator datatransfer.RequestValidator, t.Fatal("deal tracking failed") } ref := minerDeal.Ref - _, err = validator.ValidatePush(sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) + _, err = validator.ValidatePush(false, sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) if !xerrors.Is(err, rv.ErrWrongPeer) { t.Fatal("Push should fail if miner address is incorrect") } @@ -199,7 +199,7 @@ func AssertPushValidator(t *testing.T, validator datatransfer.RequestValidator, if err := state.Begin(minerDeal.ProposalCid, &minerDeal); err != nil { t.Fatal("deal tracking failed") } - _, err = validator.ValidatePush(sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, blockGenerator.Next().Cid(), nil) + _, err = validator.ValidatePush(false, sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, blockGenerator.Next().Cid(), nil) if !xerrors.Is(err, rv.ErrWrongPiece) { t.Fatal("Push should fail if piece ref is incorrect") } @@ -213,7 +213,7 @@ func AssertPushValidator(t *testing.T, validator datatransfer.RequestValidator, t.Fatal("deal tracking failed") } ref := minerDeal.Ref - _, err = validator.ValidatePush(sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) + _, err = validator.ValidatePush(false, sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) if !xerrors.Is(err, rv.ErrInacceptableDealState) { t.Fatal("Push should fail if deal is in a state that cannot be data transferred") } @@ -227,7 +227,7 @@ func AssertPushValidator(t *testing.T, validator datatransfer.RequestValidator, t.Fatal("deal tracking failed") } ref := minerDeal.Ref - _, err = validator.ValidatePush(sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) + _, err = validator.ValidatePush(false, sender, &rv.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) if err != nil { t.Fatal("Push should should succeed when all parameters are correct") } @@ -244,7 +244,7 @@ func AssertValidatesPulls(t *testing.T, validator datatransfer.RequestValidator, if err != nil { t.Fatal("error serializing proposal") } - _, err = validator.ValidatePull(receiver, &rv.StorageDataTransferVoucher{proposalNd.Cid()}, proposal.Proposal.PieceCID, nil) + _, err = validator.ValidatePull(false, receiver, &rv.StorageDataTransferVoucher{proposalNd.Cid()}, proposal.Proposal.PieceCID, nil) if !xerrors.Is(err, rv.ErrNoDeal) { t.Fatal("Pull should fail if there is no deal stored") } @@ -259,7 +259,7 @@ func AssertValidatesPulls(t *testing.T, validator datatransfer.RequestValidator, t.Fatal("deal tracking failed") } payloadCid := clientDeal.DataRef.Root - _, err = validator.ValidatePull(receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) + _, err = validator.ValidatePull(false, receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) if !xerrors.Is(err, rv.ErrWrongPeer) { t.Fatal("Pull should fail if miner address is incorrect") } @@ -272,7 +272,7 @@ func AssertValidatesPulls(t *testing.T, validator datatransfer.RequestValidator, if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil { t.Fatal("deal tracking failed") } - _, err = validator.ValidatePull(receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, blockGenerator.Next().Cid(), nil) + _, err = validator.ValidatePull(false, receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, blockGenerator.Next().Cid(), nil) if !xerrors.Is(err, rv.ErrWrongPiece) { t.Fatal("Pull should fail if piece ref is incorrect") } @@ -286,7 +286,7 @@ func AssertValidatesPulls(t *testing.T, validator datatransfer.RequestValidator, t.Fatal("deal tracking failed") } payloadCid := clientDeal.DataRef.Root - _, err = validator.ValidatePull(receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) + _, err = validator.ValidatePull(false, receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) if !xerrors.Is(err, rv.ErrInacceptableDealState) { t.Fatal("Pull should fail if deal is in a state that cannot be data transferred") } @@ -300,7 +300,7 @@ func AssertValidatesPulls(t *testing.T, validator datatransfer.RequestValidator, t.Fatal("deal tracking failed") } payloadCid := clientDeal.DataRef.Root - _, err = validator.ValidatePull(receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) + _, err = validator.ValidatePull(false, receiver, &rv.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) if err != nil { t.Fatal("Pull should should succeed when all parameters are correct") } diff --git a/storagemarket/impl/requestvalidation/unified_request_validator.go b/storagemarket/impl/requestvalidation/unified_request_validator.go index 065df6cb2..d94e6fc57 100644 --- a/storagemarket/impl/requestvalidation/unified_request_validator.go +++ b/storagemarket/impl/requestvalidation/unified_request_validator.go @@ -50,7 +50,7 @@ func (v *UnifiedRequestValidator) SetPullDeals(pullDeals PullDeals) { // ValidatePush implements the ValidatePush method of a data transfer request validator. // If no pushStore exists, it rejects the request // Otherwise, it calls the ValidatePush function to validate the deal -func (v *UnifiedRequestValidator) ValidatePush(sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (v *UnifiedRequestValidator) ValidatePush(isRestart bool, sender peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { if v.pushDeals == nil { return nil, ErrNoPushAccepted } @@ -61,7 +61,7 @@ func (v *UnifiedRequestValidator) ValidatePush(sender peer.ID, voucher datatrans // ValidatePull implements the ValidatePull method of a data transfer request validator. // If no pullStore exists, it rejects the request // Otherwise, it calls the ValidatePull function to validate the deal -func (v *UnifiedRequestValidator) ValidatePull(receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { +func (v *UnifiedRequestValidator) ValidatePull(isRestart bool, receiver peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.VoucherResult, error) { if v.pullDeals == nil { return nil, ErrNoPullAccepted } diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 83ce65a18..49a5da05e 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -299,14 +299,12 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { // Configure data-transfer to restart after stalling restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ - MonitorPushChannels: true, - AcceptTimeout: 200 * time.Millisecond, - Interval: 100 * time.Millisecond, - MinBytesTransferred: 1, - ChecksPerInterval: 10, - RestartBackoff: 200 * time.Millisecond, + AcceptTimeout: 100 * time.Millisecond, + RestartBackoff: 100 * time.Millisecond, + RestartAckTimeout: 2 * time.Second, + RestartDebounce: 100 * time.Millisecond, MaxConsecutiveRestarts: 5, - CompleteTimeout: 200 * time.Millisecond, + CompleteTimeout: 100 * time.Millisecond, }) smState := testnodes.NewStorageMarketState() depGen := dependencies.NewDepGenerator() @@ -637,12 +635,10 @@ func TestBounceConnectionDataTransfer(t *testing.T) { // Configure data-transfer to automatically restart when connection goes down restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ - MonitorPushChannels: true, AcceptTimeout: 100 * time.Millisecond, - Interval: 100 * time.Millisecond, - MinBytesTransferred: 1, - ChecksPerInterval: 10, - RestartBackoff: 200 * time.Millisecond, + RestartBackoff: 100 * time.Millisecond, + RestartAckTimeout: 2 * time.Second, + RestartDebounce: 100 * time.Millisecond, MaxConsecutiveRestarts: 5, CompleteTimeout: 100 * time.Millisecond, })