Skip to content

Commit

Permalink
Add ability to reference TimeoutCancellation and SignalCancellation e…
Browse files Browse the repository at this point in the history
…vent-loop callbacks

Closes #421.
  • Loading branch information
trowski committed Jan 25, 2025
1 parent a0d5549 commit 722f46b
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 20 deletions.
8 changes: 4 additions & 4 deletions src/Interval.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final class Interval
* @param float $interval Invoke the function every $interval seconds.
* @param \Closure():void $closure Use {@see weakClosure()} to avoid a circular reference if storing this object
* as a property of another object.
* @param bool $reference If false, unreference the underlying watcher.
* @param bool $reference If false, unreference the underlying event-loop callback.
*/
public function __construct(float $interval, \Closure $closure, bool $reference = true)
{
Expand All @@ -33,15 +33,15 @@ public function __destruct()
}

/**
* @return bool True if the internal watcher is referenced.
* @return bool True if the internal event-loop callback is referenced.
*/
public function isReferenced(): bool
{
return EventLoop::isReferenced($this->callbackId);
}

/**
* References the internal watcher in the event loop, keeping the loop running while the repeat loop is enabled.
* References the internal event-loop callback, keeping the loop running while the repeat loop is enabled.
*
* @return $this
*/
Expand All @@ -53,7 +53,7 @@ public function reference(): self
}

/**
* Unreferences the internal watcher in the event loop, allowing the loop to stop while the repeat loop is enabled.
* Unreferences the internal event-loop callback, allowing the loop to stop while the repeat loop is enabled.
*
* @return $this
*/
Expand Down
72 changes: 62 additions & 10 deletions src/SignalCancellation.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ final class SignalCancellation implements Cancellation
use ForbidSerialization;

/** @var list<string> */
private readonly array $watchers;
private readonly array $callbackIds;

private readonly Cancellation $cancellation;

