diff --git a/package.json b/package.json index 10c587a7..0c86bc8f 100644 --- a/package.json +++ b/package.json @@ -69,6 +69,7 @@ "promisify-es6": "^1.0.3", "rimraf": "^3.0.0", "safe-buffer": "^5.1.2", + "sinon": "^9.0.0", "stats-lite": "^2.2.0", "uuid": "^3.3.2" }, diff --git a/src/index.js b/src/index.js index dca53913..f5e71073 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ const DecisionEngine = require('./decision-engine') const Notifications = require('./notifications') const logger = require('./utils').logger const Stats = require('./stats') +const CID = require('cids') const defaultOptions = { statsEnabled: false, @@ -58,6 +59,8 @@ class Bitswap { this.wm = new WantManager(this.peerInfo.id, this.network, this._stats) this.notifications = new Notifications(this.peerInfo.id) + + this.fetcher = new Fetcher(this) } get peerInfo () { @@ -101,6 +104,11 @@ class Bitswap { this._updateReceiveCounters(peerId.toB58String(), block, has) if (has || !wasWanted) { + // When fetching a block, we register with the notifier and then check + // the blockstore, to catch syncing issues between the blockstore and + // wantlist. So inform the notifier that we got the block even though + // it may not have been in the wantlist. + this.notifications.hasBlock(block) return } @@ -185,49 +193,11 @@ class Bitswap { * Fetch a a list of blocks by cid. If the blocks are in the local * blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us. * - * @param {Iterable} cids + * @param {AsyncIterable} cids * @returns {Promise>} */ - async * getMany (cids) { - let pendingStart = cids.length - const wantList = [] - let promptedNetwork = false - - const fetchFromNetwork = async (cid) => { - wantList.push(cid) - - const blockP = this.notifications.wantBlock(cid) - - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - - const block = await blockP - this.wm.cancelWants([cid]) - - return block - } - - for (const cid of cids) { - const has = await this.blockstore.has(cid) - pendingStart-- - if (has) { - if (!pendingStart) { - this.wm.wantBlocks(wantList) - } - yield this.blockstore.get(cid) - - continue - } - - if (!promptedNetwork) { - promptedNetwork = true - this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err)) - } - - // we don't have the block locally so fetch it from the network - yield fetchFromNetwork(cid) - } + async * getMany (cids) { // eslint-disable-line require-await + yield * this.fetcher.fetchBlocks(cids) } /** @@ -279,22 +249,29 @@ class Bitswap { async putMany (blocks) { // eslint-disable-line require-await const self = this - return this.blockstore.putMany(async function * () { - for await (const block of blocks) { - if (await self.blockstore.has(block.cid)) { - continue - } + for await (const block of blocks) { + if (await self.blockstore.has(block.cid)) { + continue + } - yield block + await this.blockstore.put(block) - self.notifications.hasBlock(block) - self.engine.receivedBlocks([block.cid]) - // Note: Don't wait for provide to finish before returning - self.network.provide(block.cid).catch((err) => { - self._log.error('Failed to provide: %s', err.message) - }) - } - }()) + self._sendHaveBlockNotifications(block) + } + } + + /** + * Sends notifications about the arrival of a block + * + * @param {Block} block + */ + _sendHaveBlockNotifications (block) { + this.notifications.hasBlock(block) + this.engine.receivedBlocks([block.cid]) + // Note: Don't wait for provide to finish before returning + this.network.provide(block.cid).catch((err) => { + this._log.error('Failed to provide: %s', err.message) + }) } /** @@ -348,4 +325,95 @@ class Bitswap { } } +/** + * Fetcher fetches blocks from the blockstore or the network, allowing + * multiple concurrent fetches for the same cid. + * @param {Bitswap} bitswap + */ +class Fetcher { + constructor (bitswap) { + this.bitswap = bitswap + this.live = new Map() + } + + /** + * Fetch the list of cids. + * + * @param {Array} cids + * @returns {Iterator>} + */ + async * fetchBlocks (cids) { // eslint-disable-line require-await + const req = { rootCid: cids[0] } + + // Queue up the requests for each CID + for await (const cid of cids) { + yield this.enqueueFetch(cid, req) + } + } + + /** + * Add a cid to the fetch queue. + * + * @param {CID} cid + * @param {Object} req - used to keep state across a request for several blocks + * @returns {Promise} + */ + enqueueFetch (cid, req) { + if (!CID.isCID(cid)) { + throw new Error('Not a valid cid') + } + + // Check if there is already a live fetch for the block + const cidstr = cid.toString() + let blockFetch = this.live.get(cidstr) + if (blockFetch) { + return blockFetch + } + + // If not, add one + blockFetch = this.get(cid, req) + this.live.set(cidstr, blockFetch) + + // Clean up the promise once the fetch has completed + blockFetch.finally(() => this.live.delete(cidstr)) + + return blockFetch + } + + /** + * Get a block from the blockstore or the network. + * + * @param {CID} cid + * @param {Object} req - used to keep state across a request for several blocks + * @returns {Promise} + */ + async get (cid, req) { + // Register with the notifier, in case the block arrives while we're + // checking the blockstore for it + const blockNotification = this.bitswap.notifications.wantBlock(cid) + + // If the block is in the local blockstore, return it + const has = await this.bitswap.blockstore.has(cid) + if (has) { + // Deregister with the notifier + this.bitswap.notifications.unwantBlock(cid) + // Get the block from the blockstore + return this.bitswap.blockstore.get(cid) + } + + // Otherwise query content routing for the block + if (!req.promptedNetwork) { + this.bitswap.network.findAndConnect(req.rootCid).catch((err) => this.bitswap._log.error(err)) + req.promptedNetwork = true + } + + // Add the block CID to the wantlist + this.bitswap.wm.wantBlocks([cid]) + // Remove it from the wantlist when the block arrives + blockNotification.then(() => this.bitswap.wm.cancelWants([cid])) + + return blockNotification + } +} + module.exports = Bitswap diff --git a/test/bitswap-mock-internals.js b/test/bitswap-mock-internals.js index f4e56e00..0f52364c 100644 --- a/test/bitswap-mock-internals.js +++ b/test/bitswap-mock-internals.js @@ -190,7 +190,7 @@ describe('bitswap with mocks', function () { expect(retrievedBlocks).to.be.eql([b1, b2, b3]) }) - it('getMany', async () => { + it('multiple get', async () => { const b1 = blocks[5] const b2 = blocks[6] const b3 = blocks[7] @@ -208,6 +208,22 @@ describe('bitswap with mocks', function () { expect(block3).to.eql(b3) }) + it('getMany with iterator', async () => { + const blocks = await makeBlock(3) + + await repo.blocks.putMany(blocks) + const bs = new Bitswap(mockLibp2pNode(), repo.blocks) + + function * it () { + for (const b of blocks) { + yield b.cid + } + } + const fetched = await all(bs.getMany(it())) + + expect(fetched).to.eql(blocks) + }) + it('block is added locally afterwards', async () => { const finish = orderedFinish(2) const block = blocks[9] diff --git a/test/bitswap.js b/test/bitswap.js index de4150db..8acbfa75 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -5,6 +5,8 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const delay = require('delay') +const PeerId = require('peer-id') +const sinon = require('sinon') const Bitswap = require('../src') @@ -12,6 +14,7 @@ const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const Message = require('../src/types/message') // Creates a repo + libp2pNode + Bitswap with or without DHT async function createThing (dht) { @@ -70,6 +73,67 @@ describe('bitswap without DHT', function () { finish.assert() }) + + it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => { + // the block we want + const block = await makeBlock() + + // id of a peer with the block we want + const peerId = await PeerId.create({ bits: 512 }) + + // incoming message with requested block from the other peer + const message = new Message(false) + message.addEntry(block.cid, 1, false) + message.addBlock(block) + + // Control when the put completes + const realBlockstore = nodes[0].bitswap.blockstore + let putResolver + const allowPutToProceed = () => { + putResolver(realBlockstore.put(block)) + } + // Create a promise that resolves when the put starts + let onPutCalled + const blockstorePutCalled = new Promise((resolve) => { + onPutCalled = resolve + }) + const unresolvedPut = new Promise((resolve) => { + onPutCalled() + putResolver = resolve + }) + nodes[0].bitswap.blockstore = { + ...nodes[0].bitswap.blockstore, + put: sinon.stub().withArgs(block).returns(unresolvedPut) + } + + // add the block to our want list + const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) + + // oh look, a peer has sent it to us - this will trigger a `blockstore.put` which + // is an async operation + nodes[0].bitswap._receiveMessage(peerId, message) + + // Wait for the call to blockstore.put() + // (but don't allow it to proceed yet) + await blockstorePutCalled + + // another context wants the same block + const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) + + // Restore the real blockstore + nodes[0].bitswap.blockstore = realBlockstore + + // Allow the first put to proceed + allowPutToProceed() + + // receive the block again + await nodes[0].bitswap._receiveMessage(peerId, message) + + // both requests should get the block + const res = await Promise.all([wantBlockPromise1, wantBlockPromise2]) + expect(res[0]).to.deep.equal(block) + expect(res[1]).to.deep.equal(block) + }) }) describe('bitswap with DHT', function () {