From d09acca4c4b83f98a55a02524de3ebc9e3b746ce Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 11:12:06 -0800 Subject: [PATCH 1/8] * add tests for ISubscriptionsGrain; * make GetSubscriptions(null) possible --- .../Services/Orleans/ISubscriptionsGrain.cs | 5 +- .../ISubscriptionsGrainTests.cs | 81 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs index d3af459bb7f..e9ca031a96b 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs @@ -4,7 +4,10 @@ namespace Microsoft.AutoGen.Agents; public interface ISubscriptionsGrain : IGrainWithIntegerKey { + [Alias("SubscribeAsync")] ValueTask SubscribeAsync(string agentType, string topic); + [Alias("UnsubscribeAsync")] ValueTask UnsubscribeAsync(string agentType, string topic); - ValueTask>> GetSubscriptions(string agentType); + [Alias("GetSubscriptions")] + ValueTask>> GetSubscriptions(string? agentType = null); } diff --git a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs new file mode 100644 index 00000000000..e9b0237589d --- /dev/null +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ISubscriptionsGrainTests.cs + +using Moq; +using Xunit; + +namespace Microsoft.AutoGen.Agents.Tests; + +public class ISubscriptionsGrainTests +{ + private readonly Mock _mockSubscriptionsGrain; + + public ISubscriptionsGrainTests() + { + _mockSubscriptionsGrain = new Mock(); + } + + [Fact] + public async Task GetSubscriptions_ReturnsAllSubscriptions_WhenAgentTypeIsNull() + { + // Arrange + var subscriptions = new Dictionary> + { + { "topic1", new List { "agentType1" } }, + { "topic2", new List { "agentType2" } } + }; + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptions(null)).ReturnsAsync(subscriptions); + + // Act + var result = await _mockSubscriptionsGrain.Object.GetSubscriptions(); + + // Assert + Assert.Equal(2, result.Count); + Assert.Contains("topic1", result.Keys); + Assert.Contains("topic2", result.Keys); + } + + [Fact] + public async Task GetSubscriptions_ReturnsFilteredSubscriptions_WhenAgentTypeIsNotNull() + { + // Arrange + var subscriptions = new Dictionary> + { + { "topic1", new List { "agentType1" } } + }; + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptions("agentType1")).ReturnsAsync(subscriptions); + + // Act + var result = await _mockSubscriptionsGrain.Object.GetSubscriptions("agentType1"); + + // Assert + Assert.Single(result); + Assert.Contains("topic1", result.Keys); + } + + [Fact] + public async Task SubscribeAsync_AddsSubscription() + { + // Arrange + _mockSubscriptionsGrain.Setup(grain => grain.SubscribeAsync("agentType1", "topic1")).Returns(ValueTask.CompletedTask); + + // Act + await _mockSubscriptionsGrain.Object.SubscribeAsync("agentType1", "topic1"); + + // Assert + _mockSubscriptionsGrain.Verify(grain => grain.SubscribeAsync("agentType1", "topic1"), Times.Once); + } + + [Fact] + public async Task UnsubscribeAsync_RemovesSubscription() + { + // Arrange + _mockSubscriptionsGrain.Setup(grain => grain.UnsubscribeAsync("agentType1", "topic1")).Returns(ValueTask.CompletedTask); + + // Act + await _mockSubscriptionsGrain.Object.UnsubscribeAsync("agentType1", "topic1"); + + // Assert + _mockSubscriptionsGrain.Verify(grain => grain.UnsubscribeAsync("agentType1", "topic1"), Times.Once); + } +} \ No newline at end of file From cb270d8a8507b1c8c4c0a149727fbbf5a75bca90 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 12:13:20 -0800 Subject: [PATCH 2/8] refactor subscriptions grain; remove dulicate code, persist correctly to orleans, introduce SubscriptionsState class --- .../Abstractions/SubscriptionsState.cs | 12 ++++ .../Services/Orleans/ISubscriptionsGrain.cs | 6 +- .../Services/Orleans/SubscriptionsGrain.cs | 59 ++++++++++--------- .../ISubscriptionsGrainTests.cs | 23 ++++---- 4 files changed, 57 insertions(+), 43 deletions(-) create mode 100644 dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs new file mode 100644 index 00000000000..2488c17d4e1 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SubscriptionsState.cs + +using System.Collections.Concurrent; + +namespace Microsoft.AutoGen.Abstractions; +public sealed class SubscriptionsState +{ + public ConcurrentDictionary _subscriptionsByAgentType = new(); + public ConcurrentDictionary> _subscriptionsByTopic = new(); + public ConcurrentDictionary> Subscriptions { get; set; } = new(); +} \ No newline at end of file diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs index e9ca031a96b..41d76a99377 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // ISubscriptionsGrain.cs +using System.Collections.Concurrent; + namespace Microsoft.AutoGen.Agents; public interface ISubscriptionsGrain : IGrainWithIntegerKey { @@ -8,6 +10,6 @@ public interface ISubscriptionsGrain : IGrainWithIntegerKey ValueTask SubscribeAsync(string agentType, string topic); [Alias("UnsubscribeAsync")] ValueTask UnsubscribeAsync(string agentType, string topic); - [Alias("GetSubscriptions")] - ValueTask>> GetSubscriptions(string? agentType = null); + [Alias("GetSubscriptionsAsync")] + ValueTask>> GetSubscriptionsAsync(string? agentType = null); } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs index 0e647dbab98..08c56437693 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs @@ -1,50 +1,53 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // SubscriptionsGrain.cs +using System.Collections.Concurrent; +using Microsoft.AutoGen.Abstractions; namespace Microsoft.AutoGen.Agents; -internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore")] IPersistentState state) : Grain, ISubscriptionsGrain +internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore")] IPersistentState subscriptionsState) : Grain, ISubscriptionsGrain { - private readonly Dictionary> _subscriptions = new(); - public ValueTask>> GetSubscriptions(string? agentType = null) + private readonly IPersistentState _subscriptionsState = subscriptionsState; + + public ValueTask>> GetSubscriptionsAsync(string? agentType = null) { + var _subscriptions = _subscriptionsState.State.Subscriptions; //if agentType is null, return all subscriptions else filter on agentType if (agentType != null) { - return new ValueTask>>(_subscriptions.Where(x => x.Value.Contains(agentType)).ToDictionary(x => x.Key, x => x.Value)); + var filteredSubscriptions = _subscriptions.Where(x => x.Value.Contains(agentType)); + return new ValueTask>>((ConcurrentDictionary>)filteredSubscriptions); } - return new ValueTask>>(_subscriptions); + return new ValueTask>>(_subscriptions); } public async ValueTask SubscribeAsync(string agentType, string topic) { - if (!_subscriptions.TryGetValue(topic, out var subscriptions)) - { - subscriptions = _subscriptions[topic] = []; - } - if (!subscriptions.Contains(agentType)) - { - subscriptions.Add(agentType); - } - _subscriptions[topic] = subscriptions; - state.State.Subscriptions = _subscriptions; - await state.WriteStateAsync().ConfigureAwait(false); + await WriteSubscriptionsAsync(agentType: agentType, topic: topic, subscribe: true).ConfigureAwait(false); } public async ValueTask UnsubscribeAsync(string agentType, string topic) { - if (!_subscriptions.TryGetValue(topic, out var subscriptions)) + await WriteSubscriptionsAsync(agentType: agentType, topic: topic, subscribe: false).ConfigureAwait(false); + } + private async ValueTask WriteSubscriptionsAsync(string agentType, string topic, bool subscribe=true) + { + var _subscriptions = _subscriptionsState.State.Subscriptions; + if (!_subscriptions.TryGetValue(topic, out var agentTypes)) { - subscriptions = _subscriptions[topic] = []; + agentTypes = _subscriptions[topic] = []; } - if (!subscriptions.Contains(agentType)) + if (!agentTypes.Contains(agentType)) { - subscriptions.Remove(agentType); + if (subscribe) + { + agentTypes.Add(agentType); + } + else + { + agentTypes.Remove(agentType); + } } - _subscriptions[topic] = subscriptions; - state.State.Subscriptions = _subscriptions; - await state.WriteStateAsync(); + _subscriptions[topic] = agentTypes; + _subscriptionsState.State.Subscriptions = _subscriptions; + await _subscriptionsState.WriteStateAsync().ConfigureAwait(false); } -} -public sealed class SubscriptionsState -{ - public Dictionary> Subscriptions { get; set; } = new(); -} +} \ No newline at end of file diff --git a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs index e9b0237589d..218579833aa 100644 --- a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // ISubscriptionsGrainTests.cs +using System.Collections.Concurrent; using Moq; using Xunit; @@ -19,15 +20,13 @@ public ISubscriptionsGrainTests() public async Task GetSubscriptions_ReturnsAllSubscriptions_WhenAgentTypeIsNull() { // Arrange - var subscriptions = new Dictionary> - { - { "topic1", new List { "agentType1" } }, - { "topic2", new List { "agentType2" } } - }; - _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptions(null)).ReturnsAsync(subscriptions); + var subscriptions = new ConcurrentDictionary>(); + subscriptions.TryAdd("topic1", new List { "agentType1" }); + subscriptions.TryAdd("topic2", new List { "agentType2" }); + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsAsync(null)).ReturnsAsync(subscriptions); // Act - var result = await _mockSubscriptionsGrain.Object.GetSubscriptions(); + var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsAsync(); // Assert Assert.Equal(2, result.Count); @@ -39,14 +38,12 @@ public async Task GetSubscriptions_ReturnsAllSubscriptions_WhenAgentTypeIsNull() public async Task GetSubscriptions_ReturnsFilteredSubscriptions_WhenAgentTypeIsNotNull() { // Arrange - var subscriptions = new Dictionary> - { - { "topic1", new List { "agentType1" } } - }; - _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptions("agentType1")).ReturnsAsync(subscriptions); + var subscriptions = new ConcurrentDictionary>(); + subscriptions.TryAdd("topic1", new List { "agentType1" }); + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsAsync("agentType1")).ReturnsAsync(subscriptions); // Act - var result = await _mockSubscriptionsGrain.Object.GetSubscriptions("agentType1"); + var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsAsync("agentType1"); // Assert Assert.Single(result); From 140a594644ae0286a8d087df0d2468a4d2978250 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 15:11:20 -0800 Subject: [PATCH 3/8] refactor and cleanup --- .../Agents/Services/Grpc/GrpcGateway.cs | 20 ++++++------ .../Services/Grpc/GrpcWorkerConnection.cs | 2 +- .../Services/Orleans/AgentStateGrain.cs | 2 +- .../Services/Orleans/ISubscriptionsGrain.cs | 11 ++++++- .../Services/Orleans/SubscriptionsGrain.cs | 21 +++++++++++- .../Services/Orleans}/SubscriptionsState.cs | 3 +- .../ISubscriptionsGrainTests.cs | 32 ++++++++++++++++--- 7 files changed, 73 insertions(+), 18 deletions(-) rename dotnet/src/Microsoft.AutoGen/{Abstractions => Agents/Services/Orleans}/SubscriptionsState.cs (92%) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index 9ba36410a30..d54ccd7a7e1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -16,13 +16,12 @@ public sealed class GrpcGateway : BackgroundService, IGateway private readonly IClusterClient _clusterClient; private readonly ConcurrentDictionary _agentState = new(); private readonly IRegistryGrain _gatewayRegistry; - private readonly ISubscriptionsGrain _subscriptions; + private readonly ISubscriptionsGrain _subscriptionsGrain; private readonly IGateway _reference; // The agents supported by each worker process. + private SubscriptionsState _subscriptionsState = new(); private readonly ConcurrentDictionary> _supportedAgentTypes = []; public readonly ConcurrentDictionary _workers = new(); - private readonly ConcurrentDictionary _subscriptionsByAgentType = new(); - private readonly ConcurrentDictionary> _subscriptionsByTopic = new(); // The mapping from agent id to worker process. private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new(); @@ -36,7 +35,7 @@ public GrpcGateway(IClusterClient clusterClient, ILogger logger) _clusterClient = clusterClient; _reference = clusterClient.CreateObjectReference(this); _gatewayRegistry = clusterClient.GetGrain(0); - _subscriptions = clusterClient.GetGrain(0); + _subscriptionsGrain = clusterClient.GetGrain(0); } public async ValueTask BroadcastEvent(CloudEvent evt) { @@ -123,6 +122,7 @@ private async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, // {genttype}:rpc_response={request_id} private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request) { + _subscriptionsState = await _subscriptionsGrain.GetSubscriptionsStateAsync().ConfigureAwait(true); var topic = ""; var agentType = ""; if (request.Subscription.TypePrefixSubscription is not null) @@ -135,10 +135,10 @@ private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, Ad topic = request.Subscription.TypeSubscription.TopicType; agentType = request.Subscription.TypeSubscription.AgentType; } - _subscriptionsByAgentType[agentType] = request.Subscription; - _subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); - await _subscriptions.SubscribeAsync(topic, agentType); - //var response = new AddSubscriptionResponse { RequestId = request.RequestId, Error = "", Success = true }; + _subscriptionsState._subscriptionsByAgentType[agentType] = request.Subscription; + _subscriptionsState._subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); + await _subscriptionsGrain.SubscribeAsync(topic, agentType).ConfigureAwait(true); + await _subscriptionsGrain.WriteSubscriptionsStateAsync(_subscriptionsState).ConfigureAwait(true); Message response = new() { AddSubscriptionResponse = new() @@ -169,12 +169,14 @@ private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, } private async ValueTask DispatchEventAsync(CloudEvent evt) { + _subscriptionsState = await _subscriptionsGrain.GetSubscriptionsStateAsync().ConfigureAwait(true); + var _subscriptionsByTopic = _subscriptionsState._subscriptionsByTopic; // get the event type and then send to all agents that are subscribed to that event type var eventType = evt.Type; // ensure that we get agentTypes as an async enumerable list - try to get the value of agentTypes by topic and then cast it to an async enumerable list if (_subscriptionsByTopic.TryGetValue(eventType, out var agentTypes)) { - await DispatchEventToAgentsAsync(agentTypes, evt); + await DispatchEventToAgentsAsync(agentTypes, evt).ConfigureAwait(false); } // instead of an exact match, we can also check for a prefix match where key starts with the eventType else if (_subscriptionsByTopic.Keys.Any(key => key.StartsWith(eventType))) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs index f2eb81c4360..fdb9a5c0d87 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcWorkerConnection.cs @@ -7,7 +7,7 @@ namespace Microsoft.AutoGen.Agents; -internal sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection +public sealed class GrpcWorkerConnection : IAsyncDisposable, IConnection { private static long s_nextConnectionId; private readonly Task _readTask; diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs index 9905f6aebac..4ee3d4abcae 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/AgentStateGrain.cs @@ -17,7 +17,7 @@ public async ValueTask WriteStateAsync(AgentState newState, string eTag, if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) { state.State = newState; - await state.WriteStateAsync().ConfigureAwait(false); + await state.WriteStateAsync().ConfigureAwait(true); } else { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs index 41d76a99377..566ba8d84f4 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/ISubscriptionsGrain.cs @@ -2,8 +2,11 @@ // ISubscriptionsGrain.cs using System.Collections.Concurrent; +using Microsoft.AutoGen.Abstractions; namespace Microsoft.AutoGen.Agents; + +[Alias("Microsoft.AutoGen.Agents.ISubscriptionsGrain")] public interface ISubscriptionsGrain : IGrainWithIntegerKey { [Alias("SubscribeAsync")] @@ -11,5 +14,11 @@ public interface ISubscriptionsGrain : IGrainWithIntegerKey [Alias("UnsubscribeAsync")] ValueTask UnsubscribeAsync(string agentType, string topic); [Alias("GetSubscriptionsAsync")] - ValueTask>> GetSubscriptionsAsync(string? agentType = null); + ValueTask>> GetSubscriptionsByAgentTypeAsync(string? agentType = null); + [Alias ("GetSubscriptionsByTopicAsync")] + ValueTask>> GetSubscriptionsByTopicAsync(string? topic = null); + [Alias("GetSubscriptionsByAgentTypeAsync")] + ValueTask GetSubscriptionsStateAsync(); + [Alias("WriteSubscriptionsStateAsync")] + ValueTask WriteSubscriptionsStateAsync(SubscriptionsState subscriptionsState); } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs index 08c56437693..77e26b209f1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs @@ -9,17 +9,30 @@ internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore" { private readonly IPersistentState _subscriptionsState = subscriptionsState; - public ValueTask>> GetSubscriptionsAsync(string? agentType = null) + public ValueTask>> GetSubscriptionsByAgentTypeAsync(string? agentType = null) { var _subscriptions = _subscriptionsState.State.Subscriptions; //if agentType is null, return all subscriptions else filter on agentType if (agentType != null) { var filteredSubscriptions = _subscriptions.Where(x => x.Value.Contains(agentType)); + return ValueTask.FromResult>>((ConcurrentDictionary>)filteredSubscriptions); + } + return ValueTask.FromResult>>(_subscriptions); + } + public ValueTask>> GetSubscriptionsByTopicAsync(string? topic = null) + { + var _subscriptions = _subscriptionsState.State.Subscriptions; + //if topic is null, return all subscriptions else filter on topic + if (topic != null) + { + var filteredSubscriptions = _subscriptions.Where(x => x.Key == topic); return new ValueTask>>((ConcurrentDictionary>)filteredSubscriptions); } return new ValueTask>>(_subscriptions); } + public ValueTask GetSubscriptionsStateAsync() => ValueTask.FromResult(_subscriptionsState.State); + public async ValueTask SubscribeAsync(string agentType, string topic) { await WriteSubscriptionsAsync(agentType: agentType, topic: topic, subscribe: true).ConfigureAwait(false); @@ -28,6 +41,12 @@ public async ValueTask UnsubscribeAsync(string agentType, string topic) { await WriteSubscriptionsAsync(agentType: agentType, topic: topic, subscribe: false).ConfigureAwait(false); } + public async ValueTask WriteSubscriptionsStateAsync(SubscriptionsState subscriptionsState) + { + _subscriptionsState.State = subscriptionsState; + await _subscriptionsState.WriteStateAsync().ConfigureAwait(true); + } + private async ValueTask WriteSubscriptionsAsync(string agentType, string topic, bool subscribe=true) { var _subscriptions = _subscriptionsState.State.Subscriptions; diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs similarity index 92% rename from dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs rename to dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs index 2488c17d4e1..ec3478c58f7 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/SubscriptionsState.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs @@ -1,9 +1,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // SubscriptionsState.cs - using System.Collections.Concurrent; namespace Microsoft.AutoGen.Abstractions; +[GenerateSerializer] +[Serializable] public sealed class SubscriptionsState { public ConcurrentDictionary _subscriptionsByAgentType = new(); diff --git a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs index 218579833aa..87a9bd32212 100644 --- a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs @@ -2,6 +2,7 @@ // ISubscriptionsGrainTests.cs using System.Collections.Concurrent; +using Microsoft.AutoGen.Abstractions; using Moq; using Xunit; @@ -16,6 +17,29 @@ public ISubscriptionsGrainTests() _mockSubscriptionsGrain = new Mock(); } + [Fact] + public async Task GetSubscriptionsStateAsync_ReturnsCorrectState() + { + // Arrange + var subscriptionsState = new SubscriptionsState + { + Subscriptions = new ConcurrentDictionary> + { + ["topic1"] = ["agentType1"], + ["topic2"] = ["agentType2"] + } + }; + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsStateAsync()).ReturnsAsync(subscriptionsState); + + // Act + var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsStateAsync(); + + // Assert + Assert.Equal(2, result.Subscriptions.Count); + Assert.Contains("topic1", result.Subscriptions.Keys); + Assert.Contains("topic2", result.Subscriptions.Keys); + } + [Fact] public async Task GetSubscriptions_ReturnsAllSubscriptions_WhenAgentTypeIsNull() { @@ -23,10 +47,10 @@ public async Task GetSubscriptions_ReturnsAllSubscriptions_WhenAgentTypeIsNull() var subscriptions = new ConcurrentDictionary>(); subscriptions.TryAdd("topic1", new List { "agentType1" }); subscriptions.TryAdd("topic2", new List { "agentType2" }); - _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsAsync(null)).ReturnsAsync(subscriptions); + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsByAgentTypeAsync(null)).ReturnsAsync(subscriptions); // Act - var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsAsync(); + var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsByAgentTypeAsync(); // Assert Assert.Equal(2, result.Count); @@ -40,10 +64,10 @@ public async Task GetSubscriptions_ReturnsFilteredSubscriptions_WhenAgentTypeIsN // Arrange var subscriptions = new ConcurrentDictionary>(); subscriptions.TryAdd("topic1", new List { "agentType1" }); - _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsAsync("agentType1")).ReturnsAsync(subscriptions); + _mockSubscriptionsGrain.Setup(grain => grain.GetSubscriptionsByAgentTypeAsync("agentType1")).ReturnsAsync(subscriptions); // Act - var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsAsync("agentType1"); + var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsByAgentTypeAsync("agentType1"); // Assert Assert.Single(result); From eb9d5aa00000efe17c1b340fa63bbe763e64b543 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 15:17:25 -0800 Subject: [PATCH 4/8] autocomplete fail --- .../Agents/Services/Orleans/SubscriptionsGrain.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs index 77e26b209f1..5ccdad52776 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs @@ -27,9 +27,9 @@ public ValueTask>> GetSubscriptionsByT if (topic != null) { var filteredSubscriptions = _subscriptions.Where(x => x.Key == topic); - return new ValueTask>>((ConcurrentDictionary>)filteredSubscriptions); + return ValueTask.FromResult>>((ConcurrentDictionary>)filteredSubscriptions); } - return new ValueTask>>(_subscriptions); + return ValueTask.FromResult>>(_subscriptions); } public ValueTask GetSubscriptionsStateAsync() => ValueTask.FromResult(_subscriptionsState.State); From c4a0effe63ae32078466637d9342ce14c6baff4c Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 15:34:18 -0800 Subject: [PATCH 5/8] more cleanup and simplification --- .../Agents/Services/Grpc/GrpcGateway.cs | 3 --- .../Agents/Services/Orleans/SubscriptionsGrain.cs | 11 ++++++----- .../Agents/Services/Orleans/SubscriptionsState.cs | 7 +++---- .../ISubscriptionsGrainTests.cs | 8 ++++---- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index d54ccd7a7e1..cc83107b8f1 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -135,10 +135,7 @@ private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, Ad topic = request.Subscription.TypeSubscription.TopicType; agentType = request.Subscription.TypeSubscription.AgentType; } - _subscriptionsState._subscriptionsByAgentType[agentType] = request.Subscription; - _subscriptionsState._subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); await _subscriptionsGrain.SubscribeAsync(topic, agentType).ConfigureAwait(true); - await _subscriptionsGrain.WriteSubscriptionsStateAsync(_subscriptionsState).ConfigureAwait(true); Message response = new() { AddSubscriptionResponse = new() diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs index 5ccdad52776..ed10ae18e09 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs @@ -11,7 +11,7 @@ internal sealed class SubscriptionsGrain([PersistentState("state", "PubSubStore" public ValueTask>> GetSubscriptionsByAgentTypeAsync(string? agentType = null) { - var _subscriptions = _subscriptionsState.State.Subscriptions; + var _subscriptions = _subscriptionsState.State.SubscriptionsByAgentType; //if agentType is null, return all subscriptions else filter on agentType if (agentType != null) { @@ -22,7 +22,7 @@ public ValueTask>> GetSubscriptionsByA } public ValueTask>> GetSubscriptionsByTopicAsync(string? topic = null) { - var _subscriptions = _subscriptionsState.State.Subscriptions; + var _subscriptions = _subscriptionsState.State.SubscriptionsByTopic; //if topic is null, return all subscriptions else filter on topic if (topic != null) { @@ -49,7 +49,7 @@ public async ValueTask WriteSubscriptionsStateAsync(SubscriptionsState subscript private async ValueTask WriteSubscriptionsAsync(string agentType, string topic, bool subscribe=true) { - var _subscriptions = _subscriptionsState.State.Subscriptions; + var _subscriptions = await GetSubscriptionsByAgentTypeAsync().ConfigureAwait(true); if (!_subscriptions.TryGetValue(topic, out var agentTypes)) { agentTypes = _subscriptions[topic] = []; @@ -65,8 +65,9 @@ private async ValueTask WriteSubscriptionsAsync(string agentType, string topic, agentTypes.Remove(agentType); } } - _subscriptions[topic] = agentTypes; - _subscriptionsState.State.Subscriptions = _subscriptions; + _subscriptionsState.State.SubscriptionsByAgentType = _subscriptions; + var _subsByTopic = await GetSubscriptionsByTopicAsync().ConfigureAwait(true); + _subsByTopic.GetOrAdd(topic, _ => []).Add(agentType); await _subscriptionsState.WriteStateAsync().ConfigureAwait(false); } } \ No newline at end of file diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs index ec3478c58f7..eb1bcddc38a 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsState.cs @@ -6,8 +6,7 @@ namespace Microsoft.AutoGen.Abstractions; [GenerateSerializer] [Serializable] public sealed class SubscriptionsState -{ - public ConcurrentDictionary _subscriptionsByAgentType = new(); - public ConcurrentDictionary> _subscriptionsByTopic = new(); - public ConcurrentDictionary> Subscriptions { get; set; } = new(); +{ + public ConcurrentDictionary> SubscriptionsByTopic = new(); + public ConcurrentDictionary> SubscriptionsByAgentType { get; set; } = new(); } \ No newline at end of file diff --git a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs index 87a9bd32212..02dcb75571c 100644 --- a/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/ISubscriptionsGrainTests.cs @@ -23,7 +23,7 @@ public async Task GetSubscriptionsStateAsync_ReturnsCorrectState() // Arrange var subscriptionsState = new SubscriptionsState { - Subscriptions = new ConcurrentDictionary> + SubscriptionsByAgentType = new ConcurrentDictionary> { ["topic1"] = ["agentType1"], ["topic2"] = ["agentType2"] @@ -35,9 +35,9 @@ public async Task GetSubscriptionsStateAsync_ReturnsCorrectState() var result = await _mockSubscriptionsGrain.Object.GetSubscriptionsStateAsync(); // Assert - Assert.Equal(2, result.Subscriptions.Count); - Assert.Contains("topic1", result.Subscriptions.Keys); - Assert.Contains("topic2", result.Subscriptions.Keys); + Assert.Equal(2, result.SubscriptionsByAgentType.Count); + Assert.Contains("topic1", result.SubscriptionsByAgentType.Keys); + Assert.Contains("topic2", result.SubscriptionsByAgentType.Keys); } [Fact] From 5074c8d75bf8846d6150f9c9f56328e598787b80 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 16:11:10 -0800 Subject: [PATCH 6/8] fix dispatch --- .../src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index cc83107b8f1..fe2741cb421 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -166,8 +166,7 @@ private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, } private async ValueTask DispatchEventAsync(CloudEvent evt) { - _subscriptionsState = await _subscriptionsGrain.GetSubscriptionsStateAsync().ConfigureAwait(true); - var _subscriptionsByTopic = _subscriptionsState._subscriptionsByTopic; + var _subscriptionsByTopic = await _subscriptionsGrain.GetSubscriptionsByTopicAsync().ConfigureAwait(true); // get the event type and then send to all agents that are subscribed to that event type var eventType = evt.Type; // ensure that we get agentTypes as an async enumerable list - try to get the value of agentTypes by topic and then cast it to an async enumerable list From 2b74b125f398ca453419d6d42a4b861b95d5ce24 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Wed, 4 Dec 2024 16:19:44 -0800 Subject: [PATCH 7/8] missed write --- .../Agents/Services/Orleans/SubscriptionsGrain.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs index ed10ae18e09..e8a6dc17553 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Orleans/SubscriptionsGrain.cs @@ -68,6 +68,7 @@ private async ValueTask WriteSubscriptionsAsync(string agentType, string topic, _subscriptionsState.State.SubscriptionsByAgentType = _subscriptions; var _subsByTopic = await GetSubscriptionsByTopicAsync().ConfigureAwait(true); _subsByTopic.GetOrAdd(topic, _ => []).Add(agentType); + _subscriptionsState.State.SubscriptionsByTopic = _subsByTopic; await _subscriptionsState.WriteStateAsync().ConfigureAwait(false); } } \ No newline at end of file From 246ac5e4758f2bc59f51b12f9aeb462506c8641b Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Thu, 5 Dec 2024 08:11:17 -0800 Subject: [PATCH 8/8] move read --- .../src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index fe2741cb421..9865f8ea810 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -8,7 +8,6 @@ using Microsoft.Extensions.Logging; namespace Microsoft.AutoGen.Agents; - public sealed class GrpcGateway : BackgroundService, IGateway { private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30); @@ -122,7 +121,6 @@ private async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, // {genttype}:rpc_response={request_id} private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request) { - _subscriptionsState = await _subscriptionsGrain.GetSubscriptionsStateAsync().ConfigureAwait(true); var topic = ""; var agentType = ""; if (request.Subscription.TypePrefixSubscription is not null) @@ -136,6 +134,8 @@ private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, Ad agentType = request.Subscription.TypeSubscription.AgentType; } await _subscriptionsGrain.SubscribeAsync(topic, agentType).ConfigureAwait(true); + _subscriptionsState = await _subscriptionsGrain.GetSubscriptionsStateAsync().ConfigureAwait(true); + Message response = new() { AddSubscriptionResponse = new()