Skip to content

Commit

Permalink
Added binary serializer with support for Stream, Memory<byte>, IMemor…
Browse files Browse the repository at this point in the history
…yOwner<byte>
  • Loading branch information
kekekeks committed Mar 23, 2023
1 parent e73804b commit a937911
Show file tree
Hide file tree
Showing 20 changed files with 1,027 additions and 39 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
*/bin
*/obj
**/bin
**/obj
*.suo
*.DotSettings.user
*.user
Expand Down
12 changes: 12 additions & 0 deletions CoreRPC.sln
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CoreRPC.AspNetCore", "CoreR
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoreRPC.Testing", "CoreRPC.Testing\CoreRPC.Testing.csproj", "{AE2104C8-05E5-4D23-8711-21CFB7EDF9AD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoreRPC.JsonLikeBinarySerializer", "src\CoreRPC.JsonLikeBinarySerializer\CoreRPC.JsonLikeBinarySerializer.csproj", "{DE2F5A9C-192D-421C-AAF2-49334EFD2B90}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CoreRPC.JsonLikeBinaryReaderWriter", "src\CoreRPC.JsonLikeBinaryReaderWriter\CoreRPC.JsonLikeBinaryReaderWriter.csproj", "{1F21E069-DDE1-4ADF-8247-D1104991D7DE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -33,6 +37,14 @@ Global
{AE2104C8-05E5-4D23-8711-21CFB7EDF9AD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AE2104C8-05E5-4D23-8711-21CFB7EDF9AD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AE2104C8-05E5-4D23-8711-21CFB7EDF9AD}.Release|Any CPU.Build.0 = Release|Any CPU
{DE2F5A9C-192D-421C-AAF2-49334EFD2B90}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DE2F5A9C-192D-421C-AAF2-49334EFD2B90}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DE2F5A9C-192D-421C-AAF2-49334EFD2B90}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DE2F5A9C-192D-421C-AAF2-49334EFD2B90}.Release|Any CPU.Build.0 = Release|Any CPU
{1F21E069-DDE1-4ADF-8247-D1104991D7DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1F21E069-DDE1-4ADF-8247-D1104991D7DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1F21E069-DDE1-4ADF-8247-D1104991D7DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1F21E069-DDE1-4ADF-8247-D1104991D7DE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
2 changes: 1 addition & 1 deletion CoreRPC/Engine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public Engine (IMethodCallSerializer serializer, IMethodBinder binder)
}

public Engine()
: this(new JsonMethodCallSerializer(false), new DefaultMethodBinder())
: this(new JsonMethodCallSerializer(), new DefaultMethodBinder())
{

}
Expand Down
13 changes: 8 additions & 5 deletions CoreRPC/RequestHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ async Task IRequestHandler.HandleRequest (IRequest req)
object res = null;
try
{
if (_interceptor != null)
using (call)
{
res = _interceptor.Intercept(call, req.Context,
() => ConvertToTask(call.Method.Invoke(call.Target, call.Arguments)));
if (_interceptor != null)
{
res = _interceptor.Intercept(call, req.Context,
() => ConvertToTask(call.Method.Invoke(call.Target, call.Arguments)));
}
else
res = call.Method.Invoke(call.Target, call.Arguments);
}
else
res = call.Method.Invoke(call.Target, call.Arguments);
}
catch (Exception e)
{
Expand Down
74 changes: 51 additions & 23 deletions CoreRPC/Serialization/JsonMethodCallSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Reflection;
using System.Text;
Expand All @@ -14,31 +15,26 @@ namespace CoreRPC.Serialization
public class JsonMethodCallSerializer : IMethodCallSerializer
{
private readonly JsonSerializer _serializer;
private readonly bool _useBson;

public JsonMethodCallSerializer(JsonSerializer serializer, bool useBson = false)
public JsonMethodCallSerializer(JsonSerializer serializer)
{
_serializer = serializer;
_useBson = useBson;
}

public JsonMethodCallSerializer(bool useBson = false) : this(new JsonSerializer(), useBson)
public JsonMethodCallSerializer() : this(new JsonSerializer())
{

}

private static readonly Encoding Utf8 = new UTF8Encoding(false);

JsonWriter CreateWriter(Stream stream) => _useBson
? (JsonWriter) new BsonWriter(stream) {CloseOutput = false}
: new JsonTextWriter(new StreamWriter(stream, Utf8, 1024, true));
protected virtual JsonWriter CreateWriter(Stream stream) =>
new JsonTextWriter(new StreamWriter(stream, Utf8, 1024, true));

JsonReader CreateReader(Stream stream) => _useBson
? (JsonReader) new BsonReader(stream)
: new JsonTextReader(new StreamReader(stream))
{
DateParseHandling = DateParseHandling.None
};
protected virtual JsonReader CreateReader(Stream stream) => new JsonTextReader(new StreamReader(stream))
{
DateParseHandling = DateParseHandling.None
};

public void SerializeCall(Stream stream, IMethodBinder binder, string target, MethodCall call)
{
Expand All @@ -49,7 +45,7 @@ public void SerializeCall(Stream stream, IMethodBinder binder, string target, Me
writer.WritePropertyName("Target");
writer.WriteValue(target);
writer.WritePropertyName("MethodSignature");
writer.WriteValue(_useBson ? (object) sig : Convert.ToBase64String(sig));
writer.WriteValue(sig);
writer.WritePropertyName("Arguments");
_serializer.Serialize(writer, call.Arguments);
writer.WriteEndObject();
Expand All @@ -58,18 +54,32 @@ public void SerializeCall(Stream stream, IMethodBinder binder, string target, Me



public MethodCall DeserializeCall(Stream stream, IMethodBinder binder,
public virtual MethodCall DeserializeCall(Stream stream, IMethodBinder binder,
ITargetSelector selector, object callContext)
{
var reader = CreateReader(stream).MoveToContent();
var rv = new MethodCall
{
Target = selector.GetTarget(reader.ReadProperty("Target").ToString(), callContext)
};
var call = new MethodCall();
DeserializeCallCore(call, CreateReader(stream), binder, selector, callContext);
return call;
}

protected virtual void DeserializeCallCore(MethodCall rv, JsonReader reader,
IMethodBinder binder, ITargetSelector selector, object callContext)
{
reader.MoveToContent();

var osig = reader.ReadProperty("MethodSignature");
var sig = osig as byte[] ?? Convert.FromBase64String((string) osig);
rv.Target = selector.GetTarget(reader.ReadProperty("Target").ToString(), callContext);

reader.ExpectProperty("MethodSignature");
var osig = reader.Value;
byte[] sig;
if (osig is byte[])
sig = (byte[])osig;
else if (osig is string)
sig = Convert.FromBase64String((string)osig);
else
sig = (byte[])TypeDescriptor.GetConverter(osig).ConvertTo(osig, typeof(byte[]));
reader.Next();

rv.Method = binder.GetInfoProviderFor(rv.Target).GetMethod(sig);
reader.ExpectProperty("Arguments");

Expand All @@ -96,7 +106,6 @@ public MethodCall DeserializeCall(Stream stream, IMethodBinder binder,
reader.Next();
}
reader.MoveToEnd();
return rv;
}

public void SerializeResult(Stream stream, object result)
Expand Down Expand Up @@ -181,4 +190,23 @@ public static JsonReader MoveToEnd(this JsonReader reader)
return reader;
}
}

public class BsonMethodCallSerializer : JsonMethodCallSerializer
{
protected override JsonWriter CreateWriter(Stream stream) =>
new BsonWriter(stream) { CloseOutput = false };

protected override JsonReader CreateReader(Stream stream) => new BsonReader(stream)
{ DateParseHandling = DateParseHandling.None };

public BsonMethodCallSerializer()
{

}

public BsonMethodCallSerializer(JsonSerializer serializer) : base(serializer)
{

}
}
}
8 changes: 6 additions & 2 deletions CoreRPC/Transferable/MethodCall.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System.Reflection;
using System;
using System.Reflection;

namespace CoreRPC.Transferable
{
public class MethodCall
public class MethodCall : IDisposable
{
public object Target { get; set; }
public MethodInfo Method { get; set; }
public object[] Arguments { get; set; }
public virtual void Dispose()
{
}
}
}
97 changes: 97 additions & 0 deletions Tests/End2EndTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Tasks;
using CoreRPC;
using CoreRPC.Binding.Default;
using CoreRPC.JsonLikeBinarySerializer;
using CoreRPC.Routing;
using CoreRPC.Transport;
using CoreRPC.Utility;
using Microsoft.IO;
using Xunit;

namespace Tests
{
public class BinarySerializedEnd2EndTests
{
public class BinaryResponse
{
public IMemoryOwner<byte> Memory { get; set; }
public Stream Stream { get; set; }
public byte[] Bytes { get; set; }
}

public interface IMyRpc
{
Task<BinaryResponse> Test(Stream foo, List<Stream> bar, List<Memory<byte>> memories, byte[] bytes);
}

public class MyRpc : IMyRpc
{
public async Task<BinaryResponse> Test(Stream foo, List<Stream> bar, List<Memory<byte>> memories,
byte[] bytes)
{
var ms = new MemoryStream();
foo.CopyTo(ms);
foreach(var s in bar)
s.CopyTo(ms);
foreach (var mem in memories)
{
MemoryMarshal.TryGetArray(mem, out ArraySegment<byte> seg);
ms.Write(seg.Array, seg.Offset, seg.Count);
}

ms.Write(bytes, 0, bytes.Length);

ms.Position = 0;
var rev = ms.ToArray().Reverse().ToArray();
return new BinaryResponse
{
Stream = ms,
Memory = new ArrayMemoryOwner(rev),
Bytes = rev
};
}
}


[Fact]
public void ModernMemoryObjectsCanBePassedViaRpc()
{
var engine = new Engine(new BinaryJsonLikeMethodCallSerializer(), new DefaultMethodBinder());
var handler = engine.CreateProxy<IMyRpc>(
new InternalThreadPoolTransport(engine.CreateRequestHandler(new Selector())));

var memoryStream = new MemoryStream(Encoding.UTF8.GetBytes("Stream1"));
var memoryStreams = new List<Stream>
{
new MemoryStream(Encoding.UTF8.GetBytes("Stream2"))
};
var result = handler.Test(memoryStream, memoryStreams, new List<Memory<byte>>
{
new Memory<byte>(Encoding.UTF8.GetBytes("Memory1")),
new Memory<byte>(Encoding.UTF8.GetBytes("Memory2"))
}, Encoding.UTF8.GetBytes("Bytes")).Result;
Assert.Equal("Stream1Stream2Memory1Memory2Bytes", Encoding.UTF8.GetString(
((RecyclableMemoryStream)result.Stream).ReadAsBytes()));
Assert.Equal("Stream1Stream2Memory1Memory2Bytes".Reverse(), Encoding.UTF8.GetString(
result.Memory.Memory.ToArray()));
Assert.Equal("Stream1Stream2Memory1Memory2Bytes".Reverse(), Encoding.UTF8.GetString(
result.Bytes));
}


class Selector : ITargetSelector
{
public object GetTarget(string target, object callContext)
{
return new MyRpc();
}
}
}
}
Loading

0 comments on commit a937911

Please sign in to comment.