Skip to content

Commit

Permalink
feat(cli): add MongoDB restore
Browse files Browse the repository at this point in the history
  • Loading branch information
juanrgm committed Apr 12, 2024
1 parent 5d10395 commit 5ff640a
Show file tree
Hide file tree
Showing 6 changed files with 596 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-dots-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@datatruck/cli": minor
---

Add MongoDB restore
4 changes: 3 additions & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@
"fast-glob": "^3.3.2",
"listr2": "^8.2.1",
"micromatch": "^4.0.5",
"mongodb": "^6.5.0",
"mysql2": "^3.9.4",
"tty-table": "^4.2.3",
"yaml": "^2.4.1"
},
"devDependencies": {
"@types/async": "^3.2.24",
"@types/micromatch": "^4.0.6"
"@types/micromatch": "^4.0.6",
"mongodb-memory-server": "^9.1.8"
},
"optionalDependencies": {
"ts-node": "^10.9.2"
Expand Down
119 changes: 98 additions & 21 deletions packages/cli/src/tasks/MongoDumpTask.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
import { AsyncProcess } from "../utils/async-process";
import { ensureEmptyDir, fetchData, mkdirIfNotExists } from "../utils/fs";
import {
ResolveDatabaseNameParams,
resolveDatabaseName,
} from "../utils/datatruck/config";
import { AppError } from "../utils/error";
import { ensureEmptyDir, mkdirIfNotExists } from "../utils/fs";
import { MongoUriObject, resolveMongoUri, toMongoUri } from "../utils/mongodb";
import { mkTmpDir } from "../utils/temp";
import { TaskBackupData, TaskRestoreData, TaskAbstract } from "./TaskAbstract";
import {
TaskBackupData,
TaskRestoreData,
TaskAbstract,
TaskPrepareRestoreData,
} from "./TaskAbstract";
import { readdir, rename, rmdir } from "fs/promises";
import { MongoClient } from "mongodb";
import { join } from "path";

export type MongoDumpTaskConfig = {
uri: string | MongoUriObject;
command?: string;
hostname?: string;
port?: number;
username?: string;
password?: string | { path: string };
compress?: boolean;
concurrency?: number;
targetDatabase?: {
name: string;
};
};

export const mongodumpTaskName = "mongo-dump";
Expand All @@ -31,28 +45,24 @@ export class MongoDumpTask extends TaskAbstract<MongoDumpTaskConfig> {
await mkdirIfNotExists(snapshotPath);
await ensureEmptyDir(snapshotPath);

const config = await resolveMongoUri(this.config.uri);
const p = new AsyncProcess(
this.command,
[
...(this.config.hostname ? ["/h", this.config.hostname] : []),
...(this.config.port ? ["/p", this.config.port] : []),
...(this.config.username ? ["/u", this.config.username] : []),
...(config.host ? ["/h", config.host] : []),
...(config.port ? [`/port:${config.port}`] : []),
...["/authenticationDatabase:admin"],
...["/d", config.database],
...(config.username ? ["/u", config.username] : []),
...(this.config.compress ? ["/gzip"] : []),
...(this.config.concurrency ? ["/j", this.config.concurrency] : []),
"/o",
snapshotPath,
],
{
$log: this.verbose,
},
{ $log: this.verbose },
);

const password =
this.config.password !== undefined
? (await fetchData(this.config.password, (p) => p.path)) ?? ""
: "";

p.stdin.writable.write(`${password}\n`);
p.stdin.writable.write(`${config.password ?? ""}\n`);

await p.stderr.parseLines((line) => {
data.onProgress({
Expand All @@ -61,11 +71,78 @@ export class MongoDumpTask extends TaskAbstract<MongoDumpTaskConfig> {
},
});
});

const tmpDir = join(snapshotPath, config.database);
for (const file of await readdir(tmpDir))
await rename(join(tmpDir, file), join(snapshotPath, file));
await rmdir(tmpDir);
return { snapshotPath };
}

override async prepareRestore(data: TaskPrepareRestoreData) {
return {
snapshotPath:
data.package.restorePath ??
(await mkTmpDir(mongodumpTaskName, "task", "restore", "snapshot")),
};
}
override async restore(data: TaskRestoreData) {
throw new Error("Not implemented");
this.verbose = data.options.verbose;

const config = await resolveMongoUri(this.config.uri);
const uri = toMongoUri(config);
const client = new MongoClient(`${uri}?authSource=admin`);

const params: ResolveDatabaseNameParams = {
packageName: data.package.name,
snapshotId: data.options.id,
snapshotDate: data.snapshot.date,
action: "restore",
database: undefined,
};

const database = {
name: resolveDatabaseName(config.database, params),
};

if (this.config.targetDatabase && !data.options.initial)
database.name = resolveDatabaseName(this.config.targetDatabase.name, {
...params,
database: database.name,
});

const restoreCollections = (await readdir(data.snapshotPath))
.filter((name) => name.endsWith(".bson"))
.map((name) => name.replace(/\.bson$/, ""));

const collections: string[] = (
await client.db(database.name).collections()
).map((v) => v.collectionName);

const duplicatedCollections = restoreCollections.filter((v) =>
collections.includes(v),
);

if (duplicatedCollections.length)
throw new AppError(
`Target collections already exists: ${duplicatedCollections.join(", ")}`,
);

const p = new AsyncProcess(
"mongorestore",
[
...(config.host ? ["/h", config.host] : []),
...(config.port ? [`/port:${config.port}`] : []),
...["/authenticationDatabase:admin"],
...["/d", database.name],
...(config.username ? ["/u", config.username] : []),
...(this.config.compress ? ["/gzip"] : []),
...(this.config.concurrency ? ["/j", this.config.concurrency] : []),
data.snapshotPath,
],
{ $log: this.verbose },
);

p.stdin.writable.write(`${config.password ?? ""}\n`);

await p.waitForClose();
}
}
43 changes: 43 additions & 0 deletions packages/cli/src/utils/mongodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { fetchData } from "./fs";

