Skip to content

Commit

Permalink
Merge pull request #69 from lifeomic/feat/kinesis-parsing
Browse files Browse the repository at this point in the history
feat: handle kinesis parsing
  • Loading branch information
swain authored Mar 12, 2024
2 parents c5af1c4 + b5b7772 commit 5329d1d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
70 changes: 53 additions & 17 deletions src/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ beforeEach(() => {
const testSerializer = {
parseEvent: (msg: string) => JSON.parse(msg),
stringifyEvent: (msg: any) => JSON.stringify(msg),
toKinesisNativeRecord: (msg: Record<string, unknown>) =>
Buffer.from(JSON.stringify(msg)).toString('base64'),
};

describe('KinesisEventHandler', () => {
Expand Down Expand Up @@ -64,7 +66,9 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
],
Expand Down Expand Up @@ -104,25 +108,33 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-4' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-4',
}),
},
},
],
Expand Down Expand Up @@ -229,19 +241,25 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
] as any,
Expand Down Expand Up @@ -320,19 +338,25 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
] as any,
Expand Down Expand Up @@ -371,37 +395,49 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: 'group-id',
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: 'group-id-2',
data: JSON.stringify({ data: 'test-event-other-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-other-1',
}),
},
},
{
kinesis: {
partitionKey: 'group-id',
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
{
kinesis: {
partitionKey: 'group-id-2',
data: JSON.stringify({ data: 'test-event-other-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-other-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-4' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-4',
}),
},
},
] as any,
Expand Down
7 changes: 4 additions & 3 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export type KinesisEventHandlerHarnessOptions<Event, Context> = {
* A function for stringifying events.
*/
stringifyEvent: (event: Event) => string;

/**
* An optional override for the logger.
*/
Expand Down Expand Up @@ -89,7 +88,9 @@ export class KinesisEventHandler<Event, Context> {
eventId: record.eventID,
});

const parsedEvent = this.config.parseEvent(record.kinesis.data);
const parsedEvent = this.config.parseEvent(
Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
);

for (const action of this.actions) {
await action({ ...context, logger: eventLogger }, parsedEvent);
Expand Down Expand Up @@ -126,7 +127,7 @@ export class KinesisEventHandler<Event, Context> {
eventID: uuid(),
kinesis: {
partitionKey: uuid(),
data: stringifyEvent(e),
data: Buffer.from(stringifyEvent(e)).toString('base64'),
},
})),
};
Expand Down

0 comments on commit 5329d1d

Please sign in to comment.