Skip to content

Commit

Permalink
Works as a standalone library now. Pending review to merge to staging.
Browse files Browse the repository at this point in the history
callers and handlers are now refactored

* WIP - Newline now works, refers issue #1

node v20 fix

feat: handlers implementations are now abstract arrow functions

* Fixes #5

[ci skip]

* resolves issue 5, makes RPC handlers abstract arrow function properties

feat: rename to uppercase

[ci skip]

fix: handler export fix

[ci skip]

fix: tsconf from quic

[ci skip]

fix: dependencies (js quic), events and errors versions, changing to relative imports, jest dev dependency, js-quic tsconfig

[ci skip]

fix: tests imports, using @

[ci skip]

chore: removed sysexits

chore: fix default exports for callers and handlers
Fixed index for handlers

fix: remove @matrixai/id

fix: remove @matrixai/id and ix

chore : diagram

[ci skip]

chore : lintfix
fix: errors now extend AbstractError

[ci skip]

fix: undoing fix #1

[ci skip]

replacd errorCode with just code, references std error codes from rpc spec

feat: events based createDestroy

[ci skip]

chore: img format fix

[ci skip]

chore: img in README.md

[ci skip]

feat: allows the user to pass in a generator function if the user wishes to specify a particular id

[ci skip]

fix: fixes #7

* Removes graceTimer and related jests

chore: idGen name change. idGen parameter in creation and constructor. No longer optional. Only defaulted in one place.

wip: added idgen to jests, was missing.

[ci skip]

wip: reimported ix, since a few tests rely on it.
removed, matrixai/id

wip: jests for #4
removed, matrixai/id

wip: * Implements custom RPC Error codes.
     * Fixed jest for concurrent timeouts
     * All errors now have a cause
     * All errors now use custom error codes.

wip: *Client uses ctx timer now

wip: *Jests to test concurrency

wip: *custom RPC based errors for RPC Client, now all errors have a cause and an error message

WIP: * Refactor out sensitiveReplacer

WIP: * Refactor out sensitiveReplacer

WIP: * Update to latest async init and events
* set default timeout to Infinity
* jest to check server and client with infinite timeout
* fixing jests which broke after changing default timeout to infinity

WIP: f1x #4

WIP: f1x #11

f1x: parameterize toError, fromError and replacer

wip: tofrom

fix: parameterize toError, fromError and replacer

fix: Makes concurrent jests non deterministic

* Related #4

fix: parameterize replacer toError and fromError, change fromError to return JSONValue, stringify fromError usages

* Related #10

fix: Converted global state for fromError to handle it internally.

*Related: #10
Reviewed-by: @tegefaulkes
[ci skip]

chore: Jests for fromError and toError, and using a custom replacer.

related: #10

[ci skip]
  • Loading branch information
addievo committed Sep 26, 2023
1 parent c162788 commit f30da25
Show file tree
Hide file tree
Showing 37 changed files with 1,944 additions and 1,247 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ npm publish --access public
git push
git push --tags
```

Domains Diagram:
![diagram_encapuslated.svg](images%2Fdiagram_encapuslated.svg)
17 changes: 17 additions & 0 deletions images/diagram_encapuslated.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@
"ts-node": "^10.9.1",
"tsconfig-paths": "^3.9.0",
"typedoc": "^0.23.21",
"typescript": "^4.9.3"
"typescript": "^4.9.3",
"@fast-check/jest": "^1.1.0"
},
"dependencies": {
"@fast-check/jest": "^1.7.2",
"@matrixai/async-init": "^1.9.1",
"@matrixai/async-init": "^1.9.4",
"@matrixai/contexts": "^1.2.0",
"@matrixai/id": "^3.3.6",
"@matrixai/logger": "^3.1.0",
"@matrixai/errors": "^1.2.0",
"@matrixai/events": "^3.2.0",
"@streamparser/json": "^0.0.17",
"ix": "^5.0.0"
}
Expand Down
114 changes: 91 additions & 23 deletions src/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,45 @@ import type {
RPCStream,
JSONRPCResponseResult,
} from './types';
import type { JSONValue } from './types';
import type { JSONValue, IdGen } from './types';
import type {
JSONRPCRequest,
JSONRPCResponse,
MiddlewareFactory,
MapCallers,
} from './types';
import type { ErrorRPCRemote } from './errors';
import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy';
import Logger from '@matrixai/logger';
import { Timer } from '@matrixai/timer';
import { createDestroy } from '@matrixai/async-init';
import * as rpcUtilsMiddleware from './utils/middleware';
import * as rpcErrors from './errors';
import * as rpcUtils from './utils/utils';
import { promise } from './utils';
import { never } from './errors';
import { ErrorRPCStreamEnded, never } from './errors';
import * as events from './events';

const timerCleanupReasonSymbol = Symbol('timerCleanUpReasonSymbol');

// eslint-disable-next-line
interface RPCClient<M extends ClientManifest> extends CreateDestroy {}
@CreateDestroy()
/**
* Events:
* - {@link events.Event}
*/
interface RPCClient<M extends ClientManifest>
extends createDestroy.CreateDestroy {}
/**
* You must provide an error handler `addEventListener('error')`.
* Otherwise, errors will just be ignored.
*
* Events:
* - {@link events.EventRPCClientDestroy}
* - {@link events.EventRPCClientDestroyed}
*/
@createDestroy.CreateDestroy({
eventDestroy: events.EventRPCClientDestroy,
eventDestroyed: events.EventRPCClientDestroyed,
})
class RPCClient<M extends ClientManifest> {
/**
* @param obj
Expand All @@ -49,8 +67,9 @@ class RPCClient<M extends ClientManifest> {
manifest,
streamFactory,
middlewareFactory = rpcUtilsMiddleware.defaultClientMiddlewareWrapper(),
streamKeepAliveTimeoutTime = 60_000, // 1 minute
streamKeepAliveTimeoutTime = Infinity, // 1 minute
logger = new Logger(this.name),
idGen = () => Promise.resolve(null),
}: {
manifest: M;
streamFactory: StreamFactory;
Expand All @@ -62,6 +81,8 @@ class RPCClient<M extends ClientManifest> {
>;
streamKeepAliveTimeoutTime?: number;
logger?: Logger;
idGen: IdGen;
toError?: (errorData, metadata?: JSONValue) => ErrorRPCRemote<unknown>;
}) {
logger.info(`Creating ${this.name}`);
const rpcClient = new this({
Expand All @@ -70,11 +91,13 @@ class RPCClient<M extends ClientManifest> {
middlewareFactory,
streamKeepAliveTimeoutTime: streamKeepAliveTimeoutTime,
logger,
idGen,
});
logger.info(`Created ${this.name}`);
return rpcClient;
}

protected onTimeoutCallback?: () => void;
protected idGen: IdGen;
protected logger: Logger;
protected streamFactory: StreamFactory;
protected middlewareFactory: MiddlewareFactory<
Expand All @@ -84,6 +107,10 @@ class RPCClient<M extends ClientManifest> {
Uint8Array
>;
protected callerTypes: Record<string, HandlerType>;
toError: (errorData: any, metadata?: JSONValue) => Error;
public registerOnTimeoutCallback(callback: () => void) {
this.onTimeoutCallback = callback;
}
// Method proxies
public readonly streamKeepAliveTimeoutTime: number;
public readonly methodsProxy = new Proxy(
Expand Down Expand Up @@ -116,6 +143,8 @@ class RPCClient<M extends ClientManifest> {
middlewareFactory,
streamKeepAliveTimeoutTime,
logger,
idGen = () => Promise.resolve(null),
toError,
}: {
manifest: M;
streamFactory: StreamFactory;
Expand All @@ -127,20 +156,39 @@ class RPCClient<M extends ClientManifest> {
>;
streamKeepAliveTimeoutTime: number;
logger: Logger;
idGen: IdGen;
toError?: (errorData, metadata?: JSONValue) => ErrorRPCRemote<unknown>;
}) {
this.idGen = idGen;
this.callerTypes = rpcUtils.getHandlerTypes(manifest);
this.streamFactory = streamFactory;
this.middlewareFactory = middlewareFactory;
this.streamKeepAliveTimeoutTime = streamKeepAliveTimeoutTime;
this.logger = logger;
this.toError = toError || rpcUtils.toError;
}

public async destroy(): Promise<void> {
public async destroy({
errorCode = rpcErrors.JSONRPCErrorCode.RPCStopping,
errorMessage = '',
force = true,
}: {
errorCode?: number;
errorMessage?: string;
force?: boolean;
} = {}): Promise<void> {
this.logger.info(`Destroying ${this.constructor.name}`);

// You can dispatch an event before the actual destruction starts
this.dispatchEvent(new events.EventRPCClientDestroy());

// Dispatch an event after the client has been destroyed
this.dispatchEvent(new events.EventRPCClientDestroyed());

this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new rpcErrors.ErrorRPCDestroyed())
@ready(new rpcErrors.ErrorRPCCallerFailed())
public get methods(): MapCallers<M> {
return this.methodsProxy as MapCallers<M>;
}
Expand All @@ -154,7 +202,7 @@ class RPCClient<M extends ClientManifest> {
* the provided I type.
* @param ctx - ContextTimed used for timeouts and cancellation.
*/
@ready(new rpcErrors.ErrorRPCDestroyed())
@ready(new rpcErrors.ErrorMissingCaller())
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
Expand All @@ -167,7 +215,9 @@ class RPCClient<M extends ClientManifest> {
await writer.write(parameters);
const output = await reader.read();
if (output.done) {
throw new rpcErrors.ErrorRPCMissingResponse();
throw new rpcErrors.ErrorMissingCaller('Missing response', {
cause: ctx.signal?.reason,
});
}
await reader.cancel();
await writer.close();
Expand All @@ -189,7 +239,7 @@ class RPCClient<M extends ClientManifest> {
* the provided I type.
* @param ctx - ContextTimed used for timeouts and cancellation.
*/
@ready(new rpcErrors.ErrorRPCDestroyed())
@ready(new rpcErrors.ErrorRPCCallerFailed())
public async serverStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
Expand Down Expand Up @@ -218,7 +268,7 @@ class RPCClient<M extends ClientManifest> {
* @param method - Method name of the RPC call
* @param ctx - ContextTimed used for timeouts and cancellation.
*/
@ready(new rpcErrors.ErrorRPCDestroyed())
@ready(new rpcErrors.ErrorRPCCallerFailed())
public async clientStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
ctx: Partial<ContextTimedInput> = {},
Expand All @@ -230,7 +280,9 @@ class RPCClient<M extends ClientManifest> {
const reader = callerInterface.readable.getReader();
const output = reader.read().then(({ value, done }) => {
if (done) {
throw new rpcErrors.ErrorRPCMissingResponse();
throw new rpcErrors.ErrorMissingCaller('Missing response', {
cause: ctx.signal?.reason,
});
}
return value;
});
Expand All @@ -251,7 +303,7 @@ class RPCClient<M extends ClientManifest> {
* @param method - Method name of the RPC call
* @param ctx - ContextTimed used for timeouts and cancellation.
*/
@ready(new rpcErrors.ErrorRPCDestroyed())
@ready(new rpcErrors.ErrorRPCCallerFailed())
public async duplexStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
ctx: Partial<ContextTimedInput> = {},
Expand Down Expand Up @@ -294,10 +346,16 @@ class RPCClient<M extends ClientManifest> {
signal.addEventListener('abort', abortRacePromHandler);
};
// Setting up abort events for timeout
const timeoutError = new rpcErrors.ErrorRPCTimedOut();
const timeoutError = new rpcErrors.ErrorRPCTimedOut(
'Error RPC has timed out',
{ cause: ctx.signal?.reason },
);
void timer.then(
() => {
abortController.abort(timeoutError);
if (this.onTimeoutCallback) {
this.onTimeoutCallback();
}
},
() => {}, // Ignore cancellation error
);
Expand All @@ -310,13 +368,17 @@ class RPCClient<M extends ClientManifest> {
} catch (e) {
cleanUp();
void streamFactoryProm.then((stream) =>
stream.cancel(Error('TMP stream timed out early')),
stream.cancel(ErrorRPCStreamEnded),
);
throw e;
}
void timer.then(
() => {
rpcStream.cancel(new rpcErrors.ErrorRPCTimedOut());
rpcStream.cancel(
new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
cause: ctx.signal?.reason,
}),
);
},
() => {}, // Ignore cancellation error
);
Expand Down Expand Up @@ -379,8 +441,9 @@ class RPCClient<M extends ClientManifest> {
* single RPC message that is sent to specify the method for the RPC call.
* Any metadata of extra parameters is provided here.
* @param ctx - ContextTimed used for timeouts and cancellation.
* @param id - Id is generated only once, and used throughout the stream for the rest of the communication
*/
@ready(new rpcErrors.ErrorRPCDestroyed())
@ready(new rpcErrors.ErrorRPCCallerFailed())
public async rawStreamCaller(
method: string,
headerParams: JSONValue,
Expand Down Expand Up @@ -430,7 +493,9 @@ class RPCClient<M extends ClientManifest> {
signal.addEventListener('abort', abortRacePromHandler);
};
// Setting up abort events for timeout
const timeoutError = new rpcErrors.ErrorRPCTimedOut();
const timeoutError = new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
cause: ctx.signal?.reason,
});
void timer.then(
() => {
abortController.abort(timeoutError);
Expand All @@ -457,11 +522,12 @@ class RPCClient<M extends ClientManifest> {
abortProm.p,
]);
const tempWriter = rpcStream.writable.getWriter();
const id = await this.idGen();
const header: JSONRPCRequestMessage = {
jsonrpc: '2.0',
method,
params: headerParams,
id: null,
id,
};
await tempWriter.write(Buffer.from(JSON.stringify(header)));
tempWriter.releaseLock();
Expand All @@ -484,11 +550,13 @@ class RPCClient<M extends ClientManifest> {
...(rpcStream.meta ?? {}),
command: method,
};
throw rpcUtils.toError(messageValue.error.data, metadata);
throw this.toError(messageValue.error.data, metadata);
}
leadingMessage = messageValue;
} catch (e) {
rpcStream.cancel(Error('TMP received error in leading response'));
rpcStream.cancel(
new ErrorRPCStreamEnded('RPC Stream Ended', { cause: e }),
);
throw e;
}
tempReader.releaseLock();
Expand Down
Loading

0 comments on commit f30da25

Please sign in to comment.