Skip to content

Commit

Permalink
Resolve back-pressure when queue is completed and iterator subsequent…
Browse files Browse the repository at this point in the history
…ly disposed
  • Loading branch information
trowski committed Jan 19, 2025
1 parent 66c0956 commit 97cbf28
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
19 changes: 10 additions & 9 deletions src/Internal/QueueState.php
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,18 @@ public function hasPending(): bool
*/
public function dispose(): void
{
try {
if ($this->completed || $this->disposed) {
return; // Pipeline already completed or failed.
}
if ($this->disposed) {
return;
}

$this->finalize(new DisposedException, true);
} finally {
if ($this->disposed) {
$this->triggerDisposal();
}
if ($this->completed) {
$this->disposed = true;
$this->exception = new DisposedException;
$this->triggerDisposal();
return;
}

$this->finalize(new DisposedException, true);
}

/**
Expand Down
26 changes: 26 additions & 0 deletions test/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,32 @@ public function testFailWithDisposedException(): void
$this->queue->complete();
}

public function testBackpressureRelievedAfterCompletionThenDisposal(): void
{
$emit1 = $this->queue->pushAsync(1);
$emit2 = $this->queue->pushAsync(2);

$pipeline = $this->queue->pipe()->getIterator();
$consume = async(function () use ($pipeline) {
$pipeline->continue();
return $pipeline->getValue();
});

$this->queue->complete();

self::assertSame(1, $consume->await());
$emit1->await();

$pipeline->dispose();
self::assertTrue($this->queue->isDisposed());

try {
$emit2->await();
$this->fail(\sprintf('Expected instance of %s to be thrown', DisposedException::class));
} catch (DisposedException) {
}
}

public function testTraversable(): void
{
EventLoop::queue(function (): void {
Expand Down

0 comments on commit 97cbf28

Please sign in to comment.