Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pipeline::buffer() #21

Merged
merged 1 commit into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/Internal/ConcurrentFlatMapIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ final class ConcurrentFlatMapIterator implements ConcurrentIterator
* @param ConcurrentIterator<T> $iterator
* @param \Closure(T, int):iterable<R> $flatMap
*/
public function __construct(ConcurrentIterator $iterator, int $concurrency, bool $ordered, \Closure $flatMap)
{
$queue = new QueueState;
public function __construct(
ConcurrentIterator $iterator,
int $bufferSize,
int $concurrency,
bool $ordered,
\Closure $flatMap,
) {
$queue = new QueueState($bufferSize);
$this->iterator = new ConcurrentQueueIterator($queue);
$order = $ordered ? new Sequence : null;

Expand Down
4 changes: 2 additions & 2 deletions src/Internal/ConcurrentIterableIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class ConcurrentIterableIterator implements ConcurrentIterator
/**
* @param iterable<T> $iterable
*/
public function __construct(iterable $iterable)
public function __construct(iterable $iterable, int $bufferSize = 0)
{
if (\is_array($iterable)) {
$this->iterator = new ConcurrentArrayIterator($iterable);
Expand All @@ -36,7 +36,7 @@ public function __construct(iterable $iterable)
$iterable = $iterable->getIterator();
}

$queue = new QueueState();
$queue = new QueueState($bufferSize);
$this->iterator = new ConcurrentQueueIterator($queue);

async(static function () use ($queue, $iterable): void {
Expand Down
11 changes: 9 additions & 2 deletions src/Internal/FlatMapOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static function getStopMarker(): object
* @param \Closure(T, int):iterable<R> $flatMap
*/
public function __construct(
private readonly int $bufferSize,
private readonly int $concurrency,
private readonly bool $ordered,
private readonly \Closure $flatMap
Expand All @@ -45,9 +46,15 @@ public function __invoke(ConcurrentIterator $source): ConcurrentIterator
yield $item;
}
}
})());
})(), $this->bufferSize);
}

return new ConcurrentFlatMapIterator($source, $this->concurrency, $this->ordered, $this->flatMap);
return new ConcurrentFlatMapIterator(
$source,
$this->bufferSize,
$this->concurrency,
$this->ordered,
$this->flatMap,
);
}
}
24 changes: 23 additions & 1 deletion src/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Amp\Pipeline\Internal\ConcurrentIterableIterator;
use Amp\Pipeline\Internal\ConcurrentMergedIterator;
use Amp\Pipeline\Internal\FlatMapOperation;
use Amp\Pipeline\Internal\IntermediateOperation;
use Amp\Pipeline\Internal\Sequence;
use Amp\Pipeline\Internal\SortOperation;
use function Amp\delay;
Expand Down Expand Up @@ -123,10 +124,15 @@ private static function mapToConcurrentIterators(array $iterables): array
return \array_map(static fn (iterable $pipeline) => self::fromIterable($pipeline)->getIterator(), $iterables);
}

/** @var non-negative-int */
private int $bufferSize = 0;

/** @var positive-int */
private int $concurrency = 1;

private bool $ordered = true;

/** @var list<IntermediateOperation> */
private array $intermediateOperations = [];

private bool $used = false;
Expand All @@ -146,6 +152,17 @@ public function __destruct()
}
}

public function buffer(int $bufferSize): self
{
if ($bufferSize < 0) {
throw new \ValueError('Argument #1 ($bufferSize) must be non-negative, got ' . $bufferSize);
}

$this->bufferSize = $bufferSize;

return $this;
}

public function concurrent(int $concurrency): self
{
if ($concurrency < 1) {
Expand Down Expand Up @@ -345,7 +362,12 @@ public function flatMap(\Closure $flatMap): self
throw new \Error('Pipeline consumption has already been started');
}

$this->intermediateOperations[] = new FlatMapOperation($this->concurrency, $this->ordered, $flatMap);
$this->intermediateOperations[] = new FlatMapOperation(
$this->bufferSize,
$this->concurrency,
$this->ordered,
$flatMap,
);

/** @var self<R> */
return $this;
Expand Down
Loading