This repository has been archived by the owner on Oct 2, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add jetstream push wrappers (#483)
- Loading branch information
1 parent
7d8c4cc
commit ac89aa6
Showing
20 changed files
with
314 additions
and
4,465 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import { realizeChannelName, getMessageType, realizeParametersForChannelWrapper, messageHasNullPayload, renderJSDocParameters } from '../../utils/index'; | ||
// eslint-disable-next-line no-unused-vars | ||
import { Message, ChannelParameter } from '@asyncapi/parser'; | ||
|
||
/** | ||
* Component which returns a function which publishes to the given channel | ||
* | ||
* @param {string} channelName to publish to | ||
* @param {Message} message which is being published | ||
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel | ||
*/ | ||
export function JetstreamPublish(channelName, message, channelParameters) { | ||
const messageType = getMessageType(message); | ||
const hasNullPayload = messageHasNullPayload(message.payload()); | ||
//Determine the publish operation based on whether the message type is null | ||
let publishOperation = `await js.publish(${realizeChannelName(channelParameters, channelName)}, Nats.Empty);`; | ||
if (!hasNullPayload) { | ||
publishOperation = ` | ||
let dataToSend : any = message.marshal(); | ||
dataToSend = codec.encode(dataToSend); | ||
js.publish(${realizeChannelName(channelParameters, channelName)}, dataToSend, options);`; | ||
} | ||
return ` | ||
/** | ||
* Internal functionality to publish message to jetstream channel | ||
* ${channelName} | ||
* | ||
* @param message to publish | ||
* @param js to publish with | ||
* @param codec used to convert messages | ||
${renderJSDocParameters(channelParameters)} | ||
* @param options to publish with | ||
*/ | ||
export function jetStreamPublish( | ||
message: ${messageType}, | ||
js: Nats.JetStreamClient, | ||
codec: Nats.Codec<any> | ||
${realizeParametersForChannelWrapper(channelParameters)}, | ||
options?: Nats.PublishOptions | ||
): Promise<void> { | ||
return new Promise<void>(async (resolve, reject) => { | ||
try{ | ||
${publishOperation} | ||
resolve(); | ||
}catch(e: any){ | ||
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); | ||
} | ||
}); | ||
}; | ||
`; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import { pascalCase, camelCase, getMessageType, realizeParametersForChannelWrapper, realizeParametersForChannelWithoutType, renderJSDocParameters } from '../../utils/index'; | ||
// eslint-disable-next-line no-unused-vars | ||
import { Message, ChannelParameter } from '@asyncapi/parser'; | ||
|
||
/** | ||
* Component which returns a publish to function for the client | ||
* | ||
* @param {string} defaultContentType | ||
* @param {string} channelName to publish to | ||
* @param {Message} message which is being published | ||
* @param {string} messageDescription | ||
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel | ||
*/ | ||
export function JetstreamPublish(channelName, message, messageDescription, channelParameters) { | ||
return ` | ||
/** | ||
* Publish to the \`${channelName}\` jetstream channel | ||
* | ||
* ${messageDescription} | ||
* | ||
* @param message to publish | ||
${renderJSDocParameters(channelParameters)} | ||
*/ | ||
public jetStreamPublishTo${pascalCase(channelName)}( | ||
message: ${getMessageType(message)} | ||
${realizeParametersForChannelWrapper(channelParameters)}, | ||
options?: Nats.PublishOptions | ||
): Promise<void> { | ||
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { | ||
return ${camelCase(channelName)}Channel.jetStreamPublish( | ||
message, | ||
this.js, | ||
this.codec | ||
${Object.keys(channelParameters).length ? `,${realizeParametersForChannelWithoutType(channelParameters)}` : ''}, | ||
options | ||
); | ||
}else{ | ||
return Promise.reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED)); | ||
} | ||
} | ||
`; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.