Skip to content

Commit

Permalink
Merge pull request #4280 from filecoin-project/fix/market-deal-race
Browse files Browse the repository at this point in the history
fix a race when retrieving pieces
  • Loading branch information
magik6k authored Oct 10, 2020
2 parents 09bff14 + 283fd05 commit 83ba7ec
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,13 +462,19 @@ type retrievalSubscribeEvent struct {
state rm.ClientDealState
}

func readSubscribeEvents(ctx context.Context, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
func readSubscribeEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
case <-ctx.Done():
return xerrors.New("Retrieval Timed Out")
case subscribeEvent = <-subscribeEvents:
if subscribeEvent.state.ID != dealID {
// we can't check the deal ID ahead of time because:
// 1. We need to subscribe before retrieving.
// 2. We won't know the deal ID until after retrieving.
continue
}
}

select {
Expand Down Expand Up @@ -531,19 +537,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return err
}*/

var dealID retrievalmarket.DealID
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
if state.PayloadCID.Equals(order.Root) && state.ID == dealID {
select {
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
})

ppb := types.BigDiv(order.Total, types.NewInt(order.Size))

params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
Expand All @@ -562,7 +555,21 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
_ = a.RetrievalStoreMgr.ReleaseStore(store)
}()

dealID, err = a.Retrieval.Retrieve(
// Subscribe to events before retrieving to avoid losing events.
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
defer cancel()
unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) {
// We'll check the deal IDs inside readSubscribeEvents.
if state.PayloadCID.Equals(order.Root) {
select {
case <-subscribeCtx.Done():
case subscribeEvents <- retrievalSubscribeEvent{event, state}:
}
}
})

dealID, err := a.Retrieval.Retrieve(
ctx,
order.Root,
params,
Expand All @@ -573,11 +580,12 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
store.StoreID())

if err != nil {
unsubscribe()
finish(xerrors.Errorf("Retrieve failed: %w", err))
return
}

err = readSubscribeEvents(ctx, subscribeEvents, events)
err = readSubscribeEvents(ctx, dealID, subscribeEvents, events)

unsubscribe()
if err != nil {
Expand Down

0 comments on commit 83ba7ec

Please sign in to comment.