Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ protected function _combine_with(StreamCursor $other): StreamCursor
/** @var SizeLimitedStreamCursor $other */
return new self(
StreamCursor::combine_all([$this->cursor, $other->cursor]),
max($this->count, $other->count) + 1 // Magic! You should be able to prove it in math.
// count is the absolute cumulative position; combining is an idempotent max.
max($this->count, $other->count)
);
}

Expand Down
11 changes: 6 additions & 5 deletions lib/Tumblr/StreamBuilder/Streams/SizeLimitedStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,17 @@ protected function _enumerate(
$elements = $result->get_elements();
$selected_elements = array_slice($elements, 0, $balance);

$derived_elements = array_map(function (StreamElement $e) use ($cursor) {
$derived_elements = array_map(function (StreamElement $e, int $index) use ($cursor) {
// Tag each element with its absolute cumulative position across pages.
$new_cursor = new SizeLimitedStreamCursor(
$e->get_cursor(),
$cursor->get_current_size() + 1
$cursor->get_current_size() + $index + 1
);
return new DerivedStreamElement($e, $this->get_identity(), $new_cursor);
}, $selected_elements);
}, $selected_elements, array_keys($selected_elements));

// if this page size is greater than stream's limit, we don't want to keep paginating.
$is_exhausted = count($derived_elements) < $count || count($derived_elements) >= $this->limit;
// if this page size is greater than stream's limit, or inner is exhausted, we don't want to keep paginating.
$is_exhausted = $result->is_exhaustive() || count($derived_elements) >= $this->limit;
return new StreamResult($is_exhausted, $derived_elements);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class SizeLimitedStreamCursorTest extends \PHPUnit\Framework\TestCase
public function combine_with_provider()
{
return [
[5, 8, 9],
[0, 0, 1],
[0, 15, 16],
[15, 0, 16],
[5, 8, 8],
[0, 0, 0],
[0, 15, 15],
[15, 0, 15],
];
}

Expand Down
71 changes: 61 additions & 10 deletions tests/unit/Tumblr/StreamBuilder/Streams/SizeLimitedStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use Tumblr\StreamBuilder\Interfaces\Log;
use Tumblr\StreamBuilder\StreamContext;
use Tumblr\StreamBuilder\StreamCursors\MultiCursor;
use Tumblr\StreamBuilder\StreamCursors\SizeLimitedStreamCursor;
use Tumblr\StreamBuilder\StreamElements\StreamElement;
use Tumblr\StreamBuilder\StreamResult;
use Tumblr\StreamBuilder\Streams\ConcatenatedStream;
Expand Down Expand Up @@ -111,27 +112,39 @@ public function test_enumerate_exact_count()
}

/**
* Test enumerate when not enough elements are returned.
* Test enumerate is exhausted when the inner stream is exhausted, even below the limit.
*/
public function test_enumerate_less_count()
public function test_enumerate_inner_exhausted()
{
/** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */
$stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock();
$el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock();
$stream->expects($this->any())->method('_enumerate')
->will($this->onConsecutiveCalls(
new StreamResult(true, [$el, $el, $el, $el]),
new StreamResult(true, [])
));
$stream->expects($this->once())
->method('_enumerate')
->willReturn(new StreamResult(true, [$el, $el, $el, $el]));

$size_limited_stream = new SizeLimitedStream($stream, 5, 'amazing_size_limited_stream');
$result = $size_limited_stream->enumerate(5);
$this->assertSame(4, $result->get_size());
$this->assertTrue($result->is_exhaustive());
}

$result = $size_limited_stream->enumerate(4, $result->get_combined_cursor());
$this->assertSame(0, $result->get_size());
$this->assertTrue($result->is_exhaustive());
/**
* Test enumerate is not exhausted when the inner stream is not exhausted, even on a short page.
*/
public function test_enumerate_inner_not_exhausted()
{
/** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */
$stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock();
$el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock();
$stream->expects($this->once())
->method('_enumerate')
->willReturn(new StreamResult(false, [$el, $el, $el]));

$size_limited_stream = new SizeLimitedStream($stream, 10, 'amazing_size_limited_stream');
$result = $size_limited_stream->enumerate(5);
$this->assertSame(3, $result->get_size());
$this->assertFalse($result->is_exhaustive());
}

/**
Expand Down Expand Up @@ -188,6 +201,44 @@ public function testEnumerateWithSmallBalance(): void
$size_limited_stream->enumerate(20);
}

/**
* Paginating across many pages serves exactly `limit` elements in total, even when each page's
* combined cursor is re-combined with the request cursor (as ActionContext::getNextCursor does).
*/
public function testEnumerateReachesLimitAcrossPages(): void
{
$limit = 20;
$page_size = 3;

/** @var Stream|\PHPUnit\Framework\MockObject\MockObject $stream */
$stream = $this->getMockBuilder(Stream::class)->disableOriginalConstructor()->getMock();
$el = $this->getMockBuilder(StreamElement::class)->disableOriginalConstructor()->getMock();
$stream->expects($this->exactly(7))
->method('_enumerate')
->willReturn(new StreamResult(false, array_fill(0, $page_size, $el)));
Comment thread
lucila marked this conversation as resolved.

$size_limited_stream = new SizeLimitedStream($stream, $limit, 'amazing_size_limited_stream');

$cursor = null;
$cumulative = 0;
$page_sizes = [];
while ($cumulative < $limit) {
$result = $size_limited_stream->enumerate($page_size, $cursor);
$page_sizes[] = $result->get_size();
$cumulative += $result->get_size();

$combined = $result->get_combined_cursor();
$this->assertInstanceOf(SizeLimitedStreamCursor::class, $combined);
// Mirror ActionContext::getNextCursor: re-combine the page cursor with the request cursor.
$cursor = $combined->combine_with($cursor);
$this->assertSame($cumulative, $cursor->get_current_size());
}

// Full pages serve `page_size`; the last page is truncated to the remaining budget.
$this->assertSame([3, 3, 3, 3, 3, 3, 2], $page_sizes);
$this->assertSame($limit, $cumulative);
}

/**
* Test to_template
*/
Expand Down
Loading