diff --git a/example/console/.gitignore b/example/console/.gitignore new file mode 100644 index 0000000..25c8fdb --- /dev/null +++ b/example/console/.gitignore @@ -0,0 +1,2 @@ +node_modules +package-lock.json \ No newline at end of file diff --git a/example/console/Dockerfile b/example/console/Dockerfile new file mode 100644 index 0000000..80998ca --- /dev/null +++ b/example/console/Dockerfile @@ -0,0 +1,5 @@ +FROM node:14.15.1-alpine3.12 +WORKDIR /app +COPY package.json . +RUN npm i --quiet +COPY sign-server.js ./ \ No newline at end of file diff --git a/example/console/config.json b/example/console/config.json new file mode 100644 index 0000000..6c3957e --- /dev/null +++ b/example/console/config.json @@ -0,0 +1,27 @@ +{ + "secret": "secret", + "api_key": "strong_secret_for_api_auth", + "admin": true, + "admin_password": "password", + "admin_secret": "strong_secret_key_to_sign_authorization_token", + "internal_port": "10001", + "max_channel_length": 500, + "connection_lifetime": 60, + "log_level": "debug", + "namespaces": [ + { + "name": "public", + "publish": true, + "anonymous": true, + "presence": true, + "join_leave": false + }, + { + "name": "user", + "anonymous": false, + "publish": true, + "presence": false, + "join_leave": false + } + ] +} \ No newline at end of file diff --git a/example/console/docker-compose.yml b/example/console/docker-compose.yml new file mode 100644 index 0000000..462c025 --- /dev/null +++ b/example/console/docker-compose.yml @@ -0,0 +1,22 @@ +version: "3" +services: + centrifugo: + image: centrifugo/centrifugo:v2.3.1 + container_name: centrifugo + restart: always + ports: + - 8000:8000 + volumes: + - ./:/centrifugo + command: centrifugo --config=config.json + sign_server: + container_name: sign_server + build: . + command: npm start + volumes: + - ./:/app + - /app/node_modules + ports: + - 5000:5000 + environment: + - PORT=5000 diff --git a/example/console/package.json b/example/console/package.json new file mode 100644 index 0000000..345fb9c --- /dev/null +++ b/example/console/package.json @@ -0,0 +1,17 @@ +{ + "name": "console", + "version": "1.0.0", + "description": "Before running example make sure you created `chat` namespace in Centrifugo configuration and allowed publishing into channel - i.e. using config like this:", + "main": "sign-server.js", + "scripts": { + "start": "node sign-server.js" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "body-parser": "^1.19.0", + "express": "^4.17.1", + "jsonwebtoken": "^8.5.1" + } +} \ No newline at end of file diff --git a/example/console/readme.md b/example/console/readme.md index 0486428..aa525e6 100644 --- a/example/console/readme.md +++ b/example/console/readme.md @@ -1,23 +1,23 @@ -Before running example make sure you created `chat` namespace in Centrifugo configuration and allowed publishing into channel - i.e. using config like this: +# centrifuge-dart console app -```json -{ - ... - "namespaces": [ - { - "name": "chat", - "anonymous": false, - "publish": true, - "join_leave": true, - "presence": true, - "presence_stats": true - } - ] -} -``` +You can run environment in two ways: -Also run Centrifugo in insecure client mode so it does not expect JWT token from client: +1. Docker (recommended) + ```bash + docker-compose up [-d] + ``` +2. Manually + ```bash + # first terminal session + npm i + npm start + + # second terminal session + ./centrifugo --config config.json --client_insecure + ``` + +To run console app itself: ```bash -./centrifugo --config config.json --client_insecure -``` +dart ./simple.dart +``` \ No newline at end of file diff --git a/example/console/sign-server.js b/example/console/sign-server.js new file mode 100644 index 0000000..d134a1c --- /dev/null +++ b/example/console/sign-server.js @@ -0,0 +1,26 @@ +const express = require('express') +const bodyParser = require('body-parser') +const jwt = require('jsonwebtoken') + +const app = express() +const PORT = process.env.PORT || 5000 + +app.use(bodyParser.json()) + +app.post('/auth', (req, res) => { + const channels = [] + + for (const channel of req.body.channels) { + const token = jwt.sign({ + client: req.body.client, + channel, + }, 'secret') + channels.push({ channel, token }) + } + + res.json({ channels }) +}) + +app.listen(PORT, () => { + console.log(`Sign server listening on port: ${ PORT }`) +}) \ No newline at end of file diff --git a/example/console/simple.dart b/example/console/simple.dart index 4cb2bf9..c8050fe 100644 --- a/example/console/simple.dart +++ b/example/console/simple.dart @@ -1,45 +1,68 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'package:http/http.dart' as http; import 'package:centrifuge/centrifuge.dart' as centrifuge; void main() async { final url = 'ws://localhost:8000/connection/websocket?format=protobuf'; - final channel = 'chat:index'; + final channel = 'public:test'; - final onEvent = (dynamic event) { + // Uncomment to subscribe to private channels + // final channel = r'$user:test'; + + // Uncomment to use batching + // final channels = [ + // r'$usert:test1', + // r'$user:test2', + // r'$user:test3', + // r'public:test1', + // ]; + + final onEvent = (String channel, dynamic event) { print('$channel> $event'); }; try { + final httpClient = http.Client(); final client = centrifuge.createClient( url, config: centrifuge.ClientConfig( - headers: <String, dynamic>{'user-id': 42, 'user-name': 'The Answer'}, - onPrivateSub: (centrifuge.PrivateSubEvent event) { - return Future.value('<SUBSCRIPTION JWT>'); - }), + onPrivateSub: (event) => + _auth(httpClient, event.clientID, event.channels), + ), ); - client.connectStream.listen(onEvent); - client.disconnectStream.listen(onEvent); + client.connectStream.listen((e) => onEvent('', e)); + client.disconnectStream.listen((e) => onEvent('', e)); // Uncomment to use example token based on secret key `secret`. - // client.setToken('eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0c3VpdGVfand0In0.hPmHsVqvtY88PvK4EmJlcdwNuKFuy3BGaF7dMaKdPlw'); - client.connect(); + // client.setToken( + // 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0c3VpdGVfand0In0.hPmHsVqvtY88PvK4EmJlcdwNuKFuy3BGaF7dMaKdPlw'); + await client.connect(); + // Uncomment to use batching + // client.startBatching(); + // client.startSubscribeBatching(); + // for (final channel in channels) { final subscription = client.getSubscription(channel); - subscription.publishStream.map((e) => utf8.decode(e.data)).listen(onEvent); - subscription.joinStream.listen(onEvent); - subscription.leaveStream.listen(onEvent); + subscription.publishStream + .map((e) => utf8.decode(e.data)) + .listen((e) => onEvent(channel, e)); + subscription.joinStream.listen((e) => onEvent(channel, e)); + subscription.leaveStream.listen((e) => onEvent(channel, e)); - subscription.subscribeSuccessStream.listen(onEvent); - subscription.subscribeErrorStream.listen(onEvent); - subscription.unsubscribeStream.listen(onEvent); + subscription.subscribeSuccessStream.listen((e) => onEvent(channel, e)); + subscription.subscribeErrorStream.listen((e) => onEvent(channel, e)); + subscription.unsubscribeStream.listen((e) => onEvent(channel, e)); subscription.subscribe(); + // Uncomment to use batching + // } + // client.stopSubscribeBatching(); + // client.stopBatching(); final handler = _handleUserInput(client, subscription); @@ -72,7 +95,7 @@ Function(String) _handleUserInput( print('RPC result: ' + utf8.decode(result.data)); break; case '#disconnect': - client.disconnect(); + await client.disconnect(); break; default: final output = jsonEncode({'input': message}); @@ -87,3 +110,20 @@ Function(String) _handleUserInput( return; }; } + +Future<centrifuge.PrivateSubSign> _auth( + http.Client httpClient, String clientID, List<String> channels) async { + final body = json.encode(<String, dynamic>{ + 'client': clientID, + 'channels': channels, + }); + final res = await httpClient.post( + 'http://localhost:5000/auth', + headers: <String, String>{ + 'content-type': 'application/json', + 'accept': 'application/json', + }, + body: body, + ); + return centrifuge.PrivateSubSign.fromRawJson(res.body); +} diff --git a/lib/centrifuge.dart b/lib/centrifuge.dart index deebc70..3787023 100644 --- a/lib/centrifuge.dart +++ b/lib/centrifuge.dart @@ -2,4 +2,4 @@ export 'src/client.dart' show Client, createClient; export 'src/client_config.dart'; export 'src/error.dart'; export 'src/events.dart'; -export 'src/subscription.dart' show Subscription; +export 'src/subscription.dart' show Subscription, PrivateSubSign; // TODO diff --git a/lib/src/client.dart b/lib/src/client.dart index 2286ef1..f9d49be 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -5,6 +5,7 @@ import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; import 'client_config.dart'; +import 'error.dart' as errors; import 'events.dart'; import 'proto/client.pb.dart'; import 'subscription.dart'; @@ -25,7 +26,7 @@ abstract class Client { /// Connect to the server. /// - void connect(); + Future<void> connect(); /// Set token for connection request. /// @@ -56,7 +57,22 @@ abstract class Client { /// Disconnect from the server. /// - void disconnect(); + Future<void> disconnect(); + + /// Start collecting messages without sending them to Centrifuge until stopBatching + /// method called + void startBatching(); + + /// Stop collecting messages and send them + void stopBatching(); + + /// Start collecting private channels to create bulk authentication + /// Call config.onPrivateSub when stopSubscribeBatching will be called + void startSubscribeBatching(); + + /// Call config.onPrivateSub with collected private channels + /// to ask if this client can subscribe on each channel + Future<void> stopSubscribeBatching(); /// Detect that the subscription already exists. /// @@ -82,6 +98,16 @@ class ClientImpl implements Client, GeneratedMessageSender { Transport _transport; String _token; + final _messages = <TransportMessage>[]; + bool _isBatching = false; + bool get isBatching => _isBatching; + + final _privateChannels = <String>{}; + Set<String> get privateChannels => _privateChannels; + + bool _isSubscribeBatching = false; + bool get isSubscribeBatching => _isSubscribeBatching; + final String _url; ClientConfig _config; @@ -105,9 +131,7 @@ class ClientImpl implements Client, GeneratedMessageSender { Stream<MessageEvent> get messageStream => _messageController.stream; @override - void connect() async { - return _connect(); - } + Future<void> connect() async => _connect(); bool get connected => _state == _ClientState.connected; @@ -118,19 +142,34 @@ class ClientImpl implements Client, GeneratedMessageSender { void setConnectData(List<int> connectData) => _connectData = connectData; @override - Future publish(String channel, List<int> data) async { + Future<PublishResult> publish(String channel, List<int> data) async { final request = PublishRequest() ..channel = channel ..data = data; - await _transport.sendMessage(request, PublishResult()); + final completer = Completer<PublishResult>(); + await addMessage( + TransportMessage( + req: request, + res: PublishResult(), + onResult: completer.complete, + ), + ); + return completer.future; } @override - Future<RPCResult> rpc(List<int> data) => _transport.sendMessage( - RPCRequest()..data = data, - RPCResult(), - ); + Future<RPCResult> rpc(List<int> data) async { + final completer = Completer<RPCResult>(); + await addMessage( + TransportMessage( + req: RPCRequest()..data = data, + res: RPCResult(), + onResult: completer.complete, + ), + ); + return completer.future; + } @override @alwaysThrows @@ -139,7 +178,7 @@ class ClientImpl implements Client, GeneratedMessageSender { } @override - void disconnect() async { + Future<void> disconnect() async { _processDisconnect(reason: 'manual disconnect', reconnect: false); await _transport.close(); } @@ -173,15 +212,21 @@ class ClientImpl implements Client, GeneratedMessageSender { Future<UnsubscribeEvent> unsubscribe(String channel) async { final request = UnsubscribeRequest()..channel = channel; - await _transport.sendMessage(request, UnsubscribeResult()); + final completer = Completer<UnsubscribeResult>(); + await addMessage(TransportMessage( + req: request, + res: UnsubscribeResult(), + onResult: completer.complete, + )); + await completer.future; return UnsubscribeEvent(); } @override - Future<Rep> - sendMessage<Req extends GeneratedMessage, Rep extends GeneratedMessage>( - Req request, Rep result) => - _transport.sendMessage(request, result); + Future<void> + sendMessages<Req extends GeneratedMessage, Res extends GeneratedMessage>( + List<TransportMessage<Req, Res>> messages) => + _transport.sendMessages(messages); int _retryCount = 0; @@ -234,10 +279,15 @@ class ClientImpl implements Client, GeneratedMessageSender { request.data = _connectData; } - final result = await _transport.sendMessage( - request, - ConnectResult(), + final completer = Completer<ConnectResult>(); + await addMessage( + TransportMessage( + req: request, + res: ConnectResult(), + onResult: completer.complete, + ), ); + final result = await completer.future; _clientID = result.client; _retryCount = 0; @@ -294,19 +344,81 @@ class ClientImpl implements Client, GeneratedMessageSender { } } - Future<String> getToken(String channel) async { - if (_isPrivateChannel(channel)) { - final event = PrivateSubEvent(_clientID, channel); - return _onPrivateSub(event); + @override + void startBatching() { + _isBatching = true; + } + + @override + Future<void> stopBatching() async { + _isBatching = false; + await _flush(); + } + + Future<void> _flush() async { + if (_messages.isEmpty) { + return; + } + final messages = _messages.sublist(0); + _messages.clear(); + await sendMessages(messages); + } + + Future<void> + addMessage<Req extends GeneratedMessage, Res extends GeneratedMessage>( + TransportMessage<Req, Res> message) async { + if (_isBatching) { + _messages.add(message); + } else { + await sendMessages([message]); } - return null; } - Future<String> _onPrivateSub(PrivateSubEvent event) => - _config.onPrivateSub(event); + @override + void startSubscribeBatching() { + _isSubscribeBatching = true; + } - bool _isPrivateChannel(String channel) => - channel.startsWith(_config.privateChannelPrefix); + @override + Future<void> stopSubscribeBatching() async { + _isSubscribeBatching = false; + final authChannels = _privateChannels.toList(); + _privateChannels.clear(); + + final event = PrivateSubEvent(_clientID, authChannels); + final sign = await _onPrivateSub(event); + + startBatching(); + + for (final ch in sign.channels) { + final req = SubscribeRequest() + ..channel = ch.channel + ..token = ch.token; + + final SubscriptionImpl sub = getSubscription(ch.channel); + + final message = TransportMessage( + req: req, + res: SubscribeResult(), + onResult: (dynamic result) { + final event = SubscribeSuccessEvent.from(result, false); + final SubscriptionImpl sub = getSubscription(ch.channel); + sub.onSubscribeSuccess(event); + sub.recover(result); + }, + onError: (err) { + sub.onSubscribeError(SubscribeErrorEvent(err.message, err.code)); + }, + ); + + await addMessage(message); + } + + stopBatching(); + } + + Future<PrivateSubSign> _onPrivateSub(PrivateSubEvent event) => + _config.onPrivateSub(event); } enum _ClientState { connected, disconnected, connecting } diff --git a/lib/src/client_config.dart b/lib/src/client_config.dart index 4dd51ea..bfa3d62 100644 --- a/lib/src/client_config.dart +++ b/lib/src/client_config.dart @@ -5,17 +5,17 @@ import 'package:centrifuge/centrifuge.dart'; import 'package:centrifuge/src/events.dart'; class ClientConfig { - ClientConfig( - {this.timeout = const Duration(seconds: 5), - this.debug = false, - this.headers = const <String, dynamic>{}, - this.tlsSkipVerify = false, - this.maxReconnectDelay = const Duration(seconds: 20), - this.privateChannelPrefix = "\$", - this.pingInterval = const Duration(seconds: 25), - this.onPrivateSub = _defaultPrivateSubCallback, - WaitRetry retry}) - : retry = retry ?? _defaultRetry(maxReconnectDelay.inSeconds); + ClientConfig({ + this.timeout = const Duration(seconds: 5), + this.debug = false, + this.headers = const <String, dynamic>{}, + this.tlsSkipVerify = false, + this.maxReconnectDelay = const Duration(seconds: 20), + this.privateChannelPrefix = "\$", + this.pingInterval = const Duration(seconds: 25), + this.onPrivateSub = _defaultPrivateSubCallback, + WaitRetry retry, + }) : retry = retry ?? _defaultRetry(maxReconnectDelay.inSeconds); final Duration timeout; final bool debug; @@ -32,12 +32,12 @@ class ClientConfig { typedef WaitRetry = Future Function(int); -typedef PrivateSubCallback = Future<String> Function(PrivateSubEvent); +typedef PrivateSubCallback = Future<PrivateSubSign> Function(PrivateSubEvent); WaitRetry _defaultRetry(int maxReconnectDelay) => (int count) { final seconds = min(0.5 * pow(2, count), maxReconnectDelay).toInt(); return Future<void>.delayed(Duration(seconds: seconds)); }; -Future<String> _defaultPrivateSubCallback(PrivateSubEvent event) => - Future.value(""); +Future<PrivateSubSign> _defaultPrivateSubCallback(PrivateSubEvent event) => + Future.value(PrivateSubSign.fromRawJson('{"channels":[]}')); diff --git a/lib/src/codec.dart b/lib/src/codec.dart index da08c02..9575421 100644 --- a/lib/src/codec.dart +++ b/lib/src/codec.dart @@ -3,20 +3,23 @@ import 'dart:convert'; import 'package:centrifuge/src/proto/client.pb.dart'; import 'package:protobuf/protobuf.dart'; -abstract class CommandEncoder extends Converter<Command, List<int>> {} +abstract class CommandEncoder extends Converter<List<Command>, List<int>> {} abstract class ReplyDecoder extends Converter<List<int>, List<Reply>> {} class ProtobufCommandEncoder extends CommandEncoder { @override - List<int> convert(Command input) { - final commandData = input.writeToBuffer(); - final length = commandData.lengthInBytes; - + List<int> convert(List<Command> commands) { final writer = CodedBufferWriter(); - writer.writeInt32NoTag(length); - return writer.toBuffer() + commandData; + for (final cmd in commands) { + final cmdData = cmd.writeToBuffer(); + writer + ..writeInt32NoTag(cmdData.lengthInBytes) + ..writeRawBytes(cmdData); + } + + return writer.toBuffer(); } } @@ -38,8 +41,12 @@ class ProtobufReplyDecoder extends ReplyDecoder { class JsonCommandEncoder extends CommandEncoder { @override - List<int> convert(Command input) { - return utf8.encode(input.writeToJson()); + List<int> convert(List<Command> commands) { + const jsonCommands = <String>[]; + for (final c in commands) { + jsonCommands.add(c.writeToJson()); + } + return utf8.encode(jsonCommands.join('\n')); } } diff --git a/lib/src/events.dart b/lib/src/events.dart index 818f8e2..8e8a0ba 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -3,14 +3,14 @@ import 'dart:convert'; import 'proto/client.pb.dart' as proto; class PrivateSubEvent { - PrivateSubEvent(this.clientID, this.channel); + PrivateSubEvent(this.clientID, this.channels); final String clientID; - final String channel; + final List<String> channels; @override String toString() { - return 'PrivateSubEvent{clientID: $clientID, channel: $channel}'; + return 'PrivateSubEvent{clientID: $clientID, channels: $channels}'; } } diff --git a/lib/src/subscription.dart b/lib/src/subscription.dart index c653708..66eaee6 100644 --- a/lib/src/subscription.dart +++ b/lib/src/subscription.dart @@ -1,9 +1,10 @@ import 'dart:async'; +import 'dart:convert'; +import 'package:centrifuge/src/transport.dart'; import 'package:meta/meta.dart'; import 'client.dart'; -import 'error.dart' as errors; import 'events.dart'; import 'proto/client.pb.dart'; @@ -74,12 +75,12 @@ class SubscriptionImpl implements Subscription { Future publish(List<int> data) => _client.publish(channel, data); @override - void subscribe() { + Future<void> subscribe() async { _state = _SubscriptionState.subscribed; if (!_client.connected) { return; } - _resubscribe(isResubscribed: false); + await _resubscribe(isResubscribed: false); } void resubscribeIfNeeded() { @@ -90,7 +91,7 @@ class SubscriptionImpl implements Subscription { } @override - void unsubscribe() async { + Future<void> unsubscribe() async { if (_state != _SubscriptionState.subscribed) { return; } @@ -101,7 +102,15 @@ class SubscriptionImpl implements Subscription { } final request = UnsubscribeRequest()..channel = channel; - await _client.sendMessage(request, UnsubscribeResult()); + final completer = Completer<UnsubscribeResult>(); + await _client.addMessage( + TransportMessage( + req: request, + res: UnsubscribeResult(), + onResult: completer.complete, + ), + ); + await completer.future; final event = UnsubscribeEvent(); addUnsubscribe(event); } @@ -117,7 +126,15 @@ class SubscriptionImpl implements Subscription { @override Future<List<HistoryEvent>> history() async { final request = HistoryRequest()..channel = channel; - final result = await _client.sendMessage(request, HistoryResult()); + final completer = Completer<HistoryResult>(); + await _client.addMessage( + TransportMessage( + req: request, + res: HistoryResult(), + onResult: completer.complete, + ), + ); + final result = await completer.future; final events = result.publications.map(HistoryEvent.from).toList(); return events; } @@ -128,42 +145,87 @@ class SubscriptionImpl implements Subscription { void addLeave(LeaveEvent event) => _leaveController.add(event); - void _onSubscribeSuccess(SubscribeSuccessEvent event) => + void onSubscribeSuccess(SubscribeSuccessEvent event) => _subscribeSuccessController.add(event); - void _onSubscribeError(SubscribeErrorEvent event) => + void onSubscribeError(SubscribeErrorEvent event) => _subscribeErrorController.add(event); void addUnsubscribe(UnsubscribeEvent event) => _unsubscribeController.add(event); - Future _resubscribe({@required bool isResubscribed}) async { - try { - final token = await _client.getToken(channel); - final request = SubscribeRequest() - ..channel = channel - ..token = token ?? ''; - - final result = await _client.sendMessage(request, SubscribeResult()); - final event = SubscribeSuccessEvent.from(result, isResubscribed); - _onSubscribeSuccess(event); - _recover(result); - } catch (exception) { - if (exception is errors.Error) { - _onSubscribeError( - SubscribeErrorEvent(exception.message, exception.code)); + Future<void> _resubscribe({@required bool isResubscribed}) async { + if (_isPrivateChannel(channel)) { + if (_client.isSubscribeBatching) { + _client.privateChannels.add(channel); } else { - _onSubscribeError(SubscribeErrorEvent(exception.toString(), -1)); + _client.startSubscribeBatching(); + _resubscribe(isResubscribed: isResubscribed); + _client.stopSubscribeBatching(); } + } else { + final request = SubscribeRequest() + ..channel = channel + ..token = ''; + + await _client.addMessage( + TransportMessage( + req: request, + res: SubscribeResult(), + onResult: (dynamic result) { + final event = SubscribeSuccessEvent.from(result, isResubscribed); + onSubscribeSuccess(event); + recover(result); + }, + onError: (err) { + onSubscribeError(SubscribeErrorEvent(err.message, err.code)); + }, + ), + ); } } - void _recover(SubscribeResult result) { + void recover(SubscribeResult result) { for (Publication publication in result.publications) { final event = PublishEvent.from(publication); addPublish(event); } } + + bool _isPrivateChannel(String channel) => + channel.startsWith(_client.config.privateChannelPrefix); } enum _SubscriptionState { subscribed, unsubscribed } + +class PrivateSubSign { + PrivateSubSign._({ + this.channels, + }); + + factory PrivateSubSign.fromRawJson(String str) => + PrivateSubSign._fromJson(json.decode(str)); + + factory PrivateSubSign._fromJson(Map<String, dynamic> json) => + PrivateSubSign._( + channels: List<_Channel>.from( + json['channels'].map((dynamic x) => _Channel.fromJson(x))), + ); + + final List<_Channel> channels; +} + +class _Channel { + _Channel._({ + this.channel, + this.token, + }); + + String channel; + String token; + + factory _Channel.fromJson(Map<String, dynamic> json) => _Channel._( + channel: json['channel'], + token: json['token'], + ); +} diff --git a/lib/src/transport.dart b/lib/src/transport.dart index 52c6925..4f44031 100644 --- a/lib/src/transport.dart +++ b/lib/src/transport.dart @@ -16,17 +16,42 @@ typedef Transport TransportBuilder({ typedef Future<WebSocket> WebSocketBuilder(); +typedef void TransportMessageResultCallback<T extends GeneratedMessage>( + T result); +typedef void TransportMessageErrorCallback(centrifuge.Error error); + +class TransportMessage<Req extends GeneratedMessage, + Res extends GeneratedMessage> { + TransportMessage({ + @required this.res, + @required this.req, + @required this.onResult, + this.onError, + }); + final Req req; + final Res res; + + final TransportMessageResultCallback<Res> onResult; + + /// Transport will call this method (if present) + /// instead of throwing exception + final TransportMessageErrorCallback onError; +} + class TransportConfig { - TransportConfig( - {this.pingInterval = const Duration(seconds: 25), - this.headers = const <String, dynamic>{}}); + TransportConfig({ + this.pingInterval = const Duration(seconds: 25), + this.headers = const <String, dynamic>{}, + }); final Duration pingInterval; final Map<String, dynamic> headers; } -Transport protobufTransportBuilder( - {@required String url, @required TransportConfig config}) { +Transport protobufTransportBuilder({ + @required String url, + @required TransportConfig config, +}) { final replyDecoder = ProtobufReplyDecoder(); final commandEncoder = ProtobufCommandEncoder(); @@ -44,14 +69,18 @@ Transport protobufTransportBuilder( } abstract class GeneratedMessageSender { - Future<Rep> - sendMessage<Req extends GeneratedMessage, Rep extends GeneratedMessage>( - Req request, Rep result); + Future<void> + sendMessages<Req extends GeneratedMessage, Res extends GeneratedMessage>( + List<TransportMessage<Req, Res>> messages); } class Transport implements GeneratedMessageSender { - Transport(this._socketBuilder, this._config, this._commandEncoder, - this._replyDecoder); + Transport( + this._socketBuilder, + this._config, + this._commandEncoder, + this._replyDecoder, + ); final WebSocketBuilder _socketBuilder; WebSocket _socket; @@ -59,9 +88,11 @@ class Transport implements GeneratedMessageSender { final ReplyDecoder _replyDecoder; final TransportConfig _config; - Future open(void onPush(Push push), - {Function onError, - void onDone(String reason, bool shouldReconnect)}) async { + Future open( + void onPush(Push push), { + Function onError, + void onDone(String reason, bool shouldReconnect), + }) async { _socket = await _socketBuilder(); if (_config.pingInterval != Duration.zero) { _socket.pingInterval = _config.pingInterval; @@ -79,14 +110,21 @@ class Transport implements GeneratedMessageSender { final _completers = <int, Completer<GeneratedMessage>>{}; @override - Future<Rep> - sendMessage<Req extends GeneratedMessage, Rep extends GeneratedMessage>( - Req request, Rep result) async { - final command = _createCommand(request); - final reply = await _sendCommand(command); - - final filledResult = _processResult(result, reply); - return filledResult; + Future<void> + sendMessages<Req extends GeneratedMessage, Res extends GeneratedMessage>( + List<TransportMessage<Req, Res>> messages) async { + final commands = messages.map((msg) => _createCommand(msg.req)).toList(); + final replies = await _sendCommands(commands); + + for (var i = 0; i < replies.length; i++) { + final m = messages[i]; + _processResult( + m.res, + replies[i], + onResult: m.onResult, + onError: m.onError, + ); + } } Future close() { @@ -98,28 +136,44 @@ class Transport implements GeneratedMessageSender { ..method = _getType(request) ..params = request.writeToBuffer(); - Future<Reply> _sendCommand(Command command) { - final completer = Completer<Reply>.sync(); + Future<List<Reply>> _sendCommands(List<Command> commands) { + // List of completers each reffering to one of the passed commands + final ctxCompleters = <Completer<Reply>>[]; - _completers[command.id] = completer; + for (final cmd in commands) { + final completer = Completer<Reply>.sync(); + _completers[cmd.id] = completer; + ctxCompleters.add(completer); + } - final data = _commandEncoder.convert(command); + final data = _commandEncoder.convert(commands); if (_socket == null) { throw centrifuge.ClientDisconnectedError; } _socket.add(data); - - return completer.future; + return Future.wait(ctxCompleters.map((c) => c.future)); } - T _processResult<T extends GeneratedMessage>(T result, Reply reply) { + void _processResult<T extends GeneratedMessage>( + T result, + Reply reply, { + TransportMessageResultCallback onResult, + TransportMessageErrorCallback onError, + }) { if (reply.hasError()) { - throw centrifuge.Error.custom(reply.error.code, reply.error.message); + final err = + centrifuge.Error.custom(reply.error.code, reply.error.message); + if (onError != null) { + onError(err); + return; + } else { + throw err; + } } result.mergeFromBuffer(reply.result); - return result; + onResult(result); } MethodType _getType(GeneratedMessage request) { diff --git a/pubspec.yaml b/pubspec.yaml index 235a7e0..1d7b9fb 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: centrifuge -version: 0.5.1 +version: 0.6.0 description: > Dart client to communicate with Centrifuge and Centrifugo from Flutter and VM over dart:io WebSocket @@ -8,11 +8,11 @@ author: German Saprykin <saprykin.h@gmail.com> homepage: http://github.com/centrifugal/centrifuge-dart environment: - sdk: '>=2.0.0 <3.0.0' + sdk: ">=2.2.0 <3.0.0" dependencies: - meta: ^1.1.5 - protobuf: ^1.0.1 + meta: ^1.1.5 + protobuf: ^1.0.1 dev_dependencies: test: ^1.3.0