diff --git a/itests/deals_concurrent_test.go b/itests/deals_concurrent_test.go index 33a8218dd5c..44b25c7b36e 100644 --- a/itests/deals_concurrent_test.go +++ b/itests/deals_concurrent_test.go @@ -1,12 +1,22 @@ package itests import ( + "context" "fmt" + "sync" "testing" "time" + "github.com/stretchr/testify/require" + + datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" ) func TestDealCyclesConcurrent(t *testing.T) { @@ -47,3 +57,71 @@ func TestDealCyclesConcurrent(t *testing.T) { t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) }) } } + +func TestSimultenousTransferLimit(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + kit.QuietMiningLogs() + + blockTime := 10 * time.Millisecond + + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + startEpoch := abi.ChainEpoch(2 << 12) + + runTest := func(t *testing.T) { + client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(2))), + )) + ens.InterconnectAll().BeginMining(blockTime) + dh := kit.NewDealHarness(t, client, miner) + + ctx, cancel := context.WithCancel(context.Background()) + + du, err := miner.MarketDataTransferUpdates(ctx) + require.NoError(t, err) + + var maxOngoing int + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + ongoing := map[datatransfer.TransferID]struct{}{} + + for { + select { + case u := <-du: + t.Logf("%d - %s", u.TransferID, datatransfer.Statuses[u.Status]) + if u.Status == datatransfer.Ongoing { + ongoing[u.TransferID] = struct{}{} + } else { + delete(ongoing, u.TransferID) + } + + if len(ongoing) > maxOngoing { + maxOngoing = len(ongoing) + } + case <-ctx.Done(): + return + } + } + }() + + dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{ + N: 1, // TODO: set to 20 after https://github.com/ipfs/go-graphsync/issues/175 is fixed + FastRetrieval: true, + StartEpoch: startEpoch, + }) + + cancel() + wg.Wait() + + require.LessOrEqual(t, maxOngoing, 2) + } + + runTest(t) +} diff --git a/node/builder.go b/node/builder.go index f5294b8a359..884261a89b9 100644 --- a/node/builder.go +++ b/node/builder.go @@ -238,7 +238,7 @@ var LibP2P = Options( Override(ConnGaterKey, lp2p.ConnGaterOption), ) -func isType(t repo.RepoType) func(s *Settings) bool { +func IsType(t repo.RepoType) func(s *Settings) bool { return func(s *Settings) bool { return s.nodeType == t } } @@ -468,7 +468,7 @@ func Online() Option { LibP2P, ApplyIf(isFullOrLiteNode, ChainNode), - ApplyIf(isType(repo.StorageMiner), MinerNode), + ApplyIf(IsType(repo.StorageMiner), MinerNode), ) } @@ -680,8 +680,8 @@ func Repo(r repo.Repo) Option { Override(new(*dtypes.APIAlg), modules.APISecret), - ApplyIf(isType(repo.FullNode), ConfigFullNode(c)), - ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)), + ApplyIf(IsType(repo.FullNode), ConfigFullNode(c)), + ApplyIf(IsType(repo.StorageMiner), ConfigStorageMiner(c)), )(settings) } }