Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preliminary Scopes Implementation #102

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
17 changes: 17 additions & 0 deletions main/contracts/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ export interface IMessage<T = any> {
* Represents a messagebroker and provides access to the core features which includes publishing/subscribing to messages and RSVP.
*/
export interface IMessageBroker<T> {
/**
* A reference to the parent scope if this is not the root node in the tree of scopes. If this is the root, it's undefined.
*/
readonly parent?: IMessageBroker<T>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the parent may have different generic params. We are assuming all the tree is uniform. Its fine but something to considerr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understand right, do you mean supporting something like this?

const broker1 = messagebroker<Contract1>();
const broker2 = broker1.createScope<Contract2>('my-scope');

in this case, would Contract2 be required to extend Contract 1? Otherwise a message sent to broker1 could not necessarily be passed down to broker2.

Copy link

@Davidhanson90 Davidhanson90 Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so I think this is governed by the propergation logic you plan to implement. For example say create scope and that scope extends the parents channels type, then it would mean my scope would essentially be a superset of the parent and messages subsribed or publishes essentially propergate upwards.

interface Contract1{
 "foo": boolean;
}

const broker1 = messagebroker<Contract1>();
const broker2 = broker1.createScope<Contract2 extends Contract1>('my-scope');

broker2.publish('foo') //Works

However if you do not extend the parents type then you could it this way

const broker1 = messagebroker<Contract1>();
const broker2 = broker1.createScope<Contract2 extends Contract1>('my-scope');

broker2.publish('foo') //Compile error
broker2.parent.publish('foo') //Works

I prefer the extended types option as it makes more sense that channels delegate upwards

/**
* A list of all child scopes that have been created on this instance of the broker.
*/
readonly scopes: IMessageBroker<T>[];
aidanm3341 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new channel with the provided channelName. An optional config object can be passed that specifies how many messages to cache.
* No caching is set by default
Expand Down Expand Up @@ -96,6 +105,14 @@ export interface IMessageBroker<T> {
* This RSVP function is used by responders and is analogous to the 'Get' function. Responders when invoked must return the required response value type.
*/
rsvp<K extends keyof RSVPOf<T>>(channelName: K, handler: RSVPHandler<T>): IResponderRef;

/**
* Creates a new scope with the given scopeName with this instance of the MessageBroker as its parent.
* If a scope with this name already exists, it returns that instance instead of creating a new one.
* @param scopeName The name to use for the scope to create
* @returns An instance of the messagebroker that matches the scopeName provided
*/
createScope(scopeName: string): IMessageBroker<T>;
}

/**
Expand Down
31 changes: 29 additions & 2 deletions main/core/messagebroker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { get, Injectable } from '@morgan-stanley/needle';
import { get, getRootInjector, Injectable } from '@morgan-stanley/needle';
import { defer, Observable, Subject, Subscription } from 'rxjs';
import { filter, shareReplay } from 'rxjs/operators';
import { v4 as uuid } from 'uuid';
Expand Down Expand Up @@ -29,15 +29,18 @@ export function messagebroker<T = any>(): IMessageBroker<T> {
return instance;
}

const rootInjector = getRootInjector();

/**
* Represents a messagebroker. Using the 'new' operator is discouraged, instead use the messagebroker() function or dependency injection.
*/
@Injectable()
export class MessageBroker<T = any> implements IMessageBroker<T> {
private channelLookup: ChannelModelLookup<T> = {};
private messagePublisher = new Subject<IMessage<any>>();
private _scopes: IMessageBroker<T>[] = [];

constructor(private rsvpMediator: RSVPMediator<T>) {}
constructor(private rsvpMediator: RSVPMediator<T>, private _parent?: IMessageBroker<T>) {}

/**
* Creates a new channel with the provided channelName. An optional config object can be passed that specifies how many messages to cache.
Expand Down Expand Up @@ -99,6 +102,21 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
delete this.channelLookup[channelName];
}

/**
* Creates a new scope with the given scopeName with this instance of the MessageBroker as its parent.
* If a scope with this name already exists, it returns that instance instead of creating a new one.
* @param scopeName The name to use for the scope to create
* @returns An instance of the messagebroker that matches the scopeName provided
*/
public createScope(scopeName: string): IMessageBroker<T> {
const scope = rootInjector.createScope(scopeName);
aidanm3341 marked this conversation as resolved.
Show resolved Hide resolved
scope.registerInstance(MessageBroker, new MessageBroker<T>(new RSVPMediator(), this));
aidanm3341 marked this conversation as resolved.
Show resolved Hide resolved

const instance = scope.get(MessageBroker);
this._scopes.push(instance);
return instance;
}

/**
* Return a deferred observable as the channel config may have been updated before the subscription
* @param channelName name of channel to subscribe to
Expand Down Expand Up @@ -143,6 +161,7 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
}

const publishFunction = (data?: T[K], type?: string): void => {
this._scopes.forEach((scope) => scope.create(channelName).publish(data), type);
this.messagePublisher.next(this.createMessage(channelName, data, type));
};

Expand Down Expand Up @@ -180,4 +199,12 @@ export class MessageBroker<T = any> implements IMessageBroker<T> {
): channel is RequiredPick<IChannelModel<T[K]>, 'config' | 'subscription'> {
return channel != null && channel.subscription != null;
}

public get parent(): IMessageBroker<T> | undefined {
aidanm3341 marked this conversation as resolved.
Show resolved Hide resolved
return this._parent;
}

public get scopes(): IMessageBroker<T>[] {
return this._scopes;
}
}
73 changes: 73 additions & 0 deletions spec/core/messagebroker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,79 @@ describe('MessageBroker', () => {
});
});

describe('Scopes', () => {
it('should return a new messagebroker instance when creating a new scope', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope('scope1');

expect(scope).not.toEqual(instance);
});

it('should return same scope if same name is used', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope('scope1');
const sameScope = instance.createScope('scope1');

expect(scope).toEqual(sameScope);
});

it('should return itself when getting the parent of its child', () => {
const instance = getInstance<IMySampleBroker>();
const scope = instance.createScope('scope1');

expect(scope.parent).toEqual(instance);
});

it('should return a list of children scopes via scopes property', () => {
const instance = getInstance<IMySampleBroker>();
const scope1 = instance.createScope('scope1');
const scope2 = instance.createScope('scope2');
const scope3 = instance.createScope('scope3');

expect(instance.scopes).toEqual([scope1, scope2, scope3]);
});

it('should publish messages from parent to children', () => {
const parentMessages: Array<IMessage<string>> = [];
const childMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const child = parent.createScope('scope1');

parent.get('channel').subscribe((message) => parentMessages.push(message));
child.get('channel').subscribe((message) => childMessages.push(message));

parent.create('channel').publish('both should get this');
child.create('channel').publish('only the child should get this');

expect(parentMessages.length).toEqual(1);
verifyMessage(parentMessages[0], 'both should get this');

expect(childMessages.length).toEqual(2);
verifyMessage(childMessages[0], 'both should get this');
verifyMessage(childMessages[1], 'only the child should get this');
});

it('should not publish messages to "sibling" scopes', () => {
const brotherMessages: Array<IMessage<string>> = [];
const sisterMessages: Array<IMessage<string>> = [];
const parent = getInstance();
const brother = parent.createScope('scope1');
const sister = parent.createScope('scope2');

brother.get('channel').subscribe((message) => brotherMessages.push(message));
sister.get('channel').subscribe((message) => sisterMessages.push(message));

brother.create('channel').publish('brother should get this');
sister.create('channel').publish('sister should get this');

expect(brotherMessages.length).toEqual(1);
verifyMessage(brotherMessages[0], 'brother should get this');

expect(sisterMessages.length).toEqual(1);
verifyMessage(sisterMessages[0], 'sister should get this');
});
});

function verifyMessage<T>(message: IMessage<T>, expectedData: T, expectedType?: string) {
expect(message).toBeDefined();
expect(message.data).toEqual(expectedData);
Expand Down