Skip to content

Commit

Permalink
Fix reconnect if initial connect fails
Browse files Browse the repository at this point in the history
Fixes #70.
  • Loading branch information
kelunik committed Oct 12, 2021
1 parent 1b4b07f commit b8910b1
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 5 deletions.
13 changes: 11 additions & 2 deletions src/RemoteExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Amp\Socket;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;

final class RemoteExecutor implements QueryExecutor
{
Expand Down Expand Up @@ -87,8 +88,16 @@ private function connect(): Promise
}

return $this->connect = call(function () {
/** @var RespSocket $resp */
$resp = yield connect($this->config->withDatabase($this->database), $this->connector);
try {
/** @var RespSocket $resp */
$resp = yield connect($this->config->withDatabase($this->database), $this->connector);
} catch (\Throwable $connectException) {
yield delay(0); // ensure $this->connect is already assigned above in case of immediate failure

$this->connect = null;

throw $connectException;
}

asyncCall(function () use ($resp) {
try {
Expand Down
13 changes: 11 additions & 2 deletions src/Subscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Amp\Promise;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\delay;

final class Subscriber
{
Expand Down Expand Up @@ -89,8 +90,16 @@ private function connect(): Promise
}

return $this->connect = call(function () {
/** @var RespSocket $resp */
$resp = yield connect($this->config);
try {
/** @var RespSocket $resp */
$resp = yield connect($this->config);
} catch (\Throwable $connectException) {
yield delay(0); // ensure $this->connect is already assigned above in case of immediate failure

$this->connect = null;

throw $connectException;
}

asyncCall(function () use ($resp) {
try {
Expand Down
2 changes: 1 addition & 1 deletion test/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected function setUp(): void
}


final protected function createInstance(): Redis
protected function createInstance(): Redis
{
return new Redis(new RemoteExecutor(Config::fromUri($this->getUri())));
}
Expand Down
43 changes: 43 additions & 0 deletions test/RemoteExecutorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Amp\Redis;

use Amp\Redis\Trainer\ConnectorTrainer;

class RemoteExecutorTest extends IntegrationTest
{
private $connectorTrainer;

public function testAutomaticReconnectIfConnectFails(): \Generator
{
// Create new instance, because flushAll() is executed first otherwise
$this->redis = $this->createInstance();

$this->connectorTrainer->givenConnectFails();

try {
yield $this->redis->get('foobar');

$this->fail('Expected exception');
} catch (\Throwable $e) {
$this->assertStringStartsWith('Failed to connect to redis instance', $e->getMessage());
}

$this->connectorTrainer->givenConnectIsNotIntercepted();

// Should not throw the same error again but retry
$this->assertNull(yield $this->redis->get('foobar'));
}

protected function setUp(): void
{
$this->connectorTrainer = new ConnectorTrainer;

parent::setUp();
}

protected function createInstance(): Redis
{
return new Redis(new RemoteExecutor(Config::fromUri($this->getUri()), $this->connectorTrainer));
}
}
50 changes: 50 additions & 0 deletions test/Trainer/ConnectorTrainer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

namespace Amp\Redis\Trainer;

use Amp\CancellationToken;
use Amp\Failure;
use Amp\Promise;
use Amp\Socket\ConnectContext;
use Amp\Socket\Connector;
use Amp\Socket\SocketException;
use function Amp\Socket\connector;

final class ConnectorTrainer implements Connector
{
/** @var Connector */
private $connector;

public function __construct()
{
$this->givenConnectIsNotIntercepted();
}

public function givenConnector(Connector $connector): void
{
$this->connector = $connector;
}

public function givenConnectFails(): void
{
$this->givenConnector(new class implements Connector {
public function connect(
string $uri,
?ConnectContext $context = null,
?CancellationToken $token = null
): Promise {
return new Failure(new SocketException('Connect to ' . $uri . ' failed'));
}
});
}

public function givenConnectIsNotIntercepted(): void
{
$this->givenConnector(connector());
}

public function connect(string $uri, ?ConnectContext $context = null, ?CancellationToken $token = null): Promise
{
return $this->connector->connect($uri, $context, $token);
}
}

0 comments on commit b8910b1

Please sign in to comment.