Skip to content
This repository has been archived by the owner on Oct 2, 2024. It is now read-only.

feat: add jetstream fetch wrappers #479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions components/channel/jetStreamFetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, messageHasNullPayload, realizeChannelName} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';
import { unwrap } from './ChannelParameterUnwrap';

/**
* Component which returns a subscribe to function for the client
*
* @param {string} defaultContentType
* @param {string} channelName to publish to
* @param {Message} message which is being received
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamFetch(channelName, message, channelParameters) {
const messageType = getMessageType(message);
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName]) => {
return `${camelCase(parameterName)}Param`;
});
const hasNullPayload = messageHasNullPayload(message.payload());

//Determine the callback process when receiving messages.
//If the message payload is null no hooks are called to process the received data.
let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`;
if (!hasNullPayload) {
whenReceivingMessage = `
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`}, msg);
`;
}
return `
/**
* Internal functionality to setup jetstrema fetch on the \`${channelName}\` channel
*
* @param onDataCallback to call when messages are received
* @param js client to fetch with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
*/
export function jetsStreamFetch(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : ${messageType}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any >
${realizeParametersForChannelWrapper(channelParameters)},
durable: string, options?: Partial<Nats.PullOptions>
) {
const stream = ${realizeChannelName(channelParameters, channelName)};
(async () => {
let msgs = await js.fetch(stream, durable, options);
for await (const msg of msgs) {
${unwrap(channelName, channelParameters)}

${whenReceivingMessage}
}
})();
}
`;
}
39 changes: 39 additions & 0 deletions components/index/jetStreamFetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, realizeParametersForChannelWithoutType, pascalCase} from '../../utils/index';

export function JetstreamFetch(channelName, message, messageDescription, channelParameters) {
return `
/**
* JetStream fetch function.
*
* Pull message from \`${channelName}\`
*
* ${messageDescription}
*
* @param onDataCallback to call when messages are received
${renderJSDocParameters(channelParameters)}
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamFetch${pascalCase(channelName)}(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${getMessageType(message)}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void
${realizeParametersForChannelWrapper(channelParameters)},
durable: string, options?: Partial<Nats.PullOptions>
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
${camelCase(channelName)}Channel.jetsStreamFetch(
onDataCallback,
this.js,
this.codec
${Object.keys(channelParameters).length ? ` ,${realizeParametersForChannelWithoutType(channelParameters)}` : ''},
durable,
options
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
`;
}
18 changes: 18 additions & 0 deletions examples/simple-publish/asyncapi-nats-client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ The test/mirror client which is the reverse to the normal NatsAsyncApiClient.
* [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiTestClient+jetStreamPullStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamFetchStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiTestClient+jetStreamFetchStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiTestClient+connect"></a>

Expand Down Expand Up @@ -330,3 +331,20 @@ Channel for the turn on command which should turn on the streetlight
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiTestClient+jetStreamFetchStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiTestClient.jetStreamFetchStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)
JetStream fetch function.

Pull message from `streetlight/{streetlight_id}/command/turnon`

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiTestClient</code>](#NatsAsyncApiTestClient)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| streetlight_id | parameter to use in topic |
| options | to pull message with, bindings from the AsyncAPI document overwrite these if specified |

Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,34 @@ export class NatsAsyncApiTestClient {
}
});
}
/**
* JetStream fetch function.
*
* Pull message from `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamFetchStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string,
durable: string, options ? : Partial < Nats.PullOptions >
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
streetlightStreetlightIdCommandTurnonChannel.jetsStreamFetch(
onDataCallback,
this.js,
this.codec, streetlight_id,
durable,
options
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,40 @@ export function jetStreamPullSubscribe(
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
/**
* Internal functionality to setup jetstrema fetch on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param js client to fetch with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
*/
export function jetsStreamFetch(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
durable: string, options ? : Partial < Nats.PullOptions >
) {
const stream = `streetlight.${streetlight_id}.command.turnon`;
(async () => {
let msgs = await js.fetch(stream, durable, options);
for await (const msg of msgs) {
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg);
}
})();
}
33 changes: 33 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Module which wraps functionality for the `streetlight/{streetlight_id}/command/t
* [~jetStreamPull(onDataCallback, js, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPull)
* [~jetStreamPushSubscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPushSubscribe)
* [~jetStreamPullSubscribe(onDataCallback, nc, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPullSubscribe)
* [~jetsStreamFetch(onDataCallback, js, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetsStreamFetch)

<a name="module_streetlightStreetlightIdCommandTurnon..subscribe"></a>

Expand Down Expand Up @@ -89,6 +90,20 @@ Internal functionality to setup jetstream pull subscription on the `streetlight/
| codec | used to convert messages |
| streetlight_id | parameter to use in topic |

<a name="module_streetlightStreetlightIdCommandTurnon..jetsStreamFetch"></a>

### streetlightStreetlightIdCommandTurnon~jetsStreamFetch(onDataCallback, js, codec, streetlight_id)
Internal functionality to setup jetstrema fetch on the `streetlight/{streetlight_id}/command/turnon` channel

**Kind**: inner method of [<code>streetlightStreetlightIdCommandTurnon</code>](#module_streetlightStreetlightIdCommandTurnon)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| js | client to fetch with |
| codec | used to convert messages |
| streetlight_id | parameter to use in topic |

<a name="NatsAsyncApiClient"></a>

## NatsAsyncApiClient
Expand All @@ -110,6 +125,7 @@ The generated client based on your AsyncAPI document.
* [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiClient+jetStreamPullStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamFetchStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiClient+jetStreamFetchStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiClient+connect"></a>

Expand Down Expand Up @@ -243,6 +259,23 @@ Channel for the turn on command which should turn on the streetlight
| flush | ensure client is force flushed after subscribing |
| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiClient+jetStreamFetchStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiClient.jetStreamFetchStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)
JetStream fetch function.

Pull message from `streetlight/{streetlight_id}/command/turnon`

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiClient</code>](#NatsAsyncApiClient)

| Param | Description |
| --- | --- |
| onDataCallback | to call when messages are received |
| streetlight_id | parameter to use in topic |
| options | to pull message with, bindings from the AsyncAPI document overwrite these if specified |

<a name="NatsAsyncApiTestClient"></a>

## NatsAsyncApiTestClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,40 @@ export function jetStreamPullSubscribe(
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
/**
* Internal functionality to setup jetstrema fetch on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param js client to fetch with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
*/
export function jetsStreamFetch(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
durable: string, options ? : Partial < Nats.PullOptions >
) {
const stream = `streetlight.${streetlight_id}.command.turnon`;
(async () => {
let msgs = await js.fetch(stream, durable, options);
for await (const msg of msgs) {
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg);
}
})();
}
30 changes: 30 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,34 @@ export class NatsAsyncApiClient {
}
});
}
/**
* JetStream fetch function.
*
* Pull message from `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamFetchStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string,
durable: string, options ? : Partial < Nats.PullOptions >
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
streetlightStreetlightIdCommandTurnonChannel.jetsStreamFetch(
onDataCallback,
this.js,
this.codec, streetlight_id,
durable,
options
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
}
8 changes: 7 additions & 1 deletion template/src/channels/$$channel$$.ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { General } from '../../../components/channel/general';
import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, Channel } from '@asyncapi/parser';
import { JetstreamFetch } from '../../../components/channel/jetStreamFetch';
import { JetstreamPushSubscription } from '../../../components/channel/jetstreamPushSubscription';
import { JetstreamPull } from '../../../components/channel/jetstreamPull';
import { JetstreamPullSubscription } from '../../../components/channel/jetStreamPullSubscription';
Expand Down Expand Up @@ -90,7 +91,12 @@ function getChannelCode(channel, channelName, params) {
channelName,
publishMessage,
channel.parameters());
channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscriptionCode}`;
const jetstreamFetchCode = JetstreamFetch(
channelName,
publishMessage,
channel.parameters(),
publishOperation);
channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscriptionCode}\n${jetstreamFetchCode}`;
}
}
return channelcode;
Expand Down
8 changes: 7 additions & 1 deletion template/src/index.ts.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { AsyncAPIDocument } from '@asyncapi/parser';
import { JetstreamPushSubscription } from '../../components/index/jetstreamPushSubscription';
import { JetstreamPull } from '../../components/index/jetstreamPull';
import { JetstreamPullSubscribe } from '../../components/index/jetStreamPullSubscription';
import { JetstreamFetch } from '../../components/index/jetStreamFetch';
import { JetstreamPublish } from '../../components/index/jetstreamPublish';

/**
Expand Down Expand Up @@ -83,6 +84,11 @@ function getChannelWrappers(asyncapi, params) {
publishMessage,
channelDescription,
channelParameters);
const jetstreamFetchCode = JetstreamFetch(
channelName,
publishMessage,
channelDescription,
channelParameters);
const jetstreamPullSubscribe = JetstreamPullSubscribe(
channelName,
publishMessage,
Expand All @@ -98,7 +104,7 @@ function getChannelWrappers(asyncapi, params) {
publishMessage,
channelDescription,
channelParameters);
return `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscribe}`;
return `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscribe}\n${jetstreamFetchCode}`;
}
}
});
Expand Down
Loading