Skip to content

Commit

Permalink
feat: 将Polly替换为Microsoft.Extensions.Resilience,使用更优雅.同时修复消费消息太慢的问题. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joesdu authored Jun 20, 2024
2 parents e9551da + 4d2fcdd commit 4c4bd54
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 51 deletions.
2 changes: 2 additions & 0 deletions sample/WebApi.Test.Unit/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
lc.ReadFrom.Configuration(hbc.Configuration)
.MinimumLevel.Override("Microsoft", logLevel)
.MinimumLevel.Override("System", logLevel)
// 添加下面这行来过滤掉 Microsoft.Extensions.Resilience 的日志
.MinimumLevel.Override("Polly", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Async(wt =>
{
Expand Down
3 changes: 2 additions & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
<PackageVersion Include="Swashbuckle.AspNetCore.SwaggerGen" Version="6.6.2" />
<PackageVersion Include="Swashbuckle.AspNetCore.SwaggerUI" Version="6.6.2" />
<!--microsoft asp.net core -->
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="9.0.0-preview.5.24306.7" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0-preview.5.24306.7" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="9.0.0-preview.5.24306.7" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="9.0.0-preview.5.24306.7" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.0-preview.5.24306.7" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0-preview.5.24306.7" />
<PackageVersion Include="Microsoft.Extensions.Resilience" Version="9.0.0-preview.5.24311.7" />
</ItemGroup>
</Project>
2 changes: 2 additions & 0 deletions src/EasilyNET.RabbitBus.AspNetCore/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
internal static class Constant
{
internal const string OptionName = "easily_net_rabbit_bus";

internal const string ResiliencePipelineName = "easilynet-default-resilience-pipeline";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

<ItemGroup>
<PackageReference Include="MessagePack" />
<PackageReference Include="Polly" />
<PackageReference Include="Microsoft.Extensions.Resilience" />
<PackageReference Include="RabbitMQ.Client" />
<PackageReference Include="Microsoft.Extensions.DependencyModel" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
Expand Down
49 changes: 16 additions & 33 deletions src/EasilyNET.RabbitBus.AspNetCore/EventBus.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
using EasilyNET.Core.Misc;
using EasilyNET.RabbitBus.AspNetCore;
using EasilyNET.RabbitBus.AspNetCore.Abstraction;
using EasilyNET.RabbitBus.AspNetCore.Configs;
using EasilyNET.RabbitBus.AspNetCore.Extensions;
using EasilyNET.RabbitBus.Core.Abstraction;
using EasilyNET.RabbitBus.Core.Attributes;
using EasilyNET.RabbitBus.Core.Enums;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.Registry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Reflection;

namespace EasilyNET.RabbitBus;

internal sealed class EventBus(IPersistentConnection conn, IOptionsMonitor<RabbitConfig> options, ISubscriptionsManager subsManager, IBusSerializer serializer, IServiceProvider sp, ILogger<EventBus> logger) : IBus, IDisposable
internal sealed class EventBus(IPersistentConnection conn, ISubscriptionsManager subsManager, IBusSerializer serializer, IServiceProvider sp, ILogger<EventBus> logger, ResiliencePipelineProvider<string> pipelineProvider) : IBus, IDisposable
{
private const string HandleName = nameof(IEventHandler<IEvent>.HandleAsync);
private readonly ConcurrentDictionary<(Type HandlerType, Type EventType), Delegate> _handleAsyncDelegateCache = [];

private readonly RabbitConfig config = options.Get(Constant.OptionName);

private bool disposed;

/// <inheritdoc />
Expand All @@ -52,15 +46,11 @@ public async Task Publish<T>(T @event, string? routingKey = null, byte? priority
// 在发布事件前检查是否已经取消发布
if (cancellationToken is not null && cancellationToken.Value.IsCancellationRequested) return;
var body = serializer.Serialize(@event, @event.GetType());
// 创建Policy规则
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
logger.LogError(ex, "无法发布: {EventId} 超时 {Timeout}s ({ExceptionMessage})", @event.EventId, $"{time.TotalSeconds:n1}", ex.Message));
await policy.ExecuteAsync(async () =>
var pipeline = pipelineProvider.GetPipeline(Constant.ResiliencePipelineName);
await pipeline.ExecuteAsync(async ct =>
{
logger.LogTrace("发布: {EventId}", @event.EventId);
await channel.BasicPublishAsync(exc.ExchangeName, routingKey ?? exc.RoutingKey, properties, body, true).ConfigureAwait(false);
await channel.BasicPublishAsync(exc.ExchangeName, routingKey ?? exc.RoutingKey, properties, body, true, ct).ConfigureAwait(false);
await conn.ReturnChannel(channel).ConfigureAwait(false);
}).ConfigureAwait(false);
}
Expand Down Expand Up @@ -104,15 +94,11 @@ public async Task Publish<T>(T @event, uint ttl, string? routingKey = null, byte
// 在发布事件前检查是否已经取消发布
if (cancellationToken is not null && cancellationToken.Value.IsCancellationRequested) return;
var body = serializer.Serialize(@event, @event.GetType());
// 创建Policy规则
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
logger.LogError(ex, "无法发布: {EventId} 超时 {Timeout}s ({ExceptionMessage})", @event.EventId, $"{time.TotalSeconds:n1}", ex.Message));
await policy.ExecuteAsync(async () =>
var pipeline = pipelineProvider.GetPipeline(Constant.ResiliencePipelineName);
await pipeline.ExecuteAsync(async ct =>
{
logger.LogTrace("发布: {EventId}", @event.EventId);
await channel.BasicPublishAsync(exc.ExchangeName, routingKey ?? exc.RoutingKey, properties, body, true).ConfigureAwait(false);
await channel.BasicPublishAsync(exc.ExchangeName, routingKey ?? exc.RoutingKey, properties, body, true, ct).ConfigureAwait(false);
await conn.ReturnChannel(channel).ConfigureAwait(false);
}).ConfigureAwait(false);
}
Expand Down Expand Up @@ -226,18 +212,15 @@ private async Task StartBasicConsume(Type eventType, ExchangeAttribute exc, ICha
private async Task ProcessEvent(Type eventType, byte[] message, bool isDlx, Func<ValueTask> ack)
{
logger.LogTrace("处理事件: {EventName}", eventType.Name);
var policy = Policy.Handle<BrokerUnreachableException>()
.Or<SocketException>()
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
logger.LogError(ex, "无法消费: {EventName} 超时 {Timeout}s ({ExceptionMessage})", eventType.Name, $"{time.TotalSeconds:n1}", ex.Message));
if (subsManager.HasSubscriptionsForEvent(eventType.Name, isDlx))
{
var @event = serializer.Deserialize(message, eventType);
var handlerTypes = subsManager.GetHandlersForEvent(eventType.Name, isDlx);
using var scope = sp.GetService<IServiceScopeFactory>()?.CreateScope();
await policy.ExecuteAsync(async () =>
var pipeline = pipelineProvider.GetPipeline(Constant.ResiliencePipelineName);
foreach (var handlerType in handlerTypes)
{
foreach (var handlerType in handlerTypes)
await pipeline.ExecuteAsync(async _ =>
{
var key = (handlerType, eventType);
if (!_handleAsyncDelegateCache.TryGetValue(key, out var cachedDelegate))
Expand All @@ -246,19 +229,19 @@ await policy.ExecuteAsync(async () =>
if (method is null)
{
logger.LogError($"无法找到{nameof(@event)}事件处理器");
continue; // 或者抛出异常
return; // 或者抛出异常
}
var delegateType = typeof(HandleAsyncDelegate<>).MakeGenericType(eventType);
var handler = scope?.ServiceProvider.GetService(handlerType);
if (handler is null) continue;
if (handler is null) return;
var handleAsyncDelegate = Delegate.CreateDelegate(delegateType, handler, method);
_handleAsyncDelegateCache[key] = handleAsyncDelegate;
cachedDelegate = handleAsyncDelegate;
}
if (cachedDelegate.DynamicInvoke(@event) is Task task) await task;
}
await ack.Invoke();
});
await ack.Invoke();
});
}
}
else
{
Expand Down
25 changes: 16 additions & 9 deletions src/EasilyNET.RabbitBus.AspNetCore/Manager/PersistentConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
using EasilyNET.RabbitBus.AspNetCore.Configs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.Registry;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System.Net.Sockets;

namespace EasilyNET.RabbitBus.AspNetCore.Manager;

Expand All @@ -18,6 +16,7 @@ internal sealed class PersistentConnection : IPersistentConnection, IDisposable
private readonly IConnectionFactory _connFactory;
private readonly SemaphoreSlim _connLock = new(1, 1);
private readonly ILogger<PersistentConnection> _logger;
private readonly ResiliencePipelineProvider<string> _pipelineProvider;
private readonly uint _poolCount;
private readonly SemaphoreSlim _reconnectLock = new(1, 1); // 用于控制重连并发的信号量
private readonly RabbitConfig config;
Expand All @@ -26,12 +25,13 @@ internal sealed class PersistentConnection : IPersistentConnection, IDisposable
private bool _disposed;
private bool _isReconnecting; // 用于控制重连尝试的标志位

public PersistentConnection(IConnectionFactory connFactory, IOptionsMonitor<RabbitConfig> options, ILogger<PersistentConnection> logger)
public PersistentConnection(IConnectionFactory connFactory, IOptionsMonitor<RabbitConfig> options, ILogger<PersistentConnection> logger, ResiliencePipelineProvider<string> pipelineProvider)
{
_connFactory = connFactory ?? throw new ArgumentNullException(nameof(connFactory));
config = options.Get(Constant.OptionName);
_poolCount = config.PoolCount < 1 ? (uint)Environment.ProcessorCount : config.PoolCount;
_logger = logger;
_pipelineProvider = pipelineProvider;
}

/// <inheritdoc />
Expand Down Expand Up @@ -93,14 +93,21 @@ internal override async Task TryConnect()
await _connLock.WaitAsync();
try
{
var policy = Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(config.RetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
_logger.LogWarning(ex, "RabbitMQ客户端在 {TimeOut}s 超时后无法创建链接,({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message));
await policy.ExecuteAsync(async () => _connection = config.AmqpTcpEndpoints is not null && config.AmqpTcpEndpoints.Count > 0 ? await _connFactory.CreateConnectionAsync(config.AmqpTcpEndpoints) : await _connFactory.CreateConnectionAsync());
var pipeline = _pipelineProvider.GetPipeline(Constant.ResiliencePipelineName);
await pipeline.ExecuteAsync(async ct =>
_connection = config.AmqpTcpEndpoints is not null && config.AmqpTcpEndpoints.Count > 0
? await _connFactory.CreateConnectionAsync(config.AmqpTcpEndpoints, ct)
: await _connFactory.CreateConnectionAsync(ct));
}
finally
{
// 先移除事件,以避免重复注册
if (_connection is not null)
{
_connection.ConnectionShutdown -= OnConnectionShutdown;
_connection.CallbackException -= OnCallbackException;
_connection.ConnectionBlocked -= OnConnectionBlocked;
}
if (IsConnected && _connection is not null)
{
_connection.ConnectionShutdown += OnConnectionShutdown;
Expand Down
42 changes: 35 additions & 7 deletions src/EasilyNET.RabbitBus.AspNetCore/ServiceCollectionExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
using EasilyNET.RabbitBus.Core.Abstraction;
using EasilyNET.RabbitBus.Core.Attributes;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Polly.Timeout;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using System.Diagnostics.CodeAnalysis;
using System.Net.Sockets;

// ReSharper disable UnusedMember.Global

Expand Down Expand Up @@ -49,13 +54,14 @@ public static void AddRabbitBus(this IServiceCollection services, IConfiguration

private static void InjectHandler(this IServiceCollection services)
{
var handlers = AssemblyHelper.FindTypes(o => o is
{
IsClass: true,
IsAbstract: false
} &&
o.IsBaseOn(typeof(IEventHandler<>)) &&
!o.HasAttribute<IgnoreHandlerAttribute>());
var handlers = AssemblyHelper.FindTypes(o =>
o is
{
IsClass: true,
IsAbstract: false
} &&
o.IsBaseOn(typeof(IEventHandler<>)) &&
!o.HasAttribute<IgnoreHandlerAttribute>());
foreach (var handler in handlers) services.AddSingleton(handler);
}

Expand Down Expand Up @@ -99,6 +105,28 @@ private static IServiceCollection RabbitPersistentConnection(this IServiceCollec
}
throw new("无法从配置中创建链接");
});
services.AddResiliencePipeline(Constant.ResiliencePipelineName, (builder, context) =>
{
var conf = context.ServiceProvider.GetRequiredService<IOptionsMonitor<RabbitConfig>>();
var config = conf.Get(Constant.OptionName);
var logger = context.ServiceProvider.GetRequiredService<ILogger<IPersistentConnection>>();
builder.AddRetry(new()
{
ShouldHandle = new PredicateBuilder().Handle<BrokerUnreachableException>().Handle<SocketException>().Handle<TimeoutRejectedException>(),
MaxRetryAttempts = config.RetryCount,
Delay = TimeSpan.FromMilliseconds(500),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
MaxDelay = TimeSpan.FromSeconds(10),
OnRetry = args =>
{
var ex = args.Outcome.Exception!;
logger.LogWarning(ex, "RabbitMQ客户端在 {TimeOut}s 超时后失败,({ExceptionMessage})", $"{args.Duration:n1}", ex.Message);
return ValueTask.CompletedTask;
}
});
builder.AddTimeout(TimeSpan.FromMinutes(1));
});
services.AddSingleton<IPersistentConnection, PersistentConnection>();
return services;
}
Expand Down

0 comments on commit 4c4bd54

Please sign in to comment.