Skip to content

Commit

Permalink
Fix reusing closed connection
Browse files Browse the repository at this point in the history
Rolled close logic into Http2ConnectionProcessor::shutdown() so the method is only called once when shutdown.
  • Loading branch information
trowski committed May 8, 2022
1 parent 5232d3d commit eb41f59
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 65 deletions.
48 changes: 23 additions & 25 deletions src/Connection/ConnectionLimitingPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static function formatUri(Request $request): string

private ConnectionFactory $connectionFactory;

/** @var array<string, array<int, Future<Connection>>> */
/** @var array<string, \ArrayObject<int, Future<Connection>>> */
private array $connections = [];

/** @var Connection[] */
Expand Down Expand Up @@ -106,10 +106,6 @@ public function getStream(Request $request, Cancellation $cancellation): Stream

$uri = self::formatUri($request);

/**
* @var Stream $stream
* @psalm-suppress all
*/
[$connection, $stream] = $this->getStreamFor($uri, $request, $cancellation);

$connectionId = \spl_object_id($connection);
Expand Down Expand Up @@ -146,6 +142,9 @@ function () use ($connection, $uri): void {
);
}

/**
* @return array{Connection, Stream}
*/
private function getStreamFor(string $uri, Request $request, Cancellation $cancellation): array
{
$isHttps = $request->getUri()->getScheme() === 'https';
Expand Down Expand Up @@ -186,15 +185,15 @@ private function getStreamFor(string $uri, Request $request, Cancellation $cance
}

$deferred = new DeferredFuture;
$deferredFuture = $deferred->getFuture();
$futureFromDeferred = $deferred->getFuture();

$this->waiting[$uri][\spl_object_id($deferred)] = $deferred;

if ($this->isAdditionalConnectionAllowed($uri)) {
break;
}

$connection = $deferredFuture->await();
$connection = $futureFromDeferred->await();

\assert($connection instanceof Connection);

Expand All @@ -209,27 +208,23 @@ private function getStreamFor(string $uri, Request $request, Cancellation $cance

$this->totalConnectionAttempts++;

$connectionFuture = async(fn () => $this->connectionFactory->create($request, $cancellation));
$connectionFuture = async($this->connectionFactory->create(...), $request, $cancellation);

$promiseId = \spl_object_id($connectionFuture);
$this->connections[$uri] ??= [];
$this->connections[$uri][$promiseId] = $connectionFuture;
$futureId = \spl_object_id($connectionFuture);
$this->connections[$uri] ??= new \ArrayObject();
$this->connections[$uri][$futureId] = $connectionFuture;

EventLoop::queue(function () use (
&$deferred,
$connectionFuture,
$uri,
$promiseId,
$futureId,
$isHttps
): void {
try {
/** @var Connection $connection */
$connection = $connectionFuture->await();
} catch (\Throwable $exception) {
$this->dropConnection($uri, null, $promiseId);
if ($deferred !== null) {
$deferred->error($exception); // Fail DeferredFuture so Promise\first() below fails.
}
$this->dropConnection($uri, null, $futureId);
return;
}

Expand All @@ -242,35 +237,34 @@ private function getStreamFor(string $uri, Request $request, Cancellation $cance
$this->waitForPriorConnection[$uri] = \in_array('2', $connection->getProtocolVersions(), true);
}

$connection->onClose(function () use ($uri, $connectionId, $promiseId): void {
$connection->onClose(function () use ($uri, $connectionId, $futureId): void {
$this->openConnectionCount--;
$this->dropConnection($uri, $connectionId, $promiseId);
$this->dropConnection($uri, $connectionId, $futureId);
});
});

try {
$connection = Future\awaitFirst([$connectionFuture, $deferredFuture]);
// Await both new connection future and deferred to reuse an existing connection.
$connection = Future\awaitFirst([$connectionFuture, $futureFromDeferred]);
} catch (CompositeException $exception) {
[$exception] = $exception->getReasons(); // The first reason is why the connection failed.
throw $exception;
}

$this->removeWaiting($uri, \spl_object_id($deferred)); // DeferredFuture no longer needed for this request.
$deferred = null; // Null reference so connection promise handler does not double-resolve the DeferredFuture.

\assert($connection instanceof Connection);

$stream = $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
// Reused connection did not have an available stream for the given request.
// Potentially reused connection did not have an available stream for the given request.
$connection = $connectionFuture->await(); // Wait for new connection request instead.

$stream = $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
// Other requests used the new connection first, so we need to go around again.
// Using new Coroutine avoids a bug on PHP < 7.4, see #265
return $this->getStreamFor($uri, $request, $cancellation);
}
}
Expand All @@ -280,6 +274,10 @@ private function getStreamFor(string $uri, Request $request, Cancellation $cance

private function getStreamFromConnection(Connection $connection, Request $request): ?Stream
{
if ($connection->isClosed()) {
return null; // Connection closed during iteration over available connections.
}

if (!\array_intersect($request->getProtocolVersions(), $connection->getProtocolVersions())) {
return null; // Connection does not support any of the requested protocol versions.
}
Expand Down Expand Up @@ -340,9 +338,9 @@ private function removeWaiting(string $uri, int $deferredId): void
}
}

private function dropConnection(string $uri, ?int $connectionId, int $promiseId): void
private function dropConnection(string $uri, ?int $connectionId, int $futureId): void
{
unset($this->connections[$uri][$promiseId]);
unset($this->connections[$uri][$futureId]);
if ($connectionId !== null) {
unset($this->activeRequestCounts[$connectionId], $this->idleConnections[$connectionId]);
}
Expand Down
10 changes: 3 additions & 7 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ final class Http1Connection implements Connection
/** @var int Keep-Alive timeout from last response. */
private int $priorTimeout = self::MAX_KEEP_ALIVE_TIMEOUT;

/** @var callable[]|null */
/** @var list<\Closure():void>|null */
private ?array $onClose = [];

private float $timeoutGracePeriod;
Expand Down Expand Up @@ -162,7 +162,7 @@ private function free(): void
$this->onClose = null;

foreach ($onClose as $callback) {
EventLoop::defer(fn () => $callback($this));
EventLoop::queue($callback);
}
}
}
Expand Down Expand Up @@ -394,11 +394,7 @@ private function readResponse(
if ($this->socket === null) {
throw new SocketException('Socket closed prior to response completion');
}
} while (null !== $chunk = $timeout > 0
? async(fn () => $this->socket->read())
->await(new TimeoutCancellation($timeout))
: $this->socket->read()
);
} while (null !== $chunk = $this->socket->read($timeout > 0 ? new TimeoutCancellation($timeout) : null));
} catch (CancelledException $e) {
$this->close();
$originalCancellation->throwIfRequested();
Expand Down
63 changes: 30 additions & 33 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ final class Http2ConnectionProcessor implements Http2Processor

private int $idlePings = 0;

/** @var callable[]|null */
/** @var list<\Closure():void>|null */
private ?array $onClose = [];

private bool $hasTimeout = false;
Expand Down Expand Up @@ -144,10 +144,13 @@ public function initialize(): void
$future->await();
}

