Skip to content

Commit

Permalink
Fix code style, expose connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Aug 27, 2015
1 parent d5ec78c commit d565a98
Show file tree
Hide file tree
Showing 17 changed files with 432 additions and 395 deletions.
10 changes: 5 additions & 5 deletions bench/callback.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
class BenchCase {
private $callable;

public function __construct (callable $callable) {
public function __construct(callable $callable) {
$this->callable = $callable;
}

public function bench () {
public function bench() {
$time = PHP_INT_MAX;

for ($x = 0; $x < 10; $x++) {
Expand All @@ -26,21 +26,21 @@ public function bench () {
return $time;
}

protected function onData ($data) {
protected function onData($data) {
// empty on purpose!
}
}

class AnonymousFunctionCase extends BenchCase {
public function __construct () {
public function __construct() {
parent::__construct(function ($data) {
$this->onData($data);
});
}
}

class ReflectionCase extends BenchCase {
public function __construct () {
public function __construct() {
$reflection = new ReflectionClass(self::class);
$closure = $reflection->getMethod("onData")->getClosure($this);

Expand Down
8 changes: 4 additions & 4 deletions bench/parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
class BenchCase {
private $parser;

public function __construct (RespParser $parser) {
public function __construct(RespParser $parser) {
$this->parser = $parser;
}

public function bench () {
public function bench() {
$time = PHP_INT_MAX;

for ($x = 0; $x < 10; $x++) {
Expand All @@ -32,14 +32,14 @@ public function bench () {
return $time;
}

protected function onData ($data) {
protected function onData($data) {
// empty on purpose!
}
}

class AnonymousFunctionCase extends BenchCase {
public function __construct() {
parent::__construct(new RespParser(function($data) {
parent::__construct(new RespParser(function ($data) {
$this->onData($data);
}));
}
Expand Down
21 changes: 13 additions & 8 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@

use Amp\Deferred;
use Amp\Promise;
use function Amp\promises;
use Amp\Promisor;
use Amp\Reactor;
use DomainException;
use Exception;
use function Amp\all;
use function Amp\pipe;
use function Amp\promises;

class Client extends Redis {
/** @var Promisor[] */
Expand All @@ -24,9 +23,9 @@ class Client extends Redis {

/**
* @param string $uri
* @param array $options
* @param array $options
*/
public function __construct ($uri, array $options = []) {
public function __construct($uri, array $options = []) {
$this->applyOptions($options);
$this->promisors = [];

Expand Down Expand Up @@ -55,6 +54,7 @@ public function __construct ($uri, array $options = []) {
$this->connection->addEventHandler("connect", function () {
// SELECT must be called for every new connection if another database than 0 is used
array_unshift($this->promisors, new Deferred);

return "*2\r\n$6\r\rSELECT\r\n$" . strlen($this->database) . "\r\n{$this->database}\r\n";
});
}
Expand All @@ -63,12 +63,13 @@ public function __construct ($uri, array $options = []) {
$this->connection->addEventHandler("connect", function () {
// AUTH must be before any other command, so we unshift it here
array_unshift($this->promisors, new Deferred);

return "*2\r\n$4\r\rAUTH\r\n$" . strlen($this->password) . "\r\n{$this->password}\r\n";
});
}
}

private function applyOptions (array $options) {
private function applyOptions(array $options) {
$this->password = isset($options["password"]) ? $options["password"] : null;

if (!is_string($this->password) && !is_null($this->password)) {
Expand All @@ -91,14 +92,14 @@ private function applyOptions (array $options) {
/**
* @return Transaction
*/
public function transaction () {
public function transaction() {
return new Transaction($this);
}

/**
* @return Promise
*/
public function close () {
public function close() {
/** @var Promise $promise */
$promise = all(promises($this->promisors));
$promise->when(function () {
Expand All @@ -113,7 +114,7 @@ public function close () {
* @param callable $transform
* @return Promise
*/
protected function send (array $args, callable $transform = null) {
protected function send(array $args, callable $transform = null) {
$promisor = new Deferred;
$this->promisors[] = $promisor;
$this->connection->send($args);
Expand All @@ -122,4 +123,8 @@ protected function send (array $args, callable $transform = null) {
? pipe($promisor->promise(), $transform)
: $promisor->promise();
}

public function getConnectionState() {
return $this->connection->getState();
}
}
114 changes: 67 additions & 47 deletions lib/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@
namespace Amp\Redis;

use Amp\Deferred;
use function Amp\enable;
use Amp\Promise;
use Amp\Promisor;
use Amp\Success;
use DomainException;
use Exception;
use function Amp\cancel;
use function Amp\disable;
use function Amp\enable;
use function Amp\onReadable;
use function Amp\onWritable;
use function Amp\pipe;
use function Amp\Socket\connect;

class Connection {
const STATE_DISCONNECTED = 0;
const STATE_CONNECTING = 1;
const STATE_CONNECTED = 2;

/** @var Promisor */
private $connectPromisor;
/** @var RespParser */
Expand All @@ -39,10 +43,13 @@ class Connection {
/** @var array */
private $handlers;

/** @var int */
private $state;

/**
* @param string $uri
*/
public function __construct ($uri) {
public function __construct($uri) {
if (!is_string($uri)) {
throw new DomainException(sprintf(
"URI must be string, %s given",
Expand All @@ -57,12 +64,13 @@ public function __construct ($uri) {
$this->uri = $uri;
$this->outputBufferLength = 0;
$this->outputBuffer = "";
$this->state = self::STATE_DISCONNECTED;

$this->handlers = [
"connect" => [],
"response" => [],
"error" => [],
"close" => []
"close" => [],
];

$this->parser = new RespParser(function ($response) {
Expand All @@ -72,7 +80,42 @@ public function __construct ($uri) {
});
}

private function connect () {
public function addEventHandler($event, callable $callback) {
$events = (array) $event;

foreach ($events as $event) {
if (!isset($this->handlers[$event])) {
throw new DomainException("Unknown event: " . $event);
}

$this->handlers[$event][] = $callback;
}
}

/**
* @param array $strings
* @return Promise
*/
public function send(array $strings) {
return pipe($this->connect(), function () use ($strings) {
$payload = "";

foreach ($strings as $string) {
$payload .= "$" . strlen($string) . "\r\n{$string}\r\n";
}

$payload = "*" . count($strings) . "\r\n{$payload}";

$this->outputBuffer .= $payload;
$this->outputBufferLength += strlen($payload);

if ($this->writeWatcher !== null) {
enable($this->writeWatcher);
}
});
}

private function connect() {
// If we're in the process of connecting already return that same promise
if ($this->connectPromisor) {
return $this->connectPromisor->promise();
Expand All @@ -83,18 +126,21 @@ private function connect () {
return new Success($this);
}

$this->state = self::STATE_CONNECTING;
$this->connectPromisor = new Deferred;
$socketPromise = connect($this->uri, ["timeout" => 1000]);

$onWrite = function ($watcherId) {
if ($this->outputBufferLength === 0) {
disable($watcherId);

return;
}

$bytes = fwrite($this->socket, $this->outputBuffer);

if ($bytes === 0) {
$this->state = self::STATE_DISCONNECTED;
$this->onError(new ConnectException("Connection went away (write)", $code = 1));
} else {
$this->outputBuffer = (string) substr($this->outputBuffer, $bytes);
Expand All @@ -107,13 +153,15 @@ private function connect () {
$this->connectPromisor = null;

if ($error) {
$this->state = self::STATE_DISCONNECTED;
$connectPromisor->fail(new ConnectException(
"Connection attempt failed", $code = 0, $error
));

return;
}

$this->state = self::STATE_CONNECTED;
$this->socket = $socket;

foreach ($this->handlers["connect"] as $handler) {
Expand All @@ -131,6 +179,7 @@ private function connect () {
if ($read != "") {
$this->parser->append($read);
} elseif (!is_resource($this->socket) || @feof($this->socket)) {
$this->state = self::STATE_DISCONNECTED;
$this->onError(new ConnectException("Connection went away (read)", $code = 2));
}
});
Expand All @@ -142,7 +191,15 @@ private function connect () {
return $this->connectPromisor->promise();
}

private function closeSocket () {
private function onError(Exception $exception) {
foreach ($this->handlers["error"] as $handler) {
$handler($exception);
}

$this->closeSocket();
}

private function closeSocket() {
cancel($this->readWatcher);
cancel($this->writeWatcher);

Expand All @@ -160,56 +217,19 @@ private function closeSocket () {
foreach ($this->handlers["close"] as $handler) {
$handler();
}
}

public function addEventHandler ($event, callable $callback) {
$events = (array) $event;

foreach ($events as $event) {
if (!isset($this->handlers[$event])) {
throw new DomainException("Unknown event: " . $event);
}

$this->handlers[$event][] = $callback;
}
$this->state = self::STATE_DISCONNECTED;
}

/**
* @param array $strings
* @return Promise
*/
public function send (array $strings) {
return pipe($this->connect(), function () use ($strings) {
$payload = "";

foreach ($strings as $string) {
$payload .= "$" . strlen($string) . "\r\n{$string}\r\n";
}

$payload = "*" . count($strings) . "\r\n{$payload}";

$this->outputBuffer .= $payload;
$this->outputBufferLength += strlen($payload);

if ($this->writeWatcher !== null) {
enable($this->writeWatcher);
}
});
}

public function close () {
public function close() {
$this->closeSocket();
}

private function onError (Exception $exception) {
foreach ($this->handlers["error"] as $handler) {
$handler($exception);
}

$this->closeSocket();
public function getState() {
return $this->state;
}

public function __destruct () {
public function __destruct() {
$this->closeSocket();
}
}
Loading

0 comments on commit d565a98

Please sign in to comment.