Skip to content

Commit

Permalink
feat(NODE-5243): add change stream split event (#3745)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran authored and malikj2000 committed Jun 29, 2023
1 parent 94dd36e commit 1bad8a0
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ export interface ChangeStreamDocumentKey<TSchema extends Document = Document> {
documentKey: { _id: InferIdType<TSchema>; [shardKey: string]: any };
}

/** @public */
export interface ChangeStreamSplitEvent {
/** Which fragment of the change this is. */
fragment: number;
/** The total number of fragments. */
of: number;
}

/** @public */
export interface ChangeStreamDocumentCommon {
/**
Expand Down Expand Up @@ -192,6 +200,13 @@ export interface ChangeStreamDocumentCommon {
* Only present if the operation is part of a multi-document transaction.
*/
lsid?: ServerSessionId;

/**
* When the change stream's backing aggregation pipeline contains the $changeStreamSplitLargeEvent
* stage, events larger than 16MB will be split into multiple events and contain the
* following information about which fragment the current event is.
*/
splitEvent?: ChangeStreamSplitEvent;
}

/** @public */
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ export type {
ChangeStreamReplaceDocument,
ChangeStreamReshardCollectionDocument,
ChangeStreamShardCollectionDocument,
ChangeStreamSplitEvent,
ChangeStreamUpdateDocument,
OperationTime,
ResumeOptions,
Expand Down
60 changes: 60 additions & 0 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { expect } from 'chai';
import { once } from 'events';
import * as sinon from 'sinon';
import { setTimeout } from 'timers';
import { promisify } from 'util';

import {
AbstractCursor,
type ChangeStream,
type CommandFailedEvent,
type CommandStartedEvent,
Expand All @@ -16,6 +19,7 @@ import {
Timestamp
} from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';
import { getSymbolFrom } from '../../tools/utils';
import { setupDatabase } from '../shared';

/**
Expand Down Expand Up @@ -68,6 +72,14 @@ function triggerResumableError(
triggerError();
}

const initIteratorMode = async (cs: ChangeStream) => {
const init = getSymbolFrom(AbstractCursor.prototype, 'kInit');
const initEvent = once(cs.cursor, 'init');
await promisify(cs.cursor[init].bind(cs.cursor))();
await initEvent;
return;
};

/** Waits for a change stream to start */
function waitForStarted(changeStream, callback) {
changeStream.cursor.once('init', () => {
Expand Down Expand Up @@ -938,4 +950,52 @@ describe('Change Stream prose tests', function () {
}
});
});

describe('19. Validate that large ChangeStream events are split when using $changeStreamSplitLargeEvent', function () {
let client;
let db;
let collection;
let changeStream;

beforeEach(async function () {
const configuration = this.configuration;
client = configuration.newClient();
db = client.db('test');
// Create a new collection _C_ with changeStreamPreAndPostImages enabled.
await db.createCollection('changeStreamSplitTests', {
changeStreamPreAndPostImages: { enabled: true }
});
collection = db.collection('changeStreamSplitTests');
});

afterEach(async function () {
await changeStream.close();
await collection.drop();
await client.close();
});

it('splits the event into multiple fragments', {
metadata: { requires: { topology: '!single', mongodb: '>=7.0.0' } },
test: async function () {
// Insert into _C_ a document at least 10mb in size, e.g. { "value": "q"*10*1024*1024 }
await collection.insertOne({ value: 'q'.repeat(10 * 1024 * 1024) });
// Create a change stream _S_ by calling watch on _C_ with pipeline
// [{ "$changeStreamSplitLargeEvent": {} }] and fullDocumentBeforeChange=required.
changeStream = collection.watch([{ $changeStreamSplitLargeEvent: {} }], {
fullDocumentBeforeChange: 'required'
});
await initIteratorMode(changeStream);
// Call updateOne on _C_ with an empty query and an update setting the field to a new
// large value, e.g. { "$set": { "value": "z"*10*1024*1024 } }.
await collection.updateOne({}, { $set: { value: 'z'.repeat(10 * 1024 * 1024) } });
// Collect two events from _S_.
const eventOne = await changeStream.next();
const eventTwo = await changeStream.next();
// Assert that the events collected have splitEvent fields { "fragment": 1, "of": 2 }
// and { "fragment": 2, "of": 2 }, in that order.
expect(eventOne.splitEvent).to.deep.equal({ fragment: 1, of: 2 });
expect(eventTwo.splitEvent).to.deep.equal({ fragment: 2, of: 2 });
}
});
});
});

0 comments on commit 1bad8a0

Please sign in to comment.