From 968bfb31547bdca72f5f199b43486817565f688f Mon Sep 17 00:00:00 2001 From: Lesian Lengare Date: Tue, 26 May 2026 12:38:09 -0400 Subject: [PATCH 1/5] Fix SizeLimitedStream cursor over-counting elements across pages --- .../StreamCursors/SizeLimitedStreamCursor.php | 3 +- .../Streams/SizeLimitedStream.php | 7 ++-- .../SizeLimitedStreamCursorTest.php | 8 ++-- .../Streams/SizeLimitedStreamTest.php | 42 +++++++++++++++++++ 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php b/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php index f8533c2..59fca5f 100644 --- a/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php +++ b/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php @@ -90,7 +90,8 @@ protected function _combine_with(StreamCursor $other): StreamCursor /** @var SizeLimitedStreamCursor $other */ return new self( StreamCursor::combine_all([$this->cursor, $other->cursor]), - max($this->count, $other->count) + 1 // Magic! You should be able to prove it in math. + // count is the absolute cumulative position; combining is an idempotent max. + max($this->count, $other->count) ); } diff --git a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php index bc82eb6..9db4f24 100644 --- a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php @@ -93,13 +93,14 @@ protected function _enumerate( $elements = $result->get_elements(); $selected_elements = array_slice($elements, 0, $balance); - $derived_elements = array_map(function (StreamElement $e) use ($cursor) { + $derived_elements = array_map(function (StreamElement $e, int $index) use ($cursor) { + // Tag each element with its absolute cumulative position across pages. $new_cursor = new SizeLimitedStreamCursor( $e->get_cursor(), - $cursor->get_current_size() + 1 + $cursor->get_current_size() + $index + 1 ); return new DerivedStreamElement($e, $this->get_identity(), $new_cursor); - }, $selected_elements); + }, $selected_elements, array_keys($selected_elements)); // if this page size is greater than stream's limit, we don't want to keep paginating. $is_exhausted = count($derived_elements) < $count || count($derived_elements) >= $this->limit; diff --git a/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php b/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php index c9acc04..006b3eb 100644 --- a/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php +++ b/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php @@ -37,10 +37,10 @@ class SizeLimitedStreamCursorTest extends \PHPUnit\Framework\TestCase public function combine_with_provider() { return [ - [5, 8, 9], - [0, 0, 1], - [0, 15, 16], - [15, 0, 16], + [5, 8, 8], + [0, 0, 0], + [0, 15, 15], + [15, 0, 15], ]; } diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php index 31ccf39..8e28060 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php @@ -28,6 +28,7 @@ use Tumblr\StreamBuilder\Interfaces\Log; use Tumblr\StreamBuilder\StreamContext; use Tumblr\StreamBuilder\StreamCursors\MultiCursor; +use Tumblr\StreamBuilder\StreamCursors\SizeLimitedStreamCursor; use Tumblr\StreamBuilder\StreamElements\StreamElement; use Tumblr\StreamBuilder\StreamResult; use Tumblr\StreamBuilder\Streams\ConcatenatedStream; @@ -188,6 +189,47 @@ public function testEnumerateWithSmallBalance(): void $size_limited_stream->enumerate(20); } + /** + * Paginating in equal pages serves exactly `limit` elements across the session, even when each + * page's combined cursor is re-combined with the request cursor (as ActionContext::getNextCursor + * does). The cursor's count tracks the true cumulative size after every page. + */ + public function testEnumerateReachesLimitAcrossPages(): void + { + $limit = 20; + $page_size = 5; + + /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ + $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); + $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); + // Inner stream always has more than enough; queried once per non-empty page. + $stream->expects($this->exactly($limit / $page_size)) + ->method('_enumerate') + ->willReturn(new StreamResult(false, array_fill(0, $page_size, $el))); + + $size_limited_stream = new SizeLimitedStream($stream, $limit, 'amazing_size_limited_stream'); + + $cursor = null; + $cumulative = 0; + while ($cumulative < $limit) { + $result = $size_limited_stream->enumerate($page_size, $cursor); + $this->assertSame($page_size, $result->get_size()); + $cumulative += $result->get_size(); + + $combined = $result->get_combined_cursor(); + $this->assertInstanceOf(SizeLimitedStreamCursor::class, $combined); + // Mirror ActionContext::getNextCursor: re-combine the page cursor with the request cursor. + $cursor = $combined->combine_with($cursor); + $this->assertSame($cumulative, $cursor->get_current_size()); + } + + // Limit reached: next page is empty and exhaustive, and the inner stream is not queried again. + $result = $size_limited_stream->enumerate($page_size, $cursor); + $this->assertSame(0, $result->get_size()); + $this->assertTrue($result->is_exhaustive()); + $this->assertSame($limit, $cumulative); + } + /** * Test to_template */ From d1de523283609577a5c67ff6c9413baeca6b3d61 Mon Sep 17 00:00:00 2001 From: Lesian Lengare Date: Tue, 26 May 2026 13:12:17 -0400 Subject: [PATCH 2/5] Update tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php Co-authored-by: Lucila Stancato --- .../unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php index 8e28060..705467a 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php @@ -203,7 +203,7 @@ public function testEnumerateReachesLimitAcrossPages(): void $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); // Inner stream always has more than enough; queried once per non-empty page. - $stream->expects($this->exactly($limit / $page_size)) + $stream->expects($this->exactly(4)) ->method('_enumerate') ->willReturn(new StreamResult(false, array_fill(0, $page_size, $el))); From 567542e1306ab7a038498e013ee36efd9a8e6b93 Mon Sep 17 00:00:00 2001 From: Lesian Lengare Date: Tue, 26 May 2026 13:27:49 -0400 Subject: [PATCH 3/5] Cover truncated final page in pagination test --- .../Streams/SizeLimitedStreamTest.php | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php index 705467a..448e4e2 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php @@ -190,20 +190,18 @@ public function testEnumerateWithSmallBalance(): void } /** - * Paginating in equal pages serves exactly `limit` elements across the session, even when each - * page's combined cursor is re-combined with the request cursor (as ActionContext::getNextCursor - * does). The cursor's count tracks the true cumulative size after every page. + * Paginating across many pages serves exactly `limit` elements in total, even when each page's + * combined cursor is re-combined with the request cursor (as ActionContext::getNextCursor does). */ public function testEnumerateReachesLimitAcrossPages(): void { $limit = 20; - $page_size = 5; + $page_size = 3; /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); - // Inner stream always has more than enough; queried once per non-empty page. - $stream->expects($this->exactly(4)) + $stream->expects($this->exactly(7)) ->method('_enumerate') ->willReturn(new StreamResult(false, array_fill(0, $page_size, $el))); @@ -211,9 +209,10 @@ public function testEnumerateReachesLimitAcrossPages(): void $cursor = null; $cumulative = 0; + $page_sizes = []; while ($cumulative < $limit) { $result = $size_limited_stream->enumerate($page_size, $cursor); - $this->assertSame($page_size, $result->get_size()); + $page_sizes[] = $result->get_size(); $cumulative += $result->get_size(); $combined = $result->get_combined_cursor(); @@ -223,10 +222,8 @@ public function testEnumerateReachesLimitAcrossPages(): void $this->assertSame($cumulative, $cursor->get_current_size()); } - // Limit reached: next page is empty and exhaustive, and the inner stream is not queried again. - $result = $size_limited_stream->enumerate($page_size, $cursor); - $this->assertSame(0, $result->get_size()); - $this->assertTrue($result->is_exhaustive()); + // Full pages serve `page_size`; the last page is truncated to the remaining budget. + $this->assertSame([3, 3, 3, 3, 3, 3, 2], $page_sizes); $this->assertSame($limit, $cumulative); } From fe469c7bf740ec9b40ddeda81bab051b5d975c6e Mon Sep 17 00:00:00 2001 From: Lesian Lengare Date: Tue, 26 May 2026 14:50:37 -0400 Subject: [PATCH 4/5] Follow inner stream exhaustion in SizeLimitedStream SizeLimitedStream inferred exhaustion from count(derived) < requested count, which prematurely ended pagination whenever an inner stream returned a short-but-not-exhausted page (e.g. a FilteredStream that spent its retries). Trust the inner stream's is_exhaustive() signal instead; the limit is still enforced by the >= limit clause and the balance check. Replace test_enumerate_less_count (which only covered the deleted heuristic) with test_enumerate_inner_exhausted / _inner_not_exhausted. --- .../Streams/SizeLimitedStream.php | 3 +- .../Streams/SizeLimitedStreamTest.php | 32 +++++++++++++------ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php index 9db4f24..5d30694 100644 --- a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php @@ -102,8 +102,7 @@ protected function _enumerate( return new DerivedStreamElement($e, $this->get_identity(), $new_cursor); }, $selected_elements, array_keys($selected_elements)); - // if this page size is greater than stream's limit, we don't want to keep paginating. - $is_exhausted = count($derived_elements) < $count || count($derived_elements) >= $this->limit; + $is_exhausted = $result->is_exhaustive() || count($derived_elements) >= $this->limit; return new StreamResult($is_exhausted, $derived_elements); } diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php index 448e4e2..5b829ef 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php @@ -112,27 +112,39 @@ public function test_enumerate_exact_count() } /** - * Test enumerate when not enough elements are returned. + * Test enumerate is exhausted when the inner stream is exhausted, even below the limit. */ - public function test_enumerate_less_count() + public function test_enumerate_inner_exhausted() { /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); - $stream->expects($this->any())->method('_enumerate') - ->will($this->onConsecutiveCalls( - new StreamResult(true, [$el, $el, $el, $el]), - new StreamResult(true, []) - )); + $stream->expects($this->once()) + ->method('_enumerate') + ->willReturn(new StreamResult(true, [$el, $el, $el, $el])); $size_limited_stream = new SizeLimitedStream($stream, 5, 'amazing_size_limited_stream'); $result = $size_limited_stream->enumerate(5); $this->assertSame(4, $result->get_size()); $this->assertTrue($result->is_exhaustive()); + } - $result = $size_limited_stream->enumerate(4, $result->get_combined_cursor()); - $this->assertSame(0, $result->get_size()); - $this->assertTrue($result->is_exhaustive()); + /** + * Test enumerate is not exhausted when the inner stream is not exhausted, even on a short page. + */ + public function test_enumerate_inner_not_exhausted() + { + /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ + $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); + $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); + $stream->expects($this->once()) + ->method('_enumerate') + ->willReturn(new StreamResult(false, [$el, $el, $el])); + + $size_limited_stream = new SizeLimitedStream($stream, 10, 'amazing_size_limited_stream'); + $result = $size_limited_stream->enumerate(5); + $this->assertSame(3, $result->get_size()); + $this->assertFalse($result->is_exhaustive()); } /** From fc320e9a19470fee9704fe415a39f943a93c5c5f Mon Sep 17 00:00:00 2001 From: Lesian Lengare Date: Tue, 26 May 2026 14:53:28 -0400 Subject: [PATCH 5/5] Restore comment on exhaustion condition --- lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php index 5d30694..f1ccd22 100644 --- a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php @@ -102,6 +102,7 @@ protected function _enumerate( return new DerivedStreamElement($e, $this->get_identity(), $new_cursor); }, $selected_elements, array_keys($selected_elements)); + // if this page size is greater than stream's limit, or inner is exhausted, we don't want to keep paginating. $is_exhausted = $result->is_exhaustive() || count($derived_elements) >= $this->limit; return new StreamResult($is_exhausted, $derived_elements); }