Skip to content

Commit

Permalink
fix: balanced pool (#1114)
Browse files Browse the repository at this point in the history
* fix: balanced pool

* fixup

* fixup
  • Loading branch information
ronag committed Nov 27, 2021
1 parent caf9ec3 commit 7e971bc
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 284 deletions.
120 changes: 44 additions & 76 deletions lib/balanced-pool.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
'use strict'

const { BalancedPoolMissingUpstreamError } = require('./core/errors')
const Dispatcher = require('./dispatcher')
const {
BalancedPoolMissingUpstreamError,
ClientClosedError,
InvalidArgumentError,
ClientDestroyedError
} = require('./core/errors')
const {
PoolBase,
kClients,
kNeedDrain,
kOnDrain,
kOnConnect,
kOnDisconnect,
kDispatch,
kOnConnectionError
} = require('./pool-base')
const Pool = require('./pool')

const kPools = Symbol('kPools')
const kPoolOpts = Symbol('kPoolOpts')
const kUpstream = Symbol('kUpstream')
const kNeedDrain = Symbol('kNeedDrain')
const kOptions = Symbol('options')
const kUpstream = Symbol('upstream')

class BalancedPool extends Dispatcher {
class BalancedPool extends PoolBase {
constructor (upstreams = [], opts = {}) {
super()

this[kPools] = []
this[kPoolOpts] = opts
this[kNeedDrain] = false
this[kOptions] = opts

if (!Array.isArray(upstreams)) {
upstreams = [upstreams]
Expand All @@ -27,94 +37,52 @@ class BalancedPool extends Dispatcher {
}

addUpstream (upstream) {
if (this[kPools].find((pool) => pool[kUpstream] === upstream)) {
if (this[kClients].find((pool) => pool[kUpstream] === upstream)) {
return this
}

const pool = new Pool(upstream, Object.assign({}, this[kPoolOpts]))
const pool = new Pool(upstream, Object.assign({}, this[kOptions]))
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('connectionError', this[kOnConnectionError])

pool[kUpstream] = upstream

pool.on('connect', (...args) => {
this.emit('connect', ...args)
})

pool.on('disconnect', (...args) => {
this.emit('disconnect', ...args)
})

pool.on('drain', (...args) => {
pool[kNeedDrain] = false

if (this[kNeedDrain]) {
this[kNeedDrain] = false
this.emit('drain', ...args)
}
})

this[kPools].push(pool)
this[kClients].push(pool)
return this
}

dispatch (opts, handler) {
// We validate that pools is greater than 0,
// otherwise we would have to wait until an upstream
// is added, which might never happen.
if (this[kPools].length === 0) {
throw new BalancedPoolMissingUpstreamError()
}

const pool = this[kPools].find(pool => !pool[kNeedDrain]) || this[kPools][0]

if (!pool.dispatch(opts, handler)) {
pool[kNeedDrain] = true
this[kNeedDrain] = true
}

this[kPools].splice(this[kPools].indexOf(pool), 1)
this[kPools].push(pool)

return !this[kNeedDrain]
}

removeUpstream (upstream) {
const pool = this[kPools].find((pool) => pool[kUpstream] === upstream)
const idx = this[kPools].indexOf(pool)
this[kPools].splice(idx, 1)
const pool = this[kClients].find((pool) => pool[kUpstream] === upstream)
const idx = this[kClients].indexOf(pool)
this[kClients].splice(idx, 1)
pool.close()
return this
}

get upstreams () {
return this[kPools].map((p) => p[kUpstream])
}

get destroyed () {
return this[kPools].reduce((acc, pool) => acc && pool.destroyed, true)
return this[kClients].map((p) => p[kUpstream])
}

get closed () {
return this[kPools].reduce((acc, pool) => acc && pool.closed, true)
}

close (cb) {
const p = Promise.all(this[kPools].map((p) => p.close()))

if (!cb) {
return p
[kDispatch] () {
// We validate that pools is greater than 0,
// otherwise we would have to wait until an upstream
// is added, which might never happen.
if (this[kClients].length === 0) {
throw new BalancedPoolMissingUpstreamError()
}

p.then(() => process.nextTick(cb), (err) => process.nextTick(cb, err))
}

destroy (err, cb) {
const p = Promise.all(this[kPools].map((p) => p.destroy(err)))
let dispatcher = this[kClients].find(dispatcher => !dispatcher[kNeedDrain])

if (!cb) {
return p
if (!dispatcher) {
return
}

p.then(() => process.nextTick(cb))
this[kClients].splice(this[kClients].indexOf(dispatcher), 1)
this[kClients].push(dispatcher)

return dispatcher
}
}

Expand Down
223 changes: 223 additions & 0 deletions lib/pool-base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
'use strict'

const Dispatcher = require('./dispatcher')
const {
ClientDestroyedError,
ClientClosedError,
InvalidArgumentError
} = require('./core/errors')
const FixedQueue = require('./node/fixed-queue')
const { kSize, kRunning, kPending, kBusy } = require('./core/symbols')

const kClients = Symbol('clients')
const kNeedDrain = Symbol('needDrain')
const kQueue = Symbol('queue')
const kDestroyed = Symbol('destroyed')
const kClosedPromise = Symbol('closed promise')
const kClosedResolve = Symbol('closed resolve')
const kOnDrain = Symbol('onDrain')
const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
const kQueued = Symbol('queued')
const kDispatch = Symbol('dispatch')

class PoolBase extends Dispatcher {
constructor () {
super()

this[kQueue] = new FixedQueue()
this[kClosedPromise] = null
this[kClosedResolve] = null
this[kDestroyed] = false
this[kClients] = []
this[kNeedDrain] = false
this[kQueued] = 0

const pool = this

this[kOnDrain] = function onDrain (origin, targets) {
const queue = pool[kQueue]

let needDrain = false

while (!needDrain) {
const item = queue.shift()
if (!item) {
break
}
pool[kQueued]--
needDrain = !this.dispatch(item.opts, item.handler)
}

this[kNeedDrain] = needDrain

if (!this[kNeedDrain] && pool[kNeedDrain]) {
pool[kNeedDrain] = false
pool.emit('drain', origin, [pool, ...targets])
}

if (pool[kClosedResolve] && queue.isEmpty()) {
Promise
.all(pool[kClients].map(c => c.close()))
.then(pool[kClosedResolve])
}
}

this[kOnConnect] = (origin, targets) => {
pool.emit('connect', origin, [pool, ...targets])
}

this[kOnDisconnect] = (origin, targets, err) => {
pool.emit('disconnect', origin, [pool, ...targets], err)
}

this[kOnConnectionError] = (origin, targets, err) => {
pool.emit('connectionError', origin, [pool, ...targets], err)
}
}

get [kBusy] () {
return this[kNeedDrain]
}

get [kPending] () {
let ret = this[kQueued]
for (const { [kPending]: pending } of this[kClients]) {
ret += pending
}
return ret
}

get [kRunning] () {
let ret = 0
for (const { [kRunning]: running } of this[kClients]) {
ret += running
}
return ret
}

get [kSize] () {
let ret = this[kQueued]
for (const { [kSize]: size } of this[kClients]) {
ret += size
}
return ret
}

get destroyed () {
return this[kDestroyed]
}

get closed () {
return this[kClosedPromise] != null
}

close (cb) {
try {
if (this[kDestroyed]) {
throw new ClientDestroyedError()
}

if (!this[kClosedPromise]) {
if (this[kQueue].isEmpty()) {
this[kClosedPromise] = Promise.all(this[kClients].map(c => c.close()))
} else {
this[kClosedPromise] = new Promise((resolve) => {
this[kClosedResolve] = resolve
})
}
this[kClosedPromise] = this[kClosedPromise].then(() => {
this[kDestroyed] = true
})
}

if (cb) {
this[kClosedPromise].then(() => cb(null, null))
} else {
return this[kClosedPromise]
}
} catch (err) {
if (cb) {
cb(err)
} else {
return Promise.reject(err)
}
}
}

destroy (err, cb) {
this[kDestroyed] = true

if (typeof err === 'function') {
cb = err
err = null
}

if (!err) {
err = new ClientDestroyedError()
}

while (true) {
const item = this[kQueue].shift()
if (!item) {
break
}
item.handler.onError(err)
}

const promise = Promise.all(this[kClients].map(c => c.destroy(err)))
if (cb) {
promise.then(() => cb(null, null))
} else {
return promise
}
}

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
}

try {
if (this[kDestroyed]) {
throw new ClientDestroyedError()
}

if (this[kClosedPromise]) {
throw new ClientClosedError()
}

const dispatcher = this[kDispatch]()

if (!dispatcher) {
this[kNeedDrain] = true
this[kQueue].push({ opts, handler })
this[kQueued]++
} else if (!dispatcher.dispatch(opts, handler)) {
dispatcher[kNeedDrain] = true
// TODO: return this[kClients].some(dispatcher => !dispatcher[kNeedDrain])
}
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}

handler.onError(err)
}

return !this[kNeedDrain]
}

}

module.exports = {
PoolBase,
kClients,
kNeedDrain,
kOnDrain,
kOnConnect,
kOnDisconnect,
kDispatch,
kOnConnectionError
}
Loading

0 comments on commit 7e971bc

Please sign in to comment.