Skip to content

Commit

Permalink
feat: 优化RabbitMQ消息处理,和其他优化.性能大概能再翻倍.
Browse files Browse the repository at this point in the history
  • Loading branch information
joesdu committed Jun 20, 2024
1 parent 12f8892 commit 1222207
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 81 deletions.
9 changes: 9 additions & 0 deletions sample/WebApi.Test.Unit/Common/Constant.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace WebApi.Test.Unit.Common;

internal static class Constant
{
/// <summary>
/// InstanceName
/// </summary>
internal const string InstanceName = "EasilyNET";
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Net;

namespace WebApi.Test.Unit;
namespace WebApi.Test.Unit.Common;

/// <summary>
/// 返回对象实体
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Net;
using System.Text.Json;
using System.Text.Json.Serialization;
using WebApi.Test.Unit.Common;

// ReSharper disable UnusedType.Global
// ReSharper disable UnusedMember.Global
Expand Down
5 changes: 3 additions & 2 deletions sample/WebApi.Test.Unit/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
using Serilog.Sinks.OpenTelemetry;
using Serilog.Sinks.SystemConsole.Themes;
using WebApi.Test.Unit;
using WebApi.Test.Unit.Common;

Console.Title = "❤️ EasilyNET";
Console.Title = $"❤️ {Constant.InstanceName}";
AssemblyHelper.AddExcludeLibs("Npgsql.");
var builder = WebApplication.CreateBuilder(args);

Expand Down Expand Up @@ -48,7 +49,7 @@
};
c.ResourceAttributes = new Dictionary<string, object>
{
["service.name"] = hbc.Configuration["OTEL_SERVICE_NAME"] ?? "EasilyNET"
["service.name"] = hbc.Configuration["OTEL_SERVICE_NAME"] ?? Constant.InstanceName
};
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Mvc.Filters;
using System.Net;
using System.Text.Json.Serialization;
using WebApi.Test.Unit.Common;

// ReSharper disable UnusedType.Local
// ReSharper disable ClassNeverInstantiated.Global
Expand Down
3 changes: 2 additions & 1 deletion sample/WebApi.Test.Unit/ServiceModules/MemoryCacheModule.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using EasilyNET.AutoDependencyInjection.Contexts;
using EasilyNET.AutoDependencyInjection.Modules;
using WebApi.Test.Unit.Common;

namespace WebApi.Test.Unit;

Expand All @@ -13,7 +14,7 @@ public override void ConfigureServices(ConfigureServicesContext context)
context.Services.AddStackExchangeRedisCache(c =>
{
c.Configuration = config["CONNECTIONSTRINGS_GARNET"];
c.InstanceName = "EasilyNET";
c.InstanceName = Constant.InstanceName;
});
base.ConfigureServices(context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using EasilyNET.AutoDependencyInjection.Contexts;
using EasilyNET.AutoDependencyInjection.Modules;
using WebApi.Test.Unit.Common;

namespace WebApi.Test.Unit;

Expand All @@ -15,7 +16,7 @@ public override void ConfigureServices(ConfigureServicesContext context)
context.Services.AddStackExchangeRedisOutputCache(c =>
{
c.Configuration = config["CONNECTIONSTRINGS_GARNET"];
c.InstanceName = "EasilyNET";
c.InstanceName = Constant.InstanceName;
});
}

Expand Down
8 changes: 4 additions & 4 deletions src/EasilyNET.MongoGridFS.AspNetCore/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ namespace EasilyNET.MongoGridFS.AspNetCore;
/// <summary>
/// 定义一些常量
/// </summary>
public static class Constant
internal static class Constant
{
/// <summary>
/// 默认配置名称
/// </summary>
public const string ConfigName = "EasilyNetGridfs";
internal const string ConfigName = "EasilyNetGridfs";

/// <summary>
/// 默认桶名称
/// </summary>
public const string BucketName = "easilyfs";
internal const string BucketName = "easilyfs";

/// <summary>
/// 默认数据库名称
/// </summary>
public const string DefaultDbName = "easily";
internal const string DefaultDbName = "easily";
}
61 changes: 29 additions & 32 deletions src/EasilyNET.RabbitBus.AspNetCore/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Reflection;

