Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for commands with multiple responses #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions lib/redis_client/redis_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,16 @@ class _RedisConnection extends RedisConnection {
Future subscribe(List<String> channels, Function onMessage){

Completer subscribeCompleter = new Completer();
List<String> args = new List <String>()
..add("SUBSCRIBE")
..addAll(channels);

send(args).receive().then((val){
_subscriptionHandler = onMessage;
subscribeCompleter.complete();
List<List<int>> args = new List <List<int>>()
..add("SUBSCRIBE".codeUnits)
..addAll(channels.map((String channel) => UTF8.encode(channel)));

Future.wait(rawSendMultipleResponses(args, channels.length)
.map((Receiver r) => r.receiveMultiBulk())).then((_) {
_subscriptionHandler = onMessage;
subscribeCompleter.complete();
});

return subscribeCompleter.future;
}

Expand Down Expand Up @@ -322,17 +324,17 @@ class _RedisConnection extends RedisConnection {
*/
Receiver rawSend(List<List<int>> cmdWithArgs) {
var response = new Receiver();


if( logger.level <= Level.FINEST){
logger.finest("Sending message ${UTF8.decode(cmdWithArgs[0])}");
}

//we call _socket.add only once and we try to avoid string concat
List<int> buffer = new List<int>();
buffer.addAll("*".codeUnits);
buffer.addAll(cmdWithArgs.length.toString().codeUnits);
buffer.addAll(_lineEnd);
buffer.addAll(_lineEnd);
for( var line in cmdWithArgs) {
buffer.addAll("\$".codeUnits);
buffer.addAll(line.length.toString().codeUnits);
Expand All @@ -346,6 +348,28 @@ class _RedisConnection extends RedisConnection {
return response;
}

/**
* This is the same as [rawSend] except it expects more than 1 response.
*
*
* Eg.:
* Future.wait(
* rawSendMultipleResponses(["SUBSCRIBE".codeUnits, "CHAN1".codeUnits, "CHAN2".codeUnits], 2)
* .map((Receiver r) => r.receiveMultiBulkStrings());
*/
List<Receiver> rawSendMultipleResponses(List<List<int>> cmdWithArgs, int expectedResponseCount) {
List<Receiver> receivers = new List<Receiver>()
..add(rawSend(cmdWithArgs));

Receiver receiver;
for (int i=1; i<expectedResponseCount; i++) {
receiver = new Receiver();
receivers.add(receiver);
_pendingResponses.add(receiver);
}

return receivers;
}
}


Expand Down
55 changes: 53 additions & 2 deletions test/redis_pubsub_tests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,59 @@ main() {
);

});



test("Subscribe multiple channels", () {
List<List<String>> messages = new List<List<String>>();
async(
client.subscribe(["chan0", "chan1"], (Receiver message) {
message.receiveMultiBulkStrings().then((List<String> message) {
messages.add(message);
if (messages.length == 2) {
expect(messages[0][1], equals('chan1'));
expect(messages[0][2], equals('Hello'));
expect(messages[1][1], equals('chan0'));
expect(messages[1][2], equals('World'));
}
});
})
.then((_) {
client1.publish("chan1", "Hello");
client1.publish("chan0", "World");
})
);
});

test("partial unsubscribe", () {

List<List<String>> responses = new List<List<String>>();
async(
client.subscribe(["chan0", "chan1", "chan2"], (Receiver message) {
message.receiveMultiBulkStrings().then((List<String> response) {
responses.add(response);
if (responses.length == 1) {
client.unsubscribe(["chan0", "chan1"]).then((_) {
client1.publish("chan0", "Hello chan0");
client1.publish("chan1", "Hello chan1");
client1.publish("chan2", "Hello chan2");
});
}
if (responses.length == 4) {
expect(responses[0], equals(["chan0", "Hello chan0"]));
expect(responses[1], equals(["chan1", "Hello chan1"]));
expect(responses[2], equals(["chan2", "Hello chan2"]));
expect(responses[3], equals(["chan2", "Hello chan2"]));
}
});

}).then((_) {
client1.publish("chan0","Hello chan0");
client1.publish("chan1","Hello chan1");
client1.publish("chan2","Hello chan2");
})
);

});

});

}