export type MongoUriObject<Resolved = false> = {
host: string;
username?: string;
password?: [Resolved] extends [true] ? string : string | { path: string };
port?: number;
database: string;
};

export function toMongoUri(object: MongoUriObject<true>) {
const url = new URL(`mongodb://${object.host}`);
if (typeof object.username === "string") url.username = object.username;
if (typeof object.password === "string") url.password = object.password;
if (typeof object.port === "number") url.port = object.port.toString();
url.pathname = `/${object.database}`;
return url.href;
}

export async function resolveMongoUri(
input: string | MongoUriObject,
): Promise<MongoUriObject<true>> {
let object: MongoUriObject;
if (typeof input === "string") {
const url = new URL(input);
object = {
host: url.hostname,
password: url.password,
port: url.port ? Number(url.port) : undefined,
username: url.username,
database: url.pathname.slice(1),
};
} else {
object = input;
}
return {
...object,
password:
object.password !== undefined
? (await fetchData(object.password, (p) => p.path)) ?? ""
: "",
};
}
130 changes: 130 additions & 0 deletions packages/cli/test/mongo-dump-task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { createCommands } from "../src/utils/datatruck/command";
import { parseStringList } from "../src/utils/string";
import { makeConfig, makeRepositoryConfig, testRepositoryTypes } from "./util";
import { MongoClient } from "mongodb";
import { MongoMemoryServer } from "mongodb-memory-server";
import { describe, expect, it } from "vitest";

const repositoryTypes = parseStringList(
process.env.DTT_REPO,
testRepositoryTypes,
true,
);

const dataFormats = parseStringList(
process.env.DTT_DATA_FORMAT,
["defaults" as const],
process.env.CI ? ["defaults"] : true,
);

describe(
"mongo-dump-task",
{
timeout: 300_000,
},
() => {
it.each(
repositoryTypes.flatMap((repositoryType) =>
dataFormats.map((dataFormat) => ({
repositoryType,
dataFormat,
})),
),
)("backup and restore $repositoryType", async ({ repositoryType }) => {
const verbose = 1;
const dbName = `tmp_dtt_db`;
const mongoServer = await MongoMemoryServer.create({
auth: {
enable: true,
extraUsers: [
{
createUser: "test",
pwd: "test",
roles: [{ role: "root", db: "admin" }],
},
],
},
});
const port = Number(new URL(mongoServer.getUri()).port);
try {
const client = new MongoClient(`mongodb://test:[email protected]:${port}`);

const db = client.db(dbName);
const sourceData: Record<string, Record<string, any>[]> = {
col1: [
{ _id: 1, value: null },
{ _id: 2, value: "a" },
{ _id: 3, value: '"with\' quotes"' },
{ _id: 4, value: '"with\nline\r\nsalts"' },
{ _id: 5, value: '"\ttext' },
{ _id: 6, value: null },
{ _id: 7, value: "a\nb" },
{ _id: 8, value: "»finish" },
],
col2: [{ _id: 3, value: "b" }],
};

for (const collection of Object.keys(sourceData)) {
for (const row of sourceData[collection]) {
db.collection(collection).insertOne(row);
}
}

const config = await makeConfig({
repositories: [await makeRepositoryConfig(repositoryType)],
packages: [
{
name: "main/mongo-dump",
repositoryNames: [repositoryType],
task: {
name: "mongo-dump",
config: {
uri: {
database: dbName,
host: "127.0.0.1",
port,
username: "test",
password: "test",
},
targetDatabase: {
name: `${dbName}_{snapshotId}`,
},
},
},
},
],
});

const dtt = createCommands({ config, verbose });
await dtt.init({});
await dtt.backup({});
const snapshots = await dtt.snapshots({});
expect(snapshots).toHaveLength(1);
const [snapshot] = snapshots;

await dtt.restore({ id: snapshot.id });

const restoredDbName = `${dbName}_${snapshot.id}`;
const collections = (await client.db(restoredDbName).collections())
.map((col) => col.collectionName)
.sort();

expect(collections.join()).toBe(Object.keys(sourceData).sort().join());

for (const collection in sourceData) {
const rows = await client
.db(restoredDbName)
.collection(collection)
.aggregate([{ $sort: { _id: 1 } }])
.toArray();

expect(JSON.stringify(rows)).toBe(
JSON.stringify(sourceData[collection]),
);
}
} finally {
await mongoServer.stop();
}
});
},
);
Loading

0 comments on commit 5ff640a

Please sign in to comment.