Skip to content

Commit

Permalink
feat(nbstore): introduce standalone space store abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Nov 18, 2024
1 parent ffa4d54 commit b23d3df
Show file tree
Hide file tree
Showing 18 changed files with 885 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const allPackages = [
'packages/common/debug',
'packages/common/env',
'packages/common/infra',
'packages/common/theme',
'packages/common/nbstore',
'tools/cli',
];

Expand Down
69 changes: 69 additions & 0 deletions packages/common/nbstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Space Storage

## Usage

### Independent Storage usage

```ts
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';

const storage = new IndexedDBDocStorage({
peer: 'local'
spaceId: 'my-new-workspace',
});

await storage.connect();
storage.connection.onStatusChange((status: ConnectionStatus, error?: Error) => {
ui.show(status, error);
});

// { docId: string, bin: Uint8Array, timestamp: Date, editor?: string } | null
const doc = await storage.getDoc('my-first-doc');
```

### Use All storages together

```ts
import { SpaceStorage } from '@affine/nbstore';
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';
import { SqliteBlobStorage } from '@affine/nbstore/sqlite';

const storage = new SpaceStorage([new IndexedDBDocStorage({}), new SqliteBlobStorage({})]);

await storage.connect();
storage.on('connection', ({ storage, status, error }) => {
ui.show(storage, status, error);
});

await storage.get('doc').pushDocUpdate({ docId: 'my-first-doc', bin: new Uint8Array(), editor: 'me' });
await storage.tryGet('blob')?.get('img');
```

### Put Storage behind Worker

```ts
import { SpaceStorageWorkerClient } from '@affine/nbstore/op';
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';

const client = new SpaceStorageWorkerClient();
client.addStorage(IndexedDBDocStorage, {
// options can only be structure-cloneable type
peer: 'local',
spaceType: 'workspace',
spaceId: 'my-new-workspace',
});

await client.connect();
client.ob$('connection', ({ storage, status, error }) => {
ui.show(storage, status, error);
});

await client.call('pushDocUpdate', { docId: 'my-first-doc', bin: new Uint8Array(), editor: 'me' });

// call unregistered op will leads to Error
// Error { message: 'Handler for operation [listHistory] is not registered.' }
await client.call('listHistories', { docId: 'my-first-doc' });
```
17 changes: 17 additions & 0 deletions packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "@affine/nbstore",
"type": "module",
"version": "0.18.0",
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
}
}
132 changes: 132 additions & 0 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import EventEmitter2 from 'eventemitter2';

export type ConnectionStatus =
| 'idle'
| 'connecting'
| 'connected'
| 'error'
| 'closed';

export abstract class Connection<T = any> {
private readonly event = new EventEmitter2();
private _inner: T | null = null;
private _status: ConnectionStatus = 'idle';
protected error?: Error;
private refCount = 0;

constructor() {
this.autoReconnect();
}

get shareId(): string | undefined {
return undefined;
}

get maybeConnection() {
return this._inner;
}

get inner(): T {
if (!this._inner) {
throw new Error(
`Connection ${this.constructor.name} has not been established.`
);
}

return this._inner;
}

protected set inner(inner: T | null) {
this._inner = inner;
}

get status() {
return this._status;
}

protected setStatus(status: ConnectionStatus, error?: Error) {
const shouldEmit = status !== this._status && error !== this.error;
this._status = status;
this.error = error;
if (shouldEmit) {
this.emitStatusChanged(status, error);
}
}

abstract doConnect(): Promise<T>;
abstract doDisconnect(conn: T): Promise<void>;

ref() {
this.refCount++;
}

deref() {
this.refCount = Math.max(0, this.refCount - 1);
}

async connect() {
if (this.status === 'idle' || this.status === 'error') {
this.setStatus('connecting');
try {
this._inner = await this.doConnect();
this.setStatus('connected');
} catch (error) {
this.setStatus('error', error as any);
}
}
}

async disconnect() {
this.deref();
if (this.refCount > 0) {
return;
}

if (this.status === 'connected') {
try {
if (this._inner) {
await this.doDisconnect(this._inner);
this._inner = null;
}
this.setStatus('closed');
} catch (error) {
this.setStatus('error', error as any);
}
}
}

private autoReconnect() {
// TODO:
// - maximum retry count
// - dynamic sleep time (attempt < 3 ? 1s : 1min)?
this.onStatusChanged(() => {
this.connect().catch(() => {});
});
}

onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
this.event.on('statusChanged', cb);
return () => {
this.event.off('statusChanged', cb);
};
}

private readonly emitStatusChanged = (
status: ConnectionStatus,
error?: Error
) => {
this.event.emit('statusChanged', status, error);
};
}

export class DummyConnection extends Connection<undefined> {
doConnect() {
return Promise.resolve(undefined);
}

doDisconnect() {
return Promise.resolve(undefined);
}
}
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './connection';
export * from './shared-connection';
22 changes: 22 additions & 0 deletions packages/common/nbstore/src/connection/shared-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { Connection } from './connection';

const CONNECTIONS: Map<string, Connection<any>> = new Map();
export function share<T extends Connection<any>>(conn: T): T {
if (!conn.shareId) {
throw new Error(
`Connection ${conn.constructor.name} is not shareable.\nIf you want to make it shareable, please override [shareId].`
);
}

const existing = CONNECTIONS.get(conn.shareId);

if (existing) {
existing.ref();
return existing as T;
}

CONNECTIONS.set(conn.shareId, conn);
conn.ref();

return conn;
}
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './connection';
export * from './storage';
29 changes: 29 additions & 0 deletions packages/common/nbstore/src/storage/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Storage, type StorageOptions } from './storage';

export interface BlobStorageOptions extends StorageOptions {}

export interface BlobRecord {
key: string;
data: Uint8Array;
mime: string;
createdAt: Date;
}

export interface ListedBlobRecord {
key: string;
mime: string;
size: number;
createdAt: Date;
}

export abstract class BlobStorage<
Options extends BlobStorageOptions = BlobStorageOptions,
> extends Storage<Options> {
override readonly storageType = 'blob';

abstract get(key: string): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord): Promise<void>;
abstract delete(key: string, permanently: boolean): Promise<void>;
abstract release(): Promise<void>;
abstract list(): Promise<ListedBlobRecord[]>;
}
Loading

0 comments on commit b23d3df

Please sign in to comment.