Skip to content

Commit

Permalink
fix: 修复一些bug以及性能优化. (#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
joesdu authored Aug 12, 2024
2 parents 45adf10 + d10dd24 commit e0aaefc
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 28 deletions.
2 changes: 1 addition & 1 deletion sample/WebApi.Test.Unit/AppWebModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace WebApi.Test.Unit;
typeof(CorsModule),
typeof(ControllersModule),
typeof(OutPutCachingModule),
typeof(MemoryCacheModule),
typeof(GarnetCacheModule),
typeof(MongoModule),
typeof(MongoFSModule),
typeof(DistributedLockModule),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace WebApi.Test.Unit.ServiceModules;

/// <inheritdoc />
internal sealed class MemoryCacheModule : AppModule
internal sealed class GarnetCacheModule : AppModule
{
/// <inheritdoc />
public override void ConfigureServices(ConfigureServicesContext context)
Expand Down
4 changes: 2 additions & 2 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
<PackageVersion Include="MongoDB.Driver" Version="2.28.0" />
<PackageVersion Include="MongoDB.Driver.Core" Version="2.28.0" />
<PackageVersion Include="MongoDB.Driver.GridFS" Version="2.28.0" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0-rc.7" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0-rc.8" />
<PackageVersion Include="Serilog" Version="4.0.2-dev-02220" />
<PackageVersion Include="Spectre.Console.Json" Version="0.49.2-preview.0.15" />
<PackageVersion Include="Spectre.Console.Json" Version="0.49.2-preview.0.16" />
<PackageVersion Include="Swashbuckle.AspNetCore.SwaggerGen" Version="6.7.0" />
<PackageVersion Include="Swashbuckle.AspNetCore.SwaggerUI" Version="6.7.0" />
<!--microsoft asp.net core -->
Expand Down
30 changes: 20 additions & 10 deletions src/EasilyNET.AutoDependencyInjection/Modules/AppModule.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Collections.Frozen;
using System.Reflection;
using EasilyNET.AutoDependencyInjection.Abstractions;
using EasilyNET.AutoDependencyInjection.Contexts;
Expand Down Expand Up @@ -33,17 +32,28 @@ public virtual void ApplicationInitialization(ApplicationContext context) { }
public IEnumerable<Type> GetDependedTypes(Type? moduleType = null)
{
moduleType ??= GetType();
var dependedTypes = moduleType.GetCustomAttributes().OfType<IDependedTypesProvider>().ToArray();
if (dependedTypes.Length == 0) return [];
List<Type> dependList = [];
foreach (var dependedType in dependedTypes)
var dependedTypes = moduleType.GetCustomAttributes().OfType<IDependedTypesProvider>();
if (!dependedTypes.Any()) return [];
var dependSet = new HashSet<Type>();
var stack = new Stack<Type>([moduleType]);
while (stack.Count > 0)
{
var depends = dependedType.GetDependedTypes().ToFrozenSet();
if (depends.Count == 0) continue;
dependList.AddRange(depends);
foreach (var type in depends) dependList.AddRange(GetDependedTypes(type));
var currentType = stack.Pop();
if (!dependSet.Add(currentType)) continue;
var cdt = currentType.GetCustomAttributes().OfType<IDependedTypesProvider>();
foreach (var dependedType in cdt)
{
var depends = dependedType.GetDependedTypes();
foreach (var type in depends)
{
if (dependSet.Add(type))
{
stack.Push(type);
}
}
}
}
return dependList.Distinct();
return dependSet;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ protected ModuleApplicationBase(Type startupModuleType, IServiceCollection servi
Services = services;
services.AddSingleton<IModuleApplication>(this);
services.TryAddObjectAccessor<IServiceProvider>();
// TODO:希望替换成使用SG的方式来注入服务,将GetAllEnabledModule函数替换掉.
Source = GetAllEnabledModule(services);
Modules = LoadModules;
}
Expand All @@ -43,9 +42,11 @@ private IReadOnlyList<IAppModule> LoadModules
var depends = module.GetDependedTypes();
foreach (var dependType in depends)
{
var dependModule = Source.ToList().Find(m => m.GetType() == dependType);
if (dependModule is null) continue;
if (!modules.Contains(dependModule)) modules.Add(dependModule);
var dependModule = Source.FirstOrDefault(m => m.GetType() == dependType);
if (dependModule is not null && !modules.Contains(dependModule))
{
modules.Add(dependModule);
}
}
return modules;
}
Expand Down
2 changes: 1 addition & 1 deletion src/EasilyNET.Core/DeepCopy/DeepCopyByExpressionTrees.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static void MemberwiseCloneInputToOutputExpression(Type type, ParameterE
///// Intended code:
/////
///// var output = (<type>)input.MemberwiseClone();
var memberwiseCloneMethod = ObjectType.GetMethod("MemberwiseClone", BindingFlags.NonPublic | BindingFlags.Instance)!;
var memberwiseCloneMethod = ObjectType.GetMethod(nameof(MemberwiseClone), BindingFlags.NonPublic | BindingFlags.Instance)!;
var memberwiseCloneInputExpression = Expression.Assign(outputVariable, Expression.Convert(Expression.Call(inputParameter, memberwiseCloneMethod), type));
expressions.Add(memberwiseCloneInputExpression);
}
Expand Down
2 changes: 1 addition & 1 deletion src/EasilyNET.Core/DeepCopy/DeepCopyByReflection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace EasilyNET.Core.DeepCopy;
/// </summary>
public static class DeepCopyByReflection
{
private static readonly MethodInfo? CloneMethod = typeof(object).GetMethod("Memberwise Clone", BindingFlags.NonPublic | BindingFlags.Instance);
private static readonly MethodInfo? CloneMethod = typeof(object).GetMethod(nameof(MemberwiseClone), BindingFlags.NonPublic | BindingFlags.Instance);

/// <summary>
/// 这个函数接收一个Type类型的参数,然后检查这个类型是否为string类型,或者这个类型是否为值类型并且是原始类型。如果满足这些条件中的任何一个,那么这个函数就会返回true,否则返回false。
Expand Down
97 changes: 97 additions & 0 deletions src/EasilyNET.Core/System/AsyncBarrier.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// ReSharper disable UnusedType.Global
// ReSharper disable UnusedMember.Global

namespace EasilyNET.Core.System;

/// <summary>
/// 表示一个异步屏障,它会阻塞一组任务,直到所有任务都到达屏障,通常用于需要协调多个异步任务的场景,确保所有任务在某个同步点之前都已完成特定的工作,然后再继续执行
/// </summary>
/// <example>
/// <code>
/// <![CDATA[
/// var barrier = new AsyncBarrier(3);
/// var cts = new CancellationTokenSource();
///
/// // 启动三个任务,每个任务都会在屏障处等待
/// var tasks = new List&lt;Task&gt;
/// {
/// Task.Run(async () =>
/// {
/// Console.WriteLine("任务1到达屏障");
/// await barrier.SignalAndWait(cts.Token);
/// Console.WriteLine("任务1继续执行");
/// }),
/// Task.Run(async () =>
/// {
/// Console.WriteLine("任务2到达屏障");
/// await barrier.SignalAndWait(cts.Token);
/// Console.WriteLine("任务2继续执行");
/// }),
/// Task.Run(async () =>
/// {
/// Console.WriteLine("任务3到达屏障");
/// await barrier.SignalAndWait(cts.Token);
/// Console.WriteLine("任务3继续执行");
/// })
/// };
///
/// // 等待所有任务完成
/// await Task.WhenAll(tasks);
/// ]]>
/// </code>
/// </example>
public sealed class AsyncBarrier
{
private readonly int participantCount;
private readonly Stack<Waiter> waiters;

/// <summary>
/// 使用指定数量的参与者初始化 <see cref="AsyncBarrier" /> 类的新实例
/// </summary>
/// <param name="participants">屏障的参与者数量</param>
/// <exception cref="ArgumentOutOfRangeException">当参与者数量小于或等于零时抛出</exception>
public AsyncBarrier(int participants)
{
if (participants <= 0)
throw new ArgumentOutOfRangeException(nameof(participants), $"参数 {nameof(participants)} 必须是一个正数。");
participantCount = participants;
waiters = new(participants - 1);
}

/// <summary>
/// 表示一个参与者已到达屏障,并等待所有其他参与者到达屏障
/// </summary>
/// <param name="cancellationToken">用于监视取消请求的令牌</param>
/// <returns>表示异步操作的任务</returns>
public ValueTask SignalAndWait(CancellationToken cancellationToken)
{
lock (waiters)
{
if (waiters.Count + 1 == participantCount)
{
while (waiters.Count > 0)
{
var waiter = waiters.Pop();
waiter.CompletionSource.TrySetResult(default);
waiter.CancellationRegistration.Dispose();
}
return new(cancellationToken.IsCancellationRequested
? Task.FromCanceled(cancellationToken)
: Task.CompletedTask);
}
TaskCompletionSource<EmptyStruct> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
var ctr = cancellationToken.CanBeCanceled
? cancellationToken.Register(static (tcs, ct) => ((TaskCompletionSource<EmptyStruct>)tcs!).TrySetCanceled(ct), tcs)
: default;
waiters.Push(new(tcs, ctr));
return new(tcs.Task);
}
}

private readonly struct Waiter(TaskCompletionSource<EmptyStruct> completionSource, CancellationTokenRegistration cancellationRegistration)
{
internal TaskCompletionSource<EmptyStruct> CompletionSource => completionSource;

internal CancellationTokenRegistration CancellationRegistration => cancellationRegistration;
}
}
20 changes: 20 additions & 0 deletions src/EasilyNET.Core/System/EmptyStruct.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

// ReSharper disable UnusedMember.Global

namespace EasilyNET.Core.System;

/// <summary>
/// 一个空结构体
/// </summary>
/// <remarks>
/// 当泛型类型需要一个类型参数但完全不使用时,这可以节省 4 个字节,相对于 System.Object
/// </remarks>
internal readonly struct EmptyStruct
{
/// <summary>
/// 获取空结构体的一个实例
/// </summary>
internal static EmptyStruct Instance => default;
}
38 changes: 30 additions & 8 deletions src/EasilyNET.RabbitBus.AspNetCore/EventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace EasilyNET.RabbitBus.AspNetCore;
internal sealed class EventBus(IPersistentConnection conn, ISubscriptionsManager subsManager, IBusSerializer serializer, IServiceProvider sp, ILogger<EventBus> logger, ResiliencePipelineProvider<string> pipelineProvider) : IBus
{
private const string HandleName = nameof(IEventHandler<IEvent>.HandleAsync);
private readonly ConcurrentDictionary<(Type HandlerType, Type EventType), Delegate> _handleAsyncDelegateCache = [];
private readonly ConcurrentDictionary<(Type HandlerType, Type EventType), Func<object, Task>?> _handleAsyncDelegateCache = [];

public async Task Publish<T>(T @event, string? routingKey = null, byte? priority = 0, CancellationToken? cancellationToken = null) where T : IEvent
{
Expand Down Expand Up @@ -86,7 +86,7 @@ public async Task Publish<T>(T @event, uint ttl, string? routingKey = null, byte
exc_args["x-delayed-type"] = !xDelayedType || delayedType is null ? "direct" : delayedType;
}
//创建延时交换机,type类型为x-delayed-message
await channel.ExchangeDeclareAsync(exc.ExchangeName, exc.WorkModel.ToDescription()!, true, false, exc_args);
await channel.ExchangeDeclareAsync(exc.ExchangeName, exc.WorkModel.ToDescription(), true, false, exc_args);
// 在发布事件前检查是否已经取消发布
if (cancellationToken is not null && cancellationToken.Value.IsCancellationRequested) return;
var body = serializer.Serialize(@event, @event.GetType());
Expand Down Expand Up @@ -157,7 +157,7 @@ private async Task<IChannel> CreateConsumerChannel(ExchangeAttribute exc, Type @
if (!success && exc is { WorkModel: EModel.Delayed }) exchange_args.Add("x-delayed-type", "direct"); //x-delayed-type必须加
}
//创建交换机
await channel.ExchangeDeclareAsync(exc.ExchangeName, exc.WorkModel.ToDescription()!, true, false, exchange_args);
await channel.ExchangeDeclareAsync(exc.ExchangeName, exc.WorkModel.ToDescription(), true, false, exchange_args);
}
//创建队列
await channel.QueueDeclareAsync(exc.Queue, true, false, false, queue_args);
Expand Down Expand Up @@ -224,21 +224,30 @@ private async Task ProcessEvent(Type eventType, byte[] message, EKindOfHandler h
logger.LogError($"无法找到{nameof(@event)}事件处理器");
return; // 或者抛出异常
}
var delegateType = typeof(HandleAsyncDelegate<>).MakeGenericType(eventType);
// var delegateType = typeof(HandleAsyncDelegate<>).MakeGenericType(eventType);
var handler = scope?.ServiceProvider.GetService(handlerType);
if (handler is null) return;
var handleAsyncDelegate = Delegate.CreateDelegate(delegateType, handler, method);
//var handleAsyncDelegate = Delegate.CreateDelegate(typeof(Func<object, Task>), handler, method) as Func<object, Task>;
var handleAsyncDelegate = CreateHandleAsyncDelegate(handler, method, eventType);
_handleAsyncDelegateCache[key] = handleAsyncDelegate;
cachedDelegate = handleAsyncDelegate;
}
if (cachedDelegate.DynamicInvoke(@event) is Task handleTask)
if (cachedDelegate is not null && @event is not null)
{
await pipeline.ExecuteAsync(async _ =>
{
await handleTask;
await cachedDelegate(@event);
await ack.Invoke();
}).ConfigureAwait(false);
}
//if (cachedDelegate?.DynamicInvoke(@event) is Task handleTask)
//{
// await pipeline.ExecuteAsync(async _ =>
// {
// await handleTask;
// await ack.Invoke();
// }).ConfigureAwait(false);
//}
}
}
else
Expand All @@ -247,5 +256,18 @@ await pipeline.ExecuteAsync(async _ =>
}
}

