Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc/tcp transport ci #7

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
**/.idea/**/dynamic.xml

# Rider
# Rider auto-generates .iml files, contentModel.xml, and projectSettingsUpdater.xml
# Rider auto-generates .iml files, contentModel.xml, projectSettingsUpdater.xml, and indexLayout.xml
**/.idea/**/*.iml
**/.idea/**/contentModel.xml
**/.idea/**/modules.xml
**/.idea/**/projectSettingsUpdater.xml
**/.idea/**/indexLayout.xml

*.suo
*.user
Expand Down
17 changes: 17 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ To be released.
- `Swarm<T>.PeerStates`
- Return type of `RoutingTable.Neighbors()` became `IReadOnlyList<BoundPeer>`
(was `IEnumerable<BoundPeer>`). [[#1230], [#1271]]
- Removed `ITransport.RunAsync()` method.
`ITransport.StartAsync()` now conducts operation that
`ITransport.RunAsync()` used to conduct. [[#1288]]
- Removed `ITransport.ReplyMessage()` method which was non-blocking.
Instead, added `ITransport.ReplyMessageAsync()` asynchronous method
which is awaited until the reply message is sent. [[#1288]]
- The type of `ITransport.ProcessMessageHandler` became
`AsyncDelegate<T>` (which was `EventHandler`). [[#1288]]
- All methods pertaining to evaluating `IAction`s are moved
to a new `ActionEvaluator` class. [[#1301], [#1305]]
- Removed `Block<T>.Evaluate()` method.
Expand Down Expand Up @@ -183,6 +191,14 @@ To be released.
- Added `StaticPeers` as the last parameter to
`RoutingTable(Address, int, int)` constructor. [[#1230], [#1271]]
- Added `AtomicActionRenderer<T>` class. [[#1267], [#1275]]
- Added `ITransport.MessageHistory` property. [[#1288]]
- Added `ITransport.WaitForRunning()` method. [[#1288]]
- Added `TcpTransport` class which implements `ITransport` interface.
[[#1288]]
- Added `SwarmOptions.Type` property. [[#1288]]
- Added `SwarmOptions.TransportType` enum. [[#1288]]
- Added `AsyncDelegate<T>` class. [[#1288]]
- Added `InvalidMagicCookieException` class. [[#1288]]
- Added `TxExecution` abstract class. [[#1156], [#1289]]
- Added `TxSuccess` class. [[#1156], [#1289]]
- Added `TxFailure` class. [[#1156], [#1289]]
Expand Down Expand Up @@ -293,6 +309,7 @@ To be released.
[#1284]: https://github.com/planetarium/libplanet/issues/1284
[#1285]: https://github.com/planetarium/libplanet/issues/1285
[#1287]: https://github.com/planetarium/libplanet/pull/1287
[#1288]: https://github.com/planetarium/libplanet/pull/1288
[#1289]: https://github.com/planetarium/libplanet/pull/1289
[#1294]: https://github.com/planetarium/libplanet/issues/1294
[#1298]: https://github.com/planetarium/libplanet/pull/1298
Expand Down
5 changes: 3 additions & 2 deletions Libplanet.Extensions.Cocona/Commands/ApvCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Bencodex;
using Bencodex.Types;
using global::Cocona;
Expand Down Expand Up @@ -280,7 +281,7 @@ void TreeIntoTable(IValue tree, List<(string, string)> table, string key)
}

[Command(Description = "Query app protocol version (a.k.a. APV) of target node.")]
public void Query(
public async Task Query(
[Argument(
Name = "TARGET",
#pragma warning disable MEN002 // Line is too long
Expand All @@ -302,7 +303,7 @@ public void Query(

try
{
apv = peer.QueryAppProtocolVersion();
apv = await peer.QueryAppProtocolVersionTcp();
}
catch
{
Expand Down
30 changes: 29 additions & 1 deletion Libplanet.Tests/Net/Messages/MessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,36 @@ public void UseInvalidSignature()
validAppProtocolVersion,
ImmutableHashSet<PublicKey>.Empty,
null,
TimeSpan.FromSeconds(1));
TimeSpan.FromSeconds(5));
});
}

[Fact]
public void Serialize()
{
var privateKey = new PrivateKey();
var peer = new Peer(privateKey.PublicKey);
var dateTimeOffset = DateTimeOffset.UtcNow;
var appProtocolVersion = new AppProtocolVersion(
1,
new Bencodex.Types.Integer(0),
ImmutableArray<byte>.Empty,
default(Address));
var message = new Ping();
var id = Guid.NewGuid();
message.Identity = id.ToByteArray();
byte[] serialized =
message.Serialize(privateKey, peer, dateTimeOffset, appProtocolVersion);
Message deserialized = Message.Deserialize(
serialized,
appProtocolVersion,
null,
null,
null);
Assert.Equal(peer, deserialized.Remote);
Assert.Equal(appProtocolVersion, deserialized.Version);
Assert.Equal(dateTimeOffset, deserialized.Timestamp);
Assert.Equal(id.ToByteArray(), deserialized.Identity);
}
}
}
72 changes: 40 additions & 32 deletions Libplanet.Tests/Net/Protocols/ProtocolTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Libplanet.Crypto;
using Libplanet.Net;
using Libplanet.Net.Protocols;
using Libplanet.Net.Transports;
using Serilog;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -58,7 +59,7 @@ public async Task Start()
var transportA = CreateTestTransport();
var transportB = CreateTestTransport();

Assert.Throws<SwarmException>(() => transportA.SendPing(transportB.AsPeer));
Assert.Throws<TransportException>(() => transportA.SendPing(transportB.AsPeer));
await StartTestTransportAsync(transportA);
await Assert.ThrowsAsync<TimeoutException>(() =>
transportA.AddPeersAsync(
Expand Down Expand Up @@ -148,9 +149,9 @@ await Assert.ThrowsAsync<TimeoutException>(

Assert.Contains(transportB.AsPeer, transportA.Peers);

await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
transportA.Dispose();
transportB.Dispose();
transportC.Dispose();
}

[Fact(Timeout = Timeout)]
Expand All @@ -159,12 +160,15 @@ public async Task BootstrapException()
var transportA = CreateTestTransport();
var transportB = CreateTestTransport();

await Assert.ThrowsAsync<SwarmException>(
await Assert.ThrowsAsync<TransportException>(
() => transportB.BootstrapAsync(
new[] { transportA.AsPeer },
TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(3))
);

transportA.Dispose();
transportB.Dispose();
}

[Fact(Timeout = Timeout)]
Expand Down Expand Up @@ -198,9 +202,9 @@ public async Task BootstrapAsyncTest()
}
finally
{
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
transportA.Dispose();
transportB.Dispose();
transportC.Dispose();
}
}

Expand All @@ -220,7 +224,9 @@ public async Task RemoveStalePeers()
await Task.Delay(100);
await transportA.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
Assert.Empty(transportA.Peers);
await transportA.StopAsync(TimeSpan.Zero);

transportA.Dispose();
transportB.Dispose();
}

[Fact(Timeout = Timeout)]
Expand All @@ -245,10 +251,10 @@ public async Task RoutingTableFull()
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
transport.Dispose();
transportA.Dispose();
transportB.Dispose();
transportC.Dispose();
}

[Fact(Timeout = Timeout)]
Expand Down Expand Up @@ -282,9 +288,10 @@ public async Task ReplacementCache()
Assert.Contains(transportB.AsPeer, transport.Peers);
Assert.DoesNotContain(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
transport.Dispose();
transportA.Dispose();
transportB.Dispose();
transportC.Dispose();
}

[Fact(Timeout = Timeout)]
Expand Down Expand Up @@ -319,8 +326,10 @@ public async Task RemoveDeadReplacementCache()
Assert.DoesNotContain(transportB.AsPeer, transport.Peers);
Assert.Contains(transportC.AsPeer, transport.Peers);

await transport.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
transport.Dispose();
transportA.Dispose();
transportB.Dispose();
transportC.Dispose();
}

[Theory(Timeout = 2 * Timeout)]
Expand Down Expand Up @@ -358,10 +367,11 @@ public async Task BroadcastMessage(int count)
}
finally
{
seed.Dispose();
foreach (var transport in transports)
{
Assert.True(transport.ReceivedTestMessageOfData("foo"));
await transport.StopAsync(TimeSpan.Zero);
transport.Dispose();
}
}
}
Expand Down Expand Up @@ -449,9 +459,9 @@ public async Task BroadcastGuarantee()
}
finally
{
await seed.StopAsync(TimeSpan.Zero);
await t1.StopAsync(TimeSpan.Zero);
await t2.StopAsync(TimeSpan.Zero);
seed.Dispose();
t1.Dispose();
t2.Dispose();
}
}

Expand Down Expand Up @@ -480,9 +490,9 @@ public async Task DoNotBroadcastToSourcePeer()
}
finally
{
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
await transportC.StopAsync(TimeSpan.Zero);
transportA.Dispose();
transportB.Dispose();
transportC.Dispose();
}
}

Expand Down Expand Up @@ -532,14 +542,12 @@ public async Task RefreshTable()
}
finally
{
await transport.StopAsync(TimeSpan.Zero);
transport.Dispose();
foreach (var t in transports)
{
await t.StopAsync(TimeSpan.Zero);
t.Dispose();
}
}

Assert.True(true);
}

private TestTransport CreateTestTransport(
Expand All @@ -558,12 +566,12 @@ private TestTransport CreateTestTransport(
networkDelay);
}

private async Task<Task> StartTestTransportAsync(
private async Task StartTestTransportAsync(
TestTransport transport,
CancellationToken cancellationToken = default(CancellationToken))
{
await transport.StartAsync(cancellationToken);
return transport.RunAsync(cancellationToken);
_ = transport.StartAsync(cancellationToken);
await transport.WaitForRunningAsync();
}
}
}
Loading