Skip to content

Commit

Permalink
chore: cron and pinpin rewire
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Oct 19, 2021
1 parent 819c9e0 commit f5f99bd
Show file tree
Hide file tree
Showing 31 changed files with 2,308 additions and 1,292 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/cron.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: cron
on:
push:
branches:
- main
paths:
- 'packages/cron/**'
- '.github/workflows/cron.yml'
pull_request:
paths:
- 'packages/cron/**'
- '.github/workflows/cron.yml'
jobs:
test:
runs-on: ubuntu-latest
name: Test
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v2
with:
node-version: 16
- uses: bahmutov/npm-install@v1
- run: npm test --workspace packages/cron
2,206 changes: 1,093 additions & 1,113 deletions package-lock.json

Large diffs are not rendered by default.

36 changes: 14 additions & 22 deletions packages/cron/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,22 @@ To run this locally you will need the following in your `packages/cron/.env` fil

```ini
ENV=dev
DATABASE=postgres

# PostgREST API URL
DEV_PG_REST_URL=http://localhost:3000
# PostgREST API token, for role "postgres", using secret value PGRST_JWT_SECRET from './postgres/docker/docker-compose.yml'
# https://postgrest.org/en/v8.0/tutorials/tut1.html#step-3-sign-a-token
DEV_PG_REST_JWT=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoicG9zdGdyZXMifQ.oM0SXF31Vs1nfwCaDxjlczE237KcNKhTpKEYxMX-jEU

# Connection string for locally running postgres used in tests
DEV_PG_CONNECTION=postgres://postgres:[email protected]:5432/postgres

# Cluster
CLUSTER_API_URL=http://127.0.0.1:9094/
CLUSTER_IPFS_PROXY_API_URL=http://127.0.0.1:9095/api/v0/

# Fauna
DEV_FAUNA_KEY="<your key here>"
```

Expand All @@ -31,25 +45,3 @@ Run the job:
```sh
npm run start:pins
```

### pinata

Fetch the oldest 600 PinRequests from the DB and pin them on Piñata

To run this locally you will need the following in your `packages/cron/.env` file:

```ini
ENV=dev
DEV_FAUNA_KEY="<your key here>"
PINATA_JWT="<your jwt here>"
```

You also need to have:

- a dev account and db set up on FaunaDB with the latest schema imported as per [../db/README.md](../db/README.md)

Run the job:

```sh
npm run start:pinata
```
12 changes: 10 additions & 2 deletions packages/cron/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
"start": "run-s start:*",
"start:metrics": "node src/bin/metrics.js",
"start:pins": "node src/bin/pins.js",
"start:pinata": "node src/bin/pinata.js"
"start:dagcargo:views": "NODE_TLS_REJECT_UNAUTHORIZED=0 node src/bin/dagcargo-views.js",
"test": "npm-run-all -p -r mock:cluster mock:pgrest test:e2e",
"test:e2e": "mocha test/*.spec.js --exit",
"mock:cluster": "smoke -p 9094 test/mocks/cluster",
"mock:pgrest": "smoke -p 9087 test/mocks/pgrest"
},
"author": "Alan Shaw",
"license": "(Apache-2.0 AND MIT)",
Expand All @@ -20,12 +24,16 @@
"debug": "^4.3.1",
"dotenv": "^9.0.2",
"limiter": "2.0.1",
"pg": "^8.7.1",
"node-fetch": "^2.6.1",
"p-retry": "^4.6.1",
"piggybacker": "^2.0.0"
},
"devDependencies": {
"@types/node": "^16.3.1",
"npm-run-all": "^4.1.5"
"execa": "^5.1.1",
"mocha": "^8.3.2",
"npm-run-all": "^4.1.5",
"smoke": "^3.1.1"
}
}
19 changes: 19 additions & 0 deletions packages/cron/src/bin/dagcargo-views.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env node

import dotenv from 'dotenv'
import { refreshMaterializedViews } from '../jobs/dagcargo.js'
import { getPg } from '../lib/utils.js'

async function main () {
const pg = getPg(process.env)
await pg.connect()

try {
await refreshMaterializedViews({ pg })
} finally {
await pg.end()
}
}

dotenv.config()
main()
26 changes: 26 additions & 0 deletions packages/cron/src/jobs/dagcargo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import debug from 'debug'

/**
* Refreshes the materialized views.
*
* @param {{ pg: import('pg').Client }} config
*/
export async function refreshMaterializedViews ({ pg }) {
const log = debug('dagcargo:refreshMaterializedViews')
if (!log.enabled) {
console.log(
'ℹ️ Enable logging by setting DEBUG=dagcargo:refreshMaterializedViews'
)
}

log('🔁 REFRESH MATERIALIZED VIEW CONCURRENTLY public.deal;')
await pg.query('REFRESH MATERIALIZED VIEW CONCURRENTLY public.deal;')

log('🔁 REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate;')
await pg.query('REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate;')

log('🔁 REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate_entry;')
await pg.query('REFRESH MATERIALIZED VIEW CONCURRENTLY public.aggregate_entry;')

log('✅ Done')
}
134 changes: 32 additions & 102 deletions packages/cron/src/jobs/pins.js
Original file line number Diff line number Diff line change
@@ -1,75 +1,18 @@
import debug from 'debug'
import { gql } from '@web3-storage/db'
import { toPinStatusEnum } from '@web3-storage/api/src/utils/pin.js'
import retry from 'p-retry'
import { piggyback } from 'piggybacker'

const MAX_PIN_REQUESTS_PER_RUN = 600
const log = debug('pins:updatePinStatuses')

const FIND_PIN_SYNC_REQUESTS = gql`
query FindPinSyncRequests($to: Time, $after: String) {
findPinSyncRequests(to: $to, _size: 1000, _cursor: $after) {
data {
_id
pin {
_id
content {
_id
cid
dagSize
}
location {
peerId
}
status
created
}
}
after
}
}
`

const UPDATE_PINS = gql`
mutation UpdatePins($pins: [UpdatePinInput!]!) {
updatePins(pins: $pins) {
_id
}
}
`

const UPDATE_CONTENT_DAG_SIZE = gql`
mutation UpdateContentDagSize($content: ID!, $dagSize: Long!) {
updateContentDagSize(content: $content, dagSize: $dagSize) {
_id
}
}
`

const DELETE_PIN_SYNC_REQUESTS = gql`
mutation DeletePinSyncRequests($requests: [ID!]!) {
deletePinSyncRequests(requests: $requests) {
_id
}
}
`

const CREATE_PIN_SYNC_REQUESTS = gql`
mutation CreatePinSyncRequests($pins: [ID!]!) {
createPinSyncRequests(pins: $pins) {
_id
}
}
`

/**
* @param {{
* cluster: import('@nftstorage/ipfs-cluster').Cluster
* db: import('@web3-storage/db').DBClient
* ipfs: import('../lib/ipfs').IPFS
* }} config
*/
export async function updatePinStatuses ({ cluster, db, ipfs }) {
export async function updatePinStatuses ({ cluster, db }) {
if (!log.enabled) {
console.log('ℹ️ Enable logging by setting DEBUG=pins:updatePinStatuses')
}
Expand All @@ -78,10 +21,6 @@ export async function updatePinStatuses ({ cluster, db, ipfs }) {
// multiple times about the same CID.
/** @type {Map<string, import('@nftstorage/ipfs-cluster').StatusResponse['peerMap']>} */
const statusCache = new Map()
// List of CIDs that we already updated the DAG size for and don't need to do
// get the size or update again.
/** @type {Set<string>} */
const updatedDagSizes = new Set()

const getPinStatus = piggyback(
async cid => {
Expand All @@ -99,18 +38,26 @@ export async function updatePinStatuses ({ cluster, db, ipfs }) {
)

const to = new Date().toISOString()
const size = MAX_PIN_REQUESTS_PER_RUN
let queryRes, after
let i = 0
while (true) {
queryRes = await retry(() => db.query(FIND_PIN_SYNC_REQUESTS, { to, after }), { onFailedAttempt: log })
const requests = queryRes.findPinSyncRequests.data
queryRes = await retry(() => db.getPinSyncRequests({ to, after, size }), { onFailedAttempt: log })

const requests = queryRes.data
log(`📥 Processing ${i} -> ${i + requests.length}`)

const checkDagSizePins = []
const reSyncPins = []
let pinUpdates = await Promise.all(requests.map(async req => {
const { pin } = req
const peerMap = await getPinStatus(pin.content.cid)
let peerMap

try {
peerMap = await getPinStatus(pin.contentCid)
} catch (err) {
reSyncPins.push(pin)
return null // Cluster could not find the content, please check later
}

if (!peerMap[pin.location.peerId]) {
return null // not tracked by our cluster
Expand All @@ -123,59 +70,42 @@ export async function updatePinStatuses ({ cluster, db, ipfs }) {
}

if (status === pin.status) {
log(`🙅 ${pin.content.cid}@${pin.location.peerId}: No status change (${status})`)
log(`🙅 ${pin.contentCid}@${pin.location.peerId}: No status change (${status})`)
return null
}

if (status === 'Pinned' && !pin.content.dagSize && !updatedDagSizes.has(pin.content.cid)) {
checkDagSizePins.push(pin)
updatedDagSizes.add(pin.content.cid)
}
log(`📌 ${pin.contentCid}@${pin.location.peerId}: ${pin.status} => ${status}`)

log(`📌 ${pin.content.cid}@${pin.location.peerId}: ${pin.status} => ${status}`)
return { pin: pin._id, status: status }
return {
id: pin._id,
status: status,
content_cid: pin.contentCid,
pin_location_id: pin.location._id,
updated_at: new Date().toISOString()
}
}))

pinUpdates = pinUpdates.filter(Boolean)

log(`⏳ Updating ${pinUpdates.length} pins...`)
if (pinUpdates.length) {
await retry(() => db.query(UPDATE_PINS, {
pins: pinUpdates
}), { onFailedAttempt: log })
await retry(() => db.upsertPins(pinUpdates), { onFailedAttempt: log })
}
log(`✅ Updated ${pinUpdates.filter(Boolean).length} pins...`)

log(`⏳ Re-queuing ${reSyncPins.length} pin sync requests...`)
if (reSyncPins.length) {
await retry(() => db.query(CREATE_PIN_SYNC_REQUESTS, {
pins: reSyncPins.map(p => p._id)
}), { onFailedAttempt: log })
}
log(`✅ Re-queued ${reSyncPins.length} pin sync requests...`)

log(`⏳ Removing ${requests.length} pin sync requests...`)
if (requests.length) {
await retry(() => db.query(DELETE_PIN_SYNC_REQUESTS, {
requests: requests.map(r => r._id)
}), { onFailedAttempt: log })
await retry(() => db.deletePinSyncRequests(requests.map(r => r._id)), { onFailedAttempt: log })
}
log(`✅ Removed ${requests.length} pin sync requests...`)

await Promise.all(checkDagSizePins.map(async pin => {
log(`⏳ ${pin.content.cid}: Querying DAG size...`)
let dagSize
try {
// Note: this will timeout for large DAGs
dagSize = await ipfs.dagSize(pin.content.cid, { timeout: 10 * 60000 })
log(`🛄 ${pin.content.cid}@${pin.location.peerId}: ${dagSize} bytes`)
await retry(() => db.query(UPDATE_CONTENT_DAG_SIZE, { content: pin.content._id, dagSize }), { onFailedAttempt: log })
} catch (err) {
log(`💥 ${pin.content.cid}@${pin.location.peerId}: Failed to update DAG size`)
log(err)
}
}))
log(`⏳ Re-queuing ${reSyncPins.length} pin sync requests...`)
if (reSyncPins.length) {
await retry(() => db.createPinSyncRequests(reSyncPins.map(p => p._id)), { onFailedAttempt: log })
}
log(`✅ Re-queued ${reSyncPins.length} pin sync requests...`)

after = queryRes.findPinSyncRequests.after
after = queryRes.after
if (!after) break
i += requests.length
}
Expand Down
Loading

0 comments on commit f5f99bd

Please sign in to comment.