public function onClose(callable $onClose): void
/**
* @param \Closure():void $onClose
*/
public function onClose(\Closure $onClose): void
{
if ($this->onClose === null) {
EventLoop::defer(fn () => $onClose($this));
EventLoop::queue($onClose);
return;
}

Expand All @@ -156,21 +159,17 @@ public function onClose(callable $onClose): void

public function close(): void
{
$exception = new SocketException('Socket from \'' . $this->socket->getLocalAddress() . '\' to \'' .
$this->socket->getRemoteAddress() . '\' closed');

$this->shutdown($exception);

$this->socket->close();
if ($this->shutdown !== null) {
return;
}

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
$exception = new SocketException(\sprintf(
"Socket from '%s' to '%s' closed",
$this->socket->getLocalAddress()->toString(),
$this->socket->getRemoteAddress()->toString(),
));

foreach ($onClose as $callback) {
EventLoop::defer(fn () => $callback($this));
}
}
$this->shutdown($exception);
}

public function handlePong(string $data): void
Expand Down Expand Up @@ -508,7 +507,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool

if (!$this->streams[$streamId]->originalCancellation->isRequested()) {
$this->hasTimeout = true;
async(fn () => $this->ping())->ignore(); // async ping, if other requests occur, they wait for it
async($this->ping(...))->ignore(); // async ping, if other requests occur, they wait for it

$transferTimeout = $this->streams[$streamId]->request->getTransferTimeout();

Expand Down Expand Up @@ -773,8 +772,6 @@ public function handleConnectionException(Http2ConnectionException $exception):
$this->shutdown(
new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)
);

$this->close();
}

public function handleData(int $streamId, string $data): void
Expand Down Expand Up @@ -1209,11 +1206,9 @@ private function runReadFiber(): void
*/
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''),
$this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN
$this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN,
), 0);

$this->close();

return;
}

Expand All @@ -1235,10 +1230,8 @@ private function runReadFiber(): void
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection from '" . $this->socket->getLocalAddress() . "' to '" . $this->socket->getRemoteAddress() .
"' closed" . ($this->shutdown === null ? ' unexpectedly' : ''),
$this->shutdown ?? Http2Parser::INTERNAL_ERROR
$this->shutdown ?? Http2Parser::INTERNAL_ERROR,
));

$this->close();
} catch (\Throwable $exception) {
/**
* @psalm-suppress DeprecatedClass
Expand All @@ -1250,8 +1243,6 @@ private function runReadFiber(): void
Http2Parser::INTERNAL_ERROR,
$exception
));

$this->close();
}
}

Expand Down Expand Up @@ -1515,10 +1506,6 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo
if (!$this->streams && !$this->socket->isClosed()) {
$this->socket->unreference();
}

if (!$this->streams && $this->shutdown !== null) {
$this->close();
}
}

private function setupPingIfIdle(): void
Expand All @@ -1545,15 +1532,14 @@ private function setupPingIfIdle(): void
// Connection idle for 10 minutes
if ($this->idlePings >= 1) {
$this->shutdown(new HttpException('Too many pending pings'));
$this->close();
return;
}

if ($this->ping()) {
$this->setupPingIfIdle();
}
} catch (\Throwable $exception) {
$this->close();
$this->shutdown(new HttpException('Exception when handling pings', 0, $exception));
}
});

Expand Down Expand Up @@ -1635,6 +1621,17 @@ private function shutdown(HttpException $reason, ?int $lastId = null): void
$this->releaseStream($id, $reason);
}
}

$this->socket->close();

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;

foreach ($onClose as $callback) {
EventLoop::queue($callback);
}
}
}

/**
Expand Down

0 comments on commit eb41f59

Please sign in to comment.