Skip to content

Commit

Permalink
fix: race condition when requesting the same block twice
Browse files Browse the repository at this point in the history
When we call `blockstore.putMany`, some implementations will batch
up all the `put`s and write them at once.  This means that
`blockstore.has` might not return `true` for a little while - if
another request for a given block comes in before `blockstore.has`
returns `true` it'll get added to the want list.  If the block then
finishes it's batch and finally a remote peer supplies the wanted
block, the notifications that complete the second block request
will never get sent and the process will hang idefinately.

The change made here is to separate the sending of notifications
out from putting things into the blockstore.  If the blockstore has
a block, but the block is still in the wantlist, send notifications
that we now have the block.
  • Loading branch information
achingbrain committed May 4, 2020
1 parent 7e7f36c commit 57185ea
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 22 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"peer-info": "^0.17.0",
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^3.3.2"
},
Expand Down
47 changes: 25 additions & 22 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ class Bitswap {
this._updateReceiveCounters(peerId.toB58String(), block, has)

if (has || !wasWanted) {
if (wasWanted) {
this._sendHaveBlockNotifications(block)
}

return
}

Expand Down Expand Up @@ -282,32 +286,31 @@ class Bitswap {
async putMany (blocks) { // eslint-disable-line require-await
const self = this

// Add any new blocks to the blockstore
const newBlocks = []
await this.blockstore.putMany(async function * () {
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}

yield block
newBlocks.push(block)
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}
}())

// Notify engine that we have new blocks
this.engine.receivedBlocks(newBlocks)

// Notify listeners that we have received the new blocks
for (const block of newBlocks) {
this.notifications.hasBlock(block)
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
self._log.error('Failed to provide: %s', err.message)
})

await this.blockstore.put(block)

self._sendHaveBlockNotifications(block)
}
}

/**
* Sends notifications about the arrival of a block
*
* @param {Block} block
*/
_sendHaveBlockNotifications (block) {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block])
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}

/**
* Get the current list of wants.
*
Expand Down
49 changes: 49 additions & 0 deletions test/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const delay = require('delay')
const PeerId = require('peer-id')
const sinon = require('sinon')

const Bitswap = require('../src')

const createTempRepo = require('./utils/create-temp-repo-nodejs')
const createLibp2pNode = require('./utils/create-libp2p-node')
const makeBlock = require('./utils/make-block')
const orderedFinish = require('./utils/helpers').orderedFinish
const Message = require('../src/types/message')

// Creates a repo + libp2pNode + Bitswap with or without DHT
async function createThing (dht) {
Expand Down Expand Up @@ -70,6 +73,52 @@ describe('bitswap without DHT', function () {

finish.assert()
})

it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => {
// the block we want
const block = await makeBlock()

// id of a peer with the block we want
const peerId = await PeerId.create({ bits: 512 })

// incoming message with requested block from the other peer
const message = new Message(false)
message.addEntry(block.cid, 1, false)
message.addBlock(block)

// slow blockstore
nodes[0].bitswap.blockstore = {
has: sinon.stub().withArgs(block.cid).returns(false),
put: sinon.stub()
}

// add the block to our want list
const wantBlockPromise1 = nodes[0].bitswap.get(block.cid)

// oh look, a peer has sent it to us - this will trigger a `blockstore.put` which
// is an async operation so `self.blockstore.has(cid)` will still return false
// until the write has completed
await nodes[0].bitswap._receiveMessage(peerId, message)

// block store did not have it
expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true()

// another context wants the same block
const wantBlockPromise2 = nodes[0].bitswap.get(block.cid)

// meanwhile the blockstore finishes it's batch
nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true)

// here it comes again
await nodes[0].bitswap._receiveMessage(peerId, message)

// block store had it this time
expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true()

// both requests should get the block
expect(await wantBlockPromise1).to.deep.equal(block)
expect(await wantBlockPromise2).to.deep.equal(block)
})
})

describe('bitswap with DHT', function () {
Expand Down

0 comments on commit 57185ea

Please sign in to comment.