diff --git a/src/change_stream.ts b/src/change_stream.ts index 22330e8595..f99a18c501 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -705,6 +705,11 @@ export class ChangeStream< return this.cursor?.resumeToken; } + /** Returns the currently buffered documents length of the underlying cursor. */ + bufferedCount(): number { + return this.cursor?.bufferedCount() ?? 0; + } + /** Check if there is any document still available in the Change Stream */ async hasNext(): Promise { this._setIsIterator(); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index fbdc5e4561..aab490f854 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1167,6 +1167,82 @@ describe('Change Streams', function () { ); }); }); + + describe('#bufferedCount()', function () { + it( + 'should return 0 before starting to consume the change stream (empty collection)', + { requires: { topology: 'replicaset' } }, + async function () { + changeStream = collection.watch([]); + expect(changeStream.bufferedCount()).to.equal(0); + await changeStream.close(); + } + ); + + it( + 'should return 0 before starting to consume the change stream (non-empty collection)', + { requires: { topology: 'replicaset' } }, + async function () { + // existing documents + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + changeStream = collection.watch([]); + + // docs inserted after the change stream was created + await collection.insertOne({ a: 3 }); + await collection.insertOne({ a: 4 }); + + expect(changeStream.bufferedCount()).to.equal(0); + await changeStream.close(); + } + ); + + it('should return the underlying cursor buffered document count', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + expect(changeStream.bufferedCount()).to.equal(0); + + // hasNext will trigger a batch fetch, buffering the documents + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(changeStream.bufferedCount()).to.equal(2); + } + }); + + it(`decreases as buffered documents are consumed`, { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + expect(changeStream.bufferedCount()).to.equal(0); + // `next` triggers a batch fetch, buffering the documents + // and then consumes one document from that buffer + await changeStream.next(); + expect(changeStream.bufferedCount()).to.equal(1); + await changeStream.next(); + expect(changeStream.bufferedCount()).to.equal(0); + + await collection.insertOne({ a: 1 }); + await collection.insertOne({ a: 2 }); + + // `tryNext` also triggers a batch fetch + await changeStream.tryNext(); + expect(changeStream.bufferedCount()).to.equal(1); + await changeStream.tryNext(); + expect(changeStream.bufferedCount()).to.equal(0); + } + }); + }); }); describe('startAfter', function () {