Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Queue.QueueCancellableInvocableWithPayload method #209

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions DocsV2/docs/Queuing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,33 @@ while(!this.Token.IsCancellationRequested)
await ProcessNextRecord();
}
```

If you need supply a payload/parameters to your `ICancellableTask` you should add `IInvocableWithPayload<T>` to your invocable and implement the Payload property:

```csharp
public UserModel Payload { get; set; }
```

Now, your `Invoke` method will have access to `Token` and `Payload`:

```csharp
while(!this.Token.IsCancellationRequested)
{
await ProcessNextRecord(this.Payload.UserId);
}
```

To queue this invocable use the `QueueCancellableInvocableWithPayload` method:

```csharp
var userModel = await _userService.Get(userId);
var (taskGuid, token) = queue.QueueCancellableInvocableWithPayload<CancellableInvocable, UserModel>(userModel);

// Somewhere else....

token.Cancel();
```

## Metrics

You can gain some insight into how the queue is doing at a given moment in time.
Expand Down
7 changes: 6 additions & 1 deletion Src/Coravel/Queuing/Interfaces/IQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ public interface IQueue
/// </summary>
Guid QueueInvocableWithPayload<T, TParams>(TParams payload) where T : IInvocable, IInvocableWithPayload<TParams>;

/// <summary>
/// Queue an invocable that will be given the payload supplied to this method.
/// </summary>
(Guid, CancellationTokenSource) QueueCancellableInvocableWithPayload<T, TParams>(TParams payload) where T : IInvocable, IInvocableWithPayload<TParams>, ICancellableTask;

/// <summary>
/// View metrics given the queue's current executing state.
/// </summary>
/// <returns></returns>
QueueMetrics GetMetrics();
}
}
}
13 changes: 12 additions & 1 deletion Src/Coravel/Queuing/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ public Guid QueueInvocableWithPayload<T, TParams>(TParams payload) where T : IIn
return (func.Guid, tokenSource);
}

public (Guid, CancellationTokenSource) QueueCancellableInvocableWithPayload<T, TParams>(TParams payload) where T : IInvocable, IInvocableWithPayload<TParams>, ICancellableTask
{
var tokenSource = new CancellationTokenSource();
var func = this.EnqueueInvocable<T>((invocable) => {
(invocable as ICancellableTask).Token = tokenSource.Token;
(invocable as IInvocableWithPayload<TParams>).Payload = payload;
});
this._tokens.TryAdd(func.Guid, tokenSource);
return (func.Guid, tokenSource);
}

public Guid QueueAsyncTask(Func<Task> asyncTask)
{
var job = new ActionOrAsyncFunc(asyncTask);
Expand Down Expand Up @@ -223,4 +234,4 @@ private async Task InvokeTask(ActionOrAsyncFunc task)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Coravel.Invocable;
using Coravel.Queuing;
using Coravel.Queuing.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using UnitTests.Scheduling.Stubs;
using Xunit;

namespace UnitTests.Queuing
{
public class CancellableInvocableWithParamsForQueueTests
{
[Fact]
public async Task CanCancelInvocable()
{
var services = new ServiceCollection();
services.AddTransient<TestCancellableInvocable>();
var provider = services.BuildServiceProvider();

Queue queue = new Queue(provider.GetRequiredService<IServiceScopeFactory>(), new DispatcherStub());

var payload = "Test";
var (_, token1) = queue.QueueCancellableInvocableWithPayload<TestCancellableInvocable, string>(payload);
var (_, token2) = queue.QueueCancellableInvocableWithPayload<TestCancellableInvocable, string>(payload);
var (_, token3) = queue.QueueCancellableInvocableWithPayload<TestCancellableInvocable, string>(payload);

token1.Cancel();
token3.Cancel();

TestCancellableInvocable.TokensCancelled = 0;
await queue.ConsumeQueueAsync();

Assert.Equal(2, TestCancellableInvocable.TokensCancelled);
}

[Fact]
public async Task CanCancelInvocablesForShutdown()
{
var services = new ServiceCollection();
services.AddTransient<TestCancellableInvocable>();
var provider = services.BuildServiceProvider();

Queue queue = new Queue(provider.GetRequiredService<IServiceScopeFactory>(), new DispatcherStub());

var payload = "Test";
var token1 = queue.QueueCancellableInvocableWithPayload<TestCancellableInvocable, string>(payload);
var token2 = queue.QueueCancellableInvocableWithPayload<TestCancellableInvocable, string>(payload);
var token3 = queue.QueueCancellableInvocableWithPayload<TestCancellableInvocable, string>(payload);

TestCancellableInvocable.TokensCancelled = 0;
await queue.ConsumeQueueOnShutdown();

Assert.Equal(3, TestCancellableInvocable.TokensCancelled);
}

[Fact]
public async Task TestQueueCancellableInvocableWithPrimitiveParams()
{
string testString = "";

var parameters = "This is valid";

var services = new ServiceCollection();
services.AddScoped<Action<string>>(p => str => testString = str);
services.AddScoped<TestCancellableInvocableWithStringParam>();
var provider = services.BuildServiceProvider();

var queue = new Queue(provider.GetRequiredService<IServiceScopeFactory>(), new DispatcherStub());
queue.QueueCancellableInvocableWithPayload<TestCancellableInvocableWithStringParam, string>(parameters);
await queue.ConsumeQueueAsync();

Assert.Equal("This is valid", testString);
}

[Fact]
public async Task TestQueueCancellableInvocableWithComplexParams()
{
int testNumber = 0;
string testString = "";

var parameters = new TestParams
{
Name = "This is valid",
Number = 999
};

var services = new ServiceCollection();
services.AddScoped<Action<string, int>>(p => (str, num) =>
{
testNumber = num;
testString = str;
});
services.AddScoped<TestCancellableInvocableWithComplexParams>();
var provider = services.BuildServiceProvider();

var queue = new Queue(provider.GetRequiredService<IServiceScopeFactory>(), new DispatcherStub());
queue.QueueCancellableInvocableWithPayload<TestCancellableInvocableWithComplexParams, TestParams>(parameters);
await queue.ConsumeQueueAsync();

Assert.Equal(999, testNumber);
Assert.Equal("This is valid", testString);
}

public class TestParams
{
public string Name { get; set; }
public int Number { get; set; }
}

public class TestCancellableInvocableWithComplexParams : IInvocable, IInvocableWithPayload<TestParams>, ICancellableTask
{
public TestParams Payload { get; set; }
private Action<string, int> _func;
public CancellationToken Token { get; set; }

public TestCancellableInvocableWithComplexParams(Action<string, int> func) => this._func = func;

public Task Invoke()
{
this._func(this.Payload.Name, this.Payload.Number);
return Task.CompletedTask;
}
}

private class TestCancellableInvocable : IInvocable, ICancellableTask, IInvocableWithPayload<string>
{
/// <summary>
/// Static fields keeps track of all cancelled tokens count.
/// </summary>
public static int TokensCancelled = 0;

public TestCancellableInvocable() {}

public CancellationToken Token { get; set; }

public string Payload { get; set; }

public Task Invoke()
{
if(this.Token.IsCancellationRequested)
{
Interlocked.Increment(ref TokensCancelled);
}

return Task.CompletedTask;
}
}

public class TestCancellableInvocableWithStringParam : IInvocable, IInvocableWithPayload<string>, ICancellableTask
{
public string Payload { get; set; }

private Action<string> _func;

public CancellationToken Token { get; set; }

public TestCancellableInvocableWithStringParam(Action<string> func) => this._func = func;

public Task Invoke()
{
this._func(this.Payload);
return Task.CompletedTask;
}
}
}
}