Skip to content

Commit

Permalink
feat: storage backup
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Sep 2, 2021
1 parent c790401 commit 0b84c98
Show file tree
Hide file tree
Showing 13 changed files with 7,336 additions and 5,209 deletions.
12,271 changes: 7,098 additions & 5,173 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ One time set up of your cloudflare worker subdomain for dev:
wrangler secret put FAUNA_KEY --env $(whoami) # Get from fauna.com after creating a dev Classic DB
wrangler secret put CLUSTER_BASIC_AUTH_TOKEN --env $(whoami) # Get from web3.storage vault in 1password (not required for dev)
wrangler secret put SENTRY_DSN --env $(whoami) # Get from Sentry (not required for dev)
wrangler secret put S3_BUCKET_REGION --env $(whoami) # Get from Amazon S3 (not required for dev)
wrangler secret put S3_ACCESS_KEY_ID --env $(whoami) # Get from Amazon S3 (not required for dev)
wrangler secret put S3_SECRET_ACCESS_KEY_ID --env $(whoami) # Get from Amazon S3 (not required for dev)
wrangler secret put S3_BUCKET_NAME --env $(whoami) # Get from Amazon S3 (not required for dev)
```
- `npm run publish` - Publish the worker under your env. An alias for `wrangler publish --env $(whoami)`
Expand Down Expand Up @@ -181,3 +185,7 @@ SENTRY_UPLOAD=false # toggle for sentry source/sourcemaps upload (capture will s
```
Production vars should be set in Github Actions secrets.
## S3 Setup
We use [S3](https://aws.amazon.com/s3/) for backup storage. For production an accounton AWS needs to be created.
11 changes: 7 additions & 4 deletions packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
"dev": "wrangler dev --env $(whoami)",
"publish": "wrangler publish --env $(whoami)",
"build": "WEBPACK_CLI_FORCE_LOAD_ESM_CONFIG=true webpack",
"test": "npm-run-all -p -r mock:cluster mock:db test:e2e -s test:size",
"test": "npm-run-all -p -r mock:cluster mock:db mock:backup test:e2e -s test:size",
"test:size": "bundlesize",
"test:e2e": "playwright-test \"test/**/*.spec.js\" --sw src/index.js -b webkit",
"mock:cluster": "smoke -p 9094 test/mocks/cluster",
"mock:db": "smoke -p 9086 test/mocks/db"
"mock:db": "smoke -p 9086 test/mocks/db",
"mock:backup": "smoke -p 9096 test/mocks/backup"
},
"devDependencies": {
"@sentry/webpack-plugin": "^1.16.0",
Expand All @@ -38,6 +39,7 @@
"webpack-cli": "^4.7.2"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.28.0",
"@ipld/car": "^3.1.4",
"@ipld/dag-cbor": "^6.0.3",
"@ipld/dag-pb": "^2.0.2",
Expand All @@ -48,12 +50,13 @@
"@web3-storage/multipart-parser": "^1.0.0",
"itty-router": "^2.3.10",
"multiformats": "^9.0.4",
"p-retry": "^4.6.1"
"p-retry": "^4.6.1",
"uint8arrays": "^3.0.0"
},
"bundlesize": [
{
"path": "./dist/main.js",
"maxSize": "1 MB",
"maxSize": "1.4 MB",
"compression": "none"
}
]
Expand Down
49 changes: 36 additions & 13 deletions packages/api/src/car.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as cbor from '@ipld/dag-cbor'
import * as pb from '@ipld/dag-pb'
import retry from 'p-retry'
import { GATEWAY, LOCAL_ADD_THRESHOLD, DAG_SIZE_CALC_LIMIT, MAX_BLOCK_SIZE } from './constants.js'
import { backup } from './utils/backup.js'
import { JSONResponse } from './utils/json-response.js'
import { toPinStatusEnum } from './utils/pin.js'

