Skip to content

Commit

Permalink
Make the nightly meter process smoother; avoid 1000s of conc db ops
Browse files Browse the repository at this point in the history
  • Loading branch information
j-berman committed Jul 28, 2023
1 parent 7b038fd commit ab925b2
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 24 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"build:server": "cd ./src/proof-of-concept && npm run build:server",
"build:admin-panel": "cd ./src/userbase-server/admin-panel && npm run build",
"copy:admin-panel:to-build": "mkdir ./src/proof-of-concept/build/node_modules/userbase-server/admin-panel && cp -R ./src/userbase-server/admin-panel/dist ./src/proof-of-concept/build/node_modules/userbase-server/admin-panel/dist",
"start": "npm-run-all -r -p watch:client watch:server watch:admin-panel watch:userbase-js",
"start": "LOG_LEVEL=debug npm-run-all -r -p watch:client watch:server watch:admin-panel watch:userbase-js",
"start:prod": "npm stop && node ./node_modules/forever/bin/forever --minUptime 5000 --spinSleepTime 1000 -l ../logs/encd.log -p $(pwd) -a start ./src/proof-of-concept/build/app.js",
"stop": "node ./node_modules/forever/bin/forever stopall",
"watch:client": "cd ./src/proof-of-concept && npm run watch:client",
Expand Down
105 changes: 89 additions & 16 deletions src/userbase-server/meter.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,54 @@ import logger from './logger'
import setup from './setup'
import connection from './connection'
import dbController from './db'
import { estimateSizeOfDdbItem } from './utils'
import { estimateSizeOfDdbItem, wait } from './utils'

const TIME_SINCE_LAST_METER = 1000 * 60 * 60 * 12 // 12 hours

const MAX_METER_ATTEMPTS = 3

// Most users of an app have a similar data footprint, so it makes most sense
// to optimize metering by metering users concurrently and metering all other
// data synchronously when we only have one choice (DDB concurrent reads are throttled)
const MAX_BATCH_SIZE = 200

// TODO: a queue for all DDB operations so the meter process isn't
// limited to a single bottleneck (metering users is the current bottleneck)
const controlledMeter = async (objectsToMeter, meterFunc, nightlyId, batchSize = 1) => {
let sizes = []
let batch = []
for (const objToMeter of objectsToMeter) {
batch.push(meterFunc(nightlyId, objToMeter))
if (batch.length === batchSize) {
const batchResults = await Promise.all(batch)
sizes.push(...batchResults)
batch = []
}
}
if (batch.length > 0) {
const batchResults = await Promise.all(batch)
sizes.push(...batchResults)
}
return sizes
}

const skipMeter = (start, objToMeter, type, logChildObject) => {
try {
logChildObject.lastMetered = objToMeter['last-metered']
logChildObject.lastSize = objToMeter['size']

if (objToMeter['size'] > 0) {
const lastMetered = new Date(objToMeter['last-metered'])
if (objToMeter['skip-meter'] || ((start - lastMetered) < TIME_SINCE_LAST_METER)) {
logger.child(logChildObject).debug('Skipping ' + type)
return true
}
}
} catch (e) {
logger.child({ err: e, ...logChildObject }).warn('Error checking skip meter')
}
return false
}