private delegate Task HandleAsyncDelegate<in TEvent>(TEvent @event) where TEvent : IEvent;
private static Func<object, Task> CreateHandleAsyncDelegate(object handler, MethodInfo method, Type eventType)
{
var delegateType = typeof(Func<,>).MakeGenericType(eventType, typeof(Task));
var handleAsyncDelegate = Delegate.CreateDelegate(delegateType, handler, method);
return async @event =>
{
if (handleAsyncDelegate.DynamicInvoke(@event) is Task task)
{
await task.ConfigureAwait(false);
}
};
}

//private delegate Task HandleAsyncDelegate<in TEvent>(TEvent @event) where TEvent : IEvent;
}
64 changes: 64 additions & 0 deletions test/EasilyNET.Test.Unit/System/AsyncBarrierTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using EasilyNET.Core.System;

// ReSharper disable MethodSupportsCancellation

namespace EasilyNET.Test.Unit.System;

[TestClass]
public class AsyncBarrierTest
{
[TestMethod]
public async Task AllParticipantsReachBarrier_ShouldContinueExecution()
{
var barrier = new AsyncBarrier(3);
var cts = new CancellationTokenSource();
var tasks = new List<Task>
{
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); }),
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); }),
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); })
};
await Task.WhenAll(tasks);
}

[TestMethod]
public async Task CancellationToken_ShouldCancelTask()
{
var barrier = new AsyncBarrier(3);
var cts = new CancellationTokenSource();
var tasks = new List<Task>
{
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); }),
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); }),
Task.Run(async () =>
{
await cts.CancelAsync(); // 取消令牌
await Assert.ThrowsExceptionAsync<TaskCanceledException>(async () => { await barrier.SignalAndWait(cts.Token); });
})
};
try
{
await Task.WhenAll(tasks);
}
catch (TaskCanceledException)
{
// 预期的异常,不需要处理
}
}

[TestMethod]
public async Task LessParticipants_ShouldWaitIndefinitely()
{
var barrier = new AsyncBarrier(3);
var cts = new CancellationTokenSource();
var tasks = new List<Task>
{
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); }),
Task.Run(async () => { await barrier.SignalAndWait(cts.Token); })
};
var delayTask = Task.Delay(1000, cts.Token);
var allTasks = Task.WhenAll(tasks);
var completedTask = await Task.WhenAny(allTasks, delayTask);
Assert.AreEqual(delayTask, completedTask); // 确保任务在等待
}
}

0 comments on commit e0aaefc

Please sign in to comment.