Expand Down Expand Up @@ -144,19 +145,10 @@ export async function carPost (request, env, ctx) {
// Ensure car blob.type is set; it is used by the cluster client to set the foramt=car flag on the /add call.
const content = blob.slice(0, blob.size, 'application/car')

const { cid } = await env.cluster.add(content, {
metadata: { size: content.size.toString() },
// When >2.5MB, use local add, because waiting for blocks to be sent to
// other cluster nodes can take a long time. Replication to other nodes
// will be done async by bitswap instead.
local: blob.size > LOCAL_ADD_THRESHOLD
})

const { peerMap } = await env.cluster.status(cid)
const pins = toPins(peerMap)
if (!pins.length) { // should not happen
throw new Error('not pinning on any node')
}
const [{ cid, pins }, backupKey] = await Promise.all([
addToCluster(content, blob.size, env),
backup(content, env)
])

// Store in DB
// Retried because it's possible to receive the error:
Expand All @@ -169,6 +161,12 @@ export async function carPost (request, env, ctx) {
cid,
name,
type: 'Car',
backupData: backupKey
? [{
key: backupKey,
name: env.s3BucketName
}]
: [],
pins
}
})
Expand Down Expand Up @@ -259,6 +257,31 @@ export async function sizeOf (response) {
return size
}

/**
* Adds content to local cluster and returns its content and pins
*
* @param {Blob} content
* @param {number} size
* @param {import('./env').Env} env
*/
async function addToCluster (content, size, env) {
const { cid } = await env.cluster.add(content, {
metadata: { size: content.size.toString() },
// When >2.5MB, use local add, because waiting for blocks to be sent to
// other cluster nodes can take a long time. Replication to other nodes
// will be done async by bitswap instead.
local: size > LOCAL_ADD_THRESHOLD
})

const { peerMap } = await env.cluster.status(cid)
const pins = toPins(peerMap)
if (!pins.length) { // should not happen
throw new Error('not pinning on any node')
}

return { cid, pins }
}

/**
* Returns the sum of all block sizes and total blocks. Throws if the CAR does
* not conform to our idea of a valid CAR i.e.
Expand Down
31 changes: 29 additions & 2 deletions packages/api/src/env.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
/* global MAGIC_SECRET_KEY FAUNA_ENDPOINT FAUNA_KEY SALT CLUSTER_BASIC_AUTH_TOKEN CLUSTER_API_URL SENTRY_DSN, VERSION DANGEROUSLY_BYPASS_MAGIC_AUTH */
/* global MAGIC_SECRET_KEY FAUNA_ENDPOINT FAUNA_KEY SALT CLUSTER_BASIC_AUTH_TOKEN CLUSTER_API_URL SENTRY_DSN, VERSION DANGEROUSLY_BYPASS_MAGIC_AUTH S3_BUCKET_ENDPOINT S3_BUCKET_NAME S3_BUCKET_REGION S3_ACCESS_KEY_ID S3_SECRET_ACCESS_KEY_ID */
import Toucan from 'toucan-js'
import { S3Client } from '@aws-sdk/client-s3'
import { Magic } from '@magic-sdk/admin'
import { DBClient } from '@web3-storage/db'
import { Cluster } from '@nftstorage/ipfs-cluster'

import pkg from '../package.json'

/** @typedef {{ magic: Magic, db: DBClient, SALT: string }} Env */
/**
* @typedef {object} Env
* @property {Cluster} cluster
* @property {Magic} magic
* @property {DBClient} db
* @property {string} SALT
* @property {S3Client} [s3Client]
* @property {string} [s3BucketName]
*/