const ddbWhileLoop = async (params, ddbQuery, action) => {
let itemsResponse = await ddbQuery(params)
Expand Down Expand Up @@ -99,7 +146,7 @@ const storeDatabaseSizes = async (dbId, size, transactionLogSize, dbStatesSize,
const meterDatabase = async (nightlyId, userDb) => {
const start = Date.now()
const logChildObject = { nightlyId, userId: userDb['user-id'], databaseId: userDb['database-id'] }
logger.child(logChildObject).info('Metering database')
logger.child(logChildObject).debug('Metering database')

let totalDatabaseSize = estimateSizeOfDdbItem(userDb)

Expand Down Expand Up @@ -129,12 +176,13 @@ const meterDatabase = async (nightlyId, userDb) => {

logChildObject.totalDatabaseSize = totalDatabaseSize

logger.child({ timeToMeter: Date.now() - start, ...logChildObject }).info('Finished metering database')
logger.child({ timeToMeter: Date.now() - start, ...logChildObject }).debug('Finished metering database')

return totalDatabaseSize
}

const storeSize = async (TableName, Key, size, lastMetered, ConditionExpression, ExpressionAttributeNames, ExpressionAttributeValues) => {
const storeSize = async (TableName, Key, size, lastMetered, ConditionExpression,
ExpressionAttributeNames, ExpressionAttributeValues, logChildObject) => {
await connection.ddbClient().update({
TableName,
Key,
Expand All @@ -151,12 +199,18 @@ const storeSize = async (TableName, Key, size, lastMetered, ConditionExpression,
...ExpressionAttributeValues
}
}).promise()

logChildObject.currentSize = size
}

const meterUser = async (nightlyId, user) => {
const start = Date.now()
const logChildObject = { nightlyId, userId: user['user-id'], appId: user['app-id'], username: user['username'] }
logger.child(logChildObject).info('Metering user')
if (skipMeter(start, user, "user", logChildObject)) {
return user['size']
}

logger.child(logChildObject).debug('Metering user')

// meter user's databases that user is an owner of
const params = {
Expand All @@ -173,7 +227,7 @@ const meterUser = async (nightlyId, user) => {
const ddbClient = connection.ddbClient()

const ddbQuery = (params) => ddbClient.query(params).promise()
const action = (userDbs) => Promise.all(userDbs.map(userDb => meterDatabase(nightlyId, userDb)))
const action = (userDbs) => controlledMeter(userDbs, meterDatabase, nightlyId)

const totalDataStoredByUser = await ddbWhileLoop(params, ddbQuery, action) + estimateSizeOfDdbItem(user)

Expand All @@ -188,8 +242,10 @@ const meterUser = async (nightlyId, user) => {
start,
'#userId = :userId',
{ '#userId': 'user-id' },
{ ':userId': user['user-id'] }
{ ':userId': user['user-id'] },
logChildObject,
)
logger.child({ timeToMeter: Date.now() - start, ...logChildObject }).debug('Finished metering user')
} catch (e) {
logger.child({ timeToMeter: Date.now() - start, err: e, ...logChildObject }).warn('Error metering user')
}
Expand All @@ -200,7 +256,11 @@ const meterUser = async (nightlyId, user) => {
const meterApp = async (nightlyId, app) => {
const start = Date.now()
const logChildObject = { nightlyId, userId: app['user-id'], appId: app['app-id'], adminId: app['admin-id'], appName: app['app-name'] }
logger.child(logChildObject).info('Metering app')
if (skipMeter(start, app, "app", logChildObject)) {
return app['size']
}

logger.child(logChildObject).debug('Metering app')

// meter all app's users
const params = {
Expand All @@ -218,7 +278,7 @@ const meterApp = async (nightlyId, app) => {
const ddbClient = connection.ddbClient()

const ddbQuery = (params) => ddbClient.query(params).promise()
const action = (users) => Promise.all(users.map(user => meterUser(nightlyId, user)))
const action = (users) => controlledMeter(users, meterUser, nightlyId, MAX_BATCH_SIZE)
const totalDataStoredByApp = await ddbWhileLoop(params, ddbQuery, action) + estimateSizeOfDdbItem(app)

try {
Expand All @@ -232,9 +292,10 @@ const meterApp = async (nightlyId, app) => {
start,
'#appId = :appId',
{ '#appId': 'app-id' },
{ ':appId': app['app-id'] }
{ ':appId': app['app-id'] },
logChildObject,
)
logger.child({ timeToMeter: Date.now() - start, totalDataStoredByApp, ...logChildObject }).info('Finished metering app')
logger.child({ timeToMeter: Date.now() - start, totalDataStoredByApp, ...logChildObject }).debug('Finished metering app')
} catch (e) {
logger.child({ timeToMeter: Date.now() - start, err: e, ...logChildObject }).warn('Error metering app')
}
Expand All @@ -255,7 +316,7 @@ const meterApps = async (nightlyId, admin) => {
}

const ddbQuery = (params) => connection.ddbClient().query(params).promise()
const action = (apps) => Promise.all(apps.map(app => meterApp(nightlyId, app)))
const action = (apps) => controlledMeter(apps, meterApp, nightlyId)
const totalStoredByApps = await ddbWhileLoop(params, ddbQuery, action)

return totalStoredByApps
Expand All @@ -264,6 +325,10 @@ const meterApps = async (nightlyId, admin) => {
const meterAdmin = async (nightlyId, admin) => {
const start = Date.now()
const logChildObject = { nightlyId, adminId: admin['admin-id'], email: admin['email'] }
if (skipMeter(start, admin, "admin", logChildObject)) {
return admin['size']
}

logger.child(logChildObject).info('Metering admin')

const totalDataStoredByAdmin = await meterApps(nightlyId, admin)
Expand All @@ -278,7 +343,8 @@ const meterAdmin = async (nightlyId, admin) => {
start,
'#adminId = :adminId',
{ '#adminId': 'admin-id' },
{ ':adminId': admin['admin-id'] }
{ ':adminId': admin['admin-id'] },
logChildObject,
)
logger.child({ timeToMeter: Date.now() - start, ...logChildObject }).info('Finished metering admin')
} catch (e) {
Expand All @@ -298,17 +364,17 @@ const meterAdmins = async (nightlyId) => {
}

const ddbQuery = (params) => connection.ddbClient().scan(params).promise()
const action = (admins) => Promise.all(admins.map(admin => meterAdmin(nightlyId, admin)))
const action = (admins) => controlledMeter(admins, meterAdmin, nightlyId)
const totalDataStored = await ddbWhileLoop(params, ddbQuery, action)

logger.child({ timeToMeter: Date.now() - start, ...logChildObject }).info('Finished metering admins')

return totalDataStored
}

const meter = async (nightlyId) => {
const meter = async (nightlyId, attempt = 1) => {
const start = Date.now()
const logChildObject = { nightlyId, start }
const logChildObject = { nightlyId, start, attempt }

try {
logger.child(logChildObject).info('Metering data storage')
Expand All @@ -318,6 +384,13 @@ const meter = async (nightlyId) => {
logger.child({ timeToMeter: Date.now() - start, totalDataStored, ...logChildObject }).info('Finished metering')
} catch (e) {
logger.child({ timeToMeter: Date.now() - start, err: e, ...logChildObject }).fatal('Failed metering')

if (e && e.retryable) {
if (attempt < MAX_METER_ATTEMPTS) {
await wait(1000 * 60 * 5) // 5 mins
await meter(nightlyId, attempt + 1)
}
}
}
}

Expand Down
21 changes: 14 additions & 7 deletions src/userbase-server/purge.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import userController from './user'
import appController from './app'
import adminController from './admin'
import stripe from './stripe'
import { wait } from './utils'

const MS_IN_AN_HOUR = 60 * 60 * 1000
const MS_IN_A_DAY = 24 * MS_IN_AN_HOUR
const TIME_TO_PURGE = 30 * MS_IN_A_DAY
const MAX_PURGE_ATTEMPTS = 10

const ddbWhileLoop = async (params, ddbQuery, action) => {
let itemsResponse = await ddbQuery(params)
Expand Down Expand Up @@ -559,19 +561,17 @@ const purgeDeletedAdmins = async (nightlyId) => {
logger.child({ timeToPurge: Date.now() - start, ...logChildObject }).info('Finished purging deleted admins')
}

const purge = async (nightlyId) => {
const purge = async (nightlyId, attempt = 1) => {
const start = Date.now()
const logChildObject = { nightlyId, start }
const logChildObject = { nightlyId, start, attempt }

try {
logger.child(logChildObject).info('Commencing purge')

// place deleted items in permanent deleted tables
await Promise.all([
scanForDeletedAdmins(nightlyId),
scanForDeletedApps(nightlyId),
scanForDeletedUsers(nightlyId),
])
await scanForDeletedAdmins(nightlyId)
await scanForDeletedApps(nightlyId)
await scanForDeletedUsers(nightlyId)

// purge items from permanent deleted tables. Do each synchronously because top level may delete level below it;
// for example, purging admins will purge apps and users, reducing the number of deleted apps and deleted users
Expand All @@ -582,6 +582,13 @@ const purge = async (nightlyId) => {
logger.child({ timeToPurge: Date.now() - start, ...logChildObject }).info('Finished purge')
} catch (e) {
logger.child({ timeToPurge: Date.now() - start, err: e, ...logChildObject }).fatal('Failed purge')

if (e && e.retryable) {
await wait(1000 * 60 * 5) // 5 mins
if (attempt < MAX_PURGE_ATTEMPTS) {
await purge(nightlyId, attempt + 1)
}
}
}
}

Expand Down

0 comments on commit ab925b2

Please sign in to comment.