/**
* @param int|int[] $signals Signal number or array of signal numbers.
* @param string $message Message for SignalException. Default is "Operation cancelled by signal".
* @param bool $reference If false, unreference the underlying event-loop callback.
*/
public function __construct(int|array $signals, string $message = "Operation cancelled by signal")
{
public function __construct(
int|array $signals,
string $message = "Operation cancelled by signal",
private bool $reference = false,
) {
if (\is_int($signals)) {
$signals = [$signals];
}
Expand All @@ -32,11 +36,11 @@ public function __construct(int|array $signals, string $message = "Operation can
$trace = null; // Defined in case assertions are disabled.
\assert((bool) ($trace = \debug_backtrace(0)));

$watchers = [];
$callbackIds = [];

$callback = static function () use (&$watchers, $source, $message, $trace): void {
foreach ($watchers as $watcher) {
EventLoop::cancel($watcher);
$callback = static function () use (&$callbackIds, $source, $message, $trace): void {
foreach ($callbackIds as $callbackId) {
EventLoop::cancel($callbackId);
}

if ($trace) {
Expand All @@ -49,18 +53,22 @@ public function __construct(int|array $signals, string $message = "Operation can
};

foreach ($signals as $signal) {
$watchers[] = EventLoop::unreference(EventLoop::onSignal($signal, $callback));
$callbackIds[] = $callbackId = EventLoop::onSignal($signal, $callback);

if (!$reference) {
EventLoop::unreference($callbackId);
}
}

$this->watchers = $watchers;
$this->callbackIds = $callbackIds;
}

/**
* Cancels the delay watcher.
*/
public function __destruct()
{
foreach ($this->watchers as $watcher) {
foreach ($this->callbackIds as $watcher) {
EventLoop::cancel($watcher);
}
}
Expand All @@ -84,4 +92,48 @@ public function throwIfRequested(): void
{
$this->cancellation->throwIfRequested();
}

/**
* @return bool True if the internal event-loop callback is referenced, false if not or if the cancellation has
* occurred.
*/
public function isReferenced(): bool
{
return $this->reference && !$this->cancellation->isRequested();
}

/**
* References the internal event-loop callback, keeping the loop running while the timeout is applicable.
* If the timeout has expired (cancellation has been requested), this method is a no-op.
*
* @return $this
*/
public function reference(): self
{
if (!$this->cancellation->isRequested()) {
foreach ($this->callbackIds as $callbackId) {
EventLoop::reference($callbackId);
}
}

$this->reference = true;

return $this;
}

/**
* Unreferences the internal event-loop callback, allowing the loop to stop while the repeat loop is enabled.
*
* @return $this
*/
public function unreference(): self
{
foreach ($this->callbackIds as $callbackId) {
EventLoop::unreference($callbackId);
}

$this->reference = false;

return $this;
}
}
58 changes: 52 additions & 6 deletions src/TimeoutCancellation.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,26 @@ final class TimeoutCancellation implements Cancellation
use ForbidCloning;
use ForbidSerialization;

private readonly string $watcher;
private readonly string $callbackId;

private readonly Cancellation $cancellation;

/**
* @param float $timeout Seconds until cancellation is requested.
* @param string $message Message for TimeoutException. Default is "Operation timed out".
* @param bool $reference If false, unreference the underlying event-loop callback.
*/
public function __construct(float $timeout, string $message = "Operation timed out")
{
public function __construct(
float $timeout,
string $message = "Operation timed out",
bool $reference = false,
) {
$this->cancellation = $source = new Internal\Cancellable;

$trace = null; // Defined in case assertions are disabled.
\assert((bool) ($trace = \debug_backtrace(0)));

$this->watcher = EventLoop::delay($timeout, static function () use ($source, $message, $trace): void {
$this->callbackId = EventLoop::delay($timeout, static function () use ($source, $message, $trace): void {
if ($trace) {
$message .= \sprintf("\r\n%s was created here: %s", self::class, Internal\formatStacktrace($trace));
} else {
Expand All @@ -37,15 +41,17 @@ public function __construct(float $timeout, string $message = "Operation timed o
$source->cancel(new TimeoutException($message));
});

EventLoop::unreference($this->watcher);
if (!$reference) {
EventLoop::unreference($this->callbackId);
}
}

/**
* Cancels the delay watcher.
*/
public function __destruct()
{
EventLoop::cancel($this->watcher);
EventLoop::cancel($this->callbackId);
}

public function subscribe(\Closure $callback): string
Expand All @@ -67,4 +73,44 @@ public function throwIfRequested(): void
{
$this->cancellation->throwIfRequested();
}

/**
* @return bool True if the internal event-loop callback is referenced, false if not or if the cancellation has
* occurred.
*/
public function isReferenced(): bool
{
if ($this->cancellation->isRequested()) {
return false;
}

return EventLoop::isReferenced($this->callbackId);
}

/**
* References the internal event-loop callback, keeping the loop running while the timeout is applicable.
* If the timeout has expired (cancellation has been requested), this method is a no-op.
*
* @return $this
*/
public function reference(): self
{
if (!$this->cancellation->isRequested()) {
EventLoop::reference($this->callbackId);
}

return $this;
}

/**
* Unreferences the internal event-loop callback, allowing the loop to stop while the repeat loop is enabled.
*
* @return $this
*/
public function unreference(): self
{
EventLoop::unreference($this->callbackId);

return $this;
}
}
17 changes: 17 additions & 0 deletions test/Cancellation/SignalCancellationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Cancellation;

use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\SignalCancellation;
use Amp\SignalException;
use Amp\TestCase;
Expand Down Expand Up @@ -52,4 +53,20 @@ public function testWatcherCancellation(): void
unset($cancellation);
self::assertSame($identifiers, EventLoop::getIdentifiers());
}

public function testWatcherUnreference(): void
{
$this->expectException(CancelledException::class);

$cancellation = new SignalCancellation(\SIGUSR1, reference: true);

self::assertTrue($cancellation->isReferenced());

EventLoop::defer(function (): void {
\posix_kill(\getmypid(), \SIGUSR1);
});

$deferred = new DeferredFuture();
$deferred->getFuture()->await($cancellation);
}
}
13 changes: 13 additions & 0 deletions test/Cancellation/TimeoutCancellationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Cancellation;

use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\TestCase;
use Amp\TimeoutCancellation;
use Amp\TimeoutException;
Expand Down Expand Up @@ -42,4 +43,16 @@ public function testWatcherCancellation(): void
unset($cancellation);
self::assertSame($identifiers, EventLoop::getIdentifiers());
}

public function testWatcherUnreference(): void
{
$this->expectException(CancelledException::class);

$cancellation = new TimeoutCancellation(0.001, reference: true);

self::assertTrue($cancellation->isReferenced());

$deferred = new DeferredFuture();
$deferred->getFuture()->await($cancellation);
}
}

0 comments on commit 722f46b

Please sign in to comment.