Skip to content

Commit

Permalink
Fix subscription calling unsubscribe if only iterator is retained
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed May 9, 2022
1 parent e396442 commit ae98358
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
14 changes: 7 additions & 7 deletions src/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ final class Subscription implements \IteratorAggregate

public function __construct(
private readonly ConcurrentIterator $iterator,
\Closure $unsubscribe
\Closure $unsubscribe,
) {
$this->unsubscribe = $unsubscribe;
}

public function __destruct()
{
if ($this->unsubscribe) {
$unsubscribe = $this->unsubscribe;
EventLoop::queue($unsubscribe);
}
$this->unsubscribe();
}

/**
* Using a Generator to maintain a reference to $this.
*/
public function getIterator(): \Traversable
{
return $this->iterator;
yield from $this->iterator;
}

public function unsubscribe(): void
{
if ($this->unsubscribe) {
($this->unsubscribe)();
EventLoop::queue($this->unsubscribe);
$this->unsubscribe = null;
}

Expand Down
43 changes: 30 additions & 13 deletions test/PubSubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@

class PubSubTest extends IntegrationTest
{
private Subscriber $subscriber;

protected function setUp(): void
{
parent::setUp();
$this->setTimeout(1);

$this->subscriber = new Subscriber(RedisConfig::fromUri($this->getUri()));
}

private function getNextValue(ConcurrentIterator $concurrentIterator): string
Expand All @@ -27,9 +31,7 @@ private function getNextValue(ConcurrentIterator $concurrentIterator): string

public function testBasic(): void
{
$subscriber = new Subscriber(RedisConfig::fromUri($this->getUri()));

$subscription = $subscriber->subscribe('foo');
$subscription = $this->subscriber->subscribe('foo');
$iterator = new ConcurrentIterableIterator($subscription);

// Use async() to not block, because we publish in the same coroutine
Expand All @@ -50,9 +52,7 @@ public function testBasic(): void

public function testDoubleCancel(): void
{
$subscriber = new Subscriber(RedisConfig::fromUri($this->getUri()));

$subscription = $subscriber->subscribe('foo');
$subscription = $this->subscriber->subscribe('foo');
$subscription->unsubscribe();
$subscription->unsubscribe();

Expand All @@ -63,11 +63,9 @@ public function testDoubleCancel(): void

public function testMulti(): void
{
$subscriber = new Subscriber(RedisConfig::fromUri($this->getUri()));

$subscription1 = $subscriber->subscribe('foo');
$subscription1 = $this->subscriber->subscribe('foo');
$iterator1 = new ConcurrentIterableIterator($subscription1);
$subscription2 = $subscriber->subscribe('foo');
$subscription2 = $this->subscriber->subscribe('foo');
$iterator2 = new ConcurrentIterableIterator($subscription2);

delay(0.1); // Enter event loop so subscriber has time to connect.
Expand All @@ -90,9 +88,7 @@ public function testMulti(): void

public function testStream(): void
{
$subscriber = new Subscriber(RedisConfig::fromUri($this->getUri()));

$subscription = $subscriber->subscribe('foo');
$subscription = $this->subscriber->subscribe('foo');
$iterator = new ConcurrentIterableIterator($subscription);

$producer = EventLoop::repeat(0.1, function (): void {
Expand All @@ -116,4 +112,25 @@ public function testStream(): void

delay(0.1); // Ensure cancel request has completed.
}

public function testIteratorReferenceOnlyDoesNotUnsubscribe(): void
{
$iterator = $this->subscriber->subscribe('foo')->getIterator();

$producer = EventLoop::repeat(0.1, function (): void {
$this->redis->publish('foo', 'bar');
});

try {
foreach ($iterator as $value) {
self::assertSame('bar', $value);
// We only need to consume a single item to confirm the subscription was not automatically cancelled.
return;
}

self::fail('Subscription cancelled by destructor');
} finally {
EventLoop::cancel($producer);
}
}
}

0 comments on commit ae98358

Please sign in to comment.