From 984bd79006019ccb473daa4ef85abaec24ad2eba Mon Sep 17 00:00:00 2001 From: Rafael Lillo Date: Sun, 2 Mar 2025 20:47:49 +0000 Subject: [PATCH] GH-945 Fixes issue to restore it on .NET 6 & 8 --- .../LimitedConcurrencyLevelTaskScheduler.cs | 141 ++++++++++++++++++ .../rocketmq-client-csharp.csproj | 1 - .../tests/Tasks/CurrentThreadTaskScheduler.cs | 43 ++++++ 3 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 csharp/rocketmq-client-csharp/Tasks/LimitedConcurrencyLevelTaskScheduler.cs create mode 100644 csharp/tests/Tasks/CurrentThreadTaskScheduler.cs diff --git a/csharp/rocketmq-client-csharp/Tasks/LimitedConcurrencyLevelTaskScheduler.cs b/csharp/rocketmq-client-csharp/Tasks/LimitedConcurrencyLevelTaskScheduler.cs new file mode 100644 index 00000000..153ab111 --- /dev/null +++ b/csharp/rocketmq-client-csharp/Tasks/LimitedConcurrencyLevelTaskScheduler.cs @@ -0,0 +1,141 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: LimitedConcurrencyTaskScheduler.cs +// +//-------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Linq; + +namespace System.Threading.Tasks.Schedulers; + +/// +/// Provides a task scheduler that ensures a maximum concurrency level while +/// running on top of the ThreadPool. +/// +public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler +{ + /// Whether the current thread is processing work items. + [ThreadStatic] + private static bool _currentThreadIsProcessingItems; + /// The list of tasks to be executed. + private readonly LinkedList _tasks = new LinkedList(); // protected by lock(_tasks) + /// The maximum concurrency level allowed by this scheduler. + private readonly int _maxDegreeOfParallelism; + /// Whether the scheduler is currently processing work items. + private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks) + + /// + /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the + /// specified degree of parallelism. + /// + /// The maximum degree of parallelism provided by this scheduler. + public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) + { + if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism"); + _maxDegreeOfParallelism = maxDegreeOfParallelism; + } + + /// Queues a task to the scheduler. + /// The task to be queued. + protected sealed override void QueueTask(Task task) + { + // Add the task to the list of tasks to be processed. If there aren't enough + // delegates currently queued or running to process tasks, schedule another. + lock (_tasks) + { + _tasks.AddLast(task); + if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) + { + ++_delegatesQueuedOrRunning; + NotifyThreadPoolOfPendingWork(); + } + } + } + + /// + /// Informs the ThreadPool that there's work to be executed for this scheduler. + /// + private void NotifyThreadPoolOfPendingWork() + { + ThreadPool.UnsafeQueueUserWorkItem(_ => + { + // Note that the current thread is now processing work items. + // This is necessary to enable inlining of tasks into this thread. + _currentThreadIsProcessingItems = true; + try + { + // Process all available items in the queue. + while (true) + { + Task item; + lock (_tasks) + { + // When there are no more items to be processed, + // note that we're done processing, and get out. + if (_tasks.Count == 0) + { + --_delegatesQueuedOrRunning; + break; + } + + // Get the next item from the queue + item = _tasks.First.Value; + _tasks.RemoveFirst(); + } + + // Execute the task we pulled out of the queue + base.TryExecuteTask(item); + } + } + // We're done processing items on the current thread + finally { _currentThreadIsProcessingItems = false; } + }, null); + } + + /// Attempts to execute the specified task on the current thread. + /// The task to be executed. + /// + /// Whether the task could be executed on the current thread. + protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + // If this thread isn't already processing a task, we don't support inlining + if (!_currentThreadIsProcessingItems) return false; + + // If the task was previously queued, remove it from the queue + if (taskWasPreviouslyQueued) TryDequeue(task); + + // Try to run the task. + return base.TryExecuteTask(task); + } + + /// Attempts to remove a previously scheduled task from the scheduler. + /// The task to be removed. + /// Whether the task could be found and removed. + protected sealed override bool TryDequeue(Task task) + { + lock (_tasks) return _tasks.Remove(task); + } + + /// Gets the maximum concurrency level supported by this scheduler. + public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } } + + /// Gets an enumerable of the tasks currently scheduled on this scheduler. + /// An enumerable of the tasks currently scheduled. + protected sealed override IEnumerable GetScheduledTasks() + { + bool lockTaken = false; + try + { + Monitor.TryEnter(_tasks, ref lockTaken); + if (lockTaken) return _tasks.ToArray(); + else throw new NotSupportedException(); + } + finally + { + if (lockTaken) Monitor.Exit(_tasks); + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj index 8d7d8012..6f1f9ccd 100644 --- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj +++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj @@ -34,7 +34,6 @@ - diff --git a/csharp/tests/Tasks/CurrentThreadTaskScheduler.cs b/csharp/tests/Tasks/CurrentThreadTaskScheduler.cs new file mode 100644 index 00000000..e773afa1 --- /dev/null +++ b/csharp/tests/Tasks/CurrentThreadTaskScheduler.cs @@ -0,0 +1,43 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: CurrentThreadTaskScheduler.cs +// +//-------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Linq; + +namespace System.Threading.Tasks.Schedulers +{ + /// Provides a task scheduler that runs tasks on the current thread. + public sealed class CurrentThreadTaskScheduler : TaskScheduler + { + /// Runs the provided Task synchronously on the current thread. + /// The task to be executed. + protected override void QueueTask(Task task) + { + TryExecuteTask(task); + } + + /// Runs the provided Task synchronously on the current thread. + /// The task to be executed. + /// Whether the Task was previously queued to the scheduler. + /// True if the Task was successfully executed; otherwise, false. + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + return TryExecuteTask(task); + } + + /// Gets the Tasks currently scheduled to this scheduler. + /// An empty enumerable, as Tasks are never queued, only executed. + protected override IEnumerable GetScheduledTasks() + { + return Enumerable.Empty(); + } + + /// Gets the maximum degree of parallelism for this scheduler. + public override int MaximumConcurrencyLevel { get { return 1; } } + } +} \ No newline at end of file