Skip to content

Commit

Permalink
apacheGH-945 Fixes issue to restore it on .NET 6 & 8
Browse files Browse the repository at this point in the history
  • Loading branch information
lillo42 committed Mar 2, 2025
1 parent f117e73 commit 984bd79
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//--------------------------------------------------------------------------
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// File: LimitedConcurrencyTaskScheduler.cs
//
//--------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks.Schedulers;

/// <summary>
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
/// <summary>Whether the current thread is processing work items.</summary>
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
/// <summary>The list of tasks to be executed.</summary>
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
/// <summary>The maximum concurrency level allowed by this scheduler.</summary>
private readonly int _maxDegreeOfParallelism;
/// <summary>Whether the scheduler is currently processing work items.</summary>
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)

/// <summary>
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
/// specified degree of parallelism.
/// </summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}

/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}

/// <summary>
/// Informs the ThreadPool that there's work to be executed for this scheduler.
/// </summary>
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}

// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}

// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}

/// <summary>Attempts to execute the specified task on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued"></param>
/// <returns>Whether the task could be executed on the current thread.</returns>
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;

// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued) TryDequeue(task);

// Try to run the task.
return base.TryExecuteTask(task);
}

/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
/// <param name="task">The task to be removed.</param>
/// <returns>Whether the task could be found and removed.</returns>
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}

/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }

/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
/// <returns>An enumerable of the tasks currently scheduled.</returns>
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
<PackageReference Include="OpenTelemetry" Version="1.3.1" />
<PackageReference Include="OpenTelemetry.Api" Version="1.3.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.3.1" />
<PackageReference Include="ParallelExtensionsExtras" Version="1.2.0" />

<Protobuf Include="..\..\protos\apache\rocketmq\v2\definition.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />
<Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Both" />
Expand Down
43 changes: 43 additions & 0 deletions csharp/tests/Tasks/CurrentThreadTaskScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//--------------------------------------------------------------------------
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// File: CurrentThreadTaskScheduler.cs
//
//--------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;

namespace System.Threading.Tasks.Schedulers
{
/// <summary>Provides a task scheduler that runs tasks on the current thread.</summary>
public sealed class CurrentThreadTaskScheduler : TaskScheduler
{
/// <summary>Runs the provided Task synchronously on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
protected override void QueueTask(Task task)
{
TryExecuteTask(task);
}

/// <summary>Runs the provided Task synchronously on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued">Whether the Task was previously queued to the scheduler.</param>
/// <returns>True if the Task was successfully executed; otherwise, false.</returns>
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return TryExecuteTask(task);
}

/// <summary>Gets the Tasks currently scheduled to this scheduler.</summary>
/// <returns>An empty enumerable, as Tasks are never queued, only executed.</returns>
protected override IEnumerable<Task> GetScheduledTasks()
{
return Enumerable.Empty<Task>();
}

/// <summary>Gets the maximum degree of parallelism for this scheduler.</summary>
public override int MaximumConcurrencyLevel { get { return 1; } }
}
}

0 comments on commit 984bd79

Please sign in to comment.