Skip to content

Commit

Permalink
final touches
Browse files Browse the repository at this point in the history
Remove redundant abstract classes `AbstractMessageReader` and
`AbstractMessageWriter`.

Cleanup some code to be easier to read and follow.
  • Loading branch information
paul-marechal committed Oct 22, 2021
1 parent 398f1a3 commit 55f7b0f
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 116 deletions.
6 changes: 3 additions & 3 deletions packages/debug/src/node/debug-adapter-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import { DisposableCollection, Disposable } from '@theia/core/lib/common/disposa
export class DebugAdapterSessionImpl implements DebugAdapterSession {

private readonly toDispose = new DisposableCollection();
private channel?: Channel;
private channel?: Channel<string>;

constructor(
readonly id: string,
Expand All @@ -53,12 +53,12 @@ export class DebugAdapterSessionImpl implements DebugAdapterSession {

}

async start(channel: Channel): Promise<void> {
async start(channel: Channel<string>): Promise<void> {
if (this.channel) {
throw new Error('The session has already been started, id: ' + this.id);
}
this.channel = channel;
this.channel.onMessage((message: string) => this.write(message));
this.channel.onMessage(message => this.write(message));
this.channel.onClose(() => this.channel = undefined);

}
Expand Down
23 changes: 13 additions & 10 deletions packages/plugin-ext/src/common/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type { Message, MessageReader, MessageWriter } from '@theia/core/shared/v
import { Disposable } from './disposable-util';
import { PluginMessageReader } from './plugin-message-reader';
import { PluginMessageWriter } from './plugin-message-writer';
import { PluginMessage } from './plugin-message';

/**
* The interface for describing the connection between plugins and main side.
Expand Down Expand Up @@ -53,23 +54,19 @@ export class PluginConnection implements Connection {
}
}

export interface PluginMessage extends Message {
jsonrpc: '0.0'
content: string
}

/**
* [Channel](#Channel) implementation over RPC.
* Wrapper around a [PluginConnection](#PluginConnection) to match the [Channel](#Channel) interface.
*/
export class PluginWebSocketChannel implements Channel<string> {
export class PluginChannel implements Channel<string> {

constructor(
protected readonly connection: PluginConnection
) { }

send(content: string): void {
// vscode-jsonrpc's MessageReader/Writer expect to send JSON-RPC messages.
// Use a bogus jsonrpc version and pass along the content to send.
// Use a bogus jsonrpc version and pass along the `content` to send.
// `content` here is opaque: it could be any string.
const message: PluginMessage = { jsonrpc: '0.0', content };
this.connection.writer.write(message);
}
Expand All @@ -78,8 +75,7 @@ export class PluginWebSocketChannel implements Channel<string> {
this.connection.reader.listen((message: PluginMessage) => cb(message.content));
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
onError(cb: (reason: any) => void): void {
onError(cb: (reason: unknown) => void): void {
this.connection.reader.onError(cb);
}

Expand All @@ -91,3 +87,10 @@ export class PluginWebSocketChannel implements Channel<string> {
this.connection.dispose();
}
}

/**
* Use `PluginChannel` instead.
*
* @deprecated since 1.19.0
*/
export const PluginWebSocketChannel = PluginChannel;
94 changes: 36 additions & 58 deletions packages/plugin-ext/src/common/plugin-message-reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,100 +14,78 @@
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { DataCallback, Disposable, Emitter, Event, PartialMessageInfo } from '@theia/core/shared/vscode-languageserver-protocol';

export abstract class AbstractMessageReader {
protected errorEmitter = new Emitter<Error>();
protected closeEmitter = new Emitter<void>();
protected partialMessageEmitter = new Emitter<PartialMessageInfo>();
dispose(): void {
this.errorEmitter.dispose();
this.closeEmitter.dispose();
}
get onError(): Event<Error> {
return this.errorEmitter.event;
}
fireError(error: Error): void {
this.errorEmitter.fire(this.asError(error));
}
get onClose(): Event<void> {
return this.closeEmitter.event;
}
fireClose(): void {
this.closeEmitter.fire(undefined);
}
get onPartialMessage(): Event<PartialMessageInfo> {
return this.partialMessageEmitter.event;
}
firePartialMessage(info: PartialMessageInfo): void {
this.partialMessageEmitter.fire(info);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
asError(error: any): Error {
if (error instanceof Error) {
return error;
} else {
return new Error(`Reader received error. Reason: ${typeof error.message === 'string' ? error.message : 'unknown'}`);
}
}
}
import { AbstractMessageReader, Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { PluginMessage } from './plugin-message';

/**
* Support for reading string message through RPC protocol.
* Support for reading string messages through RPC protocol.
*
* Buffers events until a listener is registered.
*/
export class PluginMessageReader extends AbstractMessageReader {

protected state: 'initial' | 'listening' | 'closed' = 'initial';
protected callback: DataCallback | undefined;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
protected readonly events: { message?: any, error?: any }[] = [];

constructor() {
super();
}
protected callback?: (message: PluginMessage) => void;

/**
* Buffered events until `.listen` is called. Becomes `undefined` after.
*/
protected bufferedEvents?: { message?: string, error?: unknown }[] = [];

listen(callback: DataCallback): Disposable {
listen(callback: (message: PluginMessage) => void): Disposable {
if (this.state === 'initial') {
this.state = 'listening';
this.callback = callback;
while (this.events.length !== 0) {
const event = this.events.pop()!;
if (event.message) {
this.readMessage(event.message);
} else if (event.error) {
this.fireError(event.error);
for (const { message, error } of this.bufferedEvents!) {
if (!this.callback) {
break; // We got disposed.
} if (message) {
this.emitMessage(message);
} else if (error) {
this.fireError(error);
} else {
this.fireClose();
}
}
this.bufferedEvents = undefined;
return { dispose: () => this.callback = undefined };
}
return { dispose: () => { } };
}

/**
* Notify the listener (`this.callback`) that a new message was received.
*
* If a listener isn't registered yet we will queue the messages (FIFO).
*/
readMessage(message: string): void {
if (this.state === 'initial') {
this.events.unshift({ message });
this.bufferedEvents!.push({ message });
} else if (this.state === 'listening') {
const data = JSON.parse(message);
this.callback!(data);
this.emitMessage(message);
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
fireError(error: any): void {
fireError(error: unknown): void {
if (this.state === 'initial') {
this.events.unshift({ error });
this.bufferedEvents!.push({ error });
} else if (this.state === 'listening') {
super.fireError(error);
}
}

fireClose(): void {
if (this.state === 'initial') {
this.events.unshift({});
this.bufferedEvents!.push({});
} else if (this.state === 'listening') {
super.fireClose();
}
this.state = 'closed';
}

protected emitMessage(message: string): void {
const data = JSON.parse(message);
this.callback!(data);
}
}
41 changes: 4 additions & 37 deletions packages/plugin-ext/src/common/plugin-message-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,24 @@
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { Message, MessageWriter, Emitter, Event } from '@theia/core/shared/vscode-languageserver-protocol';
import { AbstractMessageWriter, Message, MessageWriter } from '@theia/core/shared/vscode-languageserver-protocol';
import { ConnectionMain, ConnectionExt } from './plugin-api-rpc';

export abstract class AbstractMessageWriter {
protected errorEmitter = new Emitter<[Error, Message | undefined, number | undefined]>();
protected closeEmitter = new Emitter<void>();
dispose(): void {
this.errorEmitter.dispose();
this.closeEmitter.dispose();
}
get onError(): Event<[Error, Message | undefined, number | undefined]> {
return this.errorEmitter.event;
}
fireError(error: Error, message: Message | undefined, count: number | undefined): void {
this.errorEmitter.fire([this.asError(error), message, count]);
}
get onClose(): Event<void> {
return this.closeEmitter.event;
}
fireClose(): void {
this.closeEmitter.fire(undefined);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
asError(error: any): Error {
if (error instanceof Error) {
return error;
} else {
return new Error(`Writer received error. Reason: ${typeof error.message === 'string' ? error.message : 'unknown'}`);
}
}
}

/**
* Support for writing string message through RPC protocol.
*/
export class PluginMessageWriter extends AbstractMessageWriter implements MessageWriter {

constructor(
protected readonly id: string,
protected readonly proxy: ConnectionMain | ConnectionExt
) {
super();
}

write(message: Message): Promise<void> {
async write(message: Message): Promise<void> {
const content = JSON.stringify(message);
try {
this.proxy.$sendMessage(this.id, content);
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
this.proxy.$sendMessage(this.id, content);
}

end(): void { }
Expand Down
28 changes: 28 additions & 0 deletions packages/plugin-ext/src/common/plugin-message.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/********************************************************************************
* Copyright (C) 2021 Ericsson and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { Message } from '@theia/core/shared/vscode-languageserver-protocol';

export interface PluginMessage extends Message {
/**
* Bogus JSON-RPC version because we don't actually implement JSON-RPC here.
*/
jsonrpc: '0.0'
/**
* Actual string payload being transmitted.
*/
content: string
}
4 changes: 2 additions & 2 deletions packages/plugin-ext/src/main/browser/debug/debug-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import { PluginDebugAdapterContribution } from './plugin-debug-adapter-contribut
import { PluginDebugSessionContributionRegistrator, PluginDebugSessionContributionRegistry } from './plugin-debug-session-contribution-registry';
import { Disposable, DisposableCollection } from '@theia/core/lib/common/disposable';
import { PluginDebugSessionFactory } from './plugin-debug-session-factory';
import { PluginWebSocketChannel } from '../../../common/connection';
import { PluginChannel } from '../../../common/connection';
import { PluginDebugAdapterContributionRegistrator, PluginDebugService } from './plugin-debug-service';
import { HostedPluginSupport } from '../../../hosted/browser/hosted-plugin';
import { DebugFunctionBreakpoint } from '@theia/debug/lib/browser/model/debug-function-breakpoint';
Expand Down Expand Up @@ -149,7 +149,7 @@ export class DebugMainImpl implements DebugMain, Disposable {
this.debugPreferences,
async (sessionId: string) => {
const connection = await this.connectionMain.ensureConnection(sessionId);
return new PluginWebSocketChannel(connection);
return new PluginChannel(connection);
},
this.fileService,
terminalOptionsExt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class PluginDebugSessionFactory extends DefaultDebugSessionFactory {
protected readonly messages: MessageClient,
protected readonly outputChannelManager: OutputChannelManager,
protected readonly debugPreferences: DebugPreferences,
protected readonly connectionFactory: (sessionId: string) => Promise<Channel>,
protected readonly connectionFactory: (sessionId: string) => Promise<Channel<string>>,
protected readonly fileService: FileService,
protected readonly terminalOptionsExt: TerminalOptionsExt | undefined,
protected readonly debugContributionProvider: ContributionProvider<DebugContribution>
Expand All @@ -80,8 +80,8 @@ export class PluginDebugSessionFactory extends DefaultDebugSessionFactory {
const connection = new DebugSessionConnection(
sessionId,
this.connectionFactory,
this.getTraceOutputChannel());

this.getTraceOutputChannel()
);
return new PluginDebugSession(
sessionId,
options,
Expand Down
4 changes: 2 additions & 2 deletions packages/plugin-ext/src/plugin/node/debug/debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Breakpoint } from '../../../common/plugin-api-rpc-model';
import { DebugConfigurationProviderTriggerKind, DebugExt, DebugMain, PLUGIN_RPC_CONTEXT as Ext, TerminalOptionsExt } from '../../../common/plugin-api-rpc';
import { PluginPackageDebuggersContribution } from '../../../common/plugin-protocol';
import { RPCProtocol } from '../../../common/rpc-protocol';
import { PluginWebSocketChannel } from '../../../common/connection';
import { PluginChannel } from '../../../common/connection';
import { CommandRegistryImpl } from '../../command-registry';
import { ConnectionExtImpl } from '../../connection-ext';
import {
Expand Down Expand Up @@ -307,7 +307,7 @@ export class DebugExtImpl implements DebugExt {
this.sessions.set(sessionId, debugAdapterSession);

const connection = await this.connectionExt!.ensureConnection(sessionId);
debugAdapterSession.start(new PluginWebSocketChannel(connection));
debugAdapterSession.start(new PluginChannel(connection));

return sessionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class PluginDebugAdapterSession extends DebugAdapterSessionImpl implement
this.configuration = theiaSession.configuration;
}

async start(channel: Channel): Promise<void> {
async start(channel: Channel<string>): Promise<void> {
if (this.tracker.onWillStartSession) {
this.tracker.onWillStartSession();
}
Expand Down

0 comments on commit 55f7b0f

Please sign in to comment.