From e10b8f3f1dfd6a039bbdd271ed3d3c6be5971b02 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Wed, 28 Apr 2021 15:48:20 +0900 Subject: [PATCH 01/17] Update .gitignore --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 31cb8c56f4e..6ae2611457b 100644 --- a/.gitignore +++ b/.gitignore @@ -15,11 +15,12 @@ **/.idea/**/dynamic.xml # Rider -# Rider auto-generates .iml files, contentModel.xml, and projectSettingsUpdater.xml +# Rider auto-generates .iml files, contentModel.xml, projectSettingsUpdater.xml, and indexLayout.xml **/.idea/**/*.iml **/.idea/**/contentModel.xml **/.idea/**/modules.xml **/.idea/**/projectSettingsUpdater.xml +**/.idea/**/indexLayout.xml *.suo *.user From 8d60517ec8b4b720bf8d68e8644d015ff44ba7c9 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Wed, 21 Apr 2021 16:24:50 +0900 Subject: [PATCH 02/17] Remove ITransport.RunAsync --- Libplanet.Tests/Net/Transports/TransportTest.cs | 12 +++--------- Libplanet/Net/Swarm.cs | 10 +++------- Libplanet/Net/Transports/ITransport.cs | 12 +----------- Libplanet/Net/Transports/NetMQTransport.cs | 15 --------------- 4 files changed, 7 insertions(+), 42 deletions(-) diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index e8047c1fd1d..4af7b3027b5 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -28,18 +28,13 @@ public abstract class TransportTest TransportConstructor { get; set; } [SkippableFact(Timeout = Timeout)] - public async Task RunAsync() + public void StartAsync() { ITransport transport = CreateTransport(); try { - // RunAsync() throws NRE if it is not started yet. - await Assert.ThrowsAsync( - async () => await transport.RunAsync()); - await transport.StartAsync(); - Assert.False(transport.Running); - _ = transport.RunAsync(); + _ = transport.StartAsync(); Assert.True(transport.Running); } finally @@ -235,8 +230,7 @@ protected async Task InitializeAsync( ITransport transport, CancellationToken cts = default) { - await transport.StartAsync(cts); - Task task = transport.RunAsync(cts); + Task task = transport.StartAsync(cts); while (!transport.Running) { diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index ac3d6180af6..0440fc28e3f 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -334,13 +334,9 @@ public async Task StartAsync( ).Token; BlockDemand = null; _demandTxIds = new ConcurrentDictionary(); - try - { - await Transport.StartAsync(_cancellationToken); - } - catch (TransportException te) + if (Transport.Running) { - throw new SwarmException("Swarm is already running.", innerException: te); + throw new SwarmException("Swarm is already running."); } _logger.Debug("Starting swarm..."); @@ -354,7 +350,7 @@ public async Task StartAsync( Options.RefreshLifespan, _cancellationToken)); tasks.Add(RebuildConnectionAsync(TimeSpan.FromMinutes(30), _cancellationToken)); - tasks.Add(Transport.RunAsync(_cancellationToken)); + tasks.Add(Transport.StartAsync(_cancellationToken)); tasks.Add(BroadcastTxAsync(broadcastTxInterval, _cancellationToken)); tasks.Add(ProcessFillBlocks(dialTimeout, _cancellationToken)); tasks.Add(ProcessFillTxs(_cancellationToken)); diff --git a/Libplanet/Net/Transports/ITransport.cs b/Libplanet/Net/Transports/ITransport.cs index f16241032da..ad48429fd3b 100644 --- a/Libplanet/Net/Transports/ITransport.cs +++ b/Libplanet/Net/Transports/ITransport.cs @@ -40,7 +40,7 @@ public interface ITransport : IDisposable bool Running { get; } /// - /// Initiates transport layer. + /// Initiates and runs transport layer. /// /// /// A cancellation token used to propagate notification that this @@ -48,16 +48,6 @@ public interface ITransport : IDisposable /// An awaitable task without value. Task StartAsync(CancellationToken cancellationToken = default); - /// - /// Starts running transport layer. To , you should call - /// first. - /// - /// - /// A cancellation token used to propagate notification that this - /// operation should be canceled. - /// An awaitable task without value. - Task RunAsync(CancellationToken cancellationToken = default); - /// /// Stops running transport layer. /// diff --git a/Libplanet/Net/Transports/NetMQTransport.cs b/Libplanet/Net/Transports/NetMQTransport.cs index 462a7789a8a..6fa104383b7 100644 --- a/Libplanet/Net/Transports/NetMQTransport.cs +++ b/Libplanet/Net/Transports/NetMQTransport.cs @@ -279,21 +279,6 @@ public async Task StartAsync(CancellationToken cancellationToken) _router.ReceiveReady += ReceiveMessage; _replyQueue.ReceiveReady += DoReply; _broadcastQueue.ReceiveReady += DoBroadcast; - } - - /// - public async Task RunAsync(CancellationToken cancellationToken) - { - if (Running) - { - throw new TransportException("Transport is already running."); - } - - if (_router is null) - { - throw new TransportException( - $"Transport needs to be started before {nameof(RunAsync)}()."); - } List tasks = new List(); From 0269727159a489b43a35110966993cd82f1edb8e Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 29 Apr 2021 17:23:46 +0900 Subject: [PATCH 03/17] Add serialization for message --- Libplanet.Tests/Net/Messages/MessageTest.cs | 28 +++++++++++ Libplanet/Net/Messages/Message.cs | 51 +++++++++++++++++++++ 2 files changed, 79 insertions(+) diff --git a/Libplanet.Tests/Net/Messages/MessageTest.cs b/Libplanet.Tests/Net/Messages/MessageTest.cs index e9315c6bde0..f0a662de2a6 100644 --- a/Libplanet.Tests/Net/Messages/MessageTest.cs +++ b/Libplanet.Tests/Net/Messages/MessageTest.cs @@ -193,5 +193,33 @@ public void UseInvalidSignature() TimeSpan.FromSeconds(1)); }); } + + [Fact] + public void Serialize() + { + var privateKey = new PrivateKey(); + var peer = new Peer(privateKey.PublicKey); + var dateTimeOffset = DateTimeOffset.UtcNow; + var appProtocolVersion = new AppProtocolVersion( + 1, + new Bencodex.Types.Integer(0), + ImmutableArray.Empty, + default(Address)); + var message = new Ping(); + var id = Guid.NewGuid(); + message.Identity = id.ToByteArray(); + byte[] serialized = + message.Serialize(privateKey, peer, dateTimeOffset, appProtocolVersion); + Message deserialized = Message.Deserialize( + serialized, + appProtocolVersion, + null, + null, + null); + Assert.Equal(peer, deserialized.Remote); + Assert.Equal(appProtocolVersion, deserialized.Version); + Assert.Equal(dateTimeOffset, deserialized.Timestamp); + Assert.Equal(id.ToByteArray(), deserialized.Identity); + } } } diff --git a/Libplanet/Net/Messages/Message.cs b/Libplanet/Net/Messages/Message.cs index 399d0f62fd8..d7774940901 100644 --- a/Libplanet/Net/Messages/Message.cs +++ b/Libplanet/Net/Messages/Message.cs @@ -326,6 +326,39 @@ public static Message Parse( return message; } + // TODO: reply is not required anymore. + public static Message Deserialize( + byte[] bytes, + AppProtocolVersion localVersion, + IImmutableSet trustedAppProtocolVersionSigners, + DifferentAppProtocolVersionEncountered differentAppProtocolVersionEncountered, + TimeSpan? lifetime) + { + using var stream = new MemoryStream(bytes); + var buffer = new byte[sizeof(int)]; + stream.Read(buffer, 0, sizeof(int)); + int frameCount = BitConverter.ToInt32(buffer, 0); + var frames = new List(); + for (var i = 0; i < frameCount; i++) + { + buffer = new byte[sizeof(int)]; + stream.Read(buffer, 0, sizeof(int)); + int frameSize = BitConverter.ToInt32(buffer, 0); + buffer = new byte[frameSize]; + stream.Read(buffer, 0, frameSize); + frames.Add(new NetMQFrame(buffer)); + } + + NetMQMessage message = new NetMQMessage(frames); + return Parse( + message, + false, + localVersion, + trustedAppProtocolVersionSigners, + differentAppProtocolVersionEncountered, + lifetime); + } + /// /// Casts the message to with given , /// and . @@ -381,6 +414,24 @@ public NetMQMessage ToNetMQMessage( return message; } + public byte[] Serialize( + PrivateKey key, + Peer peer, + DateTimeOffset timestamp, + AppProtocolVersion version) + { + NetMQMessage netMqMessage = ToNetMQMessage(key, peer, timestamp, version); + using var stream = new MemoryStream(); + stream.Write(BitConverter.GetBytes(netMqMessage.FrameCount), 0, sizeof(int)); + foreach (var frame in netMqMessage) + { + stream.Write(BitConverter.GetBytes(frame.BufferSize), 0, sizeof(int)); + stream.Write(frame.ToByteArray(), 0, frame.BufferSize); + } + + return stream.ToArray(); + } + protected static Peer DeserializePeer(byte[] bytes) { var formatter = new BinaryFormatter(); From d174b016c0d403b46a4fab5c99cbdab17486c225 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 13 May 2021 16:48:37 +0900 Subject: [PATCH 04/17] Prevent operation on disposed NetMQTransport instance --- Libplanet.Tests/Net/SwarmTest.Fixtures.cs | 11 ++- .../Net/Transports/TransportTest.cs | 70 +++++++++++++++++++ Libplanet/Net/Transports/NetMQTransport.cs | 25 +++++++ 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/Libplanet.Tests/Net/SwarmTest.Fixtures.cs b/Libplanet.Tests/Net/SwarmTest.Fixtures.cs index 1d401c90afe..a85ac5def81 100644 --- a/Libplanet.Tests/Net/SwarmTest.Fixtures.cs +++ b/Libplanet.Tests/Net/SwarmTest.Fixtures.cs @@ -135,8 +135,15 @@ private Swarm CreateSwarm( options); _finalizers.Add(async () => { - await StopAsync(swarm); - swarm.Dispose(); + try + { + await StopAsync(swarm); + swarm.Dispose(); + } + catch (ObjectDisposedException) + { + _logger.Debug("Swarm {Swarm} is already disposed.", swarm); + } }); return swarm; } diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index 4af7b3027b5..c9c0ff23e63 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -43,6 +43,76 @@ public void StartAsync() } } + [SkippableFact(Timeout = Timeout)] + public async void RestartAsync() + { + ITransport transport = CreateTransport(); + + try + { + _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); + Assert.True(transport.Running); + await transport.StopAsync(TimeSpan.Zero); + Assert.False(transport.Running); + _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); + Assert.True(transport.Running); + } + finally + { + transport.Dispose(); + } + } + + [SkippableFact(Timeout = Timeout)] + public async void Dispose() + { + ITransport transport = CreateTransport(); + + try + { + _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); + Assert.True(transport.Running); + transport.Dispose(); + var boundPeer = new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint("localhost", 1234)); + var message = new Ping(); + await Assert.ThrowsAsync( + async () => await transport.StartAsync()); + await Assert.ThrowsAsync( + async () => await transport.StopAsync(TimeSpan.Zero)); + await Assert.ThrowsAsync( + async () => await transport.SendMessageAsync(boundPeer, message, default)); + await Assert.ThrowsAsync( + async () => await transport.SendMessageWithReplyAsync( + boundPeer, + message, + null, + default)); + await Assert.ThrowsAsync( + async () => await transport.SendMessageWithReplyAsync( + boundPeer, + message, + null, + 3, + default)); + Assert.Throws( + () => transport.BroadcastMessage(null, message)); + Assert.Throws( + () => transport.ReplyMessage(message)); + + // To check multiple Dispose() throws error or not. + transport.Dispose(); + } + finally + { + transport.Dispose(); + } + } + [SkippableFact(Timeout = Timeout)] public async Task AsPeer() { diff --git a/Libplanet/Net/Transports/NetMQTransport.cs b/Libplanet/Net/Transports/NetMQTransport.cs index 6fa104383b7..97b09f7463d 100644 --- a/Libplanet/Net/Transports/NetMQTransport.cs +++ b/Libplanet/Net/Transports/NetMQTransport.cs @@ -235,6 +235,11 @@ private set /// public async Task StartAsync(CancellationToken cancellationToken) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(NetMQTransport)); + } + if (Running) { throw new TransportException("Transport is already running."); @@ -299,6 +304,11 @@ public async Task StopAsync( CancellationToken cancellationToken = default ) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(NetMQTransport)); + } + if (Running) { await Task.Delay(waitFor, cancellationToken); @@ -393,6 +403,11 @@ public async Task> SendMessageWithReplyAsync( CancellationToken cancellationToken = default ) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(NetMQTransport)); + } + if (!(_turnClient is null) && _turnClient.BehindNAT) { await CreatePermission(peer); @@ -501,12 +516,22 @@ await _requests.Writer.WriteAsync( /// public void BroadcastMessage(Address? except, Message message) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(NetMQTransport)); + } + _broadcastQueue.Enqueue((except, message)); } /// public void ReplyMessage(Message message) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(NetMQTransport)); + } + string identityHex = ByteUtil.Hex(message.Identity); _logger.Debug("Reply {Message} to {Identity}...", message, identityHex); _replyQueue.Enqueue(message.ToNetMQMessage( From 0a94b48c159fb9a52bc45d21bcf03db597aaadc9 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 6 May 2021 14:21:32 +0900 Subject: [PATCH 05/17] Introduce ITransport.WaitForRunningAsync and ITransport.MessageHistory --- .../Net/Protocols/TestTransport.cs | 10 +++++++++ Libplanet.Tests/Net/SwarmTest.Broadcast.cs | 2 +- Libplanet.Tests/Net/SwarmTest.cs | 2 +- .../Net/Transports/TransportTest.cs | 21 ++++++------------- Libplanet/Net/Swarm.cs | 8 +++---- Libplanet/Net/Transports/ITransport.cs | 15 +++++++++++++ Libplanet/Net/Transports/NetMQTransport.cs | 10 +++------ 7 files changed, 40 insertions(+), 28 deletions(-) diff --git a/Libplanet.Tests/Net/Protocols/TestTransport.cs b/Libplanet.Tests/Net/Protocols/TestTransport.cs index 116d54a411f..5d62654ab0c 100644 --- a/Libplanet.Tests/Net/Protocols/TestTransport.cs +++ b/Libplanet.Tests/Net/Protocols/TestTransport.cs @@ -61,6 +61,7 @@ public TestTransport( _random = new Random(); Table = new RoutingTable(Address, tableSize, bucketSize); Protocol = new KademliaProtocol(Table, this, Address); + MessageHistory = new FixedSizedQueue(30); } public event EventHandler ProcessMessageHandler; @@ -79,6 +80,8 @@ public TestTransport( public bool Running => !(_swarmCancellationTokenSource is null); + public ConcurrentQueue MessageHistory { get; } + internal ConcurrentBag ReceivedMessages { get; } internal RoutingTable Table { get; } @@ -119,6 +122,11 @@ public async Task StopAsync( await Task.Delay(waitFor, cancellationToken); } + public Task WaitForRunningAsync() + { + return Task.CompletedTask; + } + public async Task BootstrapAsync( IEnumerable bootstrapPeers, TimeSpan? pingSeedTimeout = null, @@ -353,6 +361,7 @@ await _requests.AddAsync( message.Identity); LastMessageTimestamp = DateTimeOffset.UtcNow; ReceivedMessages.Add(reply); + MessageHistory.Enqueue(reply); MessageReceived.Set(); return reply; } @@ -433,6 +442,7 @@ private void ReceiveMessage(Message message) throw new ArgumentException("Sender of message is not a BoundPeer."); } + MessageHistory.Enqueue(message); if (message is TestMessage testMessage) { if (_ignoreTestMessageWithData.Contains(testMessage.Data)) diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index 6d49d560202..5a6f519bb6c 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -113,7 +113,7 @@ public async Task BroadcastIgnoreFromDifferentGenesisHash() await receiverSwarm.AddPeersAsync(new[] { seedSwarm.AsPeer }, null); Block block = await seedChain.MineBlock(seedSwarm.Address); seedSwarm.BroadcastBlock(block); - while (!((NetMQTransport)receiverSwarm.Transport).MessageHistory + while (receiverSwarm.Transport.MessageHistory .Any(msg => msg is BlockHeaderMessage)) { await Task.Delay(100); diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 3a58058718a..0ed2cc9dde0 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -1522,7 +1522,7 @@ public async Task DoNotFillMultipleTimes() // Awaits 1 second because receiver swarm may tried to fill again after filled. await Task.Delay(1000); - var transport = receiver.Transport as NetMQTransport; + var transport = receiver.Transport; Log.Debug("Messages: {@Message}", transport.MessageHistory); Assert.Single( transport.MessageHistory.Where(msg => msg is Libplanet.Net.Messages.Blocks)); diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index c9c0ff23e63..787dea516bb 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -50,13 +50,11 @@ public async void RestartAsync() try { - _ = transport.StartAsync(); - await transport.WaitForRunningAsync(); + await InitializeAsync(transport); Assert.True(transport.Running); await transport.StopAsync(TimeSpan.Zero); Assert.False(transport.Running); - _ = transport.StartAsync(); - await transport.WaitForRunningAsync(); + await InitializeAsync(transport); Assert.True(transport.Running); } finally @@ -72,8 +70,7 @@ public async void Dispose() try { - _ = transport.StartAsync(); - await transport.WaitForRunningAsync(); + await InitializeAsync(transport); Assert.True(transport.Running); transport.Dispose(); var boundPeer = new BoundPeer( @@ -296,18 +293,12 @@ EventHandler MessageHandler(TaskCompletionSource tcs) } } - protected async Task InitializeAsync( + protected async Task InitializeAsync( ITransport transport, CancellationToken cts = default) { - Task task = transport.StartAsync(cts); - - while (!transport.Running) - { - await Task.Delay(100, cts); - } - - return task; + _ = transport.StartAsync(cts); + await transport.WaitForRunningAsync(); } private ITransport CreateTransport( diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 0440fc28e3f..d975899a605 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -163,7 +163,7 @@ internal Swarm( } } - public bool Running => Transport is NetMQTransport p && p.Running; + public bool Running => Transport.Running; public DnsEndPoint EndPoint => AsPeer is BoundPeer boundPeer ? boundPeer.EndPoint : null; @@ -221,10 +221,10 @@ internal Swarm( /// /// Waits until this instance gets started to run. /// - /// - /// A completed when + /// + /// A completed when /// property becomes true. - public Task WaitForRunningAsync() => (Transport as NetMQTransport)?.WaitForRunningAsync(); + public Task WaitForRunningAsync() => Transport?.WaitForRunningAsync(); public void Dispose() { diff --git a/Libplanet/Net/Transports/ITransport.cs b/Libplanet/Net/Transports/ITransport.cs index ad48429fd3b..d6bf27d3c4e 100644 --- a/Libplanet/Net/Transports/ITransport.cs +++ b/Libplanet/Net/Transports/ITransport.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; @@ -39,6 +40,13 @@ public interface ITransport : IDisposable [Pure] bool Running { get; } + /// + /// A fixed sized queue that keeps history of received. + /// It saves at most 30 recent s. + /// + [Pure] + ConcurrentQueue MessageHistory { get; } + /// /// Initiates and runs transport layer. /// @@ -61,6 +69,13 @@ Task StopAsync( TimeSpan waitFor, CancellationToken cancellationToken = default); + /// + /// Waits until this instance gets started to run. + /// + /// A completed when + /// property becomes true. + Task WaitForRunningAsync(); + /// /// Sends the to given . /// diff --git a/Libplanet/Net/Transports/NetMQTransport.cs b/Libplanet/Net/Transports/NetMQTransport.cs index 97b09f7463d..6c0cb31b28e 100644 --- a/Libplanet/Net/Transports/NetMQTransport.cs +++ b/Libplanet/Net/Transports/NetMQTransport.cs @@ -226,7 +226,8 @@ private set } } - internal FixedSizedQueue MessageHistory { get; } + /// + public ConcurrentQueue MessageHistory { get; } internal IPAddress PublicIPAddress => _turnClient?.PublicAddress; @@ -360,12 +361,7 @@ public void Dispose() } } - /// - /// Waits until this instance gets started to run. - /// - /// - /// A completed when - /// property becomes true. + /// public Task WaitForRunningAsync() => _runningEvent.Task; /// From 8c917b46e044e9fa8445c2c0acfe8d83b9af0a76 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Tue, 18 May 2021 17:38:43 +0900 Subject: [PATCH 06/17] Make ReplyMessage await reply end --- .../Net/Protocols/TestTransport.cs | 23 +++++++++---------- .../Net/Transports/TransportTest.cs | 16 +++++++------ Libplanet/Net/Protocols/KademliaProtocol.cs | 4 ++-- Libplanet/Net/Swarm.MessageHandlers.cs | 10 ++++---- Libplanet/Net/Transports/ITransport.cs | 6 ++++- Libplanet/Net/Transports/NetMQTransport.cs | 17 ++++++++++++-- 6 files changed, 47 insertions(+), 29 deletions(-) diff --git a/Libplanet.Tests/Net/Protocols/TestTransport.cs b/Libplanet.Tests/Net/Protocols/TestTransport.cs index 5d62654ab0c..3c097ec2676 100644 --- a/Libplanet.Tests/Net/Protocols/TestTransport.cs +++ b/Libplanet.Tests/Net/Protocols/TestTransport.cs @@ -388,7 +388,7 @@ await SendMessageWithReplyAsync(peer, message, timeout, cancellationToken), }; } - public void ReplyMessage(Message message) + public async Task ReplyMessageAsync(Message message, CancellationToken cancellationToken) { if (!Running) { @@ -397,12 +397,9 @@ public void ReplyMessage(Message message) _logger.Debug("Replying {Message}...", message); message.Remote = AsPeer; - Task.Run(async () => - { - await Task.Delay(_networkDelay); - _transports[_peersToReply[message.Identity]].ReceiveReply(message); - _peersToReply.TryRemove(message.Identity, out Address addr); - }); + await Task.Delay(_networkDelay, cancellationToken); + _transports[_peersToReply[message.Identity]].ReceiveReply(message); + _peersToReply.TryRemove(message.Identity, out Address addr); } public async Task WaitForTestMessageWithData( @@ -467,11 +464,13 @@ private void ReceiveMessage(Message message) if (message is Ping) { - ReplyMessage(new Pong - { - Identity = message.Identity, - Remote = AsPeer, - }); + _ = ReplyMessageAsync( + new Pong + { + Identity = message.Identity, + Remote = AsPeer, + }, + default); } LastMessageTimestamp = DateTimeOffset.UtcNow; diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index 787dea516bb..d5d17dcd182 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -98,8 +98,8 @@ await Assert.ThrowsAsync( default)); Assert.Throws( () => transport.BroadcastMessage(null, message)); - Assert.Throws( - () => transport.ReplyMessage(message)); + await Assert.ThrowsAsync( + async () => await transport.ReplyMessageAsync(message, default)); // To check multiple Dispose() throws error or not. transport.Dispose(); @@ -187,7 +187,7 @@ await transportA.SendMessageAsync( } } - // This also tests ITransport.ReplyMessage at the same time. + // This also tests ITransport.ReplyMessageAsync at the same time. [SkippableFact(Timeout = Timeout)] public async Task SendMessageWithReplyAsync() { @@ -198,10 +198,12 @@ public async Task SendMessageWithReplyAsync() { if (message is Ping) { - transportB.ReplyMessage(new Pong - { - Identity = message.Identity, - }); + _ = transportB.ReplyMessageAsync( + new Pong + { + Identity = message.Identity, + }, + CancellationToken.None); } }; diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index 9f9ccbff85f..ad06183136a 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -703,7 +703,7 @@ private void ReceivePing(Ping ping) Identity = ping.Identity, }; - _transport.ReplyMessage(pong); + _ = _transport.ReplyMessageAsync(pong, default); } /// @@ -852,7 +852,7 @@ private void ReceiveFindPeer(FindNeighbors findNeighbors) Identity = findNeighbors.Identity, }; - _transport.ReplyMessage(neighbors); + _ = _transport.ReplyMessageAsync(neighbors, default); } } } diff --git a/Libplanet/Net/Swarm.MessageHandlers.cs b/Libplanet/Net/Swarm.MessageHandlers.cs index 99865db2c57..7042f6d81a7 100644 --- a/Libplanet/Net/Swarm.MessageHandlers.cs +++ b/Libplanet/Net/Swarm.MessageHandlers.cs @@ -38,7 +38,7 @@ private void ProcessMessageHandler(object target, Message message) Identity = getChainStatus.Identity, }; - Transport.ReplyMessage(chainStatus); + _ = Transport.ReplyMessageAsync(chainStatus, default); break; } @@ -65,7 +65,7 @@ out IReadOnlyList hashes { Identity = getBlockHashes.Identity, }; - Transport.ReplyMessage(reply); + _ = Transport.ReplyMessageAsync(reply, default); break; } @@ -188,7 +188,7 @@ private void TransferTxs(GetTxs getTxs) { Identity = getTxs.Identity, }; - Transport.ReplyMessage(response); + _ = Transport.ReplyMessageAsync(response, default); } } @@ -268,7 +268,7 @@ private void TransferBlocks(GetBlocks getData) i, total ); - Transport.ReplyMessage(response); + _ = Transport.ReplyMessageAsync(response, default); blocks.Clear(); } @@ -287,7 +287,7 @@ private void TransferBlocks(GetBlocks getData) total, identityHex ); - Transport.ReplyMessage(response); + _ = Transport.ReplyMessageAsync(response, default); } _logger.Debug("Blocks were transferred to {Identity}.", identityHex); diff --git a/Libplanet/Net/Transports/ITransport.cs b/Libplanet/Net/Transports/ITransport.cs index d6bf27d3c4e..335a4291ce7 100644 --- a/Libplanet/Net/Transports/ITransport.cs +++ b/Libplanet/Net/Transports/ITransport.cs @@ -142,6 +142,10 @@ Task> SendMessageWithReplyAsync( /// . /// /// A to reply. - void ReplyMessage(Message message); + /// + /// A cancellation token used to propagate notification that this + /// operation should be canceled. + /// An awaitable task without value. + Task ReplyMessageAsync(Message message, CancellationToken cancellationToken); } } diff --git a/Libplanet/Net/Transports/NetMQTransport.cs b/Libplanet/Net/Transports/NetMQTransport.cs index 6c0cb31b28e..7f079db2516 100644 --- a/Libplanet/Net/Transports/NetMQTransport.cs +++ b/Libplanet/Net/Transports/NetMQTransport.cs @@ -59,6 +59,7 @@ public class NetMQTransport : ITransport private TaskCompletionSource _runningEvent; private CancellationToken _cancellationToken; private ConcurrentDictionary _dealers; + private ConcurrentDictionary> _replyCompletionSources; private RoutingTable _table; @@ -195,6 +196,8 @@ public NetMQTransport( MessageHistory = new FixedSizedQueue(MessageHistoryCapacity); _dealers = new ConcurrentDictionary(); + _replyCompletionSources = + new ConcurrentDictionary>(); } /// @@ -521,7 +524,7 @@ public void BroadcastMessage(Address? except, Message message) } /// - public void ReplyMessage(Message message) + public async Task ReplyMessageAsync(Message message, CancellationToken cancellationToken) { if (_disposed) { @@ -529,12 +532,19 @@ public void ReplyMessage(Message message) } string identityHex = ByteUtil.Hex(message.Identity); + var tcs = new TaskCompletionSource(); + using CancellationTokenRegistration ctr = + cancellationToken.Register(() => tcs.TrySetCanceled()); + _replyCompletionSources.TryAdd(identityHex, tcs); _logger.Debug("Reply {Message} to {Identity}...", message, identityHex); _replyQueue.Enqueue(message.ToNetMQMessage( _privateKey, AsPeer, DateTimeOffset.UtcNow, _appProtocolVersion)); + + await tcs.Task; + _replyCompletionSources.TryRemove(identityHex, out _); } private void ReceiveMessage(object sender, NetMQSocketEventArgs e) @@ -584,7 +594,7 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) { Identity = dapve.Identity, }; - ReplyMessage(differentVersion); + _ = ReplyMessageAsync(differentVersion, _cancellationToken); _logger.Debug("Message from peer with different version received."); } catch (InvalidTimestampException ite) @@ -680,6 +690,9 @@ private void DoReply(object sender, NetMQQueueEventArgs e) { _logger.Debug("Failed to reply to {Identity}", identityHex); } + + _replyCompletionSources.TryGetValue(identityHex, out TaskCompletionSource tcs); + tcs?.TrySetResult(null); } private async Task RefreshPermissions( From 3302b7d457d969dad90ca3c36f24bf7cace4b17b Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 22 Apr 2021 11:14:33 +0900 Subject: [PATCH 07/17] Introduce TCP based transport --- .../Net/Transports/TcpTransportTest.cs | 140 +++ .../Net/Transports/TransportTest.cs | 28 +- Libplanet/Net/Swarm.cs | 4 +- .../Net/Transports/BoundPeerExtensions.cs | 72 ++ Libplanet/Net/Transports/ITransport.cs | 14 + .../Transports/InvalidMagicCookieException.cs | 40 + Libplanet/Net/Transports/TcpTransport.cs | 834 ++++++++++++++++++ 7 files changed, 1129 insertions(+), 3 deletions(-) create mode 100644 Libplanet.Tests/Net/Transports/TcpTransportTest.cs create mode 100644 Libplanet/Net/Transports/InvalidMagicCookieException.cs create mode 100644 Libplanet/Net/Transports/TcpTransport.cs diff --git a/Libplanet.Tests/Net/Transports/TcpTransportTest.cs b/Libplanet.Tests/Net/Transports/TcpTransportTest.cs new file mode 100644 index 00000000000..76f8d29ad3d --- /dev/null +++ b/Libplanet.Tests/Net/Transports/TcpTransportTest.cs @@ -0,0 +1,140 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; +using Libplanet.Crypto; +using Libplanet.Net; +using Libplanet.Net.Messages; +using Libplanet.Net.Protocols; +using Libplanet.Net.Transports; +using Serilog; +using Xunit; +using Xunit.Abstractions; + +namespace Libplanet.Tests.Net.Transports +{ + public class TcpTransportTest : TransportTest + { + public TcpTransportTest(ITestOutputHelper testOutputHelper) + { + TransportConstructor = CreateTcpTransport; + + const string outputTemplate = + "{Timestamp:HH:mm:ss:ffffff}[{ThreadId}] - {Message}"; + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Verbose() + .Enrich.WithThreadId() + .WriteTo.TestOutput(testOutputHelper, outputTemplate: outputTemplate) + .CreateLogger() + .ForContext(); + Logger = Log.ForContext(); + } + + [SkippableFact(Timeout = Timeout)] + public async Task ReadWriteMessageWithInvalidMagicCookieAsync() + { + byte[] invalidCookie = { 0x01, 0x02 }; + Assert.False(TcpTransport.MagicCookie.SequenceEqual(invalidCookie)); + + var listener = new TcpListener(IPAddress.Any, 0); + listener.Start(); + var client = new TcpClient(); + await client.ConnectAsync("127.0.0.1", ((IPEndPoint)listener.LocalEndpoint).Port); + TcpClient listenerSocket = await listener.AcceptTcpClientAsync(); + + AppProtocolVersion version = AppProtocolVersion.Sign(new PrivateKey(), 1); + TcpTransport transport = CreateTcpTransport(appProtocolVersion: version); + try + { + var message = new Ping + { + Identity = Guid.NewGuid().ToByteArray(), + }; + + byte[] serialized = message.Serialize( + new PrivateKey(), + transport.AsPeer, + DateTimeOffset.UtcNow, + version); + int length = serialized.Length; + var buffer = new byte[invalidCookie.Length + sizeof(int) + length]; + invalidCookie.CopyTo(buffer, 0); + BitConverter.GetBytes(length).CopyTo(buffer, invalidCookie.Length); + serialized.CopyTo(buffer, invalidCookie.Length + sizeof(int)); + NetworkStream stream = client.GetStream(); + await stream.WriteAsync(buffer, 0, buffer.Length, default); + await Assert.ThrowsAsync( + async () => await transport.ReadMessageAsync(listenerSocket, default)); + + byte[] shortBuffer = { 0x00 }; + await stream.WriteAsync(shortBuffer, 0, shortBuffer.Length, default); + await Assert.ThrowsAsync( + async () => await transport.ReadMessageAsync(listenerSocket, default)); + } + finally + { + listenerSocket.Dispose(); + client.Dispose(); + } + } + + private TcpTransport CreateTcpTransport( + PrivateKey privateKey = null, + int tableSize = 160, + int bucketSize = 16, + AppProtocolVersion appProtocolVersion = default, + IImmutableSet trustedAppProtocolVersionSigners = null, + string host = null, + int? listenPort = null, + IEnumerable iceServers = null, + DifferentAppProtocolVersionEncountered differentAppProtocolVersionEncountered = null, + TimeSpan? messageLifespan = null + ) + { + privateKey = privateKey ?? new PrivateKey(); + host = host ?? IPAddress.Loopback.ToString(); + trustedAppProtocolVersionSigners = trustedAppProtocolVersionSigners ?? + ImmutableHashSet.Empty; + iceServers = iceServers ?? new IceServer[] { }; + var table = new RoutingTable(privateKey.ToAddress(), tableSize, bucketSize); + + return CreateTcpTransport( + table, + privateKey, + appProtocolVersion, + trustedAppProtocolVersionSigners, + host, + listenPort, + iceServers, + differentAppProtocolVersionEncountered, + messageLifespan); + } + + private TcpTransport CreateTcpTransport( + RoutingTable table, + PrivateKey privateKey, + AppProtocolVersion appProtocolVersion, + IImmutableSet trustedAppProtocolVersionSigners, + string host, + int? listenPort, + IEnumerable iceServers, + DifferentAppProtocolVersionEncountered differentAppProtocolVersionEncountered, + TimeSpan? messageLifespan + ) + { + return new TcpTransport( + table, + privateKey, + appProtocolVersion, + trustedAppProtocolVersionSigners, + host, + listenPort, + iceServers, + differentAppProtocolVersionEncountered, + messageLifespan); + } + } +} diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index d5d17dcd182..869659631d8 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -215,7 +215,7 @@ public async Task SendMessageWithReplyAsync() Message reply = await transportA.SendMessageWithReplyAsync( (BoundPeer)transportB.AsPeer, new Ping(), - null, + TimeSpan.FromSeconds(3), CancellationToken.None); Assert.IsType(reply); @@ -227,6 +227,32 @@ public async Task SendMessageWithReplyAsync() } } + // This also tests ITransport.ReplyMessage at the same time. + [SkippableFact(Timeout = Timeout)] + public async Task SendMessageWithReplyAsyncTimeout() + { + ITransport transportA = CreateTransport(); + ITransport transportB = CreateTransport(); + + try + { + await InitializeAsync(transportA); + await InitializeAsync(transportB); + + await Assert.ThrowsAsync( + async () => await transportA.SendMessageWithReplyAsync( + (BoundPeer)transportB.AsPeer, + new Ping(), + TimeSpan.FromSeconds(3), + CancellationToken.None)); + } + finally + { + transportA.Dispose(); + transportB.Dispose(); + } + } + [SkippableFact(Timeout = Timeout)] public async Task BroadcastMessage() { diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index d975899a605..b1ff76883ea 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -163,13 +163,13 @@ internal Swarm( } } - public bool Running => Transport.Running; + public bool Running => Transport?.Running ?? false; public DnsEndPoint EndPoint => AsPeer is BoundPeer boundPeer ? boundPeer.EndPoint : null; public Address Address => _privateKey.ToAddress(); - public Peer AsPeer => Transport.AsPeer; + public Peer AsPeer => Transport?.AsPeer; /// /// The last time when any message was arrived. diff --git a/Libplanet/Net/Transports/BoundPeerExtensions.cs b/Libplanet/Net/Transports/BoundPeerExtensions.cs index 0ddb50bf85f..3f020f914f5 100644 --- a/Libplanet/Net/Transports/BoundPeerExtensions.cs +++ b/Libplanet/Net/Transports/BoundPeerExtensions.cs @@ -1,4 +1,7 @@ using System; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; using Libplanet.Crypto; using Libplanet.Net.Messages; using NetMQ; @@ -49,6 +52,75 @@ public static AppProtocolVersion QueryAppProtocolVersion( ); } + /// + /// Queries of given + /// specialized for TCP based transport. + /// + /// The to query + /// . + /// Timeout value for request. + /// + /// A cancellation token used to propagate notification that this + /// operation should be canceled. + /// of given peer. + public static async Task QueryAppProtocolVersionTcp( + this BoundPeer peer, + TimeSpan? timeout = null, + CancellationToken cancellationToken = default + ) + { + using var client = new TcpClient(); + await client.ConnectAsync(peer.EndPoint.Host, peer.EndPoint.Port); + client.ReceiveTimeout = timeout?.Milliseconds ?? 0; + using var stream = client.GetStream(); + var key = new PrivateKey(); + var ping = new Ping + { + Identity = Guid.NewGuid().ToByteArray(), + }; + + byte[] serialized = ping.Serialize( + key, + peer, + DateTimeOffset.UtcNow, + default); + int length = serialized.Length; + var buffer = new byte[TcpTransport.MagicCookie.Length + sizeof(int) + length]; + TcpTransport.MagicCookie.CopyTo(buffer, 0); + BitConverter.GetBytes(length).CopyTo(buffer, TcpTransport.MagicCookie.Length); + serialized.CopyTo(buffer, TcpTransport.MagicCookie.Length + sizeof(int)); + await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken); + + buffer = new byte[1000000]; + int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken); + var magicCookieBuffer = new byte[TcpTransport.MagicCookie.Length]; + Array.Copy(buffer, 0, magicCookieBuffer, 0, TcpTransport.MagicCookie.Length); + var sizeBuffer = new byte[sizeof(int)]; + Array.Copy(buffer, TcpTransport.MagicCookie.Length, sizeBuffer, 0, sizeof(int)); + length = BitConverter.ToInt32(sizeBuffer, 0); + var contentBuffer = new byte[length]; + Array.Copy( + buffer, + TcpTransport.MagicCookie.Length + sizeof(int), + contentBuffer, + 0, + length); + + // length of identity + Array.Copy(contentBuffer, 4, sizeBuffer, 0, 4); + length = BitConverter.ToInt32(sizeBuffer, 0); + + // length of apv token + Array.Copy(contentBuffer, 4 + 4 + length, sizeBuffer, 0, 4); + int apvLength = BitConverter.ToInt32(sizeBuffer, 0); + var apvBytes = new byte[apvLength]; + Array.Copy(contentBuffer, 4 + 4 + length + 4, apvBytes, 0, apvLength); + var frame = new NetMQFrame(apvBytes); + string token = frame.ConvertToString(); + AppProtocolVersion version = AppProtocolVersion.FromToken(token); + return version; + } + internal static string ToNetMQAddress(this BoundPeer peer) { return $"tcp://{peer.EndPoint.Host}:{peer.EndPoint.Port}"; diff --git a/Libplanet/Net/Transports/ITransport.cs b/Libplanet/Net/Transports/ITransport.cs index 335a4291ce7..4251e6a1ca9 100644 --- a/Libplanet/Net/Transports/ITransport.cs +++ b/Libplanet/Net/Transports/ITransport.cs @@ -54,6 +54,8 @@ public interface ITransport : IDisposable /// A cancellation token used to propagate notification that this /// operation should be canceled. /// An awaitable task without value. + /// + /// Thrown when instance is already disposed. Task StartAsync(CancellationToken cancellationToken = default); /// @@ -65,6 +67,8 @@ public interface ITransport : IDisposable /// A cancellation token used to propagate notification that this /// operation should be canceled. /// An awaitable task without value. + /// + /// Thrown when instance is already disposed. Task StopAsync( TimeSpan waitFor, CancellationToken cancellationToken = default); @@ -85,6 +89,8 @@ Task StopAsync( /// A cancellation token used to propagate notification that this /// operation should be canceled. /// An awaitable task without value. + /// + /// Thrown when instance is already disposed. Task SendMessageAsync(BoundPeer peer, Message message, CancellationToken cancellationToken); /// @@ -99,6 +105,8 @@ Task StopAsync( /// operation should be canceled. /// The replies of the /// sent by . + /// + /// Thrown when instance is already disposed. Task SendMessageWithReplyAsync( BoundPeer peer, Message message, @@ -118,6 +126,8 @@ Task SendMessageWithReplyAsync( /// operation should be canceled. /// The replies of the /// sent by . + /// + /// Thrown when instance is already disposed. Task> SendMessageWithReplyAsync( BoundPeer peer, Message message, @@ -131,6 +141,8 @@ Task> SendMessageWithReplyAsync( /// An to exclude from broadcasting. /// If null is given, no peers will be excluded. /// A to broadcast. + /// + /// Thrown when instance is already disposed. void BroadcastMessage(Address? except, Message message); /// @@ -146,6 +158,8 @@ Task> SendMessageWithReplyAsync( /// A cancellation token used to propagate notification that this /// operation should be canceled. /// An awaitable task without value. + /// + /// Thrown when instance is already disposed. Task ReplyMessageAsync(Message message, CancellationToken cancellationToken); } } diff --git a/Libplanet/Net/Transports/InvalidMagicCookieException.cs b/Libplanet/Net/Transports/InvalidMagicCookieException.cs new file mode 100644 index 00000000000..5e80e325725 --- /dev/null +++ b/Libplanet/Net/Transports/InvalidMagicCookieException.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace Libplanet.Net.Transports +{ + public class InvalidMagicCookieException : Exception + { + public InvalidMagicCookieException(IEnumerable expected, IEnumerable actual) + { + Expected = expected.ToImmutableArray(); + Actual = actual.ToImmutableArray(); + } + + public InvalidMagicCookieException( + IEnumerable expected, + IEnumerable actual, + string message) + : base(message) + { + Expected = expected.ToImmutableArray(); + Actual = actual.ToImmutableArray(); + } + + public InvalidMagicCookieException( + IEnumerable expected, + IEnumerable actual, + string message, + Exception innerException) + : base(message, innerException) + { + Expected = expected.ToImmutableArray(); + Actual = actual.ToImmutableArray(); + } + + public ImmutableArray Expected { get; private set; } + + public ImmutableArray Actual { get; private set; } + } +} diff --git a/Libplanet/Net/Transports/TcpTransport.cs b/Libplanet/Net/Transports/TcpTransport.cs new file mode 100644 index 00000000000..67e34008886 --- /dev/null +++ b/Libplanet/Net/Transports/TcpTransport.cs @@ -0,0 +1,834 @@ +#nullable enable +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Libplanet.Crypto; +using Libplanet.Net.Messages; +using Libplanet.Net.Protocols; +using Libplanet.Stun; +using Nito.AsyncEx; +using Serilog; + +namespace Libplanet.Net.Transports +{ + public class TcpTransport : ITransport + { + public static readonly byte[] MagicCookie = { 0x4c, 0x50 }; // 'L', 'P' + + private const int MessageHistoryCapacity = 30; + private const int ListenerBacklog = 100; + private const int MaxReplyStreams = 3000; + + // TODO: Should be configurable (ex. SwarmOptions.MaxTimeout) + private static readonly TimeSpan ListenerLifetime = TimeSpan.FromMinutes(2); + + // TURN Permission lifetime was defined in RFC 5766 + // see also https://tools.ietf.org/html/rfc5766#section-8 + private static readonly TimeSpan TurnPermissionLifetime = + TimeSpan.FromMinutes(5); + + private readonly RoutingTable _table; + private readonly PrivateKey _privateKey; + private readonly AppProtocolVersion _appProtocolVersion; + private readonly IImmutableSet? _trustedAppProtocolVersionSigners; + private readonly string? _host; + private readonly DifferentAppProtocolVersionEncountered? + _differentAppProtocolVersionEncountered; + + private readonly IList? _iceServers; + private readonly ILogger _logger; + private readonly TimeSpan? _messageLifespan; + + private readonly CancellationTokenSource _turnCancellationTokenSource; + + private readonly ConcurrentDictionary _streams; + + private TaskCompletionSource _runningEvent; + private CancellationTokenSource _runtimeCancellationTokenSource; + private CancellationToken _cancellationToken; + + private int _listenPort; + private DnsEndPoint? _hostEndPoint; + private TcpListener? _listener; + private TurnClient? _turnClient; + + private bool _disposed; + + public TcpTransport( + RoutingTable table, + PrivateKey privateKey, + AppProtocolVersion appProtocolVersion, + IImmutableSet? trustedAppProtocolVersionSigners, + string? host, + int? listenPort, + IEnumerable iceServers, + DifferentAppProtocolVersionEncountered? differentAppProtocolVersionEncountered, + TimeSpan? messageLifespan = null) + { + _runningEvent = null!; + Running = false; + + _table = table; + _privateKey = privateKey; + _appProtocolVersion = appProtocolVersion; + _trustedAppProtocolVersionSigners = trustedAppProtocolVersionSigners; + _host = host; + _listenPort = listenPort ?? 0; + _differentAppProtocolVersionEncountered = differentAppProtocolVersionEncountered; + _iceServers = iceServers?.ToList(); + _messageLifespan = messageLifespan; + _streams = new ConcurrentDictionary(); + + if (!(_host is null) && listenPort is { } listenPortAsInt) + { + _hostEndPoint = new DnsEndPoint(_host, listenPortAsInt); + } + + if (_host == null && (_iceServers == null || !_iceServers.Any())) + { + throw new ArgumentException( + $"Swarm requires either {nameof(host)} or " + + $"{nameof(iceServers)}." + ); + } + + _logger = Log.ForContext(); + _runtimeCancellationTokenSource = new CancellationTokenSource(); + _turnCancellationTokenSource = new CancellationTokenSource(); + MessageHistory = new FixedSizedQueue(MessageHistoryCapacity); + } + + public event EventHandler ProcessMessageHandler = null!; + + /// + public Peer AsPeer => EndPoint is null + ? new Peer(_privateKey.PublicKey, PublicIPAddress) + : new BoundPeer(_privateKey.PublicKey, EndPoint, PublicIPAddress); + + /// + public DateTimeOffset? LastMessageTimestamp { get; private set; } + + public bool Running + { + get => _runningEvent?.Task.Status == TaskStatus.RanToCompletion; + + private set + { + if (value) + { + _runningEvent?.TrySetResult(null!); + } + else + { + _runningEvent = new TaskCompletionSource(); + } + } + } + + public ConcurrentQueue MessageHistory { get; } + + internal IPAddress? PublicIPAddress => _turnClient?.PublicAddress; + + internal DnsEndPoint? EndPoint => _turnClient?.EndPoint ?? _hostEndPoint; + + public void Dispose() + { + if (!_disposed) + { + _listener?.Stop(); + _runtimeCancellationTokenSource.Cancel(); + _turnCancellationTokenSource.Cancel(); + + _runtimeCancellationTokenSource.Dispose(); + _turnCancellationTokenSource.Dispose(); + StopAllStreams(); + Running = false; + _disposed = true; + } + } + + public async Task StartAsync(CancellationToken cancellationToken = default) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TcpTransport)); + } + + if (Running) + { + throw new TransportException("Transport is already running."); + } + + _listener = new TcpListener(new IPEndPoint(IPAddress.Any, _listenPort)); + _listener.Start(ListenerBacklog); + + // _listenPort might be 0, which is any, so it should be re-set. + _listenPort = ((IPEndPoint)_listener.LocalEndpoint).Port; + + _logger.Information("Listen on {Port}", _listenPort); + + if (_host is null && !(_iceServers is null)) + { + _turnClient = await IceServer.CreateTurnClient(_iceServers); + await _turnClient.StartAsync(_listenPort, cancellationToken); + + _ = RefreshPermissions(cancellationToken); + } + + _runtimeCancellationTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _cancellationToken = _runtimeCancellationTokenSource.Token; + + if (_turnClient is null || !_turnClient.BehindNAT) + { + string? host = _host ?? PublicIPAddress?.ToString(); + if (host is null) + { + throw new TransportException("Host is null."); + } + + _hostEndPoint = new DnsEndPoint(host, _listenPort); + } + + List tasks = new List(); + + tasks.Add(ReceiveMessageAsync(_cancellationToken)); + tasks.Add(ProcessRuntime(_cancellationToken)); + + Running = true; + _logger.Debug("Start to run. TurnClient: {Client}", _turnClient); + + await await Task.WhenAny(tasks); + } + + public async Task StopAsync(TimeSpan waitFor, CancellationToken cancellationToken = default) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TcpTransport)); + } + + if (Running) + { + await Task.Delay(waitFor, cancellationToken); + _listener?.Stop(); + _runtimeCancellationTokenSource.Cancel(); + _turnCancellationTokenSource.Cancel(); + StopAllStreams(); + Running = false; + } + } + + /// + public Task WaitForRunningAsync() => _runningEvent.Task; + + public Task SendMessageAsync( + BoundPeer peer, + Message message, + CancellationToken cancellationToken) + => SendMessageWithReplyAsync( + peer, + message, + TimeSpan.FromSeconds(3), + 0, + cancellationToken); + + public async Task SendMessageWithReplyAsync( + BoundPeer peer, + Message message, + TimeSpan? timeout, + CancellationToken cancellationToken) + { + IEnumerable replies = + await SendMessageWithReplyAsync(peer, message, timeout, 1, cancellationToken); + Message reply = replies.First(); + + return reply; + } + + public async Task> SendMessageWithReplyAsync( + BoundPeer peer, + Message message, + TimeSpan? timeout, + int expectedResponses, + CancellationToken cancellationToken = default) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TcpTransport)); + } + + if (!(_turnClient is null) && _turnClient.BehindNAT) + { + await CreatePermission(peer); + } + + if (expectedResponses < 0) + { + throw new ArgumentOutOfRangeException(nameof(expectedResponses)); + } + + Guid reqId; + if (message.Identity is null) + { + reqId = Guid.NewGuid(); + message.Identity = reqId.ToByteArray(); + } + else + { + reqId = new Guid(message.Identity); + } + + _logger.Verbose( + "Start to request {RequestId} to {Peer}: {Message}. (expected responses: {Count})", + reqId, + peer, + message, + expectedResponses + ); + + using var client = new TcpClient { LingerState = new LingerOption(true, 1) }; + try + { + await client.ConnectAsync(peer.EndPoint.Host, peer.EndPoint.Port); + client.ReceiveTimeout = timeout?.Milliseconds ?? 0; + await WriteMessageAsync(message, client, cancellationToken); + _logger.Verbose( + "Successfully sent request {RequestId} to {Peer}: {Message}.", + reqId, + peer, + message + ); + + var replies = new List(); + using var timeoutCts = new CancellationTokenSource(); + if (expectedResponses != 0 && !(timeout is null)) + { + timeoutCts.CancelAfter( + (int)timeout.Value.TotalMilliseconds * expectedResponses); + } + + using CancellationTokenSource linkedTcs = + CancellationTokenSource.CreateLinkedTokenSource( + timeoutCts.Token, + cancellationToken); + while (expectedResponses > 0) + { + try + { + // TODO: Should consider case where stream bytes exceeds buffer size. + Message received = await ReadMessageAsync( + client, + linkedTcs.Token); + _logger.Verbose( + "Received message was {Message}. Total: {Count}", + received, + replies.Count); + MessageHistory.Enqueue(received); + replies.Add(received); + expectedResponses--; + } + catch (TaskCanceledException) + { + if (timeoutCts.IsCancellationRequested) + { + var msg = + $"{nameof(SendMessageWithReplyAsync)}() timed out after " + + "{Timeout} of waiting a reply to {MessageType} ({RequestId}) " + + "from {PeerAddress}."; + _logger.Debug( + msg, + timeout, + message.GetType().Name, + reqId, + peer.Address); + throw new TimeoutException(); + } + + throw; + } + } + + if (replies.Any()) + { + const string logMsg = + "Received {ReplyMessageCount} reply messages to {RequestId} " + + "from the {Peer}: {ReplyMessages}."; + _logger.Debug(logMsg, replies.Count, reqId, peer, replies); + } + + return replies; + } + catch (DifferentAppProtocolVersionException e) + { + const string logMsg = + "{PeerAddress} sent a reply to {RequestId} with " + + "a different app protocol version; " + + "expected: {ExpectedVersion}; actual: {ActualVersion}."; + _logger.Error(e, logMsg, peer.Address, reqId, e.ExpectedVersion, e.ActualVersion); + throw; + } + catch (ArgumentException e) + { + // This exception is thrown on .NET framework when the given peer is invalid. + _logger.Error( + e, + "ArgumentException occurred during {FName} to {RequestId}. {E}", + nameof(SendMessageWithReplyAsync), + reqId, + e); + + // To match with previous implementation, throws TimeoutException when it failed to + // find peer to send message. + throw new TimeoutException($"Cannot find peer {peer}.", e); + } + catch (SocketException e) + { + // This exception is thrown on .NET core when the given peer is invalid. + _logger.Error( + e, + "SocketException occurred during {FName} to {RequestId}. {E}", + nameof(SendMessageWithReplyAsync), + reqId, + e); + + // To match with previous implementation, throws TimeoutException when it failed to + // find peer to send message. + throw new TimeoutException($"Cannot find peer {peer}.", e); + } + catch (InvalidMagicCookieException e) + { + _logger.Verbose( + "Magic cookie mismatch, ignored. (Expected: {Expected}, Actual: {Actual})", + e.Expected, + e.Actual); + throw; + } + } + + public void BroadcastMessage(Address? except, Message message) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TcpTransport)); + } + + foreach (var peer in _table.PeersToBroadcast(except)) + { + _ = SendMessageAsync(peer, message, _cancellationToken); + } + } + + public async Task ReplyMessageAsync(Message message, CancellationToken cancellationToken) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TcpTransport)); + } + + var id = new Guid(message.Identity); + _logger.Verbose("Trying to reply message. ID: {Id}", id); + if (_streams.TryGetValue(id, out ReplyStream stream)) + { + _logger.Verbose("Send reply message {Message}", message); + try + { + _logger.Verbose("Start to writing reply {Message}.", message); + using var @lock = stream.Lock.Lock(); + await WriteMessageAsync(message, stream.Client, _cancellationToken); + } + catch (IOException) + { + _logger.Verbose("Connection is lost"); + _ = TryRemoveStreamAsync(id, _cancellationToken); + } + } + } + + internal async Task WriteMessageAsync( + Message message, + TcpClient client, + CancellationToken cancellationToken) + { + byte[] serialized = message.Serialize( + _privateKey, + AsPeer, + DateTimeOffset.UtcNow, + _appProtocolVersion); + int length = serialized.Length; + var buffer = new byte[MagicCookie.Length + sizeof(int) + length]; + MagicCookie.CopyTo(buffer, 0); + BitConverter.GetBytes(length).CopyTo(buffer, MagicCookie.Length); + serialized.CopyTo(buffer, MagicCookie.Length + sizeof(int)); + NetworkStream stream = client.GetStream(); + + // NOTE: Stream is forced to be closed because NetStream.WriteAsync()'s + // cancellation token never works in .NET Framework. + using (cancellationToken.Register(() => stream.Close())) + { + try + { + await stream.WriteAsync(buffer, 0, buffer.Length, default); + } + catch (Exception) + { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + + throw; + } + } + } + + internal async Task ReadMessageAsync( + TcpClient client, + CancellationToken cancellationToken) + { + var content = new List(); + byte[] buffer = new byte[1000]; + NetworkStream stream = client.GetStream(); + + // NOTE: Stream is forced to be closed because NetStream.ReadAsync()'s + // cancellation token never works in .NET Framework. + using (cancellationToken.Register(() => stream.Close())) + { + try + { + // May read shorter than MagicCookie's length just by network condition + int bytesRead = await stream.ReadAsync(buffer, 0, MagicCookie.Length, default); + if (bytesRead < MagicCookie.Length || + !buffer.Take(MagicCookie.Length).SequenceEqual(MagicCookie)) + { + throw new InvalidMagicCookieException(MagicCookie, buffer.Take(bytesRead)); + } + + await stream.ReadAsync(buffer, 0, sizeof(int), default); + int bytesLeft = BitConverter.ToInt32(buffer.Take(sizeof(int)).ToArray(), 0); + + while (bytesLeft != 0) + { + bytesRead = await stream.ReadAsync( + buffer, + 0, + bytesLeft < 1000 ? bytesLeft : 1000, + default); + content.AddRange(buffer.Take(bytesRead)); + bytesLeft -= bytesRead; + } + } + catch (InvalidMagicCookieException) + { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + + throw; + } + catch (Exception) + { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + + _logger.Error( + e, + "Error occurred during {FName}() {E}", + nameof(ReadMessageAsync), + e); + throw; + } + } + + _logger.Verbose("Received {Bytes} bytes from network stream.", content.Count); + + Message message = Message.Deserialize( + content.ToArray(), + _appProtocolVersion, + _trustedAppProtocolVersionSigners, + _differentAppProtocolVersionEncountered, + _messageLifespan); + + _logger.Verbose( + "ReadMessageAsync success. Received message {Message} from network stream.", + message); + return message; + } + + private async Task ReceiveMessageAsync(CancellationToken cancellationToken) + { + while (!(cancellationToken.IsCancellationRequested || _listener is null)) + { + try + { + TcpClient client = await _listener.AcceptTcpClientAsync(); + client.LingerState = new LingerOption(true, 1); + _logger.Verbose("Connected to tcp client."); + Guid guid = Guid.NewGuid(); + _logger.Verbose("Start to accept message of {Id}.", guid); + + // TODO: Maximum message processor should be limited. + _ = AcceptAsync(client, cancellationToken) + .ContinueWith( + async t => + { + _logger.Verbose("Ended to accept message of {Id}.", guid); + try + { + // Wait 15 seconds to close client to receive ACK from TURN. + await Task.Delay( + 15 * 1000, + _runtimeCancellationTokenSource.Token); + } + finally + { + client.Close(); + await TryRemoveStreamAsync(guid, cancellationToken); + } + }, + cancellationToken); + } + catch (ObjectDisposedException) + { + _logger.Warning("TCPListener is disposed."); + break; + } + } + } + + private async Task AcceptAsync( + TcpClient client, + CancellationToken cancellationToken) + { + try + { + _logger.Verbose("Trying to receive message"); + Message message = await ReadMessageAsync(client, cancellationToken); + MessageHistory.Enqueue(message); + LastMessageTimestamp = DateTimeOffset.UtcNow; + _logger.Debug( + "A message has parsed: {Message}, from {Remove}", + message, + message.Remote); + + var id = new Guid(message.Identity); + await TryAddStreamAsync(id, client, cancellationToken); + + ProcessMessageHandler?.Invoke(this, message); + } + catch (DifferentAppProtocolVersionException dapve) + { + _logger.Debug("Message from peer with different version received."); + var differentVersion = new DifferentVersion + { + Identity = dapve.Identity, + }; + + await WriteMessageAsync(differentVersion, client, cancellationToken); + } + catch (IOException) + { + _logger.Verbose("Connection is lost."); + } + catch (InvalidMagicCookieException e) + { + _logger.Verbose( + "Magic cookie mismatch, ignored. (Expected: {Expected}, Actual: {Actual})", + e.Expected, + e.Actual); + } + catch (Exception e) + { + _logger.Error( + e, + "Unexpected exception occurred during {FName}(). {E}", + nameof(AcceptAsync), + e); + throw; + } + } + + private async Task RefreshPermissions( + CancellationToken cancellationToken) + { + TimeSpan lifetime = TurnPermissionLifetime; + while (!cancellationToken.IsCancellationRequested) + { + try + { + await Task.Delay(lifetime - TimeSpan.FromMinutes(1), cancellationToken); + _logger.Debug("Refreshing permissions..."); + await Task.WhenAll(_table.Peers.Select(CreatePermission)); + cancellationToken.ThrowIfCancellationRequested(); + } + catch (OperationCanceledException e) + { + _logger.Warning(e, "{FName}() is cancelled.", nameof(RefreshPermissions)); + throw; + } + catch (Exception e) + { + _logger.Error( + e, + "An unexpected exception occurred during {FName}(): {E}", + nameof(RefreshPermissions), + e); + } + } + } + + private async Task CreatePermission(BoundPeer peer) + { + if (_turnClient is null) + { + throw new TransportException( + "CreatePermission should not be called when the turn client does not exists"); + } + + using var cts = new CancellationTokenSource(); + IPAddress[] ips; + + // Cancellation After 2.5 sec + cts.CancelAfter(2500); + if (peer.PublicIPAddress is null) + { + string peerHost = peer.EndPoint.Host; + if (IPAddress.TryParse(peerHost, out IPAddress asIp)) + { + ips = new[] { asIp }; + } + else + { + ips = await Dns.GetHostAddressesAsync(peerHost); + } + } + else + { + ips = new[] { peer.PublicIPAddress }; + } + + try + { + foreach (IPAddress ip in ips) + { + var ep = new IPEndPoint(ip, peer.EndPoint.Port); + if (IPAddress.IsLoopback(ip)) + { + // This translation is only used in test case because a + // seed node exposes loopback address as public address to + // other node in test case + ep = await _turnClient.GetMappedAddressAsync(cts.Token); + } + + // FIXME Can we really ignore IPv6 case? + if (ip.AddressFamily.Equals(AddressFamily.InterNetwork)) + { + await _turnClient.CreatePermissionAsync(ep, cts.Token); + } + } + } + catch (TaskCanceledException tce) + { + if (cts.IsCancellationRequested) + { + _logger.Debug( + tce, + "Timeout occurred during {FName}", + nameof(CreatePermission)); + } + else + { + throw; + } + } + } + + private async Task ProcessRuntime(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + foreach (KeyValuePair pair in _streams) + { + // FIXME: This lifetime related disposing logic can be tested? + if (pair.Value.SpawnAt + ListenerLifetime < DateTimeOffset.UtcNow) + { + _ = TryRemoveStreamAsync(pair.Key, cancellationToken); + } + } + + await Task.Delay(100, cancellationToken); + } + } + + private async Task TryAddStreamAsync( + Guid id, + TcpClient client, + CancellationToken cancellationToken) + { + while (_streams.Count > MaxReplyStreams && !cancellationToken.IsCancellationRequested) + { + await Task.Delay(100, cancellationToken); + } + + _streams.TryAdd(id, new ReplyStream(client, DateTimeOffset.UtcNow)); + _logger.Verbose("Added stream to the collection. {Id}", id); + } + + private async Task TryRemoveStreamAsync(Guid id, CancellationToken cancellationToken) + { + if (!_streams.TryRemove(id, out ReplyStream stream)) + { + return; + } + + using var @lock = await stream.Lock.LockAsync(cancellationToken); + try + { + stream.Client.Close(); + stream.Client.Dispose(); + } + catch (ObjectDisposedException) + { + } + + _logger.Verbose("Removed stream from the collection. {Id}", id); + } + + private void StopAllStreams() + { + List tasks = new List(); + foreach (var id in _streams.Keys) + { + tasks.Add(TryRemoveStreamAsync(id, default)); + } + + Task.WhenAll(tasks).Wait(); + } + + private struct ReplyStream + { + public ReplyStream(TcpClient client, DateTimeOffset spawnAt) + { + Client = client; + SpawnAt = spawnAt; + Lock = new AsyncLock(); + } + + public DateTimeOffset SpawnAt { get; } + + public TcpClient Client { get; } + + public AsyncLock Lock { get; } + } + } +} From 1a206045b27531f6049c90cc186e00b0343e4d95 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 13 May 2021 17:52:31 +0900 Subject: [PATCH 08/17] Fix unit tests which had NetMQ dependency --- Libplanet.Tests/Net/Protocols/ProtocolTest.cs | 72 +++--- .../Net/Protocols/TestTransport.cs | 138 +++++++--- Libplanet.Tests/Net/SwarmTest.Broadcast.cs | 2 + Libplanet.Tests/Net/SwarmTest.cs | 236 ++++++++---------- .../Net/Transports/BoundPeerExtensionsTest.cs | 19 +- .../Net/Transports/NetMQTransportTest.cs | 4 +- .../Net/Transports/TcpTransportTest.cs | 71 +++++- .../Net/Transports/TransportTest.cs | 109 ++++++++ .../Net/Transports/BoundPeerExtensions.cs | 3 +- 9 files changed, 450 insertions(+), 204 deletions(-) diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs index c21cdf24b39..78b106b2687 100644 --- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs +++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs @@ -6,6 +6,7 @@ using Libplanet.Crypto; using Libplanet.Net; using Libplanet.Net.Protocols; +using Libplanet.Net.Transports; using Serilog; using Xunit; using Xunit.Abstractions; @@ -58,7 +59,7 @@ public async Task Start() var transportA = CreateTestTransport(); var transportB = CreateTestTransport(); - Assert.Throws(() => transportA.SendPing(transportB.AsPeer)); + Assert.Throws(() => transportA.SendPing(transportB.AsPeer)); await StartTestTransportAsync(transportA); await Assert.ThrowsAsync(() => transportA.AddPeersAsync( @@ -148,9 +149,9 @@ await Assert.ThrowsAsync( Assert.Contains(transportB.AsPeer, transportA.Peers); - await transportA.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - await transportC.StopAsync(TimeSpan.Zero); + transportA.Dispose(); + transportB.Dispose(); + transportC.Dispose(); } [Fact(Timeout = Timeout)] @@ -159,12 +160,15 @@ public async Task BootstrapException() var transportA = CreateTestTransport(); var transportB = CreateTestTransport(); - await Assert.ThrowsAsync( + await Assert.ThrowsAsync( () => transportB.BootstrapAsync( new[] { transportA.AsPeer }, TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3)) ); + + transportA.Dispose(); + transportB.Dispose(); } [Fact(Timeout = Timeout)] @@ -198,9 +202,9 @@ public async Task BootstrapAsyncTest() } finally { - await transportA.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - await transportC.StopAsync(TimeSpan.Zero); + transportA.Dispose(); + transportB.Dispose(); + transportC.Dispose(); } } @@ -220,7 +224,9 @@ public async Task RemoveStalePeers() await Task.Delay(100); await transportA.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken)); Assert.Empty(transportA.Peers); - await transportA.StopAsync(TimeSpan.Zero); + + transportA.Dispose(); + transportB.Dispose(); } [Fact(Timeout = Timeout)] @@ -245,10 +251,10 @@ public async Task RoutingTableFull() Assert.DoesNotContain(transportB.AsPeer, transport.Peers); Assert.DoesNotContain(transportC.AsPeer, transport.Peers); - await transport.StopAsync(TimeSpan.Zero); - await transportA.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - await transportC.StopAsync(TimeSpan.Zero); + transport.Dispose(); + transportA.Dispose(); + transportB.Dispose(); + transportC.Dispose(); } [Fact(Timeout = Timeout)] @@ -282,9 +288,10 @@ public async Task ReplacementCache() Assert.Contains(transportB.AsPeer, transport.Peers); Assert.DoesNotContain(transportC.AsPeer, transport.Peers); - await transport.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - await transportC.StopAsync(TimeSpan.Zero); + transport.Dispose(); + transportA.Dispose(); + transportB.Dispose(); + transportC.Dispose(); } [Fact(Timeout = Timeout)] @@ -319,8 +326,10 @@ public async Task RemoveDeadReplacementCache() Assert.DoesNotContain(transportB.AsPeer, transport.Peers); Assert.Contains(transportC.AsPeer, transport.Peers); - await transport.StopAsync(TimeSpan.Zero); - await transportC.StopAsync(TimeSpan.Zero); + transport.Dispose(); + transportA.Dispose(); + transportB.Dispose(); + transportC.Dispose(); } [Theory(Timeout = 2 * Timeout)] @@ -358,10 +367,11 @@ public async Task BroadcastMessage(int count) } finally { + seed.Dispose(); foreach (var transport in transports) { Assert.True(transport.ReceivedTestMessageOfData("foo")); - await transport.StopAsync(TimeSpan.Zero); + transport.Dispose(); } } } @@ -449,9 +459,9 @@ public async Task BroadcastGuarantee() } finally { - await seed.StopAsync(TimeSpan.Zero); - await t1.StopAsync(TimeSpan.Zero); - await t2.StopAsync(TimeSpan.Zero); + seed.Dispose(); + t1.Dispose(); + t2.Dispose(); } } @@ -480,9 +490,9 @@ public async Task DoNotBroadcastToSourcePeer() } finally { - await transportA.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - await transportC.StopAsync(TimeSpan.Zero); + transportA.Dispose(); + transportB.Dispose(); + transportC.Dispose(); } } @@ -532,14 +542,12 @@ public async Task RefreshTable() } finally { - await transport.StopAsync(TimeSpan.Zero); + transport.Dispose(); foreach (var t in transports) { - await t.StopAsync(TimeSpan.Zero); + t.Dispose(); } } - - Assert.True(true); } private TestTransport CreateTestTransport( @@ -558,12 +566,12 @@ private TestTransport CreateTestTransport( networkDelay); } - private async Task StartTestTransportAsync( + private async Task StartTestTransportAsync( TestTransport transport, CancellationToken cancellationToken = default(CancellationToken)) { - await transport.StartAsync(cancellationToken); - return transport.RunAsync(cancellationToken); + _ = transport.StartAsync(cancellationToken); + await transport.WaitForRunningAsync(); } } } diff --git a/Libplanet.Tests/Net/Protocols/TestTransport.cs b/Libplanet.Tests/Net/Protocols/TestTransport.cs index 3c097ec2676..fc5219f34a7 100644 --- a/Libplanet.Tests/Net/Protocols/TestTransport.cs +++ b/Libplanet.Tests/Net/Protocols/TestTransport.cs @@ -32,8 +32,10 @@ internal class TestTransport : ITransport private readonly Random _random; private readonly bool _blockBroadcast; + private TaskCompletionSource _runningEvent; private CancellationTokenSource _swarmCancellationTokenSource; private TimeSpan _networkDelay; + private bool _disposed; public TestTransport( Dictionary transports, @@ -43,6 +45,7 @@ public TestTransport( int bucketSize, TimeSpan? networkDelay) { + _runningEvent = new TaskCompletionSource(); _privateKey = privateKey; _blockBroadcast = blockBroadcast; var loggerId = _privateKey.ToAddress().ToHex(); @@ -60,11 +63,12 @@ public TestTransport( _ignoreTestMessageWithData = new List(); _random = new Random(); Table = new RoutingTable(Address, tableSize, bucketSize); + ProcessMessageHandler = new AsyncDelegate(); Protocol = new KademliaProtocol(Table, this, Address); MessageHistory = new FixedSizedQueue(30); } - public event EventHandler ProcessMessageHandler; + public AsyncDelegate ProcessMessageHandler { get; } public AsyncAutoResetEvent MessageReceived { get; } @@ -78,7 +82,22 @@ public TestTransport( public DateTimeOffset? LastMessageTimestamp { get; private set; } - public bool Running => !(_swarmCancellationTokenSource is null); + public bool Running + { + get => _runningEvent.Task.Status == TaskStatus.RanToCompletion; + + private set + { + if (value) + { + _runningEvent.TrySetResult(null); + } + else + { + _runningEvent = new TaskCompletionSource(); + } + } + } public ConcurrentQueue MessageHistory { get; } @@ -90,26 +109,31 @@ public TestTransport( public void Dispose() { + if (!_disposed) + { + _swarmCancellationTokenSource?.Cancel(); + Running = false; + _disposed = true; + } } -#pragma warning disable CS1998 // Method need to implement ITransport but it isn't be async public async Task StartAsync( CancellationToken cancellationToken = default(CancellationToken)) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + _logger.Debug("Starting transport of {Peer}.", AsPeer); _swarmCancellationTokenSource = new CancellationTokenSource(); - } -#pragma warning restore CS1998 - - public async Task RunAsync( - CancellationToken cancellationToken = default(CancellationToken)) - { CancellationToken token = cancellationToken.Equals(CancellationToken.None) ? _swarmCancellationTokenSource.Token : CancellationTokenSource .CreateLinkedTokenSource( _swarmCancellationTokenSource.Token, cancellationToken) .Token; + Running = true; await ProcessRuntime(token); } @@ -117,15 +141,23 @@ public async Task StopAsync( TimeSpan waitFor, CancellationToken cancellationToken = default(CancellationToken)) { - _logger.Debug("Stopping transport of {Peer}.", AsPeer); - _swarmCancellationTokenSource.Cancel(); + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + + if (Running) + { + _logger.Debug("Stopping transport of {Peer}.", AsPeer); + _swarmCancellationTokenSource.Cancel(); + Running = false; + } + await Task.Delay(waitFor, cancellationToken); } - public Task WaitForRunningAsync() - { - return Task.CompletedTask; - } + /// + public Task WaitForRunningAsync() => _runningEvent.Task; public async Task BootstrapAsync( IEnumerable bootstrapPeers, @@ -134,6 +166,11 @@ public async Task BootstrapAsync( int depth = 3, CancellationToken cancellationToken = default(CancellationToken)) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + IEnumerable peers = bootstrapPeers.OfType(); await BootstrapAsync( @@ -152,9 +189,14 @@ public async Task BootstrapAsync( int depth = 3, CancellationToken cancellationToken = default(CancellationToken)) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } if (bootstrapPeers is null) @@ -185,9 +227,14 @@ public Task AddPeersAsync( TimeSpan? timeout, CancellationToken cancellationToken = default(CancellationToken)) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } if (peers is null) @@ -247,9 +294,14 @@ async Task DoAddPeersAsync() public void SendPing(Peer target, TimeSpan? timeSpan = null) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } if (!(target is BoundPeer boundPeer)) @@ -268,9 +320,14 @@ public void SendPing(Peer target, TimeSpan? timeSpan = null) public void BroadcastTestMessage(Address? except, string data) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } var message = new TestMessage(data) { Remote = AsPeer }; @@ -280,6 +337,11 @@ public void BroadcastTestMessage(Address? except, string data) public void BroadcastMessage(Address? except, Message message) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + var peers = Table.PeersToBroadcast(except); var peersString = string.Join(", ", peers.Select(peer => peer.Address)); _logger.Debug( @@ -305,9 +367,14 @@ public async Task SendMessageWithReplyAsync( TimeSpan? timeout, CancellationToken cancellationToken) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } if (!(peer is BoundPeer boundPeer)) @@ -390,9 +457,14 @@ await SendMessageWithReplyAsync(peer, message, timeout, cancellationToken), public async Task ReplyMessageAsync(Message message, CancellationToken cancellationToken) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } _logger.Debug("Replying {Message}...", message); @@ -406,9 +478,14 @@ public async Task WaitForTestMessageWithData( string data, CancellationToken token = default(CancellationToken)) { + if (_disposed) + { + throw new ObjectDisposedException(nameof(TestTransport)); + } + if (!Running) { - throw new SwarmException("Start swarm before use."); + throw new TransportException("Start transport before use."); } while (!token.IsCancellationRequested && !ReceivedTestMessageOfData(data)) @@ -419,9 +496,9 @@ public async Task WaitForTestMessageWithData( public bool ReceivedTestMessageOfData(string data) { - if (!Running) + if (_disposed) { - throw new SwarmException("Start swarm before use."); + throw new ObjectDisposedException(nameof(TestTransport)); } return ReceivedMessages.OfType().Any(msg => msg.Data == data); @@ -462,20 +539,9 @@ private void ReceiveMessage(Message message) _peersToReply[message.Identity] = boundPeer.Address; } - if (message is Ping) - { - _ = ReplyMessageAsync( - new Pong - { - Identity = message.Identity, - Remote = AsPeer, - }, - default); - } - LastMessageTimestamp = DateTimeOffset.UtcNow; ReceivedMessages.Add(message); - ProcessMessageHandler?.Invoke(this, message); + _ = ProcessMessageHandler.InvokeAsync(message); MessageReceived.Set(); } diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index 5a6f519bb6c..d479292662b 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -276,6 +276,8 @@ public async Task BroadcastTxWhileMining() await StartAsync(swarmC); await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + Assert.Contains(swarmC.AsPeer, swarmA.Peers); + Assert.Contains(swarmA.AsPeer, swarmC.Peers); for (var i = 0; i < 100; i++) { diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 0ed2cc9dde0..0c797672fb6 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Net; using System.Net.Sockets; @@ -23,12 +24,11 @@ using Libplanet.Tests.Store; using Libplanet.Tests.Store.Trie; using Libplanet.Tx; -using NetMQ; -using NetMQ.Sockets; using Serilog; using xRetry; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; namespace Libplanet.Tests.Net { @@ -68,9 +68,6 @@ public void Dispose() } Log.Logger.Debug("Finished to finalize {Resources} resources.", _finalizers.Count); - - NetMQConfig.Cleanup(false); - Log.Logger.Debug($"Finished to clean up the {nameof(NetMQConfig)} singleton."); } [Fact(Timeout = Timeout)] @@ -97,8 +94,9 @@ public async Task HandleReconnection() { Swarm seed = CreateSwarm(); - Swarm swarmA = CreateSwarm(); - Swarm swarmB = CreateSwarm(); + var privateKey = new PrivateKey(); + Swarm swarmA = CreateSwarm(privateKey: privateKey); + Swarm swarmB = CreateSwarm(privateKey: privateKey); try { @@ -400,43 +398,23 @@ public async Task GetMultipleBlocksAtOnce() null ).ToArrayAsync(); - var netMQAddress = $"tcp://{peer.EndPoint.Host}:{peer.EndPoint.Port}"; - using (var socket = new DealerSocket(netMQAddress)) - { - var request = new GetBlocks(hashes.Select(pair => pair.Item2), 2); - socket.SendMultipartMessage( - request.ToNetMQMessage( - privateKey, - swarmB.AsPeer, - DateTimeOffset.UtcNow, - swarmB.AppProtocolVersion) - ); - - NetMQMessage response = socket.ReceiveMultipartMessage(); - Message parsedMessage = Message.Parse( - response, - true, - swarmA.AppProtocolVersion, - swarmA.TrustedAppProtocolVersionSigners, - null, - null); - Libplanet.Net.Messages.Blocks blockMessage = - (Libplanet.Net.Messages.Blocks)parsedMessage; - - Assert.Equal(2, blockMessage.Payloads.Count); - - response = socket.ReceiveMultipartMessage(); - parsedMessage = Message.Parse( - response, - true, - swarmA.AppProtocolVersion, - swarmA.TrustedAppProtocolVersionSigners, - null, - null); - blockMessage = (Libplanet.Net.Messages.Blocks)parsedMessage; - - Assert.Single(blockMessage.Payloads); - } + ITransport transport = swarmB.Transport; + + var request = new GetBlocks(hashes.Select(pair => pair.Item2), 2); + Message[] responses = (await transport.SendMessageWithReplyAsync( + (BoundPeer)swarmA.AsPeer, + request, + null, + 2, + default)).ToArray(); + var blockMessage = (Libplanet.Net.Messages.Blocks)responses[0]; + + Assert.Equal(2, responses.Length); + Assert.Equal(2, blockMessage.Payloads.Count); + + blockMessage = (Libplanet.Net.Messages.Blocks)responses[1]; + + Assert.Single(blockMessage.Payloads); } finally { @@ -1666,8 +1644,8 @@ public async Task BlockDemand() BoundPeer peer = swarm.AsPeer as BoundPeer; Assert.NotNull(peer); _logger.Debug( - "{0}///{1}///{2}", - peer.PublicKey, + "{Address}///{EndPoint}///{PublicIPAddress}", + peer.Address, peer.EndPoint, peer.PublicIPAddress); @@ -1676,89 +1654,93 @@ public async Task BlockDemand() Assert.True(lowerBlock.TotalDifficulty < higherBlock.TotalDifficulty); var privateKey1 = new PrivateKey(); var privateKey2 = new PrivateKey(); - var sender1 = new BoundPeer( - privateKey1.PublicKey, - new DnsEndPoint("127.0.0.1", 20000)); - var sender2 = new BoundPeer( - privateKey2.PublicKey, - new DnsEndPoint("127.0.0.1", 20001)); - _logger.Debug( - "{0}///{1}///{2}", - sender1.PublicKey, - sender1.EndPoint, - sender1.PublicIPAddress); - _logger.Debug("STEP1"); - - var netMQAddress = $"tcp://{peer.EndPoint.Host}:{peer.EndPoint.Port}"; - using (var socket = new DealerSocket(netMQAddress)) + + ITransport CreateTransport(PrivateKey privateKey, int port) { - var request = - new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, higherBlock.Header); - socket.SendMultipartMessage( - request.ToNetMQMessage( - privateKey1, - sender1, - DateTimeOffset.UtcNow, - swarm.AppProtocolVersion) - ); - await swarm.BlockHeaderReceived.WaitAsync(); - await Task.Delay(100); - Assert.NotNull(swarm.BlockDemand); - var timestamp = swarm.BlockDemand.Value.Timestamp; - Assert.Equal(higherBlock.Index, swarm.BlockDemand.Value.Header.Index); - - request = - new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, lowerBlock.Header); - socket.SendMultipartMessage( - request.ToNetMQMessage( - privateKey2, - sender2, - DateTimeOffset.UtcNow, - swarm.AppProtocolVersion) - ); - await swarm.BlockHeaderReceived.WaitAsync(); - // Await for context change - await Task.Delay(100); - Assert.NotNull(swarm.BlockDemand); - // Demand will not be refreshed. - Assert.Equal(higherBlock.Index, swarm.BlockDemand.Value.Header.Index); - Assert.Equal(timestamp, swarm.BlockDemand.Value.Timestamp); - - await Task.Delay( - swarmOptions.BlockDemandLifespan + TimeSpan.FromMilliseconds(100)); - - request = - new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, higherBlock.Header); - socket.SendMultipartMessage( - request.ToNetMQMessage( - privateKey1, - sender1, - DateTimeOffset.UtcNow, - swarm.AppProtocolVersion) - ); - await swarm.BlockHeaderReceived.WaitAsync(); - await Task.Delay(100); - Assert.NotNull(swarm.BlockDemand); - // Demand will not be refreshed. - Assert.Equal(higherBlock.Index, swarm.BlockDemand.Value.Header.Index); - Assert.Equal(timestamp, swarm.BlockDemand.Value.Timestamp); - - request = - new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, lowerBlock.Header); - socket.SendMultipartMessage( - request.ToNetMQMessage( - privateKey2, - sender2, - DateTimeOffset.UtcNow, - swarm.AppProtocolVersion) - ); - await swarm.BlockHeaderReceived.WaitAsync(); - await Task.Delay(100); - _logger.Debug("STEP5"); - Assert.NotNull(swarm.BlockDemand); - // Demand will be replaced to lower one because it's stale. - Assert.Equal(lowerBlock.Index, swarm.BlockDemand.Value.Header.Index); + if (swarm.Transport is NetMQTransport) + { + return new NetMQTransport( + new RoutingTable(privateKey.ToAddress()), + privateKey, + swarm.AppProtocolVersion, + null, + 30, + "127.0.0.1", + port, + new IceServer[0], + null); + } + else if (swarm.Transport is TcpTransport) + { + return new TcpTransport( + new RoutingTable(privateKey.ToAddress()), + privateKey, + swarm.AppProtocolVersion, + null, + "127.0.0.1", + port, + new IceServer[0], + null); + } + else + { + throw new XunitException( + "Transport constructor is required for each transport type."); + } } + + ITransport transport1 = CreateTransport(privateKey1, 20000); + ITransport transport2 = CreateTransport(privateKey2, 20001); + _ = transport1.StartAsync(); + _ = transport2.StartAsync(); + await transport1.WaitForRunningAsync(); + await transport2.WaitForRunningAsync(); + + var request = + new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, higherBlock.Header); + await transport1.SendMessageAsync(peer, request, default); + await swarm.BlockHeaderReceived.WaitAsync(); + await Task.Delay(100); + Assert.NotNull(swarm.BlockDemand); + var timestamp = swarm.BlockDemand.Value.Timestamp; + Assert.Equal(higherBlock.Index, swarm.BlockDemand.Value.Header.Index); + + request = + new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, lowerBlock.Header); + await transport2.SendMessageAsync(peer, request, default); + await swarm.BlockHeaderReceived.WaitAsync(); + // Await for context change + await Task.Delay(100); + Assert.NotNull(swarm.BlockDemand); + // Demand will not be refreshed. + Assert.Equal(higherBlock.Index, swarm.BlockDemand.Value.Header.Index); + Assert.Equal(timestamp, swarm.BlockDemand.Value.Timestamp); + + await Task.Delay( + swarmOptions.BlockDemandLifespan + TimeSpan.FromMilliseconds(100)); + + request = + new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, higherBlock.Header); + await transport1.SendMessageAsync(peer, request, default); + await swarm.BlockHeaderReceived.WaitAsync(); + await Task.Delay(100); + Assert.NotNull(swarm.BlockDemand); + + Assert.Equal(higherBlock.Index, swarm.BlockDemand.Value.Header.Index); + // Demand will not be refreshed. + Assert.Equal(timestamp, swarm.BlockDemand.Value.Timestamp); + + await Task.Delay( + swarmOptions.BlockDemandLifespan + TimeSpan.FromMilliseconds(100)); + + request = + new BlockHeaderMessage(swarm.BlockChain.Genesis.Hash, lowerBlock.Header); + await transport2.SendMessageAsync(peer, request, default); + await swarm.BlockHeaderReceived.WaitAsync(); + await Task.Delay(100); + Assert.NotNull(swarm.BlockDemand); + // Demand will be replaced to lower one because it's stale. + Assert.Equal(lowerBlock.Index, swarm.BlockDemand.Value.Header.Index); } finally { diff --git a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs index a17cb6cbb4d..466883f8325 100644 --- a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs +++ b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs @@ -9,12 +9,13 @@ using Libplanet.Tests.Common.Action; using Libplanet.Tests.Store; using Xunit; +using Xunit.Sdk; namespace Libplanet.Tests.Net.Transports { public class BoundPeerExtensionsTest { - [Fact] + [Fact(Timeout = 60 * 1000)] public async Task QueryAppProtocolVersion() { var fx = new DefaultStoreFixture(); @@ -43,7 +44,21 @@ public async Task QueryAppProtocolVersion() _ = swarm.StartAsync(); try { - AppProtocolVersion receivedAPV = peer.QueryAppProtocolVersion(); + AppProtocolVersion receivedAPV = default; + if (swarm.Transport is NetMQTransport) + { + receivedAPV = peer.QueryAppProtocolVersion(); + } + else if (swarm.Transport is TcpTransport) + { + receivedAPV = await peer.QueryAppProtocolVersionTcp(); + } + else + { + throw new XunitException( + "Each type of transport must have corresponding test case."); + } + Assert.Equal(apv, receivedAPV); } finally diff --git a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs index e94ceda9fef..b7c1c1dfd19 100644 --- a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs +++ b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs @@ -50,8 +50,8 @@ public NetMQTransportTest(ITestOutputHelper testOutputHelper) .Enrich.WithThreadId() .WriteTo.TestOutput(testOutputHelper, outputTemplate: outputTemplate) .CreateLogger() - .ForContext(); - Logger = Log.ForContext(); + .ForContext(); + Logger = Log.ForContext(); } [SkippableFact(Timeout = Timeout, Skip = "Target method is broken.")] diff --git a/Libplanet.Tests/Net/Transports/TcpTransportTest.cs b/Libplanet.Tests/Net/Transports/TcpTransportTest.cs index 76f8d29ad3d..5eecc7dd8e0 100644 --- a/Libplanet.Tests/Net/Transports/TcpTransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TcpTransportTest.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Net; using System.Net.Sockets; +using System.Threading; using System.Threading.Tasks; using Libplanet.Crypto; using Libplanet.Net; @@ -29,8 +30,72 @@ public TcpTransportTest(ITestOutputHelper testOutputHelper) .Enrich.WithThreadId() .WriteTo.TestOutput(testOutputHelper, outputTemplate: outputTemplate) .CreateLogger() - .ForContext(); - Logger = Log.ForContext(); + .ForContext(); + Logger = Log.ForContext(); + } + + [SkippableFact(Timeout = Timeout)] + public async Task ReadWriteMessageAsync() + { + var listener = new TcpListener(IPAddress.Any, 0); + listener.Start(); + var client = new TcpClient(); + await client.ConnectAsync("127.0.0.1", ((IPEndPoint)listener.LocalEndpoint).Port); + TcpClient listenerSocket = await listener.AcceptTcpClientAsync(); + + TcpTransport transport = CreateTcpTransport( + appProtocolVersion: AppProtocolVersion.Sign(new PrivateKey(), 1)); + try + { + var message1 = new Ping + { + Identity = Guid.NewGuid().ToByteArray(), + }; + + var message2 = new Pong + { + Identity = Guid.NewGuid().ToByteArray(), + }; + + await transport.WriteMessageAsync(message1, client, default); + await transport.WriteMessageAsync(message2, client, default); + Message[] messages = new Message[2]; + messages[0] = await transport.ReadMessageAsync(listenerSocket, default); + messages[1] = await transport.ReadMessageAsync(listenerSocket, default); + Assert.Equal(2, messages.Length); + Assert.Contains(messages, message => message is Ping); + Assert.Contains(messages, message => message is Pong); + } + finally + { + listenerSocket.Dispose(); + client.Dispose(); + } + } + + [SkippableFact(Timeout = Timeout)] + public async Task ReadMessageCancelAsync() + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(3)); + var listener = new TcpListener(IPAddress.Any, 0); + listener.Start(); + var client = new TcpClient(); + await client.ConnectAsync("127.0.0.1", ((IPEndPoint)listener.LocalEndpoint).Port); + TcpClient listenerSocket = await listener.AcceptTcpClientAsync(); + + TcpTransport transport = CreateTcpTransport( + appProtocolVersion: AppProtocolVersion.Sign(new PrivateKey(), 1)); + try + { + await Assert.ThrowsAsync(async () => + await transport.ReadMessageAsync(listenerSocket, cts.Token)); + } + finally + { + listenerSocket.Dispose(); + client.Dispose(); + } } [SkippableFact(Timeout = Timeout)] @@ -96,8 +161,6 @@ private TcpTransport CreateTcpTransport( { privateKey = privateKey ?? new PrivateKey(); host = host ?? IPAddress.Loopback.ToString(); - trustedAppProtocolVersionSigners = trustedAppProtocolVersionSigners ?? - ImmutableHashSet.Empty; iceServers = iceServers ?? new IceServer[] { }; var table = new RoutingTable(privateKey.ToAddress(), tableSize, bucketSize); diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index 869659631d8..95ec3051c3a 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -10,6 +10,7 @@ using Libplanet.Net.Messages; using Libplanet.Net.Protocols; using Libplanet.Net.Transports; +using NetMQ; using Serilog; using Xunit; using Xunit.Sdk; @@ -54,6 +55,11 @@ public async void RestartAsync() Assert.True(transport.Running); await transport.StopAsync(TimeSpan.Zero); Assert.False(transport.Running); + if (transport is NetMQTransport) + { + NetMQConfig.Cleanup(false); + } + await InitializeAsync(transport); Assert.True(transport.Running); } @@ -227,6 +233,81 @@ public async Task SendMessageWithReplyAsync() } } + [SkippableFact(Timeout = Timeout)] + public async Task SendMessageWithReplyCancelAsync() + { + ITransport transportA = CreateTransport(); + ITransport transportB = CreateTransport(); + var cts = new CancellationTokenSource(); + + try + { + await InitializeAsync(transportA, default); + await InitializeAsync(transportB, default); + + cts.CancelAfter(TimeSpan.FromSeconds(1)); + await Assert.ThrowsAsync( + async () => await transportA.SendMessageWithReplyAsync( + (BoundPeer)transportB.AsPeer, + new Ping(), + null, + cts.Token)); + } + finally + { + transportA.Dispose(); + transportB.Dispose(); + cts.Dispose(); + } + } + + [SkippableFact(Timeout = Timeout)] + public async Task SendMessageWithMultipleRepliesAsync() + { + ITransport transportA = CreateTransport(); + ITransport transportB = CreateTransport(); + + transportB.ProcessMessageHandler += (sender, message) => + { + if (message is Ping) + { + _ = transportB.ReplyMessageAsync( + new Ping + { + Identity = message.Identity, + }, + default); + _ = transportB.ReplyMessageAsync( + new Pong + { + Identity = message.Identity, + }, + default); + } + }; + + try + { + await InitializeAsync(transportA); + await InitializeAsync(transportB); + + var replies = (await transportA.SendMessageWithReplyAsync( + (BoundPeer)transportB.AsPeer, + new Ping(), + TimeSpan.FromSeconds(3), + 2, + CancellationToken.None)).ToArray(); + + Assert.Contains(replies, message => message is Ping); + Assert.Contains(replies, message => message is Pong); + } + finally + { + transportA.Dispose(); + transportB.Dispose(); + } + } + // This also tests ITransport.ReplyMessage at the same time. [SkippableFact(Timeout = Timeout)] public async Task SendMessageWithReplyAsyncTimeout() @@ -253,6 +334,34 @@ await Assert.ThrowsAsync( } } + [SkippableFact(Timeout = Timeout)] + public async Task SendMessageToInvalidPeerAsync() + { + ITransport transport = CreateTransport(); + + try + { + await InitializeAsync(transport); + var peer = new BoundPeer( + new PrivateKey().PublicKey, + new DnsEndPoint( + "0.0.0.0", + ((BoundPeer)transport.AsPeer).EndPoint.Port + 1)); + Task task = transport.SendMessageWithReplyAsync( + peer, + new Ping(), + TimeSpan.FromSeconds(5), + default); + + // Sending request to the invalid peer throws TimeoutException. + await Assert.ThrowsAsync(async () => await task); + } + finally + { + transport.Dispose(); + } + } + [SkippableFact(Timeout = Timeout)] public async Task BroadcastMessage() { diff --git a/Libplanet/Net/Transports/BoundPeerExtensions.cs b/Libplanet/Net/Transports/BoundPeerExtensions.cs index 3f020f914f5..f080c031c01 100644 --- a/Libplanet/Net/Transports/BoundPeerExtensions.cs +++ b/Libplanet/Net/Transports/BoundPeerExtensions.cs @@ -16,7 +16,8 @@ namespace Libplanet.Net.Transports public static class BoundPeerExtensions { /// - /// Queries of given . + /// Queries of given + /// specialized for NetMQ based transport. /// /// The to query /// . From eccafc6b742cd2a44a110f1883f9a45d89c888ff Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 20 May 2021 19:14:58 +0900 Subject: [PATCH 09/17] Introduce AsyncDelegate, whch will replace ITransport.ProcessMessageHandler EventHandler --- .../Net/Transports/NetMQTransportTest.cs | 5 ++- .../Net/Transports/TransportTest.cs | 31 +++++++------- Libplanet/Net/AsyncDelegate.cs | 41 +++++++++++++++++++ Libplanet/Net/Protocols/KademliaProtocol.cs | 16 ++++---- Libplanet/Net/Swarm.MessageHandlers.cs | 25 +++++------ Libplanet/Net/Swarm.cs | 2 +- Libplanet/Net/Transports/ITransport.cs | 4 +- Libplanet/Net/Transports/NetMQTransport.cs | 5 ++- Libplanet/Net/Transports/TcpTransport.cs | 7 ++-- 9 files changed, 90 insertions(+), 46 deletions(-) create mode 100644 Libplanet/Net/AsyncDelegate.cs diff --git a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs index b7c1c1dfd19..3372c095043 100644 --- a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs +++ b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs @@ -61,10 +61,11 @@ public async Task MessageHistory() var transportB = CreateNetMQTransport(); var messageReceived = new AsyncAutoResetEvent(); - transportB.ProcessMessageHandler += (sender, message) => + transportB.ProcessMessageHandler.Register(async message => { messageReceived.Set(); - }; + await Task.Yield(); + }); try { diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index 95ec3051c3a..cbf3c8495f1 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -164,10 +164,11 @@ public async Task SendMessageAsync() TaskCompletionSource tcs = new TaskCompletionSource(); - transportB.ProcessMessageHandler += (sender, message) => + transportB.ProcessMessageHandler.Register(async message => { tcs.SetResult(message); - }; + await Task.Yield(); + }); try { @@ -200,18 +201,18 @@ public async Task SendMessageWithReplyAsync() ITransport transportA = CreateTransport(); ITransport transportB = CreateTransport(); - transportB.ProcessMessageHandler += (sender, message) => + transportB.ProcessMessageHandler.Register(async message => { if (message is Ping) { - _ = transportB.ReplyMessageAsync( + await transportB.ReplyMessageAsync( new Pong { Identity = message.Identity, }, CancellationToken.None); } - }; + }); try { @@ -267,24 +268,24 @@ public async Task SendMessageWithMultipleRepliesAsync() ITransport transportA = CreateTransport(); ITransport transportB = CreateTransport(); - transportB.ProcessMessageHandler += (sender, message) => + transportB.ProcessMessageHandler.Register(async message => { if (message is Ping) { - _ = transportB.ReplyMessageAsync( + await transportB.ReplyMessageAsync( new Ping { Identity = message.Identity, }, default); - _ = transportB.ReplyMessageAsync( + await transportB.ReplyMessageAsync( new Pong { Identity = message.Identity, }, default); } - }; + }); try { @@ -378,18 +379,20 @@ public async Task BroadcastMessage() var tcsC = new TaskCompletionSource(); var tcsD = new TaskCompletionSource(); - transportB.ProcessMessageHandler += MessageHandler(tcsB); - transportC.ProcessMessageHandler += MessageHandler(tcsC); - transportD.ProcessMessageHandler += MessageHandler(tcsD); + transportB.ProcessMessageHandler.Register(MessageHandler(tcsB)); + transportC.ProcessMessageHandler.Register(MessageHandler(tcsC)); + transportD.ProcessMessageHandler.Register(MessageHandler(tcsD)); - EventHandler MessageHandler(TaskCompletionSource tcs) + Func MessageHandler(TaskCompletionSource tcs) { - return (sender, message) => + return async message => { if (message is Ping) { tcs.SetResult(message); } + + await Task.Yield(); }; } diff --git a/Libplanet/Net/AsyncDelegate.cs b/Libplanet/Net/AsyncDelegate.cs new file mode 100644 index 00000000000..44b8a9427d8 --- /dev/null +++ b/Libplanet/Net/AsyncDelegate.cs @@ -0,0 +1,41 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Libplanet.Net +{ + public class AsyncDelegate + { + private IEnumerable> _functions; + + public AsyncDelegate() + { + _functions = new List>(); + } + + public void Register(Func func) + { +#pragma warning disable PC002 + // Usage of a .NET Standard API that isn’t available on the .NET Framework 4.6.1 +#if NETFRAMEWORK && !NET48 && !NET472 && !NET471 + _functions = _functions.Concat(new[] { func }); +#else + _functions = _functions.Append(func); +#endif +#pragma warning restore PC002 + } + + public void Unregister(Func func) + { + _functions = _functions.Where(f => !f.Equals(func)); + } + + public async Task InvokeAsync(T arg) + { + IEnumerable tasks = _functions.Select(f => f(arg)); + await Task.WhenAll(tasks); + } + } +} diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index ad06183136a..9aa70fb67f9 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -57,7 +57,7 @@ public KademliaProtocol( _requestTimeout = requestTimeout ?? TimeSpan.FromMilliseconds(5000); - _transport.ProcessMessageHandler += ProcessMessageHandler; + _transport.ProcessMessageHandler.Register(ProcessMessageHandler); } /// @@ -488,19 +488,19 @@ internal async Task PingAsync( } } - private void ProcessMessageHandler(object target, Message message) + private async Task ProcessMessageHandler(Message message) { switch (message) { case Ping ping: { - ReceivePing(ping); + await ReceivePingAsync(ping); break; } case FindNeighbors findPeer: { - ReceiveFindPeer(findPeer); + await ReceiveFindPeerAsync(findPeer); break; } } @@ -690,7 +690,7 @@ private async Task> GetNeighbors( } // Send pong back to remote - private void ReceivePing(Ping ping) + private async Task ReceivePingAsync(Ping ping) { if (ping.Remote.Address.Equals(_address)) { @@ -703,7 +703,7 @@ private void ReceivePing(Ping ping) Identity = ping.Identity, }; - _ = _transport.ReplyMessageAsync(pong, default); + await _transport.ReplyMessageAsync(pong, default); } /// @@ -842,7 +842,7 @@ private async Task ProcessFoundAsync( // FIXME: this method is not safe from amplification attack // maybe ping/pong/ping/pong is required - private void ReceiveFindPeer(FindNeighbors findNeighbors) + private async Task ReceiveFindPeerAsync(FindNeighbors findNeighbors) { IEnumerable found = _table.Neighbors(findNeighbors.Target, _table.BucketSize, true); @@ -852,7 +852,7 @@ private void ReceiveFindPeer(FindNeighbors findNeighbors) Identity = findNeighbors.Identity, }; - _ = _transport.ReplyMessageAsync(neighbors, default); + await _transport.ReplyMessageAsync(neighbors, default); } } } diff --git a/Libplanet/Net/Swarm.MessageHandlers.cs b/Libplanet/Net/Swarm.MessageHandlers.cs index 7042f6d81a7..3c5c64be435 100644 --- a/Libplanet/Net/Swarm.MessageHandlers.cs +++ b/Libplanet/Net/Swarm.MessageHandlers.cs @@ -13,7 +13,7 @@ namespace Libplanet.Net { public partial class Swarm { - private void ProcessMessageHandler(object target, Message message) + private async Task ProcessMessageHandlerAsync(Message message) { switch (message) { @@ -38,7 +38,7 @@ private void ProcessMessageHandler(object target, Message message) Identity = getChainStatus.Identity, }; - _ = Transport.ReplyMessageAsync(chainStatus, default); + await Transport.ReplyMessageAsync(chainStatus, default); break; } @@ -65,16 +65,16 @@ out IReadOnlyList hashes { Identity = getBlockHashes.Identity, }; - _ = Transport.ReplyMessageAsync(reply, default); + await Transport.ReplyMessageAsync(reply, default); break; } case GetBlocks getBlocks: - TransferBlocks(getBlocks); + await TransferBlocksAsync(getBlocks); break; case GetTxs getTxs: - TransferTxs(getTxs); + await TransferTxsAsync(getTxs); break; case TxIds txIds: @@ -86,10 +86,7 @@ out IReadOnlyList hashes break; case BlockHeaderMessage blockHeader: - Task.Run( - async () => await ProcessBlockHeader(blockHeader, _cancellationToken), - _cancellationToken - ); + await ProcessBlockHeader(blockHeader, _cancellationToken); break; default: @@ -173,7 +170,7 @@ private async Task ProcessBlockHeader( } } - private void TransferTxs(GetTxs getTxs) + private async Task TransferTxsAsync(GetTxs getTxs) { foreach (TxId txid in getTxs.TxIds) { @@ -188,7 +185,7 @@ private void TransferTxs(GetTxs getTxs) { Identity = getTxs.Identity, }; - _ = Transport.ReplyMessageAsync(response, default); + await Transport.ReplyMessageAsync(response, default); } } @@ -231,7 +228,7 @@ private void ProcessTxIds(TxIds message) } } - private void TransferBlocks(GetBlocks getData) + private async Task TransferBlocksAsync(GetBlocks getData) { string identityHex = ByteUtil.Hex(getData.Identity); _logger.Verbose( @@ -268,7 +265,7 @@ private void TransferBlocks(GetBlocks getData) i, total ); - _ = Transport.ReplyMessageAsync(response, default); + await Transport.ReplyMessageAsync(response, default); blocks.Clear(); } @@ -287,7 +284,7 @@ private void TransferBlocks(GetBlocks getData) total, identityHex ); - _ = Transport.ReplyMessageAsync(response, default); + await Transport.ReplyMessageAsync(response, default); } _logger.Debug("Blocks were transferred to {Identity}.", identityHex); diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index b1ff76883ea..d8733aea24e 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -148,7 +148,7 @@ internal Swarm( iceServers, differentAppProtocolVersionEncountered, Options.MessageLifespan); - Transport.ProcessMessageHandler += ProcessMessageHandler; + Transport.ProcessMessageHandler.Register(ProcessMessageHandlerAsync); PeerDiscovery = new KademliaProtocol(RoutingTable, Transport, Address); } diff --git a/Libplanet/Net/Transports/ITransport.cs b/Libplanet/Net/Transports/ITransport.cs index 4251e6a1ca9..0eb4e1c490d 100644 --- a/Libplanet/Net/Transports/ITransport.cs +++ b/Libplanet/Net/Transports/ITransport.cs @@ -15,11 +15,11 @@ namespace Libplanet.Net.Transports public interface ITransport : IDisposable { /// - /// The invoked when a message that is not + /// The list of tasks invoked when a message that is not /// a reply is received. To handle reply, please use . /// - event EventHandler ProcessMessageHandler; + AsyncDelegate ProcessMessageHandler { get; } /// /// representation of . diff --git a/Libplanet/Net/Transports/NetMQTransport.cs b/Libplanet/Net/Transports/NetMQTransport.cs index 7f079db2516..3ab71d879db 100644 --- a/Libplanet/Net/Transports/NetMQTransport.cs +++ b/Libplanet/Net/Transports/NetMQTransport.cs @@ -195,13 +195,14 @@ public NetMQTransport( ); MessageHistory = new FixedSizedQueue(MessageHistoryCapacity); + ProcessMessageHandler = new AsyncDelegate(); _dealers = new ConcurrentDictionary(); _replyCompletionSources = new ConcurrentDictionary>(); } /// - public event EventHandler ProcessMessageHandler; + public AsyncDelegate ProcessMessageHandler { get; } /// public Peer AsPeer => EndPoint is null @@ -577,7 +578,7 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) try { - ProcessMessageHandler?.Invoke(this, message); + _ = ProcessMessageHandler.InvokeAsync(message); } catch (Exception exc) { diff --git a/Libplanet/Net/Transports/TcpTransport.cs b/Libplanet/Net/Transports/TcpTransport.cs index 67e34008886..7144691f40e 100644 --- a/Libplanet/Net/Transports/TcpTransport.cs +++ b/Libplanet/Net/Transports/TcpTransport.cs @@ -102,10 +102,12 @@ public TcpTransport( _logger = Log.ForContext(); _runtimeCancellationTokenSource = new CancellationTokenSource(); _turnCancellationTokenSource = new CancellationTokenSource(); + ProcessMessageHandler = new AsyncDelegate(); MessageHistory = new FixedSizedQueue(MessageHistoryCapacity); } - public event EventHandler ProcessMessageHandler = null!; + /// + public AsyncDelegate ProcessMessageHandler { get; } /// public Peer AsPeer => EndPoint is null @@ -623,8 +625,7 @@ private async Task AcceptAsync( var id = new Guid(message.Identity); await TryAddStreamAsync(id, client, cancellationToken); - - ProcessMessageHandler?.Invoke(this, message); + await ProcessMessageHandler.InvokeAsync(message); } catch (DifferentAppProtocolVersionException dapve) { From a1ed97b3297809e15903010b529c7977be790ee3 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 27 May 2021 14:58:11 +0900 Subject: [PATCH 10/17] Cancel public operation in ITransport when stop --- .../Net/Transports/TransportTest.cs | 32 +++++++ Libplanet/Net/Transports/NetMQTransport.cs | 31 ++++--- Libplanet/Net/Transports/TcpTransport.cs | 83 +++++++++++++------ 3 files changed, 112 insertions(+), 34 deletions(-) diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index cbf3c8495f1..a988e34ba69 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -363,6 +363,38 @@ public async Task SendMessageToInvalidPeerAsync() } } + [SkippableFact(Timeout = Timeout)] + public async Task SendMessageAsyncCancelWhenTransportStop() + { + ITransport transportA = CreateTransport(); + ITransport transportB = CreateTransport(); + + try + { + await InitializeAsync(transportA); + await InitializeAsync(transportB); + + Task t = transportA.SendMessageWithReplyAsync( + (BoundPeer)transportB.AsPeer, + new Ping(), + null, + CancellationToken.None); + + // For context change + await Task.Delay(100); + + await transportA.StopAsync(TimeSpan.Zero); + Assert.False(transportA.Running); + await Assert.ThrowsAsync(async () => await t); + Assert.True(t.IsCanceled); + } + finally + { + transportA.Dispose(); + transportB.Dispose(); + } + } + [SkippableFact(Timeout = Timeout)] public async Task BroadcastMessage() { diff --git a/Libplanet/Net/Transports/NetMQTransport.cs b/Libplanet/Net/Transports/NetMQTransport.cs index 3ab71d879db..10f75c793bf 100644 --- a/Libplanet/Net/Transports/NetMQTransport.cs +++ b/Libplanet/Net/Transports/NetMQTransport.cs @@ -52,12 +52,12 @@ public class NetMQTransport : ITransport private Channel _requests; private long _requestCount; + private CancellationTokenSource _runtimeProcessorCancellationTokenSource; private CancellationTokenSource _runtimeCancellationTokenSource; private CancellationTokenSource _turnCancellationTokenSource; private Task _runtimeProcessor; private TaskCompletionSource _runningEvent; - private CancellationToken _cancellationToken; private ConcurrentDictionary _dealers; private ConcurrentDictionary> _replyCompletionSources; @@ -152,6 +152,7 @@ public NetMQTransport( _logger = Log.ForContext(); _requests = Channel.CreateUnbounded(); + _runtimeProcessorCancellationTokenSource = new CancellationTokenSource(); _runtimeCancellationTokenSource = new CancellationTokenSource(); _turnCancellationTokenSource = new CancellationTokenSource(); _requestCount = 0; @@ -168,7 +169,7 @@ public NetMQTransport( for (int i = 0; i < workers; i++) { workerTasks[i] = ProcessRuntime( - _runtimeCancellationTokenSource.Token + _runtimeProcessorCancellationTokenSource.Token ); } @@ -263,17 +264,19 @@ public async Task StartAsync(CancellationToken cancellationToken) } _logger.Information($"Listen on {_listenPort}"); + _runtimeCancellationTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _turnCancellationTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (_host is null && !(_iceServers is null)) { _turnClient = await IceServer.CreateTurnClient(_iceServers); await _turnClient.StartAsync(_listenPort.Value, cancellationToken); - _ = RefreshPermissions(cancellationToken); + _ = RefreshPermissions(_runtimeCancellationTokenSource.Token); } - _cancellationToken = cancellationToken; - if (_turnClient is null || !_turnClient.BehindNAT) { _hostEndPoint = new DnsEndPoint( @@ -294,7 +297,7 @@ public async Task StartAsync(CancellationToken cancellationToken) tasks.Add(DisposeUnusedDealerSockets( TimeSpan.FromSeconds(10), - _cancellationToken)); + _runtimeCancellationTokenSource.Token)); tasks.Add(RunPoller(_routerPoller)); tasks.Add(RunPoller(_broadcastPoller)); @@ -345,6 +348,8 @@ public async Task StopAsync( _dealers.Clear(); + _runtimeCancellationTokenSource.Cancel(); + Running = false; } } @@ -355,10 +360,12 @@ public void Dispose() if (!_disposed) { _requests.Writer.Complete(); + _runtimeProcessorCancellationTokenSource.Cancel(); _runtimeCancellationTokenSource.Cancel(); _turnCancellationTokenSource.Cancel(); _runtimeProcessor.Wait(); + _runtimeProcessorCancellationTokenSource.Dispose(); _runtimeCancellationTokenSource.Dispose(); _turnCancellationTokenSource.Dispose(); _disposed = true; @@ -413,6 +420,10 @@ public async Task> SendMessageWithReplyAsync( await CreatePermission(peer); } + using CancellationTokenSource cts = + CancellationTokenSource.CreateLinkedTokenSource( + _runtimeCancellationTokenSource.Token, + cancellationToken); Guid reqId = Guid.NewGuid(); try { @@ -428,10 +439,10 @@ public async Task> SendMessageWithReplyAsync( // FIXME should we also cancel tcs sender side too? using CancellationTokenRegistration ctr = - cancellationToken.Register(() => tcs.TrySetCanceled()); + cts.Token.Register(() => tcs.TrySetCanceled()); await _requests.Writer.WriteAsync( new MessageRequest(reqId, message, peer, now, timeout, expectedResponses, tcs), - cancellationToken + cts.Token ); _logger.Verbose( "Enqueued a request {RequestId} to the peer {Peer}: {@Message}; " + @@ -555,7 +566,7 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) { try { - if (_cancellationToken.IsCancellationRequested) + if (_runtimeCancellationTokenSource.Token.IsCancellationRequested) { return; } @@ -595,7 +606,7 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) { Identity = dapve.Identity, }; - _ = ReplyMessageAsync(differentVersion, _cancellationToken); + _ = ReplyMessageAsync(differentVersion, _runtimeCancellationTokenSource.Token); _logger.Debug("Message from peer with different version received."); } catch (InvalidTimestampException ite) diff --git a/Libplanet/Net/Transports/TcpTransport.cs b/Libplanet/Net/Transports/TcpTransport.cs index 7144691f40e..c48667c4908 100644 --- a/Libplanet/Net/Transports/TcpTransport.cs +++ b/Libplanet/Net/Transports/TcpTransport.cs @@ -46,13 +46,11 @@ private readonly DifferentAppProtocolVersionEncountered? private readonly ILogger _logger; private readonly TimeSpan? _messageLifespan; - private readonly CancellationTokenSource _turnCancellationTokenSource; - private readonly ConcurrentDictionary _streams; private TaskCompletionSource _runningEvent; private CancellationTokenSource _runtimeCancellationTokenSource; - private CancellationToken _cancellationToken; + private CancellationTokenSource _turnCancellationTokenSource; private int _listenPort; private DnsEndPoint? _hostEndPoint; @@ -175,19 +173,19 @@ public async Task StartAsync(CancellationToken cancellationToken = default) _listenPort = ((IPEndPoint)_listener.LocalEndpoint).Port; _logger.Information("Listen on {Port}", _listenPort); + _runtimeCancellationTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _turnCancellationTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (_host is null && !(_iceServers is null)) { _turnClient = await IceServer.CreateTurnClient(_iceServers); await _turnClient.StartAsync(_listenPort, cancellationToken); - _ = RefreshPermissions(cancellationToken); + _ = RefreshPermissions(_runtimeCancellationTokenSource.Token); } - _runtimeCancellationTokenSource = - CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - _cancellationToken = _runtimeCancellationTokenSource.Token; - if (_turnClient is null || !_turnClient.BehindNAT) { string? host = _host ?? PublicIPAddress?.ToString(); @@ -201,8 +199,8 @@ public async Task StartAsync(CancellationToken cancellationToken = default) List tasks = new List(); - tasks.Add(ReceiveMessageAsync(_cancellationToken)); - tasks.Add(ProcessRuntime(_cancellationToken)); + tasks.Add(ReceiveMessageAsync(_runtimeCancellationTokenSource.Token)); + tasks.Add(ProcessRuntime(_runtimeCancellationTokenSource.Token)); Running = true; _logger.Debug("Start to run. TurnClient: {Client}", _turnClient); @@ -222,7 +220,6 @@ public async Task StopAsync(TimeSpan waitFor, CancellationToken cancellationToke await Task.Delay(waitFor, cancellationToken); _listener?.Stop(); _runtimeCancellationTokenSource.Cancel(); - _turnCancellationTokenSource.Cancel(); StopAllStreams(); Running = false; } @@ -296,12 +293,19 @@ public async Task> SendMessageWithReplyAsync( expectedResponses ); - using var client = new TcpClient { LingerState = new LingerOption(true, 1) }; + using CancellationTokenSource linkedTokenSource = + CancellationTokenSource.CreateLinkedTokenSource( + cancellationToken, + _runtimeCancellationTokenSource.Token); + var client = new TcpClient { LingerState = new LingerOption(true, 1) }; try { await client.ConnectAsync(peer.EndPoint.Host, peer.EndPoint.Port); + _logger.Debug( + "SendMessageAsync client: {EndPoint}", + (IPEndPoint)client.Client.LocalEndPoint); client.ReceiveTimeout = timeout?.Milliseconds ?? 0; - await WriteMessageAsync(message, client, cancellationToken); + await WriteMessageAsync(message, client, linkedTokenSource.Token); _logger.Verbose( "Successfully sent request {RequestId} to {Peer}: {Message}.", reqId, @@ -317,18 +321,17 @@ public async Task> SendMessageWithReplyAsync( (int)timeout.Value.TotalMilliseconds * expectedResponses); } - using CancellationTokenSource linkedTcs = + using CancellationTokenSource timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource( timeoutCts.Token, - cancellationToken); + linkedTokenSource.Token); while (expectedResponses > 0) { try { - // TODO: Should consider case where stream bytes exceeds buffer size. Message received = await ReadMessageAsync( client, - linkedTcs.Token); + timeoutTokenSource.Token); _logger.Verbose( "Received message was {Message}. Total: {Count}", received, @@ -413,6 +416,23 @@ public async Task> SendMessageWithReplyAsync( e.Actual); throw; } + finally + { + _ = Task.Run( + async () => + { + try + { + // Wait 15 seconds to close client to receive ACK from TURN. + await Task.Delay(15 * 1000, _runtimeCancellationTokenSource.Token); + } + finally + { + client.Dispose(); + } + }, + _runtimeCancellationTokenSource.Token); + } } public void BroadcastMessage(Address? except, Message message) @@ -424,7 +444,7 @@ public void BroadcastMessage(Address? except, Message message) foreach (var peer in _table.PeersToBroadcast(except)) { - _ = SendMessageAsync(peer, message, _cancellationToken); + _ = SendMessageAsync(peer, message, _runtimeCancellationTokenSource.Token); } } @@ -436,6 +456,10 @@ public async Task ReplyMessageAsync(Message message, CancellationToken cancellat } var id = new Guid(message.Identity); + using CancellationTokenSource linkedTokenSource = + CancellationTokenSource.CreateLinkedTokenSource( + cancellationToken, + _runtimeCancellationTokenSource.Token); _logger.Verbose("Trying to reply message. ID: {Id}", id); if (_streams.TryGetValue(id, out ReplyStream stream)) { @@ -444,12 +468,12 @@ public async Task ReplyMessageAsync(Message message, CancellationToken cancellat { _logger.Verbose("Start to writing reply {Message}.", message); using var @lock = stream.Lock.Lock(); - await WriteMessageAsync(message, stream.Client, _cancellationToken); + await WriteMessageAsync(message, stream.Client, linkedTokenSource.Token); } catch (IOException) { _logger.Verbose("Connection is lost"); - _ = TryRemoveStreamAsync(id, _cancellationToken); + _ = TryRemoveStreamAsync(id, _runtimeCancellationTokenSource.Token); } } } @@ -459,6 +483,11 @@ internal async Task WriteMessageAsync( TcpClient client, CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + byte[] serialized = message.Serialize( _privateKey, AsPeer, @@ -495,6 +524,11 @@ internal async Task ReadMessageAsync( TcpClient client, CancellationToken cancellationToken) { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + var content = new List(); byte[] buffer = new byte[1000]; NetworkStream stream = client.GetStream(); @@ -536,7 +570,7 @@ internal async Task ReadMessageAsync( throw; } - catch (Exception) + catch (Exception e) { if (cancellationToken.IsCancellationRequested) { @@ -575,7 +609,9 @@ private async Task ReceiveMessageAsync(CancellationToken cancellationToken) { TcpClient client = await _listener.AcceptTcpClientAsync(); client.LingerState = new LingerOption(true, 1); - _logger.Verbose("Connected to tcp client."); + _logger.Verbose( + "Connected to tcp client {Address}.", + (IPEndPoint)client.Client.RemoteEndPoint); Guid guid = Guid.NewGuid(); _logger.Verbose("Start to accept message of {Id}.", guid); @@ -659,8 +695,7 @@ private async Task AcceptAsync( } } - private async Task RefreshPermissions( - CancellationToken cancellationToken) + private async Task RefreshPermissions(CancellationToken cancellationToken) { TimeSpan lifetime = TurnPermissionLifetime; while (!cancellationToken.IsCancellationRequested) From 01da292146adc4ebd69ed417e8f2248fd812dc75 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 3 Jun 2021 14:47:42 +0900 Subject: [PATCH 11/17] Add SwarmOptions.Type to select transport used in Swarm --- .../Commands/ApvCommand.cs | 5 +- .../Net/Transports/BoundPeerExtensionsTest.cs | 14 ++++-- Libplanet/Net/Swarm.cs | 48 ++++++++++++++++--- Libplanet/Net/SwarmOptions.cs | 22 +++++++++ 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/Libplanet.Extensions.Cocona/Commands/ApvCommand.cs b/Libplanet.Extensions.Cocona/Commands/ApvCommand.cs index b96ae150e54..e6bcf1f6c5b 100644 --- a/Libplanet.Extensions.Cocona/Commands/ApvCommand.cs +++ b/Libplanet.Extensions.Cocona/Commands/ApvCommand.cs @@ -4,6 +4,7 @@ using System.IO; using System.Linq; using System.Text.Json; +using System.Threading.Tasks; using Bencodex; using Bencodex.Types; using global::Cocona; @@ -280,7 +281,7 @@ void TreeIntoTable(IValue tree, List<(string, string)> table, string key) } [Command(Description = "Query app protocol version (a.k.a. APV) of target node.")] - public void Query( + public async Task Query( [Argument( Name = "TARGET", #pragma warning disable MEN002 // Line is too long @@ -302,7 +303,7 @@ public void Query( try { - apv = peer.QueryAppProtocolVersion(); + apv = await peer.QueryAppProtocolVersionTcp(); } catch { diff --git a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs index 466883f8325..4e858a4e7c0 100644 --- a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs +++ b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs @@ -15,8 +15,10 @@ namespace Libplanet.Tests.Net.Transports { public class BoundPeerExtensionsTest { - [Fact(Timeout = 60 * 1000)] - public async Task QueryAppProtocolVersion() + [Theory(Timeout = 60 * 1000)] + [InlineData(SwarmOptions.TransportType.NetMQTransport)] + [InlineData(SwarmOptions.TransportType.TcpTransport)] + public async Task QueryAppProtocolVersion(SwarmOptions.TransportType transportType) { var fx = new DefaultStoreFixture(); var policy = new BlockPolicy(); @@ -28,12 +30,18 @@ public async Task QueryAppProtocolVersion() string host = IPAddress.Loopback.ToString(); int port = FreeTcpPort(); + var option = new SwarmOptions + { + Type = transportType, + }; + using (var swarm = new Swarm( blockchain, swarmKey, apv, host: host, - listenPort: port)) + listenPort: port, + options: option)) { var peer = new BoundPeer(swarmKey.PublicKey, new DnsEndPoint(host, port)); // Before swarm starting... diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index d8733aea24e..94e568adc68 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -137,17 +137,12 @@ internal Swarm( Options = options ?? new SwarmOptions(); RoutingTable = new RoutingTable(Address, tableSize, bucketSize, Options.StaticPeers); - Transport = new NetMQTransport( - RoutingTable, - _privateKey, - _appProtocolVersion, - TrustedAppProtocolVersionSigners, + Transport = InitializeTransport( workers, host, listenPort, iceServers, - differentAppProtocolVersionEncountered, - Options.MessageLifespan); + differentAppProtocolVersionEncountered); Transport.ProcessMessageHandler.Register(ProcessMessageHandlerAsync); PeerDiscovery = new KademliaProtocol(RoutingTable, Transport, Address); } @@ -1268,6 +1263,45 @@ await blockHashes.FirstAsync(cancellationToken) is { } t) } } + private ITransport InitializeTransport( + int workers, + string host, + int? listenPort, + IEnumerable iceServers, + DifferentAppProtocolVersionEncountered differentAppProtocolVersionEncountered) + { + switch (Options.Type) + { + case SwarmOptions.TransportType.NetMQTransport: + return new NetMQTransport( + RoutingTable, + _privateKey, + _appProtocolVersion, + TrustedAppProtocolVersionSigners, + workers, + host, + listenPort, + iceServers ?? new IceServer[0], + differentAppProtocolVersionEncountered, + Options.MessageLifespan); + + case SwarmOptions.TransportType.TcpTransport: + return new TcpTransport( + RoutingTable, + _privateKey, + _appProtocolVersion, + TrustedAppProtocolVersionSigners, + host, + listenPort, + iceServers ?? new IceServer[0], + differentAppProtocolVersionEncountered, + Options.MessageLifespan); + + default: + throw new ArgumentException(nameof(SwarmOptions.Type)); + } + } + private void BroadcastBlock(Address? except, Block block) { _logger.Debug("Trying to broadcast blocks..."); diff --git a/Libplanet/Net/SwarmOptions.cs b/Libplanet/Net/SwarmOptions.cs index 9695c9303a0..0521dcfb637 100644 --- a/Libplanet/Net/SwarmOptions.cs +++ b/Libplanet/Net/SwarmOptions.cs @@ -5,12 +5,29 @@ using Libplanet.Blocks; using Libplanet.Net.Messages; using Libplanet.Net.Protocols; +using Libplanet.Net.Transports; using Libplanet.Tx; namespace Libplanet.Net { public class SwarmOptions { + /// + /// Enum represents the type of the . + /// + public enum TransportType : byte + { + /// + /// NetMQ based transport. + /// + NetMQTransport = 0x01, + + /// + /// TCP based transport. + /// + TcpTransport = 0x02, + } + /// /// The maximum timeout used in . /// @@ -75,5 +92,10 @@ public class SwarmOptions /// using an approximated value. /// public int BranchpointThreshold { get; set; } = 10; + + /// + /// The type of used in . + /// + public TransportType Type { get; set; } = TransportType.TcpTransport; } } From 64996fbab361892480622d6a34640e661b09d901 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 13 May 2021 18:35:35 +0900 Subject: [PATCH 12/17] Update CHANGES.md --- CHANGES.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 777c7515d1b..2c4611930d3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -144,6 +144,14 @@ To be released. - `Swarm.PeerStates` - Return type of `RoutingTable.Neighbors()` became `IReadOnlyList` (was `IEnumerable`). [[#1230], [#1271]] + - Removed `ITransport.RunAsync()` method. + `ITransport.StartAsync()` now conducts operation that + `ITransport.RunAsync()` used to conduct. [[#1288]] + - Removed `ITransport.ReplyMessage()` method which was non-blocking. + Instead, added `ITransport.ReplyMessageAsync()` asynchronous method + which is awaited until the reply message is sent. [[#1288]] + - The type of `ITransport.ProcessMessageHandler` became + `AsyncDelegate` (which was `EventHandler`). [[#1288]] - All methods pertaining to evaluating `IAction`s are moved to a new `ActionEvaluator` class. [[#1301], [#1305]] - Removed `Block.Evaluate()` method. @@ -183,6 +191,14 @@ To be released. - Added `StaticPeers` as the last parameter to `RoutingTable(Address, int, int)` constructor. [[#1230], [#1271]] - Added `AtomicActionRenderer` class. [[#1267], [#1275]] + - Added `ITransport.MessageHistory` property. [[#1288]] + - Added `ITransport.WaitForRunning()` method. [[#1288]] + - Added `TcpTransport` class which implements `ITransport` interface. + [[#1288]] + - Added `SwarmOptions.Type` property. [[#1288]] + - Added `SwarmOptions.TransportType` enum. [[#1288]] + - Added `AsyncDelegate` class. [[#1288]] + - Added `InvalidMagicCookieException` class. [[#1288]] - Added `TxExecution` abstract class. [[#1156], [#1289]] - Added `TxSuccess` class. [[#1156], [#1289]] - Added `TxFailure` class. [[#1156], [#1289]] @@ -293,6 +309,7 @@ To be released. [#1284]: https://github.com/planetarium/libplanet/issues/1284 [#1285]: https://github.com/planetarium/libplanet/issues/1285 [#1287]: https://github.com/planetarium/libplanet/pull/1287 +[#1288]: https://github.com/planetarium/libplanet/pull/1288 [#1289]: https://github.com/planetarium/libplanet/pull/1289 [#1294]: https://github.com/planetarium/libplanet/issues/1294 [#1298]: https://github.com/planetarium/libplanet/pull/1298 From 87f4f725ae234e61b95664e9d060fb85edc552f7 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 17 Jun 2021 15:29:43 +0900 Subject: [PATCH 13/17] Fix minor bugs --- Libplanet.Tests/Net/Messages/MessageTest.cs | 2 +- Libplanet.Tests/Net/Transports/TransportTest.cs | 8 +++++++- Libplanet/Net/Swarm.BlockSync.cs | 3 ++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/Libplanet.Tests/Net/Messages/MessageTest.cs b/Libplanet.Tests/Net/Messages/MessageTest.cs index f0a662de2a6..2d72e15e539 100644 --- a/Libplanet.Tests/Net/Messages/MessageTest.cs +++ b/Libplanet.Tests/Net/Messages/MessageTest.cs @@ -190,7 +190,7 @@ public void UseInvalidSignature() validAppProtocolVersion, ImmutableHashSet.Empty, null, - TimeSpan.FromSeconds(1)); + TimeSpan.FromSeconds(5)); }); } diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index a988e34ba69..b4aa6483d3f 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -3,6 +3,7 @@ using System.Collections.Immutable; using System.Linq; using System.Net; +using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Libplanet.Crypto; @@ -343,11 +344,16 @@ public async Task SendMessageToInvalidPeerAsync() try { await InitializeAsync(transport); + // Make sure the tcp port is invalid. + var l = new TcpListener(IPAddress.Loopback, 0); + l.Start(); + int port = ((IPEndPoint)l.LocalEndpoint).Port; + l.Stop(); var peer = new BoundPeer( new PrivateKey().PublicKey, new DnsEndPoint( "0.0.0.0", - ((BoundPeer)transport.AsPeer).EndPoint.Port + 1)); + port)); Task task = transport.SendMessageWithReplyAsync( peer, new Ping(), diff --git a/Libplanet/Net/Swarm.BlockSync.cs b/Libplanet/Net/Swarm.BlockSync.cs index 335934c6c7b..9c0e5a810a4 100644 --- a/Libplanet/Net/Swarm.BlockSync.cs +++ b/Libplanet/Net/Swarm.BlockSync.cs @@ -252,7 +252,8 @@ CancellationToken cancellationToken logSessionIds: (logSessionId, subSessionId), cancellationToken: cancellationToken ); - IEnumerable> hashes = await hashesAsync.ToArrayAsync(); + IEnumerable> hashes = + await hashesAsync.ToArrayAsync(cancellationToken); if (!hashes.Any()) { From be75178540dca400826110be8fe5fcc7162f5605 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 24 Jun 2021 15:27:26 +0900 Subject: [PATCH 14/17] temp --- .../Net/Transports/BoundPeerExtensionsTest.cs | 16 ++++++++++-- .../Net/Transports/TransportTest.cs | 2 +- .../Net/Transports/BoundPeerExtensions.cs | 25 +++++++++++++++---- Libplanet/Net/Transports/TcpTransport.cs | 22 +++++++--------- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs index 4e858a4e7c0..06b4ef1a1b3 100644 --- a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs +++ b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs @@ -45,9 +45,21 @@ public async Task QueryAppProtocolVersion(SwarmOptions.TransportType transportTy { var peer = new BoundPeer(swarmKey.PublicKey, new DnsEndPoint(host, port)); // Before swarm starting... - Assert.Throws(() => + await Assert.ThrowsAsync(async () => { - peer.QueryAppProtocolVersion(timeout: TimeSpan.FromSeconds(1)); + if (swarm.Transport is NetMQTransport) + { + peer.QueryAppProtocolVersion(timeout: TimeSpan.FromSeconds(1)); + } + else if (swarm.Transport is TcpTransport) + { + await peer.QueryAppProtocolVersionTcp(timeout: TimeSpan.FromSeconds(1)); + } + else + { + throw new XunitException( + "Each type of transport must have corresponding test case."); + } }); _ = swarm.StartAsync(); try diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index b4aa6483d3f..bfd7fd41519 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -71,7 +71,7 @@ public async void RestartAsync() } [SkippableFact(Timeout = Timeout)] - public async void Dispose() + public async void DisposeTest() { ITransport transport = CreateTransport(); diff --git a/Libplanet/Net/Transports/BoundPeerExtensions.cs b/Libplanet/Net/Transports/BoundPeerExtensions.cs index f080c031c01..8484acf024a 100644 --- a/Libplanet/Net/Transports/BoundPeerExtensions.cs +++ b/Libplanet/Net/Transports/BoundPeerExtensions.cs @@ -39,14 +39,21 @@ public static AppProtocolVersion QueryAppProtocolVersion( ); TimeSpan timeoutNotNull = timeout ?? TimeSpan.FromSeconds(5); - if (dealerSocket.TrySendMultipartMessage(timeoutNotNull, request)) + try { - var response = new NetMQMessage(); - if (dealerSocket.TryReceiveMultipartMessage(timeoutNotNull, ref response)) + if (dealerSocket.TrySendMultipartMessage(timeoutNotNull, request)) { - return AppProtocolVersion.FromToken(response.First.ConvertToString()); + var response = new NetMQMessage(); + if (dealerSocket.TryReceiveMultipartMessage(timeoutNotNull, ref response)) + { + return AppProtocolVersion.FromToken(response.First.ConvertToString()); + } } } + catch (TerminatingException) + { + throw new TimeoutException($"Peer didn't respond."); + } throw new TimeoutException( $"Peer[{peer}] didn't respond within the specified time[{timeout}]." @@ -71,7 +78,15 @@ public static async Task QueryAppProtocolVersionTcp( ) { using var client = new TcpClient(); - await client.ConnectAsync(peer.EndPoint.Host, peer.EndPoint.Port); + try + { + await client.ConnectAsync(peer.EndPoint.Host, peer.EndPoint.Port); + } + catch (SocketException) + { + throw new TimeoutException("Cannot find peer."); + } + client.ReceiveTimeout = timeout?.Milliseconds ?? 0; using var stream = client.GetStream(); var key = new PrivateKey(); diff --git a/Libplanet/Net/Transports/TcpTransport.cs b/Libplanet/Net/Transports/TcpTransport.cs index c48667c4908..b019cfdf361 100644 --- a/Libplanet/Net/Transports/TcpTransport.cs +++ b/Libplanet/Net/Transports/TcpTransport.cs @@ -52,9 +52,8 @@ private readonly DifferentAppProtocolVersionEncountered? private CancellationTokenSource _runtimeCancellationTokenSource; private CancellationTokenSource _turnCancellationTokenSource; - private int _listenPort; private DnsEndPoint? _hostEndPoint; - private TcpListener? _listener; + private TcpListener _listener; private TurnClient? _turnClient; private bool _disposed; @@ -78,7 +77,6 @@ public TcpTransport( _appProtocolVersion = appProtocolVersion; _trustedAppProtocolVersionSigners = trustedAppProtocolVersionSigners; _host = host; - _listenPort = listenPort ?? 0; _differentAppProtocolVersionEncountered = differentAppProtocolVersionEncountered; _iceServers = iceServers?.ToList(); _messageLifespan = messageLifespan; @@ -100,6 +98,7 @@ public TcpTransport( _logger = Log.ForContext(); _runtimeCancellationTokenSource = new CancellationTokenSource(); _turnCancellationTokenSource = new CancellationTokenSource(); + _listener = new TcpListener(new IPEndPoint(IPAddress.Any, listenPort ?? 0)); ProcessMessageHandler = new AsyncDelegate(); MessageHistory = new FixedSizedQueue(MessageHistoryCapacity); } @@ -142,7 +141,7 @@ public void Dispose() { if (!_disposed) { - _listener?.Stop(); + _listener.Stop(); _runtimeCancellationTokenSource.Cancel(); _turnCancellationTokenSource.Cancel(); @@ -166,13 +165,10 @@ public async Task StartAsync(CancellationToken cancellationToken = default) throw new TransportException("Transport is already running."); } - _listener = new TcpListener(new IPEndPoint(IPAddress.Any, _listenPort)); _listener.Start(ListenerBacklog); + int listenPort = ((IPEndPoint)_listener.LocalEndpoint).Port; - // _listenPort might be 0, which is any, so it should be re-set. - _listenPort = ((IPEndPoint)_listener.LocalEndpoint).Port; - - _logger.Information("Listen on {Port}", _listenPort); + _logger.Information("Listen on {Port}", listenPort); _runtimeCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _turnCancellationTokenSource = @@ -181,7 +177,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) if (_host is null && !(_iceServers is null)) { _turnClient = await IceServer.CreateTurnClient(_iceServers); - await _turnClient.StartAsync(_listenPort, cancellationToken); + await _turnClient.StartAsync(listenPort, cancellationToken); _ = RefreshPermissions(_runtimeCancellationTokenSource.Token); } @@ -194,7 +190,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default) throw new TransportException("Host is null."); } - _hostEndPoint = new DnsEndPoint(host, _listenPort); + _hostEndPoint = new DnsEndPoint(host, listenPort); } List tasks = new List(); @@ -218,7 +214,7 @@ public async Task StopAsync(TimeSpan waitFor, CancellationToken cancellationToke if (Running) { await Task.Delay(waitFor, cancellationToken); - _listener?.Stop(); + _listener.Stop(); _runtimeCancellationTokenSource.Cancel(); StopAllStreams(); Running = false; @@ -603,7 +599,7 @@ internal async Task ReadMessageAsync( private async Task ReceiveMessageAsync(CancellationToken cancellationToken) { - while (!(cancellationToken.IsCancellationRequested || _listener is null)) + while (!cancellationToken.IsCancellationRequested) { try { From 461f7eeecf36bb66fb168dccd23e934b1399ae4b Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 24 Jun 2021 18:14:08 +0900 Subject: [PATCH 15/17] fix --- Libplanet.Tests/Net/Transports/TransportTest.cs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index bfd7fd41519..48c78cece0c 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -11,7 +11,6 @@ using Libplanet.Net.Messages; using Libplanet.Net.Protocols; using Libplanet.Net.Transports; -using NetMQ; using Serilog; using Xunit; using Xunit.Sdk; @@ -30,13 +29,14 @@ public abstract class TransportTest TransportConstructor { get; set; } [SkippableFact(Timeout = Timeout)] - public void StartAsync() + public async void StartAsync() { ITransport transport = CreateTransport(); try { _ = transport.StartAsync(); + await transport.WaitForRunningAsync(); Assert.True(transport.Running); } finally @@ -56,10 +56,7 @@ public async void RestartAsync() Assert.True(transport.Running); await transport.StopAsync(TimeSpan.Zero); Assert.False(transport.Running); - if (transport is NetMQTransport) - { - NetMQConfig.Cleanup(false); - } + await Task.Delay(TimeSpan.FromSeconds(5)); await InitializeAsync(transport); Assert.True(transport.Running); From a10925e77127a1ba40d96bceec1a22441cc170ab Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 24 Jun 2021 18:30:24 +0900 Subject: [PATCH 16/17] fix2 --- Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs | 1 - Libplanet.Tests/Net/Transports/NetMQTransportTest.cs | 8 +++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs index 06b4ef1a1b3..a71dcd8d52c 100644 --- a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs +++ b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs @@ -16,7 +16,6 @@ namespace Libplanet.Tests.Net.Transports public class BoundPeerExtensionsTest { [Theory(Timeout = 60 * 1000)] - [InlineData(SwarmOptions.TransportType.NetMQTransport)] [InlineData(SwarmOptions.TransportType.TcpTransport)] public async Task QueryAppProtocolVersion(SwarmOptions.TransportType transportType) { diff --git a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs index 3372c095043..327846d2cf0 100644 --- a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs +++ b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs @@ -9,6 +9,7 @@ using Libplanet.Net.Messages; using Libplanet.Net.Protocols; using Libplanet.Net.Transports; +using NetMQ; using Nito.AsyncEx; using Serilog; using Xunit; @@ -16,7 +17,7 @@ namespace Libplanet.Tests.Net.Transports { - public class NetMQTransportTest : TransportTest + public class NetMQTransportTest : TransportTest, IDisposable { public NetMQTransportTest(ITestOutputHelper testOutputHelper) { @@ -54,6 +55,11 @@ public NetMQTransportTest(ITestOutputHelper testOutputHelper) Logger = Log.ForContext(); } + public void Dispose() + { + NetMQConfig.Cleanup(false); + } + [SkippableFact(Timeout = Timeout, Skip = "Target method is broken.")] public async Task MessageHistory() { From fcf7d08e1434c746ced306411bb0a80309d16130 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 24 Jun 2021 18:48:28 +0900 Subject: [PATCH 17/17] fix3 --- Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs | 8 ++++++++ Libplanet.Tests/Net/Transports/NetMQTransportTest.cs | 1 + Libplanet.Tests/Net/Transports/TransportTest.cs | 7 ++++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs index a71dcd8d52c..a878818e284 100644 --- a/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs +++ b/Libplanet.Tests/Net/Transports/BoundPeerExtensionsTest.cs @@ -8,14 +8,17 @@ using Libplanet.Net.Transports; using Libplanet.Tests.Common.Action; using Libplanet.Tests.Store; +using NetMQ; using Xunit; using Xunit.Sdk; namespace Libplanet.Tests.Net.Transports { + [Collection("NetMQConfiguration")] public class BoundPeerExtensionsTest { [Theory(Timeout = 60 * 1000)] + [InlineData(SwarmOptions.TransportType.NetMQTransport)] [InlineData(SwarmOptions.TransportType.TcpTransport)] public async Task QueryAppProtocolVersion(SwarmOptions.TransportType transportType) { @@ -85,6 +88,11 @@ await Assert.ThrowsAsync(async () => await swarm.StopAsync(); } } + + if (transportType == SwarmOptions.TransportType.NetMQTransport) + { + NetMQConfig.Cleanup(false); + } } private static int FreeTcpPort() diff --git a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs index 327846d2cf0..99127c1ea35 100644 --- a/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs +++ b/Libplanet.Tests/Net/Transports/NetMQTransportTest.cs @@ -17,6 +17,7 @@ namespace Libplanet.Tests.Net.Transports { + [Collection("NetMQConfiguration")] public class NetMQTransportTest : TransportTest, IDisposable { public NetMQTransportTest(ITestOutputHelper testOutputHelper) diff --git a/Libplanet.Tests/Net/Transports/TransportTest.cs b/Libplanet.Tests/Net/Transports/TransportTest.cs index 48c78cece0c..870118d4194 100644 --- a/Libplanet.Tests/Net/Transports/TransportTest.cs +++ b/Libplanet.Tests/Net/Transports/TransportTest.cs @@ -11,6 +11,7 @@ using Libplanet.Net.Messages; using Libplanet.Net.Protocols; using Libplanet.Net.Transports; +using NetMQ; using Serilog; using Xunit; using Xunit.Sdk; @@ -56,7 +57,11 @@ public async void RestartAsync() Assert.True(transport.Running); await transport.StopAsync(TimeSpan.Zero); Assert.False(transport.Running); - await Task.Delay(TimeSpan.FromSeconds(5)); + + if (transport is NetMQTransport) + { + NetMQConfig.Cleanup(false); + } await InitializeAsync(transport); Assert.True(transport.Running);