diff --git a/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php b/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php index f8533c2..59fca5f 100644 --- a/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php +++ b/lib/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursor.php @@ -90,7 +90,8 @@ protected function _combine_with(StreamCursor $other): StreamCursor /** @var SizeLimitedStreamCursor $other */ return new self( StreamCursor::combine_all([$this->cursor, $other->cursor]), - max($this->count, $other->count) + 1 // Magic! You should be able to prove it in math. + // count is the absolute cumulative position; combining is an idempotent max. + max($this->count, $other->count) ); } diff --git a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php index bc82eb6..f1ccd22 100644 --- a/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php @@ -93,16 +93,17 @@ protected function _enumerate( $elements = $result->get_elements(); $selected_elements = array_slice($elements, 0, $balance); - $derived_elements = array_map(function (StreamElement $e) use ($cursor) { + $derived_elements = array_map(function (StreamElement $e, int $index) use ($cursor) { + // Tag each element with its absolute cumulative position across pages. $new_cursor = new SizeLimitedStreamCursor( $e->get_cursor(), - $cursor->get_current_size() + 1 + $cursor->get_current_size() + $index + 1 ); return new DerivedStreamElement($e, $this->get_identity(), $new_cursor); - }, $selected_elements); + }, $selected_elements, array_keys($selected_elements)); - // if this page size is greater than stream's limit, we don't want to keep paginating. - $is_exhausted = count($derived_elements) < $count || count($derived_elements) >= $this->limit; + // if this page size is greater than stream's limit, or inner is exhausted, we don't want to keep paginating. + $is_exhausted = $result->is_exhaustive() || count($derived_elements) >= $this->limit; return new StreamResult($is_exhausted, $derived_elements); } diff --git a/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php b/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php index c9acc04..006b3eb 100644 --- a/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php +++ b/tests/unit/Tumblr/StreamBuilder/StreamCursors/SizeLimitedStreamCursorTest.php @@ -37,10 +37,10 @@ class SizeLimitedStreamCursorTest extends \PHPUnit\Framework\TestCase public function combine_with_provider() { return [ - [5, 8, 9], - [0, 0, 1], - [0, 15, 16], - [15, 0, 16], + [5, 8, 8], + [0, 0, 0], + [0, 15, 15], + [15, 0, 15], ]; } diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php index 31ccf39..5b829ef 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php @@ -28,6 +28,7 @@ use Tumblr\StreamBuilder\Interfaces\Log; use Tumblr\StreamBuilder\StreamContext; use Tumblr\StreamBuilder\StreamCursors\MultiCursor; +use Tumblr\StreamBuilder\StreamCursors\SizeLimitedStreamCursor; use Tumblr\StreamBuilder\StreamElements\StreamElement; use Tumblr\StreamBuilder\StreamResult; use Tumblr\StreamBuilder\Streams\ConcatenatedStream; @@ -111,27 +112,39 @@ public function test_enumerate_exact_count() } /** - * Test enumerate when not enough elements are returned. + * Test enumerate is exhausted when the inner stream is exhausted, even below the limit. */ - public function test_enumerate_less_count() + public function test_enumerate_inner_exhausted() { /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); - $stream->expects($this->any())->method('_enumerate') - ->will($this->onConsecutiveCalls( - new StreamResult(true, [$el, $el, $el, $el]), - new StreamResult(true, []) - )); + $stream->expects($this->once()) + ->method('_enumerate') + ->willReturn(new StreamResult(true, [$el, $el, $el, $el])); $size_limited_stream = new SizeLimitedStream($stream, 5, 'amazing_size_limited_stream'); $result = $size_limited_stream->enumerate(5); $this->assertSame(4, $result->get_size()); $this->assertTrue($result->is_exhaustive()); + } - $result = $size_limited_stream->enumerate(4, $result->get_combined_cursor()); - $this->assertSame(0, $result->get_size()); - $this->assertTrue($result->is_exhaustive()); + /** + * Test enumerate is not exhausted when the inner stream is not exhausted, even on a short page. + */ + public function test_enumerate_inner_not_exhausted() + { + /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ + $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); + $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); + $stream->expects($this->once()) + ->method('_enumerate') + ->willReturn(new StreamResult(false, [$el, $el, $el])); + + $size_limited_stream = new SizeLimitedStream($stream, 10, 'amazing_size_limited_stream'); + $result = $size_limited_stream->enumerate(5); + $this->assertSame(3, $result->get_size()); + $this->assertFalse($result->is_exhaustive()); } /** @@ -188,6 +201,44 @@ public function testEnumerateWithSmallBalance(): void $size_limited_stream->enumerate(20); } + /** + * Paginating across many pages serves exactly `limit` elements in total, even when each page's + * combined cursor is re-combined with the request cursor (as ActionContext::getNextCursor does). + */ + public function testEnumerateReachesLimitAcrossPages(): void + { + $limit = 20; + $page_size = 3; + + /** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */ + $stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock(); + $el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock(); + $stream->expects($this->exactly(7)) + ->method('_enumerate') + ->willReturn(new StreamResult(false, array_fill(0, $page_size, $el))); + + $size_limited_stream = new SizeLimitedStream($stream, $limit, 'amazing_size_limited_stream'); + + $cursor = null; + $cumulative = 0; + $page_sizes = []; + while ($cumulative < $limit) { + $result = $size_limited_stream->enumerate($page_size, $cursor); + $page_sizes[] = $result->get_size(); + $cumulative += $result->get_size(); + + $combined = $result->get_combined_cursor(); + $this->assertInstanceOf(SizeLimitedStreamCursor::class, $combined); + // Mirror ActionContext::getNextCursor: re-combine the page cursor with the request cursor. + $cursor = $combined->combine_with($cursor); + $this->assertSame($cumulative, $cursor->get_current_size()); + } + + // Full pages serve `page_size`; the last page is truncated to the remaining budget. + $this->assertSame([3, 3, 3, 3, 3, 3, 2], $page_sizes); + $this->assertSame($limit, $cumulative); + } + /** * Test to_template */