1.0.0 Beta 3
Pre-release
Pre-release
Pipeline
has been changed from an interface to a final class.ConcurrentIterator
acts as the interface replacementPipeline::pipe()
has been removed in favor of operator methods directly onPipeline
, such asmap()
andfilter()
Emitter
has been renamed toQueue
yield()
has been renamed topush()
emit()
has been renamed topushAsync()
- All functions in the
Amp\Pipeline
have been removed.fromIterable()
is available asPipeline::fromIterable()
concat()
is nowPipeline::concat()
- Most operators are available directly on
Pipeline
- Added
Pipeline::generate()
that invokes a closure to create each pipeline value.
Example of using Pipeline
for concurrency:
use Amp\Pipeline\Pipeline;
use function Amp\delay;
$pipeline = Pipeline::fromIterable(function (): \Generator {
for ($i = 0; $i < 100; ++$i) {
yield $i;
}
});
$results = $pipeline->concurrent(10)
->tap(fn () => delay(\random_int(1, 10) / 10)) // Delay for 0.1 to 1 seconds, simulating I/O.
->map(fn (int $input): int => $input * 10)
->filter(fn (int $input) => $input % 3 === 0); // Filter only values divisible by 3.
foreach ($results as $value) {
echo $value, "\n";
}