Skip to content

Commit

Permalink
Merge pull request #75 from lifeomic/partial-batch-responses
Browse files Browse the repository at this point in the history
feat!: support partial batch responses on all handlers
  • Loading branch information
swain authored May 24, 2024
2 parents 4c9edbe + 6a9ccce commit 96737df
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 175 deletions.
129 changes: 101 additions & 28 deletions src/dynamo-streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ describe('DynamoStreamHandler', () => {
createRunContext: () => ({}),
}).lambda();

const result = await lambda(
{ httpMethod: 'GET' } as any,
{} as any,
{} as any,
);
const result = await lambda({ httpMethod: 'GET' } as any, {} as any);

expect(result).toStrictEqual({
statusCode: 200,
Expand All @@ -65,11 +61,7 @@ describe('DynamoStreamHandler', () => {
createRunContext: () => ({}),
}).lambda();

const result = await lambda(
{ healthCheck: true } as any,
{} as any,
{} as any,
);
const result = await lambda({ healthCheck: true } as any, {} as any);

expect(result).toStrictEqual({
healthy: true,
Expand Down Expand Up @@ -111,7 +103,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);
} catch (e: any) {
expect(e.message).toContain('Failed to process new-insert-2');
Expand Down Expand Up @@ -145,7 +136,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(dataSources.doSomething).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -178,7 +168,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(dataSources.doSomething).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -211,7 +200,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(dataSources.doSomething).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -273,7 +261,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(dataSources.doSomething).toHaveBeenCalledTimes(5);
Expand Down Expand Up @@ -489,11 +476,7 @@ describe('DynamoStreamHandler', () => {
}).lambda();

test('no dynamodb property', async () => {
await lambda(
{ Records: [{ eventName: 'INSERT' }] },
{} as any,
{} as any,
);
await lambda({ Records: [{ eventName: 'INSERT' }] }, {} as any);

expect(logger.error).toHaveBeenCalledWith(
'The dynamodb property was not present on event',
Expand All @@ -504,7 +487,6 @@ describe('DynamoStreamHandler', () => {
await lambda(
{ Records: [{ eventName: 'INSERT', dynamodb: {} }] },
{} as any,
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
Expand All @@ -523,7 +505,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
Expand All @@ -550,7 +531,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
Expand All @@ -570,7 +550,6 @@ describe('DynamoStreamHandler', () => {
await lambda(
{ Records: [{ eventName: 'REMOVE', dynamodb: {} }] },
{} as any,
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
Expand Down Expand Up @@ -674,7 +653,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
null as any,
);

const end = Date.now();
Expand Down Expand Up @@ -792,7 +770,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
null as any,
);

const end = Date.now();
Expand Down Expand Up @@ -826,7 +803,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(logger.error).toHaveBeenCalledWith(
Expand Down Expand Up @@ -875,7 +851,6 @@ describe('DynamoStreamHandler', () => {
],
},
{} as any,
{} as any,
);

expect(dataSources.doSomething).toHaveBeenCalledTimes(1);
Expand All @@ -885,4 +860,102 @@ describe('DynamoStreamHandler', () => {
);
});
});

describe('partial batch responses', () => {
test('returns partial batch response when setting is enabled', async () => {
const { sendEvent } = new DynamoStreamHandler({
logger,
loggerObfuscateImageKeys: ['otherMap', 'otherValue'],
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
usePartialBatchResponses: true,
// Make sure partial batch responses are returned in order even
// when using concurrency.
concurrency: 2,
})
.onInsert((ctx, entity) => {
// let 2 pass
if (entity.id.includes('2')) {
return;
}
throw new Error('Failed to process insert: ' + entity.id);
})
.harness();

const result = await sendEvent({
records: [
{
sequenceNumber: 'one',
type: 'insert',
entity: { id: 'entity-1', otherValue: 'other' },
},
{
sequenceNumber: 'two',
type: 'insert',
entity: { id: 'entity-2' },
},
{
sequenceNumber: 'three',
type: 'insert',
entity: { id: 'entity-3' },
},
],
});

// Expect that redaction is working
expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
identifier: 'one',
failedRecord: {
eventName: 'INSERT',
dynamodb: {
SequenceNumber: 'one',
NewImage: {
id: { S: 'entity-1' },
otherValue: { S: 'obfuscated' },
},
},
},
err: expect.objectContaining({
message: 'Failed to process insert: entity-1',
}),
}),
'Failed to process record',
);

// expect that third event is logged
expect(logger.error).toHaveBeenCalledWith(
expect.objectContaining({
identifier: 'three',
failedRecord: {
eventName: 'INSERT',
dynamodb: {
SequenceNumber: 'three',
NewImage: { id: { S: 'entity-3' } },
},
},
err: expect.objectContaining({
message: 'Failed to process insert: entity-3',
}),
}),
'Failed to process record',
);

const batchItemFailures = [
{ itemIdentifier: 'one' },
{ itemIdentifier: 'three' },
];

expect(result).toEqual({
batchItemFailures,
});
expect(logger.info).not.toHaveBeenCalledWith(
'Successfully processed all messages',
);
expect(logger.info).toHaveBeenCalledWith(
{ batchItemFailures },
'Completing with partial batch response',
);
});
});
});
45 changes: 31 additions & 14 deletions src/dynamo-streams.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { LoggerInterface } from '@lifeomic/logging';
import { v4 as uuid } from 'uuid';
import {
Context as AWSContext,
DynamoDBStreamEvent,
DynamoDBStreamHandler,
DynamoDBRecord,
} from 'aws-lambda';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
import {
BaseContext,
BaseHandlerConfig,
PartialBatchResponse,
handleUnprocessedRecords,
processWithOrdering,
withHealthCheckHandling,
} from './utils';
Expand Down Expand Up @@ -66,13 +68,16 @@ export type DynamoStreamHandlerHarnessConfig<Context> = {
};

export type DynamoStreamHandlerHarnessContext<Entity> = {
sendEvent: (event: TestEvent<Entity>) => Promise<void>;
sendEvent: (event: TestEvent<Entity>) => Promise<PartialBatchResponse>;
};

export type TestRecord<Entity> =
export type TestRecord<Entity> = {
sequenceNumber?: string;
} & (
| { type: 'insert'; entity: Entity }
| { type: 'modify'; oldEntity: Entity; newEntity: Entity }
| { type: 'remove'; entity: Entity };
| { type: 'remove'; entity: Entity }
);

export type TestEvent<Entity> = {
records: TestRecord<Entity>[];
Expand Down Expand Up @@ -106,10 +111,8 @@ export class DynamoStreamHandler<Entity, Context> {
>,
): DynamoStreamHandler<Entity, Context> {
const copy = new DynamoStreamHandler({
parse: this.config.parse,
logger: overrides.logger ?? this.config.logger,
createRunContext:
overrides.createRunContext ?? this.config.createRunContext,
...this.config,
...overrides,
});

for (const action of this.actions.insert) {
Expand Down Expand Up @@ -198,7 +201,10 @@ export class DynamoStreamHandler<Entity, Context> {
* Returns a DynamoDB stream lambda handler that will perform the configured
* actions.
*/
lambda(): DynamoDBStreamHandler {
lambda(): (
event: DynamoDBStreamEvent,
context: AWSContext,
) => Promise<PartialBatchResponse> {
return withHealthCheckHandling(async (event, ctx) => {
const correlationId = uuid();

Expand All @@ -220,7 +226,7 @@ export class DynamoStreamHandler<Entity, Context> {
'Processing DynamoDB stream event',
);

const processingResult = await processWithOrdering(
const { unprocessedRecords } = await processWithOrdering(
{
items: event.Records,
orderBy: (record) => {
Expand Down Expand Up @@ -314,8 +320,16 @@ export class DynamoStreamHandler<Entity, Context> {
},
);

processingResult.throwOnUnprocessedRecords();
context.logger.info('Successfully processed all DynamoDB stream records');
return handleUnprocessedRecords({
logger: context.logger,
unprocessedRecords,
usePartialBatchResponses: !!this.config.usePartialBatchResponses,
getItemIdentifier: (record) =>
// Why ignore: in practice, there are always SequenceNumbers
/* istanbul ignore next */
record.dynamodb?.SequenceNumber ?? uuid(),
redactRecord: (record) => this.obfuscateRecord(record),
});
});
}

Expand All @@ -329,7 +343,7 @@ export class DynamoStreamHandler<Entity, Context> {
const lambda = this.withOverrides(options ?? {}).lambda();

return {
sendEvent: async (event) => {
sendEvent: (event) => {
const dynamoEvent: DynamoDBStreamEvent = {
Records: event.records.map<DynamoDBStreamEvent['Records'][number]>(
(record) => {
Expand All @@ -338,13 +352,15 @@ export class DynamoStreamHandler<Entity, Context> {
return {
eventName: 'INSERT',
dynamodb: {
SequenceNumber: record.sequenceNumber,
NewImage: marshall(record.entity) as any,
},
};
case 'modify':
return {
eventName: 'MODIFY',
dynamodb: {
SequenceNumber: record.sequenceNumber,
OldImage: marshall(record.oldEntity) as any,
NewImage: marshall(record.newEntity) as any,
},
Expand All @@ -353,6 +369,7 @@ export class DynamoStreamHandler<Entity, Context> {
return {
eventName: 'REMOVE',
dynamodb: {
SequenceNumber: record.sequenceNumber,
OldImage: marshall(record.entity) as any,
},
};
Expand All @@ -362,7 +379,7 @@ export class DynamoStreamHandler<Entity, Context> {
};

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await lambda(dynamoEvent, {} as any, null as any);
return lambda(dynamoEvent, {} as any);
},
};
}
Expand Down
Loading

0 comments on commit 96737df

Please sign in to comment.