Skip to content

Commit

Permalink
chore(doc-storage): organize code
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Oct 22, 2024
1 parent b0c2dd3 commit a4bc542
Show file tree
Hide file tree
Showing 33 changed files with 1,481 additions and 785 deletions.
21 changes: 1 addition & 20 deletions packages/backend/server/src/core/sync/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ interface UpdateAwarenessMessage {
docId: string;
awarenessUpdate: string;
}

@WebSocketGateway()
export class SpaceSyncGateway
implements OnGatewayConnection, OnGatewayDisconnect
Expand Down Expand Up @@ -181,26 +182,6 @@ export class SpaceSyncGateway
}
}

async joinWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.join(room);
}

async leaveWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.leave(room);
}

assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
if (!client.rooms.has(room)) {
throw new NotInSpace({ spaceId: room.split(':')[0] });
}
}

// v3
@SubscribeMessage('space:join')
async onJoinSpace(
Expand Down
97 changes: 97 additions & 0 deletions packages/common/doc-storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# @affine/doc-storage

## Storages

### StorageNode

```ts
import { StorageManager } from '@affine/doc-storage';

class CloudStorageManager extends StorageManager {
private readonly socket = io('http://endpoint');

constructor(options: StorageOptions) {
super(options);
this.add(
'doc',
new CloudDocStorage({
...this.options,
socket: this.socket,
})
);
this.add(
'blob',
new CloudBlobStorage({
...this.options,
socket: this.socket,
})
);
}

override async doConnect() {
await this.socket.connect();
}

override async doDisconnect() {
await this.socket.close();
}
}
```

### StorageEdgeNode

```ts
interface SqliteStorageOptions extends StorageOptions {
dbPath: string;
}

class SqliteStorage extends CloudStorageManager<SqliteStorageOptions> {
constructor(options: StorageOptions) {
super(options);
this.db = new Sqlite(this.options.dbPath);

this.add('doc', new SqliteDocStorage({ ...this.options, db: this.db }));
this.add('blob', new SqliteBlobStorage({ ...this.options, db: this.db }));
this.add('sync', new SqliteSyncStorage({ ...this.options, db: this.db }));
}

override async doConnect() {
await this.db.connect();
}

override async doDisconnect() {
await this.db.close();
}
}
```

## Compose storages

```ts
interface SqliteStorageOptions extends StorageOptions {
dbPath: string;
}

class SqliteStorage extends CloudStorageManager<SqliteStorageOptions> {
idb!: SpaceIDB | null = null;

constructor(options: StorageOptions) {
super(options);
this.db = new Sqlite(this.options.dbPath);

this.add('doc', new SqliteDocStorage({ ...this.options, db: this.db }));
}

override async doConnect() {
await this.db.connect();
this.idb = await IDBProtocol.open(`${this.spaceType}:${this.spaceId}`);
this.add('blob', new IDBBlobStorage({ ...this.options, idb: this.idb }));
}

override async doDisconnect() {
await this.db.close();
await this.idb?.close();
this.remove('blob');
}
}
```
75 changes: 23 additions & 52 deletions packages/common/doc-storage/src/impls/cloud/blob.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,27 @@
import {
type Blob,
BlobStorage,
type DocStorageOptions,
type BlobStorageOptions,
type ListedBlob,
} from '../../storage';
import type { Socket } from './socket';
import {
base64ToUint8Array,
type ServerEventsMap,
type Socket,
uint8ArrayToBase64,
} from './socket';

interface CloudBlobStorageOptions extends DocStorageOptions {
interface CloudBlobStorageOptions extends BlobStorageOptions {
socket: Socket;
}

export class CloudBlobStorage extends BlobStorage<CloudBlobStorageOptions> {
get socket() {
private get socket() {
return this.options.socket;
}

override async connect(): Promise<void> {
// the event will be polled, there is no need to wait for socket to be connected
await this.clientHandShake();
// this.socket.on('space:broadcast-blob-update', this.onServerUpdates);
}

private async clientHandShake() {
const res = await this.socket.emitWithAck('space:join', {
override async doConnect(): Promise<void> {
const res = await this.socket.emitWithAck('space:join-blob', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
Expand All @@ -32,24 +31,25 @@ export class CloudBlobStorage extends BlobStorage<CloudBlobStorageOptions> {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(res.error.message);
}
this.socket.on('space:broadcast-blob-update', this.onServerUpdate);
}

override async disconnect(): Promise<void> {
this.socket.emit('space:leave', {
override async doDisconnect(): Promise<void> {
this.socket.emit('space:leave-blob', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
// this.socket.off('space:broadcast-doc-updates', this.onServerUpdate);
this.socket.off('space:broadcast-blob-update', this.onServerUpdate);
}

// onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => {
// if (
// this.spaceType === message.spaceType &&
// this.spaceId === message.spaceId
// ) {
// // how do we deal with the data?
// }
// };
onServerUpdate: ServerEventsMap['space:broadcast-blob-update'] = message => {
if (
this.spaceType === message.spaceType &&
this.spaceId === message.spaceId
) {
// how do we deal with the data?
}
};

override async getBlob(key: string): Promise<Blob | null> {
const res = await this.socket.emitWithAck('space:get-blob', {
Expand Down Expand Up @@ -109,32 +109,3 @@ export class CloudBlobStorage extends BlobStorage<CloudBlobStorageOptions> {
return res.data;
}
}

export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);

const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};

reader.readAsDataURL(blob);
});
}

export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
58 changes: 14 additions & 44 deletions packages/common/doc-storage/src/impls/cloud/doc.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,44 @@
import { DocStorage, type DocStorageOptions } from '../../storage';
import type { ServerEventsMap, Socket } from './socket';
import {
base64ToUint8Array,
type ServerEventsMap,
type Socket,
uint8ArrayToBase64,
} from './socket';

interface CloudDocStorageOptions extends DocStorageOptions {
endpoint: string;
socket: Socket;
}

export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
get name() {
// @ts-expect-error we need it
return this.options.socket.io.uri;
}

get socket() {
private get socket() {
return this.options.socket;
}

override async connect(): Promise<void> {
// the event will be polled, there is no need to wait for socket to be connected
await this.clientHandShake();
this.socket.on('space:broadcast-doc-updates', this.onServerUpdates);
get name() {
return this.options.endpoint;
}

private async clientHandShake() {
override async doConnect(): Promise<void> {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});

if ('error' in res) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(res.error.message);
}
this.socket?.on('space:broadcast-doc-updates', this.onServerUpdates);
}

override async disconnect(): Promise<void> {
override async doDisconnect(): Promise<void> {
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
this.socket.off('space:broadcast-doc-updates', this.onServerUpdates);
this.socket?.off('space:broadcast-doc-updates', this.onServerUpdates);
}

onServerUpdates: ServerEventsMap['space:broadcast-doc-updates'] = message => {
Expand Down Expand Up @@ -152,32 +151,3 @@ export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
return false;
}
}

export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);

const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};

reader.readAsDataURL(blob);
});
}

export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
1 change: 1 addition & 0 deletions packages/common/doc-storage/src/impls/cloud/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './blob';
export * from './doc';
Loading

0 comments on commit a4bc542

Please sign in to comment.