diff --git a/src/Internal/QueueState.php b/src/Internal/QueueState.php index a7aed36..61c2d03 100644 --- a/src/Internal/QueueState.php +++ b/src/Internal/QueueState.php @@ -177,17 +177,18 @@ public function hasPending(): bool */ public function dispose(): void { - try { - if ($this->completed || $this->disposed) { - return; // Pipeline already completed or failed. - } + if ($this->disposed) { + return; + } - $this->finalize(new DisposedException, true); - } finally { - if ($this->disposed) { - $this->triggerDisposal(); - } + if ($this->completed) { + $this->disposed = true; + $this->exception = new DisposedException; + $this->triggerDisposal(); + return; } + + $this->finalize(new DisposedException, true); } /** diff --git a/test/QueueTest.php b/test/QueueTest.php index f158e6f..8ebe47d 100644 --- a/test/QueueTest.php +++ b/test/QueueTest.php @@ -328,6 +328,32 @@ public function testFailWithDisposedException(): void $this->queue->complete(); } + public function testBackpressureRelievedAfterCompletionThenDisposal(): void + { + $emit1 = $this->queue->pushAsync(1); + $emit2 = $this->queue->pushAsync(2); + + $pipeline = $this->queue->pipe()->getIterator(); + $consume = async(function () use ($pipeline) { + $pipeline->continue(); + return $pipeline->getValue(); + }); + + $this->queue->complete(); + + self::assertSame(1, $consume->await()); + $emit1->await(); + + $pipeline->dispose(); + self::assertTrue($this->queue->isDisposed()); + + try { + $emit2->await(); + $this->fail(\sprintf('Expected instance of %s to be thrown', DisposedException::class)); + } catch (DisposedException) { + } + } + public function testTraversable(): void { EventLoop::queue(function (): void {