Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use binary message RPC protocol for plugin API #11261

Merged
merged 1 commit into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<a name="breaking_changes_1.33.0">[Breaking Changes:](#breaking_changes_1.33.0)</a>

- [core] returns of many methods of `MenuModelRegistry` changed from `CompositeMenuNode` to `MutableCompoundMenuNode`. To mutate a menu, use the `updateOptions` method or add a check for `instanceof CompositeMenuNode`, which will be true in most cases.
- [plugin-ext] refactored the plugin RPC API - now also reuses the msgpackR based RPC protocol that is better suited for handling binary data and enables message tunneling [#11228](https://github.com/eclipse-theia/theia/pull/11261). All plugin protocol types now use `UInt8Array` as type for message parameters instead of `string` - Contributed on behalf of STMicroelectronics.

## v1.32.0 - 11/24/2022

Expand Down
25 changes: 20 additions & 5 deletions packages/core/src/common/message-rpc/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable } from '../../../shared/inversify';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
Expand Down Expand Up @@ -72,7 +71,6 @@ export type MessageProvider = () => ReadBuffer;
* Reusable abstract {@link Channel} implementation that sets up
* the basic channel event listeners and offers a generic close method.
*/
@injectable()
export abstract class AbstractChannel implements Channel {

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
Expand Down Expand Up @@ -101,7 +99,21 @@ export abstract class AbstractChannel implements Channel {
}

abstract getWriteBuffer(): WriteBuffer;
}

/**
* A very basic {@link AbstractChannel} implementation which takes a function
* for retrieving the {@link WriteBuffer} as constructor argument.
*/
export class BasicChannel extends AbstractChannel {

constructor(protected writeBufferProvider: () => WriteBuffer) {
super();
}

getWriteBuffer(): WriteBuffer {
return this.writeBufferProvider();
}
}

/**
Expand Down Expand Up @@ -194,7 +206,7 @@ export class ChannelMultiplexer implements Disposable {
return this.handleClose(id);
}
case MessageTypes.Data: {
return this.handleData(id, buffer.sliceAtReadPosition());
return this.handleData(id, buffer);
}
}
}
Expand All @@ -206,7 +218,7 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand Down Expand Up @@ -236,7 +248,7 @@ export class ChannelMultiplexer implements Disposable {
protected handleData(id: string, data: ReadBuffer): void {
const channel = this.openChannels.get(id);
if (channel) {
channel.onMessageEmitter.fire(() => data);
channel.onMessageEmitter.fire(() => data.sliceAtReadPosition());
}
}

Expand All @@ -263,6 +275,9 @@ export class ChannelMultiplexer implements Disposable {
}

open(id: string): Promise<Channel> {
if (this.openChannels.has(id)) {
throw new Error(`Another channel with the id '${id}' is already open.`);
}
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
Expand Down
8 changes: 6 additions & 2 deletions packages/core/src/common/message-rpc/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel';
export { AbstractChannel, Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';

import { registerMsgPackExtensions } from './rpc-message-encoder';

registerMsgPackExtensions();
70 changes: 70 additions & 0 deletions packages/core/src/common/message-rpc/msg-pack-extension-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// *****************************************************************************
// Copyright (C) 2022 STMicroelectronics and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { addExtension } from 'msgpackr';

/**
* Handles the global registration of custom MsgPackR extensions
* required for the default RPC communication. MsgPackR extensions
* are installed globally on both ends of the communication channel.
* (frontend-backend, pluginExt-pluginMain).
* Is implemented as singleton as it is also used in plugin child processes which have no access to inversify.
*/
export class MsgPackExtensionManager {
tsmaeder marked this conversation as resolved.
Show resolved Hide resolved
private static readonly INSTANCE = new MsgPackExtensionManager();
public static getInstance(): MsgPackExtensionManager {
return this.INSTANCE;
}
tortmayr marked this conversation as resolved.
Show resolved Hide resolved

private extensions = new Map<number, MsgPackExtension>();

private constructor() {
}

registerExtensions(...extensions: MsgPackExtension[]): void {
extensions.forEach(extension => {
if (extension.tag < 1 || extension.tag > 100) {
// MsgPackR reserves the tag range 1-100 for custom extensions.
throw new Error(`MsgPack extension tag should be a number from 1-100 but was '${extension.tag}'`);
}
if (this.extensions.has(extension.tag)) {
throw new Error(`Another MsgPack extension with the tag '${extension.tag}' is already registered`);
}
this.extensions.set(extension.tag, extension);
addExtension({
Class: extension.class,
type: extension.tag,
write: extension.serialize,
read: extension.deserialize
});
});
}

getExtension(tag: number): MsgPackExtension | undefined {
return this.extensions.get(tag);
}
}

export interface MsgPackExtension {
class: Function,
tag: number,
serialize(instance: unknown): unknown,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
deserialize(serialized: any): unknown
}

export type Constructor<T> = new (...params: unknown[]) => T;

46 changes: 24 additions & 22 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
// *****************************************************************************
/* eslint-disable @typescript-eslint/no-explicit-any */

import { addExtension, Packr as MsgPack } from 'msgpackr';
import { Packr as MsgPack } from 'msgpackr';
import { ReadBuffer, WriteBuffer } from './message-buffer';
import { MsgPackExtensionManager } from './msg-pack-extension-manager';

/**
* This code lets you encode rpc protocol messages (request/reply/notification/error/cancel)
Expand Down Expand Up @@ -121,27 +122,10 @@ export interface RpcMessageEncoder {
}

export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: false });
// Add custom msgpackR extension for ResponseErrors.
addExtension({
Class: ResponseError,
type: 1,
write: (instance: ResponseError) => {
const { code, data, message, name, stack } = instance;
return { code, data, message, name, stack };
},
read: data => {
const error = new ResponseError(data.code, data.message, data.data);
error.name = data.name;
error.stack = data.stack;
return error;
}
});

export class MsgPackMessageEncoder implements RpcMessageEncoder {

constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {

}
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
Expand Down Expand Up @@ -169,13 +153,11 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
throw err;
}
}

}

export class MsgPackMessageDecoder implements RpcMessageDecoder {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

}
decode<T = any>(buf: ReadBuffer): T {
const bytes = buf.readBytes();
return this.msgPack.decode(bytes);
Expand All @@ -184,5 +166,25 @@ export class MsgPackMessageDecoder implements RpcMessageDecoder {
parse(buffer: ReadBuffer): RpcMessage {
return this.decode(buffer);
}
}

export function registerMsgPackExtensions(): void {
// Register custom msgPack extension for Errors.
MsgPackExtensionManager.getInstance().registerExtensions({
class: Error,
tag: 1,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
serialize: (error: any) => {
const { code, data, message, name } = error;
const stack = error.stacktrace ?? error.stack;
const isResponseError = error instanceof ResponseError;
return { code, data, message, name, stack, isResponseError };
},
deserialize: data => {
const error = data.isResponseError ? new ResponseError(data.code, data.message, data.data) : new Error(data.message);
error.name = data.name;
error.stack = data.stack;
return error;
}
});
}
Loading