Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use bitswap fetch queue #216

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"promisify-es6": "^1.0.3",
"rimraf": "^3.0.0",
"safe-buffer": "^5.1.2",
"sinon": "^9.0.0",
"stats-lite": "^2.2.0",
"uuid": "^3.3.2"
},
Expand Down
178 changes: 123 additions & 55 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const DecisionEngine = require('./decision-engine')
const Notifications = require('./notifications')
const logger = require('./utils').logger
const Stats = require('./stats')
const CID = require('cids')

const defaultOptions = {
statsEnabled: false,
Expand Down Expand Up @@ -58,6 +59,8 @@ class Bitswap {
this.wm = new WantManager(this.peerInfo.id, this.network, this._stats)

this.notifications = new Notifications(this.peerInfo.id)

this.fetcher = new Fetcher(this)
}

get peerInfo () {
Expand Down Expand Up @@ -101,6 +104,11 @@ class Bitswap {
this._updateReceiveCounters(peerId.toB58String(), block, has)

if (has || !wasWanted) {
// When fetching a block, we register with the notifier and then check
// the blockstore, to catch syncing issues between the blockstore and
// wantlist. So inform the notifier that we got the block even though
// it may not have been in the wantlist.
this.notifications.hasBlock(block)
return
}

Expand Down Expand Up @@ -185,49 +193,11 @@ class Bitswap {
* Fetch a a list of blocks by cid. If the blocks are in the local
* blockstore they are returned, otherwise the blocks are added to the wantlist and returned once another node sends them to us.
*
* @param {Iterable<CID>} cids
* @param {AsyncIterable<CID>} cids
* @returns {Promise<AsyncIterator<Block>>}
*/
async * getMany (cids) {
let pendingStart = cids.length
const wantList = []
let promptedNetwork = false

const fetchFromNetwork = async (cid) => {
wantList.push(cid)

const blockP = this.notifications.wantBlock(cid)

if (!pendingStart) {
this.wm.wantBlocks(wantList)
}

const block = await blockP
this.wm.cancelWants([cid])

return block
}

for (const cid of cids) {
const has = await this.blockstore.has(cid)
pendingStart--
if (has) {
if (!pendingStart) {
this.wm.wantBlocks(wantList)
}
yield this.blockstore.get(cid)

continue
}

if (!promptedNetwork) {
promptedNetwork = true
this.network.findAndConnect(cids[0]).catch((err) => this._log.error(err))
}

// we don't have the block locally so fetch it from the network
yield fetchFromNetwork(cid)
}
async * getMany (cids) { // eslint-disable-line require-await
yield * this.fetcher.fetchBlocks(cids)
}

/**
Expand Down Expand Up @@ -279,22 +249,29 @@ class Bitswap {
async putMany (blocks) { // eslint-disable-line require-await
const self = this

return this.blockstore.putMany(async function * () {
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}
for await (const block of blocks) {
if (await self.blockstore.has(block.cid)) {
continue
}

yield block
await this.blockstore.put(block)

self.notifications.hasBlock(block)
self.engine.receivedBlocks([block.cid])
// Note: Don't wait for provide to finish before returning
self.network.provide(block.cid).catch((err) => {
self._log.error('Failed to provide: %s', err.message)
})
}
}())
self._sendHaveBlockNotifications(block)
}
}

/**
* Sends notifications about the arrival of a block
*
* @param {Block} block
*/
_sendHaveBlockNotifications (block) {
this.notifications.hasBlock(block)
this.engine.receivedBlocks([block.cid])
// Note: Don't wait for provide to finish before returning
this.network.provide(block.cid).catch((err) => {
this._log.error('Failed to provide: %s', err.message)
})
}

/**
Expand Down Expand Up @@ -348,4 +325,95 @@ class Bitswap {
}
}

/**
* Fetcher fetches blocks from the blockstore or the network, allowing
* multiple concurrent fetches for the same cid.
* @param {Bitswap} bitswap
*/
class Fetcher {
constructor (bitswap) {
this.bitswap = bitswap
this.live = new Map()
}

/**
* Fetch the list of cids.
*
* @param {Array<CID>} cids
* @returns {Iterator<Promise<Block>>}
*/
async * fetchBlocks (cids) { // eslint-disable-line require-await
const req = { rootCid: cids[0] }

// Queue up the requests for each CID
for await (const cid of cids) {
yield this.enqueueFetch(cid, req)
}
}

/**
* Add a cid to the fetch queue.
*
* @param {CID} cid
* @param {Object} req - used to keep state across a request for several blocks
* @returns {Promise<Block>}
*/
enqueueFetch (cid, req) {
if (!CID.isCID(cid)) {
throw new Error('Not a valid cid')
}

// Check if there is already a live fetch for the block
const cidstr = cid.toString()
let blockFetch = this.live.get(cidstr)
if (blockFetch) {
return blockFetch
}

// If not, add one
blockFetch = this.get(cid, req)
this.live.set(cidstr, blockFetch)

// Clean up the promise once the fetch has completed
blockFetch.finally(() => this.live.delete(cidstr))

return blockFetch
}

/**
* Get a block from the blockstore or the network.
*
* @param {CID} cid
* @param {Object} req - used to keep state across a request for several blocks
* @returns {Promise<Block>}
*/
async get (cid, req) {
// Register with the notifier, in case the block arrives while we're
// checking the blockstore for it
const blockNotification = this.bitswap.notifications.wantBlock(cid)

// If the block is in the local blockstore, return it
const has = await this.bitswap.blockstore.has(cid)
if (has) {
// Deregister with the notifier
this.bitswap.notifications.unwantBlock(cid)
// Get the block from the blockstore
return this.bitswap.blockstore.get(cid)
}

// Otherwise query content routing for the block
if (!req.promptedNetwork) {
this.bitswap.network.findAndConnect(req.rootCid).catch((err) => this.bitswap._log.error(err))
req.promptedNetwork = true
}

// Add the block CID to the wantlist
this.bitswap.wm.wantBlocks([cid])
// Remove it from the wantlist when the block arrives
blockNotification.then(() => this.bitswap.wm.cancelWants([cid]))

return blockNotification
}
}

module.exports = Bitswap
18 changes: 17 additions & 1 deletion test/bitswap-mock-internals.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ describe('bitswap with mocks', function () {
expect(retrievedBlocks).to.be.eql([b1, b2, b3])
})

it('getMany', async () => {
it('multiple get', async () => {
const b1 = blocks[5]
const b2 = blocks[6]
const b3 = blocks[7]
Expand All @@ -208,6 +208,22 @@ describe('bitswap with mocks', function () {
expect(block3).to.eql(b3)
})

it('getMany with iterator', async () => {
const blocks = await makeBlock(3)

await repo.blocks.putMany(blocks)
const bs = new Bitswap(mockLibp2pNode(), repo.blocks)

function * it () {
for (const b of blocks) {
yield b.cid
}
}
const fetched = await all(bs.getMany(it()))

expect(fetched).to.eql(blocks)
})

it('block is added locally afterwards', async () => {
const finish = orderedFinish(2)
const block = blocks[9]
Expand Down
64 changes: 64 additions & 0 deletions test/bitswap.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const delay = require('delay')
const PeerId = require('peer-id')
const sinon = require('sinon')

const Bitswap = require('../src')

const createTempRepo = require('./utils/create-temp-repo-nodejs')
const createLibp2pNode = require('./utils/create-libp2p-node')
const makeBlock = require('./utils/make-block')
const orderedFinish = require('./utils/helpers').orderedFinish
const Message = require('../src/types/message')

// Creates a repo + libp2pNode + Bitswap with or without DHT
async function createThing (dht) {
Expand Down Expand Up @@ -70,6 +73,67 @@ describe('bitswap without DHT', function () {

finish.assert()
})

it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => {
// the block we want
const block = await makeBlock()

// id of a peer with the block we want
const peerId = await PeerId.create({ bits: 512 })

// incoming message with requested block from the other peer
const message = new Message(false)
message.addEntry(block.cid, 1, false)
message.addBlock(block)

// Control when the put completes
const realBlockstore = nodes[0].bitswap.blockstore
let putResolver
const allowPutToProceed = () => {
putResolver(realBlockstore.put(block))
}
// Create a promise that resolves when the put starts
let onPutCalled
const blockstorePutCalled = new Promise((resolve) => {
onPutCalled = resolve
})
const unresolvedPut = new Promise((resolve) => {
onPutCalled()
putResolver = resolve
})
nodes[0].bitswap.blockstore = {
...nodes[0].bitswap.blockstore,
put: sinon.stub().withArgs(block).returns(unresolvedPut)
}

// add the block to our want list
const wantBlockPromise1 = nodes[0].bitswap.get(block.cid)

// oh look, a peer has sent it to us - this will trigger a `blockstore.put` which
// is an async operation
nodes[0].bitswap._receiveMessage(peerId, message)

// Wait for the call to blockstore.put()
// (but don't allow it to proceed yet)
await blockstorePutCalled

// another context wants the same block
const wantBlockPromise2 = nodes[0].bitswap.get(block.cid)

// Restore the real blockstore
nodes[0].bitswap.blockstore = realBlockstore

// Allow the first put to proceed
allowPutToProceed()

// receive the block again
await nodes[0].bitswap._receiveMessage(peerId, message)

// both requests should get the block
const res = await Promise.all([wantBlockPromise1, wantBlockPromise2])
expect(res[0]).to.deep.equal(block)
expect(res[1]).to.deep.equal(block)
})
})

describe('bitswap with DHT', function () {
Expand Down