Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ export class ChangeStream<
return this.cursor?.resumeToken;
}

/** Returns the currently buffered documents length of the underlying cursor. */
bufferedCount(): number {
return this.cursor?.bufferedCount() ?? 0;
}
Comment on lines +709 to +711
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a new public surface on ChangeStream. Please add test coverage (unit or integration) that validates it reports the underlying cursor’s buffered document count (and returns 0 when nothing is buffered), similar to the existing AbstractCursor bufferedCount tests.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typesafe Seems worth doing, ideally capturing your use case in some minor form would make sure the driver doesn't regress, I'm thinking specifically about your expectation of what tryNext does when bufferedCount is zero or non-zero.

This file is probably a good spot: test/integration/change-streams/change_stream.test.ts

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbbeeken Good idea, done!


/** Check if there is any document still available in the Change Stream */
async hasNext(): Promise<boolean> {
this._setIsIterator();
Expand Down
76 changes: 76 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,82 @@ describe('Change Streams', function () {
);
});
});

describe('#bufferedCount()', function () {
it(
'should return 0 before starting to consume the change stream (empty collection)',
{ requires: { topology: 'replicaset' } },
async function () {
changeStream = collection.watch([]);
expect(changeStream.bufferedCount()).to.equal(0);
await changeStream.close();
}
);

it(
'should return 0 before starting to consume the change stream (non-empty collection)',
{ requires: { topology: 'replicaset' } },
async function () {
// existing documents
await collection.insertOne({ a: 1 });
await collection.insertOne({ a: 2 });

changeStream = collection.watch([]);

// docs inserted after the change stream was created
await collection.insertOne({ a: 3 });
await collection.insertOne({ a: 4 });

expect(changeStream.bufferedCount()).to.equal(0);
await changeStream.close();
}
);

it('should return the underlying cursor buffered document count', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 });
await collection.insertOne({ a: 2 });

expect(changeStream.bufferedCount()).to.equal(0);

// hasNext will trigger a batch fetch, buffering the documents
const hasNext = await changeStream.hasNext();
expect(hasNext).to.be.true;

expect(changeStream.bufferedCount()).to.equal(2);
}
});

it(`decreases as buffered documents are consumed`, {
metadata: { requires: { topology: 'replicaset' } },
async test() {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
await collection.insertOne({ a: 1 });
await collection.insertOne({ a: 2 });

expect(changeStream.bufferedCount()).to.equal(0);
// `next` triggers a batch fetch, buffering the documents
// and then consumes one document from that buffer
await changeStream.next();
expect(changeStream.bufferedCount()).to.equal(1);
await changeStream.next();
expect(changeStream.bufferedCount()).to.equal(0);

await collection.insertOne({ a: 1 });
await collection.insertOne({ a: 2 });

// `tryNext` also triggers a batch fetch
await changeStream.tryNext();
expect(changeStream.bufferedCount()).to.equal(1);
await changeStream.tryNext();
expect(changeStream.bufferedCount()).to.equal(0);
}
});
});
});

describe('startAfter', function () {
Expand Down