diff --git a/src/Internal/ConcurrentFlatMapIterator.php b/src/Internal/ConcurrentFlatMapIterator.php index 8f28fdf..ef4f705 100644 --- a/src/Internal/ConcurrentFlatMapIterator.php +++ b/src/Internal/ConcurrentFlatMapIterator.php @@ -24,9 +24,14 @@ final class ConcurrentFlatMapIterator implements ConcurrentIterator * @param ConcurrentIterator $iterator * @param \Closure(T, int):iterable $flatMap */ - public function __construct(ConcurrentIterator $iterator, int $concurrency, bool $ordered, \Closure $flatMap) - { - $queue = new QueueState; + public function __construct( + ConcurrentIterator $iterator, + int $bufferSize, + int $concurrency, + bool $ordered, + \Closure $flatMap, + ) { + $queue = new QueueState($bufferSize); $this->iterator = new ConcurrentQueueIterator($queue); $order = $ordered ? new Sequence : null; diff --git a/src/Internal/ConcurrentIterableIterator.php b/src/Internal/ConcurrentIterableIterator.php index 4065b6e..9d7fac7 100644 --- a/src/Internal/ConcurrentIterableIterator.php +++ b/src/Internal/ConcurrentIterableIterator.php @@ -20,7 +20,7 @@ final class ConcurrentIterableIterator implements ConcurrentIterator /** * @param iterable $iterable */ - public function __construct(iterable $iterable) + public function __construct(iterable $iterable, int $bufferSize = 0) { if (\is_array($iterable)) { $this->iterator = new ConcurrentArrayIterator($iterable); @@ -36,7 +36,7 @@ public function __construct(iterable $iterable) $iterable = $iterable->getIterator(); } - $queue = new QueueState(); + $queue = new QueueState($bufferSize); $this->iterator = new ConcurrentQueueIterator($queue); async(static function () use ($queue, $iterable): void { diff --git a/src/Internal/FlatMapOperation.php b/src/Internal/FlatMapOperation.php index 71fe2b2..f9ca7e6 100644 --- a/src/Internal/FlatMapOperation.php +++ b/src/Internal/FlatMapOperation.php @@ -23,6 +23,7 @@ public static function getStopMarker(): object * @param \Closure(T, int):iterable $flatMap */ public function __construct( + private readonly int $bufferSize, private readonly int $concurrency, private readonly bool $ordered, private readonly \Closure $flatMap @@ -45,9 +46,15 @@ public function __invoke(ConcurrentIterator $source): ConcurrentIterator yield $item; } } - })()); + })(), $this->bufferSize); } - return new ConcurrentFlatMapIterator($source, $this->concurrency, $this->ordered, $this->flatMap); + return new ConcurrentFlatMapIterator( + $source, + $this->bufferSize, + $this->concurrency, + $this->ordered, + $this->flatMap, + ); } } diff --git a/src/Pipeline.php b/src/Pipeline.php index 3d23ba5..504cdee 100644 --- a/src/Pipeline.php +++ b/src/Pipeline.php @@ -8,6 +8,7 @@ use Amp\Pipeline\Internal\ConcurrentClosureIterator; use Amp\Pipeline\Internal\ConcurrentIterableIterator; use Amp\Pipeline\Internal\FlatMapOperation; +use Amp\Pipeline\Internal\IntermediateOperation; use Amp\Pipeline\Internal\Sequence; use Amp\Pipeline\Internal\SortOperation; use function Amp\delay; @@ -96,10 +97,15 @@ public static function concat(array $pipelines): self )); } + /** @var non-negative-int */ + private int $bufferSize = 0; + + /** @var positive-int */ private int $concurrency = 1; private bool $ordered = true; + /** @var list */ private array $intermediateOperations = []; private bool $used = false; @@ -119,6 +125,17 @@ public function __destruct() } } + public function buffer(int $bufferSize): self + { + if ($bufferSize < 0) { + throw new \ValueError('Argument #1 ($bufferSize) must be non-negative, got ' . $bufferSize); + } + + $this->bufferSize = $bufferSize; + + return $this; + } + public function concurrent(int $concurrency): self { if ($concurrency < 1) { @@ -318,7 +335,12 @@ public function flatMap(\Closure $flatMap): self throw new \Error('Pipeline consumption has already been started'); } - $this->intermediateOperations[] = new FlatMapOperation($this->concurrency, $this->ordered, $flatMap); + $this->intermediateOperations[] = new FlatMapOperation( + $this->bufferSize, + $this->concurrency, + $this->ordered, + $flatMap, + ); /** @var self */ return $this;