Skip to content

Commit

Permalink
Add cancellation support for .NET framework
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed May 26, 2021
1 parent 5460596 commit 1dc3a58
Showing 1 changed file with 54 additions and 9 deletions.
63 changes: 54 additions & 9 deletions Libplanet/Net/Transports/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,29 @@ public async Task<IEnumerable<Message>> SendMessageWithReplyAsync(
_logger.Error(e, logMsg, peer.Address, reqId, e.ExpectedVersion, e.ActualVersion);
throw;
}
catch (ArgumentException e)
{
// This exception is thrown on .NET framework when the given peer is invalid.
_logger.Error(
e,
"ArgumentException occurred during {FName} to {RequestId}. {E}",
nameof(SendMessageWithReplyAsync),
reqId,
e);

// To match with previous implementation, throws TimeoutException when it failed to
// find peer to send message.
throw new TimeoutException($"Cannot find peer {peer}.", e);
}
catch (SocketException e)
{
// This exception is thrown on .NET core when the given peer is invalid.
_logger.Error(
e,
"SocketException occurred during {FName} to {RequestId}.",
"SocketException occurred during {FName} to {RequestId}. {E}",
nameof(SendMessageWithReplyAsync),
reqId);
reqId,
e);

// To match with previous implementation, throws TimeoutException when it failed to
// find peer to send message.
Expand Down Expand Up @@ -453,7 +469,25 @@ internal async Task WriteMessageAsync(
BitConverter.GetBytes(length).CopyTo(buffer, 0);
serialized.CopyTo(buffer, 4);
NetworkStream stream = client.GetStream();
await stream.WriteAsync(buffer, 0, buffer.Length, cancellationToken);

// NOTE: Stream is forced to be closed because NetStream.WriteAsync()'s
// cancellation token never works.
using (cancellationToken.Register(() => stream.Close()))
{
try
{
await stream.WriteAsync(buffer, 0, buffer.Length, default);
}
catch (Exception)
{
if (cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException();
}

throw;
}
}
}

internal async Task<Message[]> ReadMessageAsync(
Expand All @@ -463,13 +497,24 @@ internal async Task<Message[]> ReadMessageAsync(
byte[] buffer = new byte[1000000];
NetworkStream stream = client.GetStream();
int bytesRead = 0;
try
{
bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken);
}
catch (OperationCanceledException)

// NOTE: Stream is forced to be closed because NetStream.ReadAsync()'s
// cancellation token never works.
using (cancellationToken.Register(() => stream.Close()))
{
throw new TaskCanceledException();
try
{
bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, default);
}
catch (Exception)
{
if (cancellationToken.IsCancellationRequested)
{
throw new TaskCanceledException();
}

throw;
}
}

_logger.Verbose("Received {Bytes} bytes from network stream.", bytesRead);
Expand Down

0 comments on commit 1dc3a58

Please sign in to comment.