Expand All @@ -21,6 +22,7 @@ namespace EasilyNET.RabbitBus;
internal sealed class EventBus(IPersistentConnection conn, IOptionsMonitor<RabbitConfig> options, ISubscriptionsManager subsManager, IBusSerializer serializer, IServiceProvider sp, ILogger<EventBus> logger) : IBus, IDisposable
{
private const string HandleName = nameof(IEventHandler<IEvent>.HandleAsync);
private readonly ConcurrentDictionary<(Type HandlerType, Type EventType), Delegate> _handleAsyncDelegateCache = [];

private readonly RabbitConfig config = options.Get(Constant.OptionName);

Expand All @@ -31,7 +33,7 @@ public async Task Publish<T>(T @event, string? routingKey = null, byte? priority
{
if (!conn.IsConnected) await conn.TryConnect();
var type = @event.GetType();
var exc = type.GetCustomAttribute<ExchangeAttribute>() ?? throw new($"{nameof(@event)}未设置<{nameof(ExchangeAttribute)}>,无法创建发布事件");
var exc = type.GetCustomAttribute<ExchangeAttribute>() ?? throw new($"{nameof(@event)}未设置<{nameof(ExchangeAttribute)}>,无法创建消息");
if (!exc.Enable) return;
var channel = await conn.GetChannel();
var properties = new BasicProperties
Expand All @@ -53,9 +55,9 @@ public async Task Publish<T>(T @event, string? routingKey = null, byte? priority
// 创建Policy规则
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
logger.LogError(ex, "无法发布: {EventId} 超时 {Timeout}s ({ExceptionMessage})", @event.EventId, $"{time.TotalSeconds:n1}", ex.Message));
await policy.Execute(async () =>
await policy.ExecuteAsync(async () =>
{
logger.LogTrace("发布: {EventId}", @event.EventId);
await channel.BasicPublishAsync(exc.ExchangeName, routingKey ?? exc.RoutingKey, properties, body, true).ConfigureAwait(false);
Expand All @@ -68,9 +70,9 @@ public async Task Publish<T>(T @event, uint ttl, string? routingKey = null, byte
{
if (!conn.IsConnected) await conn.TryConnect();
var type = @event.GetType();
var exc = type.GetCustomAttribute<ExchangeAttribute>() ?? throw new($"{nameof(@event)}未设置<{nameof(ExchangeAttribute)}>,无法发布事件");
var exc = type.GetCustomAttribute<ExchangeAttribute>() ?? throw new($"{nameof(@event)}未设置<{nameof(ExchangeAttribute)}>,无法创建消息");
if (!exc.Enable) return;
if (exc is not { WorkModel: EModel.Delayed } | exc.IsDlx is not true) throw new($"延时队列的交换机类型必须为{nameof(EModel.Delayed)}isDlx 参数必须为 true");
if (exc is not { WorkModel: EModel.Delayed } | exc.IsDlx is not true) throw new($"延时队列的交换机类型必须为{nameof(EModel.Delayed)}{nameof(ExchangeAttribute.IsDlx)} 必须为 true");
var channel = await conn.GetChannel();
var properties = new BasicProperties
{
Expand Down Expand Up @@ -105,9 +107,9 @@ public async Task Publish<T>(T @event, uint ttl, string? routingKey = null, byte
// 创建Policy规则
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
logger.LogError(ex, "无法发布: {EventId} 超时 {Timeout}s ({ExceptionMessage})", @event.EventId, $"{time.TotalSeconds:n1}", ex.Message));
await policy.Execute(async () =>
await policy.ExecuteAsync(async () =>
{
logger.LogTrace("发布: {EventId}", @event.EventId);
await channel.BasicPublishAsync(exc.ExchangeName, routingKey ?? exc.RoutingKey, properties, body, true).ConfigureAwait(false);
Expand All @@ -132,11 +134,7 @@ internal async Task Subscribe()
private async Task InitialRabbit()
{
var events = AssemblyHelper.FindTypes(o => o is { IsClass: true, IsAbstract: false } && o.IsBaseOn(typeof(IEvent)) && o.HasAttribute<ExchangeAttribute>());
var handlers = AssemblyHelper.FindTypes(o => o is
{
IsClass: true,
IsAbstract: false
} &&
var handlers = AssemblyHelper.FindTypes(o => o is { IsClass: true, IsAbstract: false } &&
o.IsBaseOn(typeof(IEventHandler<>)) &&
!o.HasAttribute<IgnoreHandlerAttribute>()).Select(s => s.GetTypeInfo()).ToList();
foreach (var @event in events)
Expand Down Expand Up @@ -209,10 +207,6 @@ private async Task StartBasicConsume(Type eventType, ExchangeAttribute exc, ICha
{
try
{
//if (message.Contains("throw-fake-exception", StringComparison.InvariantCultureIgnoreCase))
//{
// throw new InvalidOperationException($"假异常请求:{message}");
//}
await ProcessEvent(eventType, ea.Body.Span.ToArray(), exc.IsDlx, async () => await channel.BasicAckAsync(ea.DeliveryTag, false).ConfigureAwait(false));
}
catch (Exception ex)
Expand All @@ -234,33 +228,34 @@ private async Task ProcessEvent(Type eventType, byte[] message, bool isDlx, Func
logger.LogTrace("处理事件: {EventName}", eventType.Name);
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetry(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
logger.LogError(ex, "无法消费: {EventName} 超时 {Timeout}s ({ExceptionMessage})", eventType.Name, $"{time.TotalSeconds:n1}", ex.Message));
if (subsManager.HasSubscriptionsForEvent(eventType.Name, isDlx))
{
var @event = serializer.Deserialize(message, eventType);
var handlerTypes = subsManager.GetHandlersForEvent(eventType.Name, isDlx);
using var scope = sp.GetService<IServiceScopeFactory>()?.CreateScope();
await policy.Execute(async () =>
await policy.ExecuteAsync(async () =>
{
foreach (var handlerType in handlerTypes)
{
if (@event is null)
var key = (handlerType, eventType);
if (!_handleAsyncDelegateCache.TryGetValue(key, out var cachedDelegate))
{
throw new($"{nameof(@event)}不能为空");
var method = handlerType.GetMethod(HandleName, [eventType]);
if (method is null)
{
logger.LogError($"无法找到{nameof(@event)}事件处理器");
continue; // 或者抛出异常
}
var delegateType = typeof(HandleAsyncDelegate<>).MakeGenericType(eventType);
var handler = scope?.ServiceProvider.GetService(handlerType);
if (handler is null) continue;
var handleAsyncDelegate = Delegate.CreateDelegate(delegateType, handler, method);
_handleAsyncDelegateCache[key] = handleAsyncDelegate;
cachedDelegate = handleAsyncDelegate;
}
var method = typeof(IEventHandler<>).MakeGenericType(eventType).GetMethod(HandleName);
if (method is null)
{
logger.LogError($"无法找到{nameof(@event)}事件处理器下处理方法");
throw new($"无法找到{nameof(@event)}事件处理器下处理方法");
}
var handler = scope?.ServiceProvider.GetService(handlerType);
if (handler is null) continue;
await Task.Yield();
var obj = method.Invoke(handler, [@event]);
if (obj is null) continue;
await (Task)obj;
if (cachedDelegate.DynamicInvoke(@event) is Task task) await task;
}
await ack.Invoke();
});
Expand All @@ -270,4 +265,6 @@ await policy.Execute(async () =>
logger.LogError("没有订阅事件:{EventName}", eventType.Name);
}
}

private delegate Task HandleAsyncDelegate<in TEvent>(TEvent @event) where TEvent : IEvent;
}
15 changes: 11 additions & 4 deletions src/EasilyNET.RabbitBus.AspNetCore/Manager/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,37 @@ namespace EasilyNET.RabbitBus.AspNetCore.Manager;
internal sealed class ChannelPool(IConnection connection, uint poolCount) : IChannelPool, IDisposable
{
private readonly ConcurrentBag<IChannel> _channels = [];
private int _currentCount; // 使用原子计数器来跟踪池中的通道数量

private bool _disposed; // To detect redundant calls

/// <inheritdoc />
public async Task<IChannel> GetChannel() => _channels.TryTake(out var channel) ? await Task.FromResult(channel) : await connection.CreateChannelAsync();
public async Task<IChannel> GetChannel()
{
if (!_channels.TryTake(out var channel)) return await connection.CreateChannelAsync();
Interlocked.Decrement(ref _currentCount); // 安全地减少计数
return channel;
}

/// <inheritdoc />
public async Task ReturnChannel(IChannel channel)
{
if (_channels.Count >= poolCount)
// 在32和64位平台上,对int类型的读取操作都是原子的,所以不需要Interlocked.Read(ref _currentCount)
if (_currentCount >= poolCount)
{
await channel.CloseAsync();
channel.Dispose();
}
else
{
_channels.Add(channel);
Interlocked.Increment(ref _currentCount); // 安全地增加计数
}
}

public void Dispose()
{
if (_disposed)
return;
if (_disposed) return;
foreach (var channel in _channels)
{
try
Expand Down
48 changes: 32 additions & 16 deletions src/EasilyNET.RabbitBus.AspNetCore/Manager/PersistentConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ internal sealed class PersistentConnection : IPersistentConnection, IDisposable
private readonly SemaphoreSlim _connLock = new(1, 1);
private readonly ILogger<PersistentConnection> _logger;
private readonly uint _poolCount;
private readonly SemaphoreSlim _reconnectLock = new(1, 1); // 用于控制重连并发的信号量
private readonly RabbitConfig config;
private ChannelPool? _channelPool;
private IConnection? _connection;
private bool _disposed;
private bool _isReconnecting; // 用于控制重连尝试的标志位

public PersistentConnection(IConnectionFactory connFactory, IOptionsMonitor<RabbitConfig> options, ILogger<PersistentConnection> logger)
{
Expand Down Expand Up @@ -78,7 +80,11 @@ internal override async Task<IChannel> GetChannel() =>
: await _channelPool.GetChannel();

/// <inheritdoc />
internal override async Task ReturnChannel(IChannel channel) => await _channelPool!.ReturnChannel(channel);
internal override async Task ReturnChannel(IChannel channel)
{
if (_channelPool is null) throw new("通道池为空");
await _channelPool.ReturnChannel(channel);
}

/// <inheritdoc />
internal override async Task TryConnect()
Expand All @@ -89,9 +95,9 @@ internal override async Task TryConnect()
{
var policy = Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
_logger.LogWarning(ex, "RabbitMQ客户端在 {TimeOut}s 超时后无法创建链接,({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message));
await policy.Execute(async () => _connection = config.AmqpTcpEndpoints is not null && config.AmqpTcpEndpoints.Count > 0 ? await _connFactory.CreateConnectionAsync(config.AmqpTcpEndpoints) : await _connFactory.CreateConnectionAsync());
await policy.ExecuteAsync(async () => _connection = config.AmqpTcpEndpoints is not null && config.AmqpTcpEndpoints.Count > 0 ? await _connFactory.CreateConnectionAsync(config.AmqpTcpEndpoints) : await _connFactory.CreateConnectionAsync());
}
finally
{
Expand All @@ -112,21 +118,31 @@ internal override async Task TryConnect()
}
}

private void OnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e)
/// <summary>
/// 重连
/// </summary>
/// <returns></returns>
private async Task TryReconnect()
{
_logger.LogWarning("RabbitMQ连接关闭,正在尝试重新连接");
Task.Factory.StartNew(async () => await TryConnect());
if (_isReconnecting) return; // 如果已经在尝试重连,则直接返回
await _reconnectLock.WaitAsync(); // 请求重连锁
try
{
if (_isReconnecting) return; // 再次检查标志位,以避免重复重连
_isReconnecting = true; // 设置标志位,表示开始重连尝试
_logger.LogWarning("RabbitMQ客户端尝试重新连接");
await TryConnect(); // 尝试重新连接
}
finally
{
_isReconnecting = false; // 重置标志位,表示重连尝试结束
_reconnectLock.Release(); // 释放重连锁
}
}

private void OnCallbackException(object? sender, CallbackExceptionEventArgs e)
{
_logger.LogWarning("RabbitMQ连接抛出异常,正在重试");
Task.Factory.StartNew(async () => await TryConnect());
}
private async void OnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e) => await TryReconnect();

private void OnConnectionShutdown(object? sender, ShutdownEventArgs reason)
{
_logger.LogWarning("RabbitMQ连接处于关闭状态,正在尝试重新连接");
Task.Factory.StartNew(async () => await TryConnect());
}
private async void OnCallbackException(object? sender, CallbackExceptionEventArgs e) => await TryReconnect();

private async void OnConnectionShutdown(object? sender, ShutdownEventArgs reason) => await TryReconnect();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal sealed class SubscribeService(IServiceProvider serviceProvider) : Backg
protected override async Task ExecuteAsync(CancellationToken cancelToken)
{
using var scope = serviceProvider.CreateScope();
var bus = scope.ServiceProvider.GetService<IBus>() as EventBus ?? throw new("RabbitMQ集成事件总线没有注册");
var bus = scope.ServiceProvider.GetRequiredService<IBus>() as EventBus ?? throw new("ibus service not register");
await bus.Subscribe();
while (!cancelToken.IsCancellationRequested) await Task.Delay(5000, cancelToken);
}
Expand Down
Loading

0 comments on commit 1222207

Please sign in to comment.