Skip to content

Commit

Permalink
feat: implement PubsubSwarm mode for Node
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Nov 1, 2024
1 parent b7441a0 commit a14d0e9
Show file tree
Hide file tree
Showing 19 changed files with 675 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ public static ILibplanetNodeBuilder AddLibplanetNode(
services.AddSingleton<IValidateOptions<ValidatorOptions>, ValidatorOptionsValidator>();

services.AddOptions<SoloOptions>()
.Bind(configuration.GetSection(SoloOptions.Position));
.Bind(configuration.GetSection(SoloOptions.Position));
services.AddSingleton<IConfigureOptions<SoloOptions>, SoloOptionsConfigurator>();

services.AddOptions<PubsubSwarmOptions>()
.Bind(configuration.GetSection(PubsubSwarmOptions.Position));
services
.AddSingleton<IConfigureOptions<PubsubSwarmOptions>, PubsubSwarmOptionsConfigurator>();

services.AddSingleton<PolicyService>();
services.AddSingleton<StoreService>();
services.AddSingleton(s => (IStoreService)s.GetRequiredService<StoreService>());
Expand All @@ -61,6 +66,12 @@ public static ILibplanetNodeBuilder AddLibplanetNode(
nodeBuilder.WithSolo();
}

if (configuration.IsOptionsEnabled(PubsubSwarmOptions.Position))
{
Console.WriteLine("Start PubsubSwarm");
nodeBuilder.WithPubsubSwarm();
}

if (configuration.IsOptionsEnabled(SwarmOptions.Position))
{
nodeBuilder.WithSwarm();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Libplanet.Node.Protocols;
using Libplanet.Node.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Nethermind.Libp2p.Stack;

namespace Libplanet.Node.Extensions.NodeBuilder;

Expand Down Expand Up @@ -39,4 +41,14 @@ public ILibplanetNodeBuilder WithValidator()
_scopeList.Add("Validator");
return this;
}

public ILibplanetNodeBuilder WithPubsubSwarm()
{
Services.AddSingleton<IMessageChannel, MessageChannel>();
Services.AddLibp2p(builder => builder.AddAppLayerProtocol<PingPongProtocol>());
Services.AddScoped<PubsubSwarm>();
Services.AddHostedService<PubsubSwarmService>();
_scopeList.Add("PubsubSwarm");
return this;
}
}
3 changes: 2 additions & 1 deletion sdk/node/Libplanet.Node/Libplanet.Node.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
<ProjectReference Include="..\..\..\src\Libplanet.Store\Libplanet.Store.csproj" />
<ProjectReference Include="..\..\..\src\Libplanet\Libplanet.csproj" />
<ProjectReference Include="..\..\..\src\Libplanet.Net\Libplanet.Net.csproj" />
<ProjectReference Include="..\..\..\..\..\suho\suho-lib9c\.Lib9c.Plugin.Shared\Lib9c.Plugin.Shared.csproj" />
<ProjectReference Include="..\..\..\dotnet-libp2p\src\libp2p\Libp2p\Libp2p.csproj" />
<ProjectReference Include="..\..\..\dotnet-libp2p\src\libp2p\Libp2p.Core\Libp2p.Core.csproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions sdk/node/Libplanet.Node/Options/PubsubSwarmOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.ComponentModel;
using Libplanet.Node.DataAnnotations;

namespace Libplanet.Node.Options;

[Options(Position)]
public class PubsubSwarmOptions : OptionsBase<PubsubSwarmOptions>
{
public const string Position = "PubsubSwarm";

public bool Proposer { get; set; }

public long BlockInterval { get; set; } = 4000;

[PrivateKey]
[Description("The private key of the node.")]
public string PrivateKey { get; set; } = string.Empty;
}
21 changes: 21 additions & 0 deletions sdk/node/Libplanet.Node/Options/PubsubSwarmOptionsConfigurator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Libplanet.Common;
using Libplanet.Crypto;
using Microsoft.Extensions.Logging;

namespace Libplanet.Node.Options;

internal sealed class PubsubSwarmOptionsConfigurator(
ILogger<PubsubSwarmOptionsConfigurator> logger)
: OptionsConfiguratorBase<PubsubSwarmOptions>
{
protected override void OnConfigure(PubsubSwarmOptions options)
{
if (options.PrivateKey == string.Empty)
{
options.PrivateKey = ByteUtil.Hex(new PrivateKey().ByteArray);
logger.LogWarning(
"Node's private key is not set. A new private key is generated: {PrivateKey}",
options.PrivateKey);
}
}
}
39 changes: 39 additions & 0 deletions sdk/node/Libplanet.Node/Protocols/MessageContentCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Libplanet.Net.Messages;

namespace Libplanet.Node.Protocols;

public static class MessageContentCodec
{
public static MessageContent Deserialize(byte[] bytes)
{
var rawFrames = bytes;
var type = rawFrames[0];
rawFrames = rawFrames.Skip(1).ToArray();
var dataFrames = new List<byte[]>();
var frameCount = BitConverter.ToInt32(rawFrames[..4]);
rawFrames = rawFrames.Skip(4).ToArray();
for (int i = 0; i < frameCount; i++)
{
var frameSize = BitConverter.ToInt32(rawFrames[..4]);
dataFrames.Add(rawFrames.Skip(4).Take(frameSize).ToArray());
rawFrames = rawFrames.Skip(4 + frameSize).ToArray();
}

return NetMQMessageCodec.CreateMessage(
(MessageContent.MessageType)type,
dataFrames.ToArray());
}

public static byte[] Serialize(MessageContent messageContent)
{
IEnumerable<byte> ba = [(byte)messageContent.Type];
ba = ba.Concat(BitConverter.GetBytes(messageContent.DataFrames.Count()));
foreach (var bytes in messageContent.DataFrames)
{
ba = ba.Concat(BitConverter.GetBytes(bytes.Length));
ba = ba.Concat(bytes);
}

return ba.ToArray();
}
}
75 changes: 75 additions & 0 deletions sdk/node/Libplanet.Node/Protocols/PingPongProtocol.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using System.Buffers;
using Libplanet.Node.Services;
using Nethermind.Libp2p.Core;

namespace Libplanet.Node.Protocols
{
internal class PingPongProtocol(IMessageChannel messageChannel) : IProtocol
{
public string Id => "/blockchain/ping-pong/1.0.0";

public async Task DialAsync(
IChannel downChannel,
IChannelFactory? upChannelFactory,
IPeerContext context)
{
TimeSpan elapsed = TimeSpan.Zero;

_ = Task.Run(
async () =>
{
while (true)
{
ReadOnlySequence<byte> read =
await downChannel.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow();
messageChannel.ReceiveMessage(
context.RemotePeer.Address,
read.ToArray(),
reply => _ =
downChannel.WriteAsync(
new ReadOnlySequence<byte>(reply)));
}
});

// Keep connection for 5 seconds
while (true)
{
if (messageChannel.TryGetMessageToSend(
context.RemotePeer.Address,
out byte[]? msg))
{
if (msg is not null)
{
await downChannel.WriteAsync(
new ReadOnlySequence<byte>(msg));
}

elapsed = TimeSpan.Zero;
}
else
{
await Task.Delay(50);
elapsed += TimeSpan.FromMilliseconds(50);
}
}
}

public async Task ListenAsync(
IChannel downChannel,
IChannelFactory? upChannelFactory,
IPeerContext context)
{
while (true)
{
ReadOnlySequence<byte> read =
await downChannel.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow();
messageChannel.ReceiveMessage(
context.RemotePeer.Address,
read.ToArray(),
msg => _ =
downChannel.WriteAsync(
new ReadOnlySequence<byte>(msg)));
}
}
}
}
23 changes: 23 additions & 0 deletions sdk/node/Libplanet.Node/Services/IMessageChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Libplanet.Net.Messages;
using Libplanet.Node.Protocols;
using Multiformats.Address;

namespace Libplanet.Node.Services;

public interface IMessageChannel
{
EventHandler<(Multiaddress, byte[], Action<byte[]>)>? OnMessageReceived
{
get;
set;
}

void SendMessage(Multiaddress address, byte[] message);

void ReceiveMessage(
Multiaddress sender,
byte[] message,
Action<byte[]> callback);

bool TryGetMessageToSend(Multiaddress address, out byte[]? message);
}
55 changes: 55 additions & 0 deletions sdk/node/Libplanet.Node/Services/MessageChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using System.Collections.Concurrent;
using Multiformats.Address;

namespace Libplanet.Node.Services;

public class MessageChannel : IMessageChannel
{
private readonly ConcurrentDictionary<Multiaddress, ConcurrentBag<byte[]>>
_messagesToSend = new();

public EventHandler<(Multiaddress, byte[], Action<byte[]>)>? OnMessageReceived
{
get;
set;
}

public void SendMessage(Multiaddress address, byte[] message)
{
if (!_messagesToSend.TryGetValue(address, out ConcurrentBag<byte[]>? bag))
{
bag = [];
_messagesToSend.TryAdd(address, bag);
}

bag.Add(message);
}

public void ReceiveMessage(
Multiaddress sender,
byte[] message,
Action<byte[]> callback)
{
OnMessageReceived?.Invoke(this, (sender, message, callback));
}

public bool TryGetMessageToSend(Multiaddress address, out byte[]? message)
{
if (_messagesToSend.TryGetValue(address, out ConcurrentBag<byte[]>? bag))
{
if (bag.IsEmpty)
{
message = null;
return false;
}

return bag.TryTake(out message);
}

message = null;
return false;
}
}
Loading

0 comments on commit a14d0e9

Please sign in to comment.