Skip to content

Commit

Permalink
feat: implemented pipelines (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
GerardSmit authored Jan 3, 2024
1 parent 5b965fa commit ac8ab49
Show file tree
Hide file tree
Showing 102 changed files with 3,290 additions and 884 deletions.
4 changes: 2 additions & 2 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>

<PropertyGroup>
<Version>1.0.0-alpha.8</Version>
<PackageVersion>1.0.0-alpha.8</PackageVersion>
<Version>1.0.0-alpha.9</Version>
<PackageVersion>1.0.0-alpha.9</PackageVersion>
<Authors>Zapto</Authors>
<RepositoryUrl>https://github.com/zapto-dev/Mediator</RepositoryUrl>
<Copyright>Copyright © 2023 Zapto</Copyright>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace Zapto.Mediator;

public partial class MediatorBuilder : IMediatorBuilder
{
public IMediatorBuilder AddDefaultRequestHandler(IDefaultRequestHandler handler)
{
_services.AddSingleton(handler);
return this;
}

public IMediatorBuilder AddDefaultRequestHandler(Type handlerType, RegistrationScope scope = RegistrationScope.Transient)
{
_services.Add(new ServiceDescriptor(typeof(IDefaultRequestHandler), handlerType, GetLifetime(scope)));
return this;
}

public IMediatorBuilder AddDefaultRequestHandler<THandler>(RegistrationScope scope = RegistrationScope.Transient) where THandler : class, IDefaultRequestHandler
{
_services.Add(new ServiceDescriptor(typeof(IDefaultRequestHandler), typeof(THandler), GetLifetime(scope)));
return this;
}

public IMediatorBuilder AddDefaultNotificationHandler(IDefaultNotificationHandler handler)
{
_services.AddSingleton(handler);
return this;
}

public IMediatorBuilder AddDefaultNotificationHandler(Type handlerType, RegistrationScope scope = RegistrationScope.Transient)
{
_services.Add(new ServiceDescriptor(typeof(IDefaultNotificationHandler), handlerType, GetLifetime(scope)));
return this;
}

public IMediatorBuilder AddDefaultNotificationHandler<THandler>(RegistrationScope scope = RegistrationScope.Transient) where THandler : class, IDefaultNotificationHandler
{
_services.Add(new ServiceDescriptor(typeof(IDefaultNotificationHandler), typeof(THandler), GetLifetime(scope)));
return this;
}

public IMediatorBuilder AddDefaultStreamRequestHandler(IDefaultStreamRequestHandler handler)
{
_services.AddSingleton(handler);
return this;
}

public IMediatorBuilder AddDefaultStreamRequestHandler(Type handlerType, RegistrationScope scope = RegistrationScope.Transient)
{
_services.Add(new ServiceDescriptor(typeof(IDefaultStreamRequestHandler), handlerType, GetLifetime(scope)));
return this;
}

public IMediatorBuilder AddDefaultStreamRequestHandler<THandler>(RegistrationScope scope = RegistrationScope.Transient) where THandler : class, IDefaultStreamRequestHandler
{
_services.Add(new ServiceDescriptor(typeof(IDefaultStreamRequestHandler), typeof(THandler), GetLifetime(scope)));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ namespace Zapto.Mediator;

public partial class MediatorBuilder
{
public IMediatorBuilder AddNotificationHandler<TNotification, THandler>()
public IMediatorBuilder AddNotificationHandler<TNotification, THandler>(RegistrationScope scope = RegistrationScope.Transient)
where TNotification : INotification
where THandler : class, INotificationHandler<TNotification>
{
if (_ns == null)
{
_services.AddTransient<INotificationHandler<TNotification>, THandler>();
_services.Add(new ServiceDescriptor(typeof(INotificationHandler<TNotification>), typeof(THandler), GetLifetime(scope)));
}
else
{
_services.TryAddTransient<THandler>();
_services.TryAdd(new ServiceDescriptor(typeof(THandler), typeof(THandler), GetLifetime(scope)));
_services.AddSingleton<INamespaceNotificationHandler<TNotification>>(new NamespaceNotificationHandlerProvider<TNotification, THandler>(_ns.Value));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ namespace Zapto.Mediator;

public partial class MediatorBuilder
{
public IMediatorBuilder AddRequestHandler<TRequest, TResponse, THandler>()
public IMediatorBuilder AddRequestHandler<TRequest, TResponse, THandler>(RegistrationScope scope = RegistrationScope.Transient)
where TRequest : IRequest<TResponse>
where THandler : class, IRequestHandler<TRequest, TResponse>
{
if (_ns == null)
{
_services.AddTransient<IRequestHandler<TRequest, TResponse>, THandler>();
_services.Add(new ServiceDescriptor(typeof(IRequestHandler<TRequest, TResponse>), typeof(THandler), GetLifetime(scope)));
}
else
{
_services.TryAddTransient<THandler>();
_services.TryAdd(new ServiceDescriptor(typeof(THandler), typeof(THandler), GetLifetime(scope)));
_services.AddSingleton<INamespaceRequestHandler<TRequest, TResponse>>(new NamespaceRequestHandlerProvider<TRequest, TResponse, THandler>(_ns.Value));
}

return this;
}

public IMediatorBuilder AddRequestHandler<TRequest, THandler>()
public IMediatorBuilder AddRequestHandler<TRequest, THandler>(RegistrationScope scope = RegistrationScope.Transient)
where TRequest : IRequest<Unit>
where THandler : class, IRequestHandler<TRequest, Unit>
{
return AddRequestHandler<TRequest, Unit, THandler>();
return AddRequestHandler<TRequest, Unit, THandler>(scope);
}

public IMediatorBuilder AddRequestHandler<TRequest, TResponse>(IRequestHandler<TRequest, TResponse> handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,28 @@ namespace Zapto.Mediator;

public partial class MediatorBuilder
{
public IMediatorBuilder AddStreamRequestHandler<TRequest, TResponse, THandler>()
public IMediatorBuilder AddStreamRequestHandler<TRequest, TResponse, THandler>(RegistrationScope scope = RegistrationScope.Transient)
where TRequest : IStreamRequest<TResponse>
where THandler : class, IStreamRequestHandler<TRequest, TResponse>
{
if (_ns == null)
{
_services.AddTransient<IStreamRequestHandler<TRequest, TResponse>, THandler>();
_services.Add(new ServiceDescriptor(typeof(IStreamRequestHandler<TRequest, TResponse>), typeof(THandler), GetLifetime(scope)));
}
else
{
_services.TryAddTransient<THandler>();
_services.TryAdd(new ServiceDescriptor(typeof(THandler), typeof(THandler), GetLifetime(scope)));
_services.AddSingleton<INamespaceStreamRequestHandler<TRequest, TResponse>>(new NamespaceStreamRequestHandlerProvider<TRequest, TResponse, THandler>(_ns.Value));
}

return this;
}

public IMediatorBuilder AddStreamRequestHandler<TRequest, THandler>()
public IMediatorBuilder AddStreamRequestHandler<TRequest, THandler>(RegistrationScope scope = RegistrationScope.Transient)
where TRequest : IStreamRequest<Unit>
where THandler : class, IStreamRequestHandler<TRequest, Unit>
{
return AddStreamRequestHandler<TRequest, Unit, THandler>();
return AddStreamRequestHandler<TRequest, Unit, THandler>(scope);
}

public IMediatorBuilder AddStreamRequestHandler<TRequest, TResponse>(IStreamRequestHandler<TRequest, TResponse> handler)
Expand Down
11 changes: 11 additions & 0 deletions src/Mediator.DependencyInjection/Builder/MediatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ public IMediatorBuilder AddNamespace(MediatorNamespace ns)
{
return new MediatorBuilder(_services, ns);
}

private ServiceLifetime GetLifetime(RegistrationScope scope)
{
return scope switch
{
RegistrationScope.Transient => ServiceLifetime.Transient,
RegistrationScope.Singleton => ServiceLifetime.Singleton,
RegistrationScope.Scoped => ServiceLifetime.Scoped,
_ => throw new ArgumentOutOfRangeException(nameof(scope), scope, null)
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public static IMediatorBuilder AddMediator(this IServiceCollection services)
services.TryAddSingleton(typeof(GenericNotificationCache<>));
services.TryAddTransient(typeof(GenericNotificationHandler<>));

services.TryAddSingleton(typeof(GenericPipelineBehavior<,>));

services.TryAddSingleton(typeof(GenericStreamRequestCache<,>));
services.TryAddTransient(typeof(IStreamRequestHandler<,>), typeof(GenericStreamRequestHandler<,>));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal sealed class GenericNotificationCache<TNotification> : INotificationCac
public SemaphoreSlim Lock { get; } = new(1, 1);
}

internal sealed class GenericNotificationHandler<TNotification> : INotificationHandler<TNotification>
internal sealed class GenericNotificationHandler<TNotification>
where TNotification : INotification
{
private readonly GenericNotificationCache<TNotification> _cache;
Expand All @@ -58,7 +58,7 @@ public GenericNotificationHandler(IEnumerable<GenericNotificationRegistration> e
_cache = cache;
}

public async ValueTask Handle(IServiceProvider provider, TNotification notification, CancellationToken ct)
public async ValueTask<bool> Handle(IServiceProvider provider, TNotification notification, CancellationToken ct)
{
if (_cache.Registrations.Count > 0)
{
Expand All @@ -81,14 +81,14 @@ public async ValueTask Handle(IServiceProvider provider, TNotification notificat
await ((INotificationHandler<TNotification>)handler).Handle(provider, notification, ct);
}

return;
return cachedTypes.Count > 0;
}

var notificationType = typeof(TNotification);

if (!notificationType.IsGenericType)
{
return;
return false;
}

var arguments = notificationType.GetGenericArguments();
Expand All @@ -111,5 +111,6 @@ public async ValueTask Handle(IServiceProvider provider, TNotification notificat
}

_cache.HandlerTypes = handlerTypes;
return handlerTypes.Count > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ internal sealed class GenericRequestHandler<TRequest, TResponse> : IRequestHandl
private readonly GenericRequestCache<TRequest, TResponse> _cache;
private readonly IEnumerable<GenericRequestRegistration> _enumerable;
private readonly IServiceProvider _serviceProvider;
private readonly IDefaultRequestHandler? _defaultHandler;

public GenericRequestHandler(IEnumerable<GenericRequestRegistration> enumerable, IServiceProvider serviceProvider, GenericRequestCache<TRequest, TResponse> cache)
public GenericRequestHandler(
IEnumerable<GenericRequestRegistration> enumerable,
IServiceProvider serviceProvider,
GenericRequestCache<TRequest, TResponse> cache,
IDefaultRequestHandler? defaultHandler = null)
{
_enumerable = enumerable;
_serviceProvider = serviceProvider;
_cache = cache;
_defaultHandler = defaultHandler;
}

public async ValueTask<TResponse> Handle(IServiceProvider provider, TRequest request, CancellationToken ct)
Expand All @@ -57,7 +63,12 @@ public async ValueTask<TResponse> Handle(IServiceProvider provider, TRequest req

if (!requestType.IsGenericType)
{
throw new InvalidCastException($"No handler found for request type {requestType.FullName}.");
if (_defaultHandler is null)
{
throw new HandlerNotFoundException($"No handler found for request type {requestType.FullName}.");
}

return await _defaultHandler.Handle<TRequest, TResponse>(_serviceProvider, request, ct);
}

var arguments = requestType.GetGenericArguments();
Expand Down Expand Up @@ -85,6 +96,17 @@ public async ValueTask<TResponse> Handle(IServiceProvider provider, TRequest req
return await handler.Handle(provider, request, ct);
}

throw new InvalidCastException($"No handler found for request type {requestType.FullName}.");
if (_defaultHandler is null)
{
throw new HandlerNotFoundException($"No handler found for request type {requestType.FullName}.");
}

var method = _defaultHandler
.GetType()
.GetMethod(nameof(IDefaultRequestHandler.Handle))!
.MakeGenericMethod(arguments);

return await (ValueTask<TResponse>) method.Invoke(_defaultHandler, new object[] {_serviceProvider, request, ct})!;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ internal sealed class GenericStreamRequestHandler<TRequest, TResponse> : IStream
private readonly GenericStreamRequestCache<TRequest, TResponse> _cache;
private readonly IEnumerable<GenericStreamRequestRegistration> _enumerable;
private readonly IServiceProvider _serviceProvider;
private readonly IDefaultStreamRequestHandler? _defaultHandler;

public GenericStreamRequestHandler(IEnumerable<GenericStreamRequestRegistration> enumerable, IServiceProvider serviceProvider, GenericStreamRequestCache<TRequest, TResponse> cache)
public GenericStreamRequestHandler(
IEnumerable<GenericStreamRequestRegistration> enumerable,
IServiceProvider serviceProvider,
GenericStreamRequestCache<TRequest, TResponse> cache,
IDefaultStreamRequestHandler? defaultHandler = null)
{
_enumerable = enumerable;
_serviceProvider = serviceProvider;
_cache = cache;
_defaultHandler = defaultHandler;
}

public IAsyncEnumerable<TResponse> Handle(IServiceProvider provider, TRequest request, CancellationToken ct)
Expand All @@ -57,7 +63,12 @@ public IAsyncEnumerable<TResponse> Handle(IServiceProvider provider, TRequest re

if (!requestType.IsGenericType)
{
throw new InvalidCastException($"No handler found for request type {requestType.FullName}.");
if (_defaultHandler is null)
{
throw new HandlerNotFoundException($"No handler found for request type {requestType.FullName}.");
}

return _defaultHandler.Handle<TRequest, TResponse>(provider, request, ct);
}

var arguments = requestType.GetGenericArguments();
Expand Down Expand Up @@ -85,6 +96,11 @@ public IAsyncEnumerable<TResponse> Handle(IServiceProvider provider, TRequest re
return handler.Handle(provider, request, ct);
}

throw new InvalidCastException($"No handler found for request type {requestType.FullName}.");
if (_defaultHandler is null)
{
throw new HandlerNotFoundException($"No handler found for request type {requestType.FullName}.");
}

return _defaultHandler.Handle<TRequest, TResponse>(provider, request, ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@ public partial class MediatorBuilder
{
public IMediatorBuilder AddNotificationHandler(
Type notificationType,
Type handlerType)
Type handlerType,
RegistrationScope scope = RegistrationScope.Transient)
{
if (notificationType.IsGenericType)
{
_services.AddTransient(handlerType);
_services.Add(new ServiceDescriptor(handlerType, handlerType, GetLifetime(scope)));
_services.AddSingleton(new GenericNotificationRegistration(notificationType, handlerType));
}
else
{
_services.AddTransient(typeof(INotificationHandler<>).MakeGenericType(notificationType), handlerType);
_services.Add(new ServiceDescriptor(typeof(INotificationHandler<>).MakeGenericType(notificationType), handlerType, GetLifetime(scope)));
}

return this;
}

public IMediatorBuilder AddNotificationHandler(Type handlerType)
public IMediatorBuilder AddNotificationHandler(Type handlerType, RegistrationScope scope = RegistrationScope.Transient)
{
var handlers = handlerType.GetInterfaces()
.Where(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(INotificationHandler<>))
Expand All @@ -41,16 +42,17 @@ public IMediatorBuilder AddNotificationHandler(Type handlerType)

AddNotificationHandler(
notificationType,
handlerType);
handlerType,
scope);
}

return this;
}

public IMediatorBuilder AddNotificationHandler<THandler>()
public IMediatorBuilder AddNotificationHandler<THandler>(RegistrationScope scope = RegistrationScope.Transient)
where THandler : INotificationHandler
{
AddNotificationHandler(typeof(THandler));
AddNotificationHandler(typeof(THandler), scope);
return this;
}
}
Loading

0 comments on commit ac8ab49

Please sign in to comment.