/**
* @param {Request} req
Expand Down Expand Up @@ -49,4 +58,22 @@ export function envAll (_, env, event) {
const clusterAuthToken = env.CLUSTER_BASIC_AUTH_TOKEN || (typeof CLUSTER_BASIC_AUTH_TOKEN === 'undefined' ? undefined : CLUSTER_BASIC_AUTH_TOKEN)
const headers = clusterAuthToken ? { Authorization: `Basic ${clusterAuthToken}` } : {}
env.cluster = new Cluster(env.CLUSTER_API_URL || CLUSTER_API_URL, { headers })

try {
if ((env.S3_ACCESS_KEY_ID || S3_ACCESS_KEY_ID) && (env.S3_SECRET_ACCESS_KEY_ID || S3_SECRET_ACCESS_KEY_ID)) {
const s3Endpoint = env.S3_BUCKET_ENDPOINT || (typeof S3_BUCKET_ENDPOINT === 'undefined' ? undefined : S3_BUCKET_ENDPOINT)
env.s3Client = new S3Client({
endpoint: s3Endpoint,
forcePathStyle: !!s3Endpoint, // Force path if endpoint provided
region: env.S3_BUCKET_REGION || S3_BUCKET_REGION,
credentials: {
accessKeyId: env.S3_ACCESS_KEY_ID || S3_ACCESS_KEY_ID,
secretAccessKey: env.S3_SECRET_ACCESS_KEY_ID || S3_SECRET_ACCESS_KEY_ID
}
})
env.s3BucketName = env.S3_BUCKET_NAME || S3_BUCKET_NAME
}
} catch { // not required in dev mode
console.log('no setup for backups')
}
}
69 changes: 52 additions & 17 deletions packages/api/src/upload.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint-env serviceworker */
import { gql } from '@web3-storage/db'
import { backup } from './utils/backup.js'
import { JSONResponse } from './utils/json-response.js'
import { toFormData } from './utils/form-data.js'
import { LOCAL_ADD_THRESHOLD } from './constants.js'
Expand Down Expand Up @@ -35,6 +36,7 @@ export async function uploadPost (request, env, ctx) {

let cid
let dagSize
let backupData = []
let name = headers.get('x-name')
let type
if (!name || typeof name !== 'string') {
Expand All @@ -44,17 +46,16 @@ export async function uploadPost (request, env, ctx) {
if (contentType.includes('multipart/form-data')) {
const form = await toFormData(request)
const files = /** @type {File[]} */ (form.getAll('file'))
const dirSize = files.reduce((total, f) => total + f.size, 0)

const entries = await env.cluster.addDirectory(files, {
metadata: { size: dirSize.toString() },
// When >2.5MB, use local add, because waiting for blocks to be sent to
// other cluster nodes can take a long time. Replication to other nodes
// will be done async by bitswap instead.
local: dirSize > LOCAL_ADD_THRESHOLD
})
const dir = entries[entries.length - 1]
const [entries, ...keys] = await Promise.all([
addDirectoryToCluster(files, env),
...files.map((f) => backup(f, env))
])

const dir = entries[entries.length - 1]
backupData = keys.map((key) => ({
key,
name: env.s3BucketName
}))
cid = dir.cid
dagSize = dir.size
type = 'Multipart'
Expand All @@ -64,14 +65,17 @@ export async function uploadPost (request, env, ctx) {
throw new Error('Empty payload')
}

const entry = await env.cluster.add(blob, {
metadata: { size: blob.size.toString() },
// When >2.5MB, use local add, because waiting for blocks to be sent to
// other cluster nodes can take a long time. Replication to other nodes
// will be done async by bitswap instead.
local: blob.size > LOCAL_ADD_THRESHOLD
})
const [entry, backupKey] = await Promise.all([
addFileToCluster(blob, env),
backup(blob, env)
])

backupData = backupKey
? [{
key: backupKey,
name: env.s3BucketName
}]
: []
cid = entry.cid
dagSize = entry.size
type = 'Blob'
Expand All @@ -97,6 +101,7 @@ export async function uploadPost (request, env, ctx) {
name,
type,
pins,
backupData,
dagSize
}
})
Expand All @@ -116,3 +121,33 @@ export async function uploadPost (request, env, ctx) {

return new JSONResponse({ cid })
}

/**
* @param {File[]} files
* @param {import('./env').Env} env
*/
async function addDirectoryToCluster (files, env) {
const dirSize = files.reduce((total, f) => total + f.size, 0)

return env.cluster.addDirectory(files, {
metadata: { size: dirSize.toString() },
// When >2.5MB, use local add, because waiting for blocks to be sent to
// other cluster nodes can take a long time. Replication to other nodes
// will be done async by bitswap instead.
local: dirSize > LOCAL_ADD_THRESHOLD
})
}

/**
* @param {File} file
* @param {import('./env').Env} env
*/
async function addFileToCluster (file, env) {
return env.cluster.add(file, {
metadata: { size: file.size.toString() },
// When >2.5MB, use local add, because waiting for blocks to be sent to
// other cluster nodes can take a long time. Replication to other nodes
// will be done async by bitswap instead.
local: file.size > LOCAL_ADD_THRESHOLD
})
}
24 changes: 24 additions & 0 deletions packages/api/src/utils/backup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PutObjectCommand } from '@aws-sdk/client-s3'
import { sha256 } from 'multiformats/hashes/sha2'
import { toString } from 'uint8arrays'

/**
* @param {Blob} content
* @param {import('../env').Env} env
*/
export async function backup (content, env) {
if (!env.s3Client) {
return undefined
}

const data = await content.arrayBuffer()
const key = await sha256.digest(new Uint8Array(data))
const keyStr = toString(key.bytes, 'base32')
const bucketParams = {
Bucket: env.s3BucketName,
Key: keyStr,
Body: content
}
await env.s3Client.send(new PutObjectCommand(bucketParams))
return keyStr
}
1 change: 1 addition & 0 deletions packages/api/test/car.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
import * as pb from '@ipld/dag-pb'
import { CarWriter } from '@ipld/car'

import { endpoint } from './scripts/constants.js'
import * as JWT from '../src/utils/jwt.js'
import { SALT } from './scripts/worker-globals.js'
Expand Down
9 changes: 9 additions & 0 deletions packages/api/test/mocks/backup/put_bucket#@id.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* https://github.com/sinedied/smoke#javascript-mocks
*/
module.exports = () => {
return {
statusCode: 200,
headers: { 'Content-Type': 'application/json' }
}
}
5 changes: 5 additions & 0 deletions packages/api/test/scripts/worker-globals.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ export const FAUNA_KEY = 'test-fauna-key'
export const MAGIC_SECRET_KEY = 'test-magic-secret-key'
export const CLUSTER_API_URL = 'http://localhost:9094'
export const CLUSTER_BASIC_AUTH_TOKEN = 'test'
export const S3_BUCKET_ENDPOINT = 'http://localhost:9096'
export const S3_BUCKET_NAME = 'bucket'
export const S3_BUCKET_REGION = 'eu-central-1'
export const S3_ACCESS_KEY_ID = 'access-key-id'
export const S3_SECRET_ACCESS_KEY_ID = 'secret-access-key'

// Can be removed once we get a test mode for admin magic sdk.
export const DANGEROUSLY_BYPASS_MAGIC_AUTH = true
28 changes: 28 additions & 0 deletions packages/db/fauna/resources/Function/createUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ const body = Query(
)
)
),
Foreach(
Select('backupData', Var('data')),
Lambda(
['data'],
Create('Backup', {
data: {
upload: Select('ref', Var('upload')),
key: Select('key', Var('data')),
name: Select('name', Var('data')),
created: Now()
}
})
)
),
Var('upload')
)
)
Expand Down Expand Up @@ -163,6 +177,20 @@ const body = Query(
)
)
),
Foreach(
Select('backupData', Var('data')),
Lambda(
['data'],
Create('Backup', {
data: {
upload: Select('ref', Var('upload')),
key: Select('key', Var('data')),
name: Select('name', Var('data')),
created: Now()
}
})
)
),
Var('upload')
)
)
Expand Down
Loading

0 comments on commit 0b84c98

Please sign in to comment.