Skip to content

Commit

Permalink
fix: remove dagcargo materialized views (#922)
Browse files Browse the repository at this point in the history
Sister PR to web3-storage/web3.storage#735

The dagcargo materialized views are taking too long to refresh and are timing out. The entries table is just too big now. We could maybe increase the fetch size to temporarily fix but instead we decided to just remove the materialized views and make the API tolerant of failure.
  • Loading branch information
Alan Shaw authored Dec 9, 2021
1 parent 039969c commit 3fe698f
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 567 deletions.
28 changes: 0 additions & 28 deletions .github/workflows/cron-dagcargo-views.yml

This file was deleted.

35 changes: 0 additions & 35 deletions packages/api/db/cargo.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,3 @@ IMPORT FOREIGN SCHEMA cargo
LIMIT TO (aggregate_entries, aggregates, deals, dags)
FROM SERVER dag_cargo_server
INTO cargo;

-- Create materialized view from cargo "aggregate_entries" table
CREATE MATERIALIZED VIEW public.aggregate_entry
AS
SELECT *
FROM cargo.aggregate_entries;

-- Indexes for "aggregate_entries" mat view
CREATE UNIQUE INDEX aggregate_entry_unique_cidv1_aggregate_cid
ON public.aggregate_entry (aggregate_cid, cid_v1);
CREATE INDEX aggregate_entry_cid_v1
ON public.aggregate_entry (cid_v1);

-- Create materialized view from cargo "deals" table
CREATE MATERIALIZED VIEW public.deal
AS
SELECT *
FROM cargo.deals;

-- Indexes for "deals" mat view
CREATE UNIQUE INDEX deal_unique_deal_id
ON public.deal (deal_id);
CREATE INDEX deal_aggregate_cid
ON public.deal (aggregate_cid);

-- Create materialized view from cargo "aggregates" table
CREATE MATERIALIZED VIEW public.aggregate
AS
SELECT *
FROM cargo.aggregates;

-- Indexes for "aggregate" mat view
CREATE UNIQUE INDEX aggregate_unique_aggregate_cid
ON public.aggregate (aggregate_cid);

47 changes: 47 additions & 0 deletions packages/api/db/cargo.testing.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
CREATE SCHEMA IF NOT EXISTS cargo;

-- This is a copy of the dagcargo schema for testing purposes.
-- https://github.com/nftstorage/dagcargo/blob/master/maint/pg_schema.sql

CREATE TABLE IF NOT EXISTS cargo.aggregate_entries (
aggregate_cid TEXT NOT NULL,
cid_v1 TEXT NOT NULL,
datamodel_selector TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS cargo.aggregates (
aggregate_cid TEXT NOT NULL UNIQUE,
piece_cid TEXT UNIQUE NOT NULL,
sha256hex TEXT NOT NULL,
export_size BIGINT NOT NULL,
metadata JSONB,
entry_created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS cargo.deals (
deal_id BIGINT UNIQUE NOT NULL,
aggregate_cid TEXT NOT NULL,
client TEXT NOT NULL,
provider TEXT NOT NULL,
status TEXT NOT NULL,
status_meta TEXT,
start_epoch INTEGER NOT NULL,
start_time TIMESTAMP WITH TIME ZONE NOT NULL,
end_epoch INTEGER NOT NULL,
end_time TIMESTAMP WITH TIME ZONE NOT NULL,
sector_start_epoch INTEGER,
sector_start_time TIMESTAMP WITH TIME ZONE,
entry_created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
entry_last_updated TIMESTAMP WITH TIME ZONE NOT NULL
);

-- Test data

INSERT INTO cargo.aggregate_entries ("aggregate_cid", "cid_v1", "datamodel_selector") VALUES
('bafybeiek5gau46j4dxoyty27qtirb3iuoq7aax4l3xt25mfk2igyt35bme', 'bafybeiaj5yqocsg5cxsuhtvclnh4ulmrgsmnfbhbrfxrc3u2kkh35mts4e', 'Links/19/Hash/Links/46/Hash/Links/0/Hash');

INSERT INTO cargo.aggregates ("aggregate_cid", "piece_cid", "sha256hex", "export_size", "metadata", "entry_created") VALUES
('bafybeiek5gau46j4dxoyty27qtirb3iuoq7aax4l3xt25mfk2igyt35bme', 'baga6ea4seaqfanmqerzaiq7udm5wxx3hcmgapukudbadjarzkadudexamn5gwny', '9ad34a5221cc171dcc61c0862680634ca065c32972ab59f92626b7f2f18ca3fc', 25515304172, '{"Version": 1, "RecordType": "DagAggregate UnixFS"}', '2021-09-09 14:41:14.099613+00');

INSERT INTO cargo.deals ("deal_id", "aggregate_cid", "client", "provider", "status", "start_epoch", "end_epoch", "entry_created", "entry_last_updated", "status_meta", "start_time", "sector_start_epoch", "sector_start_time", "end_time") VALUES
(2424132, 'bafybeiek5gau46j4dxoyty27qtirb3iuoq7aax4l3xt25mfk2igyt35bme', 'f144zep4gitj73rrujd3jw6iprljicx6vl4wbeavi', 'f0678914', 'active', 1102102, 2570902, '2021-09-09 16:30:52.252233+00', '2021-09-10 00:45:50.408956+00', 'containing sector active as of 2021-09-10 00:36:30 at epoch 1097593', '2021-09-11 14:11:00+00', 1097593, '2021-09-10 00:36:30+00', '2023-02-03 14:11:00+00');
2 changes: 1 addition & 1 deletion packages/api/db/fdw.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ SERVER dag_cargo_server
);

CREATE
USER MAPPING FOR current_user
USER MAPPING FOR :NFT_STORAGE_USER
SERVER dag_cargo_server
OPTIONS (
user :'DAG_CARGO_USER',
Expand Down
6 changes: 3 additions & 3 deletions packages/api/db/functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ SELECT COALESCE(de.status, 'queued') as status,
a.piece_cid as pieceCid,
ae.aggregate_cid as batchRootCid,
ae.cid_v1 as contentCid
FROM public.aggregate_entry ae
join public.aggregate a using (aggregate_cid)
LEFT JOIN public.deal de USING (aggregate_cid)
FROM cargo.aggregate_entries ae
JOIN cargo.aggregates a USING (aggregate_cid)
LEFT JOIN cargo.deals de USING (aggregate_cid)
WHERE ae.cid_v1 = ANY (cids)
ORDER BY de.entry_last_updated
$$;
129 changes: 56 additions & 73 deletions packages/api/scripts/cmds/db-sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,63 +11,30 @@ const { Client } = pg
* @param {{ reset?: boolean; cargo?: boolean; testing?: boolean; }} opts
*/
export async function dbSqlCmd(opts) {
// Check required env vars are present
;[
'DAG_CARGO_HOST',
'DAG_CARGO_DATABASE',
'DAG_CARGO_USER',
'DAG_CARGO_PASSWORD',
'DATABASE_CONNECTION',
].forEach((v) => {
if (!process.env[v]) {
throw new Error(`missing environment variable ${v}`)
}
})
if (opts.cargo && !opts.testing) {
expectEnv('DAG_CARGO_HOST')
expectEnv('DAG_CARGO_DATABASE')
expectEnv('DAG_CARGO_USER')
expectEnv('DAG_CARGO_PASSWORD')
}
expectEnv('DATABASE_CONNECTION')

// read all the SQL files
const configSql = fs.readFileSync(
path.join(__dirname, '../../db/config.sql'),
'utf-8'
)
const tables = fs.readFileSync(
path.join(__dirname, '../../db/tables.sql'),
'utf-8'
)
const functions = fs.readFileSync(
path.join(__dirname, '../../db/functions.sql'),
'utf-8'
)
const reset = fs.readFileSync(
path.join(__dirname, '../../db/reset.sql'),
'utf-8'
)
let cargo = fs.readFileSync(
path.join(__dirname, '../../db/cargo.sql'),
'utf-8'
)
let fdw = fs.readFileSync(path.join(__dirname, '../../db/fdw.sql'), 'utf-8')
const { env } = process
const configSql = loadSql('config.sql')
const tables = loadSql('tables.sql')
const functions = loadSql('functions.sql')
const reset = loadSql('reset.sql')
const cargo = loadSql('cargo.sql')
const cargoTesting = loadSql('cargo.testing.sql')
const fdw = loadSql('fdw.sql')
// Replace secrets in the FDW sql file
.replace(":'DAG_CARGO_HOST'", `'${env.DAG_CARGO_HOST}'`)
.replace(":'DAG_CARGO_DATABASE'", `'${env.DAG_CARGO_DATABASE}'`)
.replace(":'DAG_CARGO_USER'", `'${env.DAG_CARGO_USER}'`)
.replace(":'DAG_CARGO_PASSWORD'", `'${env.DAG_CARGO_PASSWORD}'`)
.replace(':NFT_STORAGE_USER', env.NFT_STORAGE_USER || 'CURRENT_USER')

// Replace secrets in the FDW sql file
fdw = fdw.replace(":'DAG_CARGO_HOST'", `'${process.env.DAG_CARGO_HOST}'`)
fdw = fdw.replace(
":'DAG_CARGO_DATABASE'",
`'${process.env.DAG_CARGO_DATABASE}'`
)
fdw = fdw.replace(":'DAG_CARGO_USER'", `'${process.env.DAG_CARGO_USER}'`)
fdw = fdw.replace(
":'DAG_CARGO_PASSWORD'",
`'${process.env.DAG_CARGO_PASSWORD}'`
)

const connectionString = process.env.DATABASE_CONNECTION
const client = await retry(
async () => {
const c = new Client({ connectionString })
await c.connect()
return c
},
{ minTimeout: 100 }
)
const client = await getDbClient(env.DATABASE_CONNECTION)

if (opts.reset) {
await client.query(reset)
Expand All @@ -78,27 +45,43 @@ export async function dbSqlCmd(opts) {

if (opts.cargo) {
if (opts.testing) {
cargo = cargo.replace(
`
-- Create materialized view from cargo "aggregate_entries" table
CREATE MATERIALIZED VIEW public.aggregate_entry
AS
SELECT *
FROM cargo.aggregate_entries;`,
`
CREATE MATERIALIZED VIEW public.aggregate_entry
AS
SELECT *
FROM cargo.aggregate_entries
WHERE cid_v1 in ('bafybeiaj5yqocsg5cxsuhtvclnh4ulmrgsmnfbhbrfxrc3u2kkh35mts4e');
`
)
await client.query(cargoTesting)
} else {
await client.query(fdw)
await client.query(cargo)
}

await client.query(fdw)
await client.query(cargo)
}

await client.query(functions)
await client.end()
}

/**
* @param {string|undefined} connectionString
*/
function getDbClient(connectionString) {
return retry(
async () => {
const c = new Client({ connectionString })
await c.connect()
return c
},
{ minTimeout: 100 }
)
}

/**
* @param {string} name
*/
function expectEnv(name) {
if (!process.env[name]) {
throw new Error(`missing environment variable: ${name}`)
}
}

/**
* @param {string} file
*/
function loadSql(file) {
return fs.readFileSync(path.join(__dirname, '..', '..', 'db', file), 'utf8')
}
6 changes: 4 additions & 2 deletions packages/api/src/utils/db-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ export class DBClient {
}

/**
* Get deals for multiple cids
* Get deals for multiple cids. This function is error tolerant as it uses
* the dagcargo FDW. It will return an empty object if any error is
* encountered fetching the data.
*
* @param {string[]} cids
*/
Expand All @@ -291,7 +293,7 @@ export class DBClient {
cids,
})
if (rsp.error) {
throw new DBError(rsp.error)
return {}
}

/** @type {Record<string, import('./../bindings').Deal[]>} */
Expand Down
Loading

0 comments on commit 3fe698f

Please sign in to comment.