diff --git a/src/Libplanet.Net/Protocols/KademliaProtocol.cs b/src/Libplanet.Net/Protocols/KademliaProtocol.cs index 2a0e9f035ac..37b117fdb6d 100644 --- a/src/Libplanet.Net/Protocols/KademliaProtocol.cs +++ b/src/Libplanet.Net/Protocols/KademliaProtocol.cs @@ -95,7 +95,7 @@ await PingAsync(peer, dialTimeout, cancellationToken) dialTimeout, cancellationToken)); } - catch (PingTimeoutException) + catch (PingFailedException) { _logger.Warning("A timeout exception occurred connecting to seed peer"); RemovePeer(peer); @@ -140,19 +140,25 @@ public async Task AddPeersAsync( var tasks = new List<Task>(); foreach (BoundPeer peer in peers) { - tasks.Add(PingAsync( - peer, - timeout: timeout, - cancellationToken: cancellationToken)); + tasks.Add( + PingAsync( + peer, + timeout: timeout, + cancellationToken: cancellationToken)); } _logger.Verbose("Trying to ping {PeerCount} peers", tasks.Count); await Task.WhenAll(tasks).ConfigureAwait(false); _logger.Verbose("Update complete"); } - catch (PingTimeoutException e) + catch (PingFailedException pfe) { - _logger.Debug(e, "Ping timed out"); + if (pfe.InnerException is { } e) + { + throw e; + } + + throw; } catch (TaskCanceledException e) { @@ -284,7 +290,7 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken await PingAsync(replacement, _requestTimeout, cancellationToken) .ConfigureAwait(false); } - catch (PingTimeoutException) + catch (PingFailedException) { _logger.Verbose( "Removed stale peer {Peer} from replacement cache", @@ -327,7 +333,7 @@ await PingAsync(replacement, _requestTimeout, cancellationToken) await PingAsync(boundPeer, _requestTimeout, cancellationToken) .ConfigureAwait(false); } - catch (PingTimeoutException) + catch (PingFailedException) { var msg = "{BoundPeer}, a target peer, is in the routing table does not respond"; @@ -394,7 +400,7 @@ await PingAsync(found, _requestTimeout, cancellationToken) throw new TaskCanceledException( $"Task is cancelled during {nameof(FindSpecificPeerAsync)}()"); } - catch (PingTimeoutException) + catch (PingFailedException) { // Ignore peer not responding } @@ -439,11 +445,9 @@ internal async Task PingAsync( AddPeer(peer); } - catch (CommunicationFailException) + catch (Exception e) { - throw new PingTimeoutException( - $"Failed to send Ping to {peer}.", - peer); + throw new PingFailedException(peer, e); } } @@ -497,7 +501,7 @@ private async Task ValidateAsync( await PingAsync(peer, timeout, cancellationToken).ConfigureAwait(false); _table.Check(peer, check, DateTimeOffset.UtcNow); } - catch (PingTimeoutException) + catch (PingFailedException) { _logger.Verbose("Removing invalid peer {Peer}...", peer); RemovePeer(peer); @@ -711,7 +715,7 @@ private async Task ProcessFoundAsync( AggregateException aggregateException = aggregateTask.Exception!; foreach (Exception e in aggregateException.InnerExceptions) { - if (e is PingTimeoutException pte) + if (e is PingFailedException pte) { peers.Remove(pte.Target); } diff --git a/src/Libplanet.Net/Protocols/PingTimeoutException.cs b/src/Libplanet.Net/Protocols/PingFailedException.cs similarity index 67% rename from src/Libplanet.Net/Protocols/PingTimeoutException.cs rename to src/Libplanet.Net/Protocols/PingFailedException.cs index ec40b608c44..576d366de2f 100644 --- a/src/Libplanet.Net/Protocols/PingTimeoutException.cs +++ b/src/Libplanet.Net/Protocols/PingFailedException.cs @@ -4,27 +4,21 @@ namespace Libplanet.Net.Protocols { [Serializable] - public class PingTimeoutException : TimeoutException + public class PingFailedException : Exception { - public PingTimeoutException(BoundPeer target) - : base() + public PingFailedException(BoundPeer target, Exception innerException) + : base($"Failed to send ping to target peer {target}", innerException) { Target = target; } - public PingTimeoutException(string message, BoundPeer target) - : base(message) - { - Target = target; - } - - public PingTimeoutException(string message, BoundPeer target, Exception innerException) + public PingFailedException(string message, BoundPeer target, Exception innerException) : base(message, innerException) { Target = target; } - protected PingTimeoutException(SerializationInfo info, StreamingContext context) + protected PingFailedException(SerializationInfo info, StreamingContext context) : base(info, context) { Target = info.GetValue(nameof(Target), typeof(BoundPeer)) is BoundPeer target diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs new file mode 100644 index 00000000000..b92b9b7657d --- /dev/null +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -0,0 +1,205 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using NetMQ; +using NetMQ.Sockets; +using Serilog; + +namespace Libplanet.Net.Transports +{ + public class NetMQChannel + { + private readonly BoundPeer _peer; + private readonly Channel<MessageRequest> _requests; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly ILogger _logger; + + private DateTimeOffset _lastUpdated; + private bool _opened; + + public NetMQChannel(BoundPeer peer) + { + _peer = peer; + _requests = Channel.CreateUnbounded<MessageRequest>(); + _cancellationTokenSource = new CancellationTokenSource(); + _logger = Log.Logger + .ForContext<NetMQChannel>() + .ForContext("Source", nameof(NetMQTransport)); + } + + public event EventHandler? Closed; + + public event EventHandler<Exception>? Faulted; + + public event EventHandler? Opened; + + public void Abort() + { + if (!_opened) + { + throw new InvalidOperationException("Cannot abort an unopened channel."); + } + + _opened = false; + _cancellationTokenSource.Cancel(); + } + + public void Close() + { + if (!_opened) + { + throw new InvalidOperationException("Cannot close an unopened channel."); + } + + _opened = false; + _cancellationTokenSource.Cancel(); + Closed?.Invoke(this, EventArgs.Empty); + } + + public void Open() + { + _opened = true; + Opened?.Invoke(this, EventArgs.Empty); + DoOpen(); + } + + public async IAsyncEnumerable<NetMQMessage> SendMessageAsync( + NetMQMessage message, + TimeSpan? timeout, + int expectedResponses, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + if (!_opened) + { + throw new InvalidOperationException( + "Cannot send message with an unopened channel."); + } + + var channel = Channel.CreateUnbounded<NetMQMessage>(); + await _requests.Writer.WriteAsync( + new MessageRequest( + message, + timeout, + expectedResponses, + channel, + cancellationToken), + cancellationToken); + + foreach (var unused in Enumerable.Range(0, expectedResponses)) + { + // FIXME: Can be replaced with Channel.Reader.Completion? + yield return await channel.Reader.ReadAsync(cancellationToken); + } + } + + private void DoOpen() + { + TaskCreationOptions taskCreationOptions = + TaskCreationOptions.DenyChildAttach | + TaskCreationOptions.LongRunning | + TaskCreationOptions.HideScheduler; + Task.Factory.StartNew( + ProcessRuntime, + _cancellationTokenSource.Token, + taskCreationOptions, + TaskScheduler.Default); + } + + private async Task ProcessRuntime() + { + var ct = _cancellationTokenSource.Token; + using var dealer = new DealerSocket(); + dealer.Options.DisableTimeWait = true; + dealer.Options.Identity = Guid.NewGuid().ToByteArray(); + var address = await _peer.ResolveNetMQAddressAsync(); + try + { + dealer.Connect(address); + } + catch (Exception e) + { + Faulted?.Invoke(this, e); + Close(); + } + + while (!ct.IsCancellationRequested) + { + MessageRequest req = await _requests.Reader.ReadAsync(ct); + try + { + _lastUpdated = DateTimeOffset.UtcNow; + CancellationTokenSource linked = + CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); + if (!dealer.TrySendMultipartMessage(req.Message)) + { + _requests.Writer.Complete(); + dealer.Close(); + DoOpen(); + break; + } + + foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) + { + var raw = new NetMQMessage(); + if (!dealer.TryReceiveMultipartMessage( + req.Timeout ?? TimeSpan.FromSeconds(1), + ref raw)) + { + break; + } + + _lastUpdated = DateTimeOffset.UtcNow; + + await req.Channel.Writer.WriteAsync(raw, linked.Token); + } + + req.Channel.Writer.Complete(); + } + catch (Exception) + { + req.Channel.Writer.Complete(); + dealer.Close(); + DoOpen(); + break; + } + } + } + + private bool HandShake(DealerSocket dealerSocket) + { + var msg = default(Msg); + return dealerSocket.TrySend(ref msg, TimeSpan.Zero, false); + } + + private readonly struct MessageRequest + { + public MessageRequest( + NetMQMessage message, + TimeSpan? timeout, + in int expectedResponses, + Channel<NetMQMessage> channel, + CancellationToken cancellationToken) + { + Message = message; + Timeout = timeout; + ExpectedResponses = expectedResponses; + Channel = channel; + CancellationToken = cancellationToken; + } + + public NetMQMessage Message { get; } + + public TimeSpan? Timeout { get; } + + public int ExpectedResponses { get; } + + public Channel<NetMQMessage> Channel { get; } + + public CancellationToken CancellationToken { get; } + } + } +} diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index 5d31d1839a0..28a0e5ec2cf 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -1,5 +1,6 @@ #nullable enable using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; @@ -32,11 +33,11 @@ public class NetMQTransport : ITransport private readonly HostOptions _hostOptions; private readonly MessageValidator _messageValidator; private readonly NetMQMessageCodec _messageCodec; - private readonly Channel<MessageRequest> _requests; - private readonly Task _runtimeProcessor; private readonly AsyncManualResetEvent _runningEvent; private readonly ActivitySource _activitySource; + private ConcurrentDictionary<BoundPeer, NetMQChannel> _channels; + private NetMQQueue<(AsyncManualResetEvent, NetMQMessage)>? _replyQueue; private RouterSocket? _router; @@ -47,9 +48,6 @@ public class NetMQTransport : ITransport private CancellationTokenSource _runtimeCancellationTokenSource; private CancellationTokenSource _turnCancellationTokenSource; - // Used only for logging. - private long _requestCount; - private long _socketCount; private bool _disposed = false; /// <summary> @@ -76,7 +74,6 @@ private NetMQTransport( .ForContext<NetMQTransport>() .ForContext("Source", nameof(NetMQTransport)); - _socketCount = 0; _privateKey = privateKey; _hostOptions = hostOptions; _appProtocolVersionOptions = appProtocolVersionOptions; @@ -84,35 +81,10 @@ private NetMQTransport( _appProtocolVersionOptions, messageTimestampBuffer); _messageCodec = new NetMQMessageCodec(); - _requests = Channel.CreateUnbounded<MessageRequest>(); _runtimeCancellationTokenSource = new CancellationTokenSource(); _turnCancellationTokenSource = new CancellationTokenSource(); _activitySource = new ActivitySource("Libplanet.Net.Transports.NetMQTransport"); - _requestCount = 0; - CancellationToken runtimeCt = _runtimeCancellationTokenSource.Token; - _runtimeProcessor = Task.Factory.StartNew( - () => - { - // Ignore NetMQ related exceptions during NetMQRuntime.Dispose() to stabilize - // tests - try - { - using var runtime = new NetMQRuntime(); - runtime.Run(ProcessRuntime(runtimeCt)); - } - catch (Exception e) - when (e is NetMQException || e is ObjectDisposedException) - { - _logger.Error( - e, - "An exception has occurred while running {TaskName}", - nameof(_runtimeProcessor)); - } - }, - runtimeCt, - TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, - TaskScheduler.Default - ); + _channels = new ConcurrentDictionary<BoundPeer, NetMQChannel>(); _runningEvent = new AsyncManualResetEvent(); ProcessMessageHandler = new AsyncDelegate<Message>(); @@ -191,6 +163,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) throw new TransportException("Transport is already running."); } + _channels = new ConcurrentDictionary<BoundPeer, NetMQChannel>(); _runtimeCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _turnCancellationTokenSource = @@ -239,6 +212,11 @@ public async Task StopAsync( _routerPoller.Stop(); } + foreach (var channel in _channels.Values) + { + channel.Abort(); + } + _replyQueue.Dispose(); _runtimeCancellationTokenSource.Cancel(); @@ -256,10 +234,8 @@ public void Dispose() if (!_disposed) { - _requests.Writer.TryComplete(); _runtimeCancellationTokenSource.Cancel(); _turnCancellationTokenSource.Cancel(); - _runtimeProcessor.WaitWithoutException(); _runtimeCancellationTokenSource.Dispose(); _turnCancellationTokenSource.Dispose(); @@ -272,6 +248,11 @@ public void Dispose() _turnClient?.Dispose(); } + foreach (var channel in _channels.Values) + { + channel.Abort(); + } + _routerPoller?.Dispose(); _disposed = true; @@ -338,11 +319,8 @@ CancellationToken cancellationToken Guid reqId = Guid.NewGuid(); var replies = new List<Message>(); - Channel<NetMQMessage> channel = Channel.CreateUnbounded<NetMQMessage>(); - try { - DateTimeOffset now = DateTimeOffset.UtcNow; NetMQMessage rawMessage = _messageCodec.Encode( content, _privateKey, @@ -350,39 +328,35 @@ CancellationToken cancellationToken AsPeer, DateTimeOffset.UtcNow ); - var req = new MessageRequest( - reqId, - rawMessage, - peer, - now, - expectedResponses, - channel, - linkedCt); - Interlocked.Increment(ref _requestCount); - await _requests.Writer.WriteAsync( - req, - linkedCt).ConfigureAwait(false); - _logger.Verbose( - "Enqueued a request {RequestId} to the peer {Peer}: {@Message}; " + - "{LeftRequests} left", - reqId, - peer, - content, - Interlocked.Read(ref _requestCount) - ); + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + NetMQChannel channel; + if (_channels.TryGetValue(peer, out var c)) + { + channel = c!; + } + else + { + channel = new NetMQChannel(peer); + channel.Open(); + _channels[peer] = channel; + } - foreach (var i in Enumerable.Range(0, expectedResponses)) + await foreach ( + var raw in channel.SendMessageAsync( + rawMessage, + timeout, + expectedResponses, + linkedCt)) { - NetMQMessage raw = await channel.Reader - .ReadAsync(linkedCt) - .ConfigureAwait(false); Message reply = _messageCodec.Decode(raw, true); _logger.Information( "Received {Reply} as a reply to request {Request} {RequestId} from {Peer}", reply.Content.Type, content.Type, - req.Id, + reqId, reply.Remote); try { @@ -400,8 +374,8 @@ await _requests.Writer.WriteAsync( reply.Content.Type, reply.Remote, content.Type, - req.Id); - channel.Writer.Complete(imte); + reqId); + linkedCts.Cancel(); } catch (DifferentAppProtocolVersionException dapve) { @@ -414,8 +388,8 @@ await _requests.Writer.WriteAsync( reply.Content.Type, reply.Remote, content.Type, - req.Id); - channel.Writer.Complete(dapve); + reqId); + linkedCts.Cancel(); } replies.Add(reply); @@ -429,6 +403,18 @@ await _requests.Writer.WriteAsync( reqId, peer, replies.Select(reply => reply.Content.Type)); + _logger + .ForContext("Tag", "Metric") + .ForContext("Subtag", "OutboundMessageReport") + .Information( + "Request {RequestId} {Message} " + + "processed in {DurationMs} ms with {ReceivedCount} replies received " + + "out of {ExpectedCount} expected replies", + reqId, + content.Type, + stopwatch.ElapsedMilliseconds, + replies.Count, + expectedResponses); a?.SetStatus(ActivityStatusCode.Ok); return replies; } @@ -457,7 +443,12 @@ await _requests.Writer.WriteAsync( "{MethodName}() was cancelled while waiting for a reply to " + "{Content} {RequestId} from {Peer}"; _logger.Debug( - oce2, dbgMsg, nameof(SendMessageAsync), content, reqId, peer); + oce2, + dbgMsg, + nameof(SendMessageAsync), + content, + reqId, + peer); a?.SetStatus(ActivityStatusCode.Error); a?.AddTag("Exception", nameof(TaskCanceledException)); @@ -477,15 +468,16 @@ await _requests.Writer.WriteAsync( "{MethodName}() encountered an unexpected exception while waiting for " + "a reply to {Content} {RequestId} from {Peer}"; _logger.Error( - e, errMsg, nameof(SendMessageAsync), content, reqId, peer.Address); + e, + errMsg, + nameof(SendMessageAsync), + content, + reqId, + peer.Address); a?.SetStatus(ActivityStatusCode.Error); a?.AddTag("Exception", e.GetType().ToString()); throw; } - finally - { - channel.Writer.TryComplete(); - } } /// <inheritdoc/> @@ -741,183 +733,6 @@ private void DoReply( ev.Set(); } - private async Task ProcessRuntime(CancellationToken cancellationToken) - { - const string waitMsg = "Waiting for a new request..."; - ChannelReader<MessageRequest> reader = _requests.Reader; -#if NETCOREAPP3_0 || NETCOREAPP3_1 || NET - _logger.Verbose(waitMsg); - await foreach (MessageRequest req in reader.ReadAllAsync(cancellationToken)) - { -#else - while (true) - { - cancellationToken.ThrowIfCancellationRequested(); - _logger.Verbose(waitMsg); - MessageRequest req = await reader.ReadAsync(cancellationToken); -#endif - string messageType = _messageCodec.ParseMessageType(req.Message, true).ToString(); - long left = Interlocked.Decrement(ref _requestCount); - _logger.Debug( - "Request {Message} {RequestId} taken for processing; {Count} requests left", - messageType, - req.Id, - left); - - _ = SynchronizationContext.Current.PostAsync( - () => ProcessRequest(req, req.CancellationToken) - ); - -#if NETCOREAPP3_0 || NETCOREAPP3_1 || NET - _logger.Verbose(waitMsg); -#endif - } - } - - private async Task ProcessRequest(MessageRequest req, CancellationToken cancellationToken) - { - string messageType = _messageCodec.ParseMessageType(req.Message, true).ToString(); - Stopwatch stopwatch = new Stopwatch(); - stopwatch.Start(); - - _logger.Debug( - "Request {Message} {RequestId} is ready to be processed in {TimeSpan}", - messageType, - req.Id, - DateTimeOffset.UtcNow - req.RequestedTime); - - Channel<NetMQMessage> channel = req.Channel; - - _logger.Debug( - "Trying to send request {Message} {RequestId} to {Peer}", - messageType, - req.Id, - req.Peer - ); - int receivedCount = 0; - long? incrementedSocketCount = null; - - // Normal OperationCanceledException initiated from outside should bubble up. - try - { - cancellationToken.ThrowIfCancellationRequested(); - - using var dealer = new DealerSocket(); - dealer.Options.DisableTimeWait = true; - dealer.Options.Identity = req.Id.ToByteArray(); - try - { - _logger.Debug( - "Trying to connect to {Peer} for request {Message} {RequestId}", - req.Peer, - messageType, - req.Id); - dealer.Connect(await req.Peer.ResolveNetMQAddressAsync()); - incrementedSocketCount = Interlocked.Increment(ref _socketCount); - _logger - .ForContext("Tag", "Metric") - .ForContext("Subtag", "SocketCount") - .Debug( - "{SocketCount} sockets open for processing request " + - "{Message} {RequestId}", - incrementedSocketCount, - messageType, - req.Id); - } - catch (NetMQException nme) - { - const string logMsg = - "{SocketCount} sockets open for processing requests; " + - "failed to create an additional socket for request {Message} {RequestId}"; - _logger - .ForContext("Tag", "Metric") - .ForContext("Subtag", "SocketCount") - .Debug( - nme, - logMsg, - Interlocked.Read(ref _socketCount), - messageType, - req.Id); - throw; - } - - if (dealer.TrySendMultipartMessage(req.Message)) - { - _logger.Debug( - "Request {RequestId} {Message} sent to {Peer}", - req.Id, - messageType, - req.Peer); - } - else - { - _logger.Debug( - "Failed to send {RequestId} {Message} to {Peer}", - req.Id, - messageType, - req.Peer); - - throw new SendMessageFailException( - $"Failed to send {messageType} to {req.Peer}.", - req.Peer); - } - - foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) - { - NetMQMessage raw = await dealer.ReceiveMultipartMessageAsync( - cancellationToken: cancellationToken - ); - - _logger.Verbose( - "Received a raw message with {FrameCount} frames as a reply to " + - "request {RequestId} from {Peer}", - raw.FrameCount, - req.Id, - req.Peer - ); - await channel.Writer.WriteAsync(raw, cancellationToken); - receivedCount += 1; - } - - channel.Writer.Complete(); - } - catch (Exception e) - { - _logger.Error( - e, - "Failed to process {Message} {RequestId}; discarding it", - messageType, - req.Id); - channel.Writer.TryComplete(e); - } - finally - { - if (req.ExpectedResponses == 0) - { - // FIXME: Temporary fix to wait for a message to be sent. - await Task.Delay(1000); - } - - if (incrementedSocketCount is { }) - { - Interlocked.Decrement(ref _socketCount); - } - - _logger - .ForContext("Tag", "Metric") - .ForContext("Subtag", "OutboundMessageReport") - .Information( - "Request {RequestId} {Message} " + - "processed in {DurationMs} ms with {ReceivedCount} replies received " + - "out of {ExpectedCount} expected replies", - req.Id, - messageType, - stopwatch.ElapsedMilliseconds, - receivedCount, - req.ExpectedResponses); - } - } - private async Task RunPoller(NetMQPoller poller) { TaskCreationOptions taskCreationOptions = @@ -972,40 +787,5 @@ Guid reqId innerException ); } - - private readonly struct MessageRequest - { - public MessageRequest( - in Guid id, - NetMQMessage message, - BoundPeer peer, - DateTimeOffset requestedTime, - in int expectedResponses, - Channel<NetMQMessage> channel, - CancellationToken cancellationToken) - { - Id = id; - Message = message; - Peer = peer; - RequestedTime = requestedTime; - ExpectedResponses = expectedResponses; - Channel = channel; - CancellationToken = cancellationToken; - } - - public Guid Id { get; } - - public NetMQMessage Message { get; } - - public BoundPeer Peer { get; } - - public DateTimeOffset RequestedTime { get; } - - public int ExpectedResponses { get; } - - public Channel<NetMQMessage> Channel { get; } - - public CancellationToken CancellationToken { get; } - } } } diff --git a/test/Libplanet.Net.Tests/Protocols/TestTransport.cs b/test/Libplanet.Net.Tests/Protocols/TestTransport.cs index 730e94d32fb..33a82ab379d 100644 --- a/test/Libplanet.Net.Tests/Protocols/TestTransport.cs +++ b/test/Libplanet.Net.Tests/Protocols/TestTransport.cs @@ -255,7 +255,7 @@ async Task DoAddPeersAsync() "Different version encountered during {MethodName}()", nameof(AddPeersAsync)); } - catch (PingTimeoutException) + catch (PingFailedException) { var msg = $"Timeout occurred during {nameof(AddPeersAsync)}() after {timeout}"; diff --git a/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs b/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs index 8e39c2f777b..b179bd7ebb0 100644 --- a/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs +++ b/test/Libplanet.Net.Tests/SwarmTest.AppProtocolVersion.cs @@ -31,13 +31,12 @@ public async Task DetectAppProtocolVersion() await StartAsync(c); await StartAsync(d); - var peers = new[] { c.AsPeer, d.AsPeer }; - - foreach (var peer in peers) - { - await a.AddPeersAsync(new[] { peer }, null); - await b.AddPeersAsync(new[] { peer }, null); - } + await a.AddPeersAsync(new[] { c.AsPeer }, null); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => a.AddPeersAsync(new[] { d.AsPeer }, null)); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => b.AddPeersAsync(new[] { c.AsPeer }, null)); + await b.AddPeersAsync(new[] { d.AsPeer }, null); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); @@ -163,14 +162,20 @@ AppProtocolVersion localVersion await StartAsync(f); await a.AddPeersAsync(new[] { c.AsPeer }, TimeSpan.FromSeconds(1)); - await a.AddPeersAsync(new[] { d.AsPeer }, TimeSpan.FromSeconds(1)); - await a.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1)); - await a.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1)); - - await b.AddPeersAsync(new[] { c.AsPeer }, TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => a.AddPeersAsync(new[] { d.AsPeer }, TimeSpan.FromSeconds(1))); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => a.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1))); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => a.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1))); + + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => b.AddPeersAsync(new[] { c.AsPeer }, TimeSpan.FromSeconds(1))); await b.AddPeersAsync(new[] { d.AsPeer }, TimeSpan.FromSeconds(1)); - await b.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1)); - await b.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => b.AddPeersAsync(new[] { e.AsPeer }, TimeSpan.FromSeconds(1))); + await Assert.ThrowsAsync<InvalidMessageContentException>( + () => b.AddPeersAsync(new[] { f.AsPeer }, TimeSpan.FromSeconds(1))); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); diff --git a/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs b/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs new file mode 100644 index 00000000000..0065a0e3759 --- /dev/null +++ b/test/Libplanet.Net.Tests/Transports/NetMQChannelTest.cs @@ -0,0 +1,121 @@ +using System; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Libplanet.Crypto; +using Libplanet.Net.Messages; +using Libplanet.Net.Options; +using Libplanet.Net.Transports; +using Xunit; + +namespace Libplanet.Net.Tests.Transports +{ + // Test uses NetMQTransport as BoundPeer, which can be replaced with simple RouterSocket. + public class NetMQChannelTest + { + private const int Timeout = 60 * 1000; + + [Fact] + public void Abort() + { + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + Assert.Throws<InvalidOperationException>(() => channel.Abort()); + channel.Open(); + channel.Abort(); + Assert.True(true); + } + + [Fact] + public void Close() + { + var closed = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Closed += (_, _) => closed = true; + Assert.Throws<InvalidOperationException>(() => channel.Close()); + channel.Open(); + channel.Close(); + Assert.True(closed); + } + + [Fact] + public void Open() + { + var opened = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Opened += (_, _) => opened = true; + channel.Open(); + Assert.True(opened); + } + + [Fact] + public async Task Faulted() + { + var faulted = false; + var channel = new NetMQChannel( + new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0))); + channel.Faulted += (_, _) => faulted = true; + channel.Open(); + await Task.Delay(100); + Assert.True(faulted); + } + + [Fact(Timeout = Timeout)] + public async Task SendMessageAsync() + { + var received = false; + var receiverKey = new PrivateKey(); + var transport = await NetMQTransport.Create( + receiverKey, + new AppProtocolVersionOptions(), + new HostOptions( + IPAddress.Loopback.ToString(), + new IceServer[] { })); + _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); + transport.ProcessMessageHandler.Register( + async msg => + { + received = msg.Content is PingMsg; + await transport.ReplyMessageAsync( + new PongMsg(), + msg.Identity!, + CancellationToken.None); + }); + var channel = new NetMQChannel(transport.AsPeer); + channel.Open(); + var key = new PrivateKey(); + await foreach ( + var reply in channel.SendMessageAsync( + new NetMQMessageCodec().Encode( + new PingMsg(), + key, + default, + new BoundPeer( + key.PublicKey, + new DnsEndPoint(IPAddress.Loopback.ToString(), 0)), + DateTimeOffset.UtcNow), + null, + 1, + CancellationToken.None)) + { + if (new NetMQMessageCodec().Decode(reply, true).Content is PongMsg) + { + received = true; + } + } + + Assert.True(received); + } + } +}