diff --git a/lib/redis_client/redis_connection.dart b/lib/redis_client/redis_connection.dart index 6bbdf72..1f9b8f4 100644 --- a/lib/redis_client/redis_connection.dart +++ b/lib/redis_client/redis_connection.dart @@ -217,14 +217,16 @@ class _RedisConnection extends RedisConnection { Future subscribe(List channels, Function onMessage){ Completer subscribeCompleter = new Completer(); - List args = new List () - ..add("SUBSCRIBE") - ..addAll(channels); - - send(args).receive().then((val){ - _subscriptionHandler = onMessage; - subscribeCompleter.complete(); + List> args = new List >() + ..add("SUBSCRIBE".codeUnits) + ..addAll(channels.map((String channel) => UTF8.encode(channel))); + + Future.wait(rawSendMultipleResponses(args, channels.length) + .map((Receiver r) => r.receiveMultiBulk())).then((_) { + _subscriptionHandler = onMessage; + subscribeCompleter.complete(); }); + return subscribeCompleter.future; } @@ -322,17 +324,17 @@ class _RedisConnection extends RedisConnection { */ Receiver rawSend(List> cmdWithArgs) { var response = new Receiver(); - - + + if( logger.level <= Level.FINEST){ logger.finest("Sending message ${UTF8.decode(cmdWithArgs[0])}"); } - + //we call _socket.add only once and we try to avoid string concat List buffer = new List(); buffer.addAll("*".codeUnits); buffer.addAll(cmdWithArgs.length.toString().codeUnits); - buffer.addAll(_lineEnd); + buffer.addAll(_lineEnd); for( var line in cmdWithArgs) { buffer.addAll("\$".codeUnits); buffer.addAll(line.length.toString().codeUnits); @@ -346,6 +348,28 @@ class _RedisConnection extends RedisConnection { return response; } + /** + * This is the same as [rawSend] except it expects more than 1 response. + * + * + * Eg.: + * Future.wait( + * rawSendMultipleResponses(["SUBSCRIBE".codeUnits, "CHAN1".codeUnits, "CHAN2".codeUnits], 2) + * .map((Receiver r) => r.receiveMultiBulkStrings()); + */ + List rawSendMultipleResponses(List> cmdWithArgs, int expectedResponseCount) { + List receivers = new List() + ..add(rawSend(cmdWithArgs)); + + Receiver receiver; + for (int i=1; i> messages = new List>(); + async( + client.subscribe(["chan0", "chan1"], (Receiver message) { + message.receiveMultiBulkStrings().then((List message) { + messages.add(message); + if (messages.length == 2) { + expect(messages[0][1], equals('chan1')); + expect(messages[0][2], equals('Hello')); + expect(messages[1][1], equals('chan0')); + expect(messages[1][2], equals('World')); + } + }); + }) + .then((_) { + client1.publish("chan1", "Hello"); + client1.publish("chan0", "World"); + }) + ); + }); + + test("partial unsubscribe", () { + + List> responses = new List>(); + async( + client.subscribe(["chan0", "chan1", "chan2"], (Receiver message) { + message.receiveMultiBulkStrings().then((List response) { + responses.add(response); + if (responses.length == 1) { + client.unsubscribe(["chan0", "chan1"]).then((_) { + client1.publish("chan0", "Hello chan0"); + client1.publish("chan1", "Hello chan1"); + client1.publish("chan2", "Hello chan2"); + }); + } + if (responses.length == 4) { + expect(responses[0], equals(["chan0", "Hello chan0"])); + expect(responses[1], equals(["chan1", "Hello chan1"])); + expect(responses[2], equals(["chan2", "Hello chan2"])); + expect(responses[3], equals(["chan2", "Hello chan2"])); + } + }); + + }).then((_) { + client1.publish("chan0","Hello chan0"); + client1.publish("chan1","Hello chan1"); + client1.publish("chan2","Hello chan2"); + }) + ); + + }); + }); }