Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nbstore): init #7639

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const allPackages = [
'packages/common/debug',
'packages/common/env',
'packages/common/infra',
'packages/common/theme',
'packages/common/nbstore',
'tools/cli',
];

Expand Down
69 changes: 69 additions & 0 deletions packages/common/nbstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Space Storage

## Usage

### Independent Storage usage

```ts
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';

const storage = new IndexedDBDocStorage({
peer: 'local'
spaceId: 'my-new-workspace',
});

await storage.connect();
storage.connection.onStatusChange((status: ConnectionStatus, error?: Error) => {
ui.show(status, error);
});

// { docId: string, bin: Uint8Array, timestamp: Date, editor?: string } | null
const doc = await storage.getDoc('my-first-doc');
```

### Use All storages together

```ts
import { SpaceStorage } from '@affine/nbstore';
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';
import { SqliteBlobStorage } from '@affine/nbstore/sqlite';

const storage = new SpaceStorage([new IndexedDBDocStorage({}), new SqliteBlobStorage({})]);

await storage.connect();
storage.on('connection', ({ storage, status, error }) => {
ui.show(storage, status, error);
});

await storage.get('doc').pushDocUpdate({ docId: 'my-first-doc', bin: new Uint8Array(), editor: 'me' });
await storage.tryGet('blob')?.get('img');
```

### Put Storage behind Worker

```ts
import { SpaceStorageWorkerClient } from '@affine/nbstore/op';
import type { ConnectionStatus } from '@affine/nbstore';
import { IndexedDBDocStorage } from '@affine/nbstore/idb';

const client = new SpaceStorageWorkerClient();
client.addStorage(IndexedDBDocStorage, {
// options can only be structure-cloneable type
peer: 'local',
spaceType: 'workspace',
spaceId: 'my-new-workspace',
});

await client.connect();
client.ob$('connection', ({ storage, status, error }) => {
ui.show(storage, status, error);
});

await client.call('pushDocUpdate', { docId: 'my-first-doc', bin: new Uint8Array(), editor: 'me' });

// call unregistered op will leads to Error
// Error { message: 'Handler for operation [listHistory] is not registered.' }
await client.call('listHistories', { docId: 'my-first-doc' });
```
17 changes: 17 additions & 0 deletions packages/common/nbstore/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "@affine/nbstore",
"type": "module",
"version": "0.18.0",
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"@toeverything/infra": "workspace:*",
"eventemitter2": "^6.4.9",
"lodash-es": "^4.17.21",
"rxjs": "^7.8.1",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
}
}
132 changes: 132 additions & 0 deletions packages/common/nbstore/src/connection/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import EventEmitter2 from 'eventemitter2';

export type ConnectionStatus =
| 'idle'
| 'connecting'
| 'connected'
| 'error'
| 'closed';

export abstract class Connection<T = any> {
private readonly event = new EventEmitter2();
private _inner: T | null = null;
private _status: ConnectionStatus = 'idle';
protected error?: Error;
private refCount = 0;

constructor() {
this.autoReconnect();
}

get shareId(): string | undefined {
return undefined;
}

get maybeConnection() {
return this._inner;
}

get inner(): T {
if (!this._inner) {
throw new Error(
`Connection ${this.constructor.name} has not been established.`
);
}

return this._inner;
}

protected set inner(inner: T | null) {
this._inner = inner;
}

get status() {
return this._status;
}

protected setStatus(status: ConnectionStatus, error?: Error) {
const shouldEmit = status !== this._status && error !== this.error;
this._status = status;
this.error = error;
if (shouldEmit) {
this.emitStatusChanged(status, error);
}
}

abstract doConnect(): Promise<T>;
abstract doDisconnect(conn: T): Promise<void>;

ref() {
this.refCount++;
}

deref() {
this.refCount = Math.max(0, this.refCount - 1);
}

async connect() {
if (this.status === 'idle' || this.status === 'error') {
this.setStatus('connecting');
try {
this._inner = await this.doConnect();
this.setStatus('connected');
} catch (error) {
this.setStatus('error', error as any);
}
}
}

async disconnect() {
this.deref();
if (this.refCount > 0) {
return;
}

if (this.status === 'connected') {
try {
if (this._inner) {
await this.doDisconnect(this._inner);
this._inner = null;
}
this.setStatus('closed');
} catch (error) {
this.setStatus('error', error as any);
}
}
}

private autoReconnect() {
// TODO:
// - maximum retry count
// - dynamic sleep time (attempt < 3 ? 1s : 1min)?
this.onStatusChanged(() => {
this.connect().catch(() => {});
});
}

onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
this.event.on('statusChanged', cb);
return () => {
this.event.off('statusChanged', cb);
};
}

private readonly emitStatusChanged = (
status: ConnectionStatus,
error?: Error
) => {
this.event.emit('statusChanged', status, error);
};
}

export class DummyConnection extends Connection<undefined> {
doConnect() {
return Promise.resolve(undefined);
}

doDisconnect() {
return Promise.resolve(undefined);
}
}
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './connection';
export * from './shared-connection';
22 changes: 22 additions & 0 deletions packages/common/nbstore/src/connection/shared-connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { Connection } from './connection';

const CONNECTIONS: Map<string, Connection<any>> = new Map();
export function share<T extends Connection<any>>(conn: T): T {
if (!conn.shareId) {
throw new Error(
`Connection ${conn.constructor.name} is not shareable.\nIf you want to make it shareable, please override [shareId].`
);
}

const existing = CONNECTIONS.get(conn.shareId);

if (existing) {
existing.ref();
return existing as T;
}

CONNECTIONS.set(conn.shareId, conn);
conn.ref();

return conn;
}
2 changes: 2 additions & 0 deletions packages/common/nbstore/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './connection';
export * from './storage';
29 changes: 29 additions & 0 deletions packages/common/nbstore/src/storage/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Storage, type StorageOptions } from './storage';

export interface BlobStorageOptions extends StorageOptions {}

export interface BlobRecord {
key: string;
data: Uint8Array;
mime: string;
createdAt: Date;
}

export interface ListedBlobRecord {
key: string;
mime: string;
size: number;
createdAt: Date;
}

export abstract class BlobStorage<
Options extends BlobStorageOptions = BlobStorageOptions,
> extends Storage<Options> {
override readonly storageType = 'blob';

abstract get(key: string): Promise<BlobRecord | null>;
abstract set(blob: BlobRecord): Promise<void>;
abstract delete(key: string, permanently: boolean): Promise<void>;
abstract release(): Promise<void>;
abstract list(): Promise<ListedBlobRecord[]>;
}
Loading
Loading