Skip to content

Commit

Permalink
fix: change batching strategy for user storage cron (#1419)
Browse files Browse the repository at this point in the history
Co-authored-by: Paolo <[email protected]>
  • Loading branch information
adamalton and flea89 authored Jun 15, 2022
1 parent 9649b35 commit ac10cb0
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 34 deletions.
62 changes: 38 additions & 24 deletions packages/cron/src/jobs/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ const USER_BY_USED_STORAGE_QUERY = `
SELECT * FROM users_by_storage_used($1, $2, $3, $4)
`

const MIN_MAX_USER_ID = `
SELECT min(id)::TEXT as min, max(id)::TEXT as max from public.user
const MAX_USER_ID_QUERY = `
SELECT max(id)::TEXT as max from public.user
`

const ID_RANGE_QUERY = `
SELECT max(id)::TEXT as max FROM (
SELECT id FROM public.user
WHERE id > $1
ORDER BY id
LIMIT $2
) as user_range
`

const USER_BY_EMAIL_QUERY = `
Expand Down Expand Up @@ -80,35 +89,34 @@ export async function checkStorageUsed ({ roPg, emailService, userBatchSize = 10
if (!log.enabled) {
console.log('ℹ️ Enable logging by setting DEBUG=storage:checkStorageUsed')
}

/** @type {{ rows: Array.<{min: string, max: string}> }} */
const { rows: minMax } = await roPg.query(MIN_MAX_USER_ID)

const min = BigInt(minMax[0].min)
const max = BigInt(minMax[0].max)
const batchSize = BigInt(userBatchSize)

log('🗄 Checking users storage quotas')

/** @type {{ rows: Array.<{max: string}> }} */
const { rows: maxIdResult } = await roPg.query(MAX_USER_ID_QUERY)
const maxId = BigInt(maxIdResult[0].max)

for (const email of STORAGE_QUOTA_EMAILS) {
const usersOverQuota = []
let start = min
let isLastBatch = false

while (!isLastBatch) {
let to = start + batchSize
let startId = BigInt(0)

while (true) {
// We iterate in batches, but we can't use a simple LIMIT/OFFSET approach, because
// the `users_by_storage_used` SQL function in turn calls `user_used_storage` for
// each user that it iterates, even the ones that it doesn't return, so a LIMIT of
// 1000 users could still involve `user_used_storage` being run on many thousands
// of users. Hence we get batches of user ID ranges to ensure that we only inflict
// a small amount of pain on the DB in each query.
const { rows: maxIdOfBatchResult } = await roPg.query(ID_RANGE_QUERY, [
startId,
userBatchSize
])
const maxIdOfBatch = BigInt(maxIdOfBatchResult[0].max)

if (to > max) {
// set `to` for the last batch to be null so that we take instances that might have been created
// since the initial query.
to = null
isLastBatch = true
}
const { rows: results } = await roPg.query(USER_BY_USED_STORAGE_QUERY, [
email.fromPercent,
email.toPercent,
start,
to
startId,
maxIdOfBatch
])

const users = results
Expand Down Expand Up @@ -137,7 +145,13 @@ export async function checkStorageUsed ({ roPg, emailService, userBatchSize = 10
}
}
}
start = to

if (maxIdOfBatch >= maxId) {
log('🗄 Reached last user')
break
} else {
startId = maxIdOfBatch
}
}

if (usersOverQuota.length > 0) {
Expand Down
9 changes: 4 additions & 5 deletions packages/db/postgres/functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ $$;
CREATE OR REPLACE FUNCTION users_by_storage_used(
from_percent INTEGER,
to_percent INTEGER DEFAULT NULL,
start_id BIGINT DEFAULT 0,
end_id BIGINT DEFAULT NULL
user_id_gt BIGINT DEFAULT 0,
user_id_lte BIGINT DEFAULT NULL
)
RETURNS TABLE
(
Expand Down Expand Up @@ -333,9 +333,8 @@ BEGIN
AND r.value ILIKE 'true'
AND r.deleted_at IS NULL
)
AND u.id >= start_id
AND (end_id is NULL OR u.id < end_id)
ORDER BY u.inserted_at
AND u.id > user_id_gt
AND u.id <= user_id_lte
)
SELECT *
FROM user_account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ DROP FUNCTION IF EXISTS users_by_storage_used;
CREATE OR REPLACE FUNCTION users_by_storage_used(
from_percent INTEGER,
to_percent INTEGER DEFAULT NULL,
start_id BIGINT DEFAULT 0,
end_id BIGINT DEFAULT NULL
user_id_gt BIGINT DEFAULT 0,
user_id_lte BIGINT DEFAULT NULL
)
RETURNS TABLE
(
Expand Down Expand Up @@ -41,9 +41,8 @@ BEGIN
AND r.value ILIKE 'true'
AND r.deleted_at IS NULL
)
AND u.id >= start_id
AND (end_id is NULL OR u.id < end_id)
ORDER BY u.inserted_at
AND u.id > user_id_gt
AND u.id <= user_id_lte
)
SELECT *
FROM user_account
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX IF NOT EXISTS upload_user_id_idx ON upload (user_id);
4 changes: 4 additions & 0 deletions packages/db/postgres/pg-rest-api-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,10 @@ export interface paths {
to_percent?: number;
/** Format: integer */
from_percent: number;
/** Format: integer */
user_id_gt?: number;
/** Format: integer */
user_id_lte?: number;
};
};
header: {
Expand Down
1 change: 1 addition & 0 deletions packages/db/postgres/tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ CREATE TABLE IF NOT EXISTS upload
CREATE INDEX IF NOT EXISTS upload_auth_key_id_idx ON upload (auth_key_id);
CREATE INDEX IF NOT EXISTS upload_content_cid_idx ON upload (content_cid);
CREATE INDEX IF NOT EXISTS upload_updated_at_idx ON upload (updated_at);
CREATE INDEX IF NOT EXISTS upload_user_id_idx ON upload (user_id);

-- Tracks requests to replicate content to more nodes.
CREATE TABLE IF NOT EXISTS pin_request
Expand Down

0 comments on commit ac10cb0

Please sign in to comment.