Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying an Event Hub to use as the default EntityPath for namespace connection string #7105

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion playground/AspireEventHub/EventHubs.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
#define EMULATOR

var builder = DistributedApplication.CreateBuilder(args);

// required for the event processor client which will use the connectionName to get the connectionString.
var blob = builder.AddAzureStorage("ehstorage")
#if EMULATOR
.RunAsEmulator()
#endif
.AddBlobs("checkpoints");

var eventHub = builder.AddAzureEventHubs("eventhubns")
#if EMULATOR
.RunAsEmulator()
.WithHub("hub");
#endif
.WithHub("hub")
.WithHub("hub2")
.WithDefaultEntity("hub");

builder.AddProject<Projects.EventHubsConsumer>("consumer")
.WithReference(eventHub).WaitFor(eventHub)
Expand Down
13 changes: 9 additions & 4 deletions playground/AspireEventHub/EventHubsApi/Program.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
//#define AZCLI
#if AZCLI
using Azure.Identity;
#endif
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

var builder = WebApplication.CreateBuilder(args);

builder.AddServiceDefaults();

builder.AddAzureEventHubProducerClient("eventhubns", settings =>
{
settings.EventHubName = "hub";
});
builder.AddAzureEventHubProducerClient("eventhubns"
#if AZCLI
, settings => settings.Credential = new AzureCliCredential()
#endif
);

var app = builder.Build();

Expand Down
32 changes: 20 additions & 12 deletions playground/AspireEventHub/EventHubsConsumer/Program.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//#define AZCLI
#if AZCLI
using Azure.Identity;
#endif
using EventHubsConsumer;

var builder = Host.CreateApplicationBuilder(args);
Expand All @@ -10,25 +14,29 @@

if (useConsumer)
{
builder.AddAzureEventHubConsumerClient("eventhubns",
settings =>
{
settings.EventHubName = "hub";
});

builder.AddAzureEventHubConsumerClient("eventhubns"
#if AZCLI
, settings => settings.Credential = new AzureCliCredential()
#endif
);
builder.Services.AddHostedService<Consumer>();
Console.WriteLine("Starting EventHubConsumerClient...");
}
else
{
// required for checkpointing our position in the event stream
builder.AddAzureBlobClient("checkpoints");
builder.AddAzureBlobClient("checkpoints"
#if AZCLI
, settings => settings.Credential = new AzureCliCredential()
#endif
);

builder.AddAzureEventProcessorClient("eventhubns"
#if AZCLI
, settings => settings.Credential = new AzureCliCredential()
#endif
);

builder.AddAzureEventProcessorClient("eventhubns",
settings =>
{
settings.EventHubName = "hub";
});
builder.Services.AddHostedService<Processor>();
Console.WriteLine("Starting EventProcessorClient...");
}
Expand Down
37 changes: 35 additions & 2 deletions src/Aspire.Hosting.Azure.EventHubs/AzureEventHubsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using AzureProvisioning = Azure.Provisioning.EventHubs;
using Microsoft.Extensions.DependencyInjection;
using System.Text.Json.Nodes;
using Azure.Messaging.EventHubs;

namespace Aspire.Hosting;

Expand Down Expand Up @@ -116,6 +117,32 @@ public static IResourceBuilder<AzureEventHubsResource> WithHub(this IResourceBui
return builder;
}

/// <summary>
/// Specifies that the named EventHub should be used as the EntityPath in the resource's connection string.
/// <remarks>Only one EventHub can be set as the default entity. If more than one is configured as default, an Exception will be raised at runtime.</remarks>
/// </summary>
/// <param name="builder">The Azure Event Hubs resource builder.</param>
/// <param name="name">The name of the Event Hub.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<AzureEventHubsResource> WithDefaultEntity(this IResourceBuilder<AzureEventHubsResource> builder, [ResourceName] string name)
{
// Only one event hub can be the default entity
if (builder.Resource.Hubs.Any(h => h.IsDefaultEntity && h.Name != name))
{
throw new DistributedApplicationException("Only one EventHub can be configured as the default entity.");
}

// We need to ensure that the hub exists before we can set it as the default entity.
if (builder.Resource.Hubs.Any(h => h.Name == name))
{
// WithHub is idempotent with respect to enrolling for creation of the hub, but configuration can be applied.
return WithHub(builder, name, hub => hub.IsDefaultEntity = true);
}

throw new DistributedApplicationException(
$"The specified EventHub does not exist in the Azure Event Hubs resource. Please ensure there is a call to WithHub(\"{name}\") before this call.");
}

/// <summary>
/// Configures an Azure Event Hubs resource to be emulated. This resource requires an <see cref="AzureEventHubsResource"/> to be added to the application model.
/// </summary>
Expand Down Expand Up @@ -200,7 +227,13 @@ public static IResourceBuilder<AzureEventHubsResource> RunAsEmulator(this IResou
// an event hub namespace without an event hub? :)
if (builder.Resource.Hubs is { Count: > 0 } && builder.Resource.Hubs[0] is { } hub)
oising marked this conversation as resolved.
Show resolved Hide resolved
{
var healthCheckConnectionString = $"{connectionString};EntityPath={hub.Name};";
// Endpoint=... format
var props = EventHubsConnectionStringProperties.Parse(connectionString);

var healthCheckConnectionString = string.IsNullOrEmpty(props.EventHubName)
? $"{connectionString};EntityPath={hub.Name};"
: connectionString;

client = new EventHubProducerClient(healthCheckConnectionString);
}
else
Expand Down Expand Up @@ -365,7 +398,7 @@ public static IResourceBuilder<AzureEventHubsEmulatorResource> WithHostPort(this
/// <param name="path">Path to the file on the AppHost where the emulator configuration is located.</param>
/// <returns>A reference to the <see cref="IResourceBuilder{T}"/>.</returns>
public static IResourceBuilder<AzureEventHubsEmulatorResource> WithConfigurationFile(this IResourceBuilder<AzureEventHubsEmulatorResource> builder, string path)
{
{
// Update the existing mount
var configFileMount = builder.Resource.Annotations.OfType<ContainerMountAnnotation>().LastOrDefault(v => v.Target == AzureEventHubsEmulatorResource.EmulatorConfigJsonPath);
if (configFileMount != null)
Expand Down
41 changes: 37 additions & 4 deletions src/Aspire.Hosting.Azure.EventHubs/AzureEventHubsResource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,43 @@ public class AzureEventHubsResource(string name, Action<AzureResourceInfrastruct
/// <summary>
/// Gets the connection string template for the manifest for the Azure Event Hubs endpoint.
/// </summary>
public ReferenceExpression ConnectionStringExpression =>
IsEmulator
? ReferenceExpression.Create($"Endpoint=sb://{EmulatorEndpoint.Property(EndpointProperty.Host)}:{EmulatorEndpoint.Property(EndpointProperty.Port)};SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;")
: ReferenceExpression.Create($"{EventHubsEndpoint}");
public ReferenceExpression ConnectionStringExpression => BuildConnectionString();

private ReferenceExpression BuildConnectionString()
{
var builder = new ReferenceExpressionBuilder();

if (IsEmulator)
{
// ConnectionString: Endpoint=...
builder.Append($"Endpoint=sb://{EmulatorEndpoint.Property(EndpointProperty.Host)}:{EmulatorEndpoint.Property(EndpointProperty.Port)};SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true");
}
else
{
// FQNS: Uri format, e.g. https://...
builder.Append($"{EventHubsEndpoint}");
}

if (!Hubs.Any(hub => hub.IsDefaultEntity))
{
// Of zero or more hubs, none are flagged as default
return builder.Build();
}

// Of one or more hubs, only one may be flagged as default
var defaultEntity = Hubs.Single(hub => hub.IsDefaultEntity);

if (IsEmulator)
{
builder.Append($";EntityPath={defaultEntity.Name}");
}
else
{
builder.Append($"?EntityPath={defaultEntity.Name}");
}

return builder.Build();
}

void IResourceWithAzureFunctionsConfig.ApplyAzureFunctionsConfiguration(IDictionary<string, object> target, string connectionName)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Aspire.Hosting.Azure.EventHubs/EventHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public EventHub(string name)
/// </summary>
public List<EventHubConsumerGroup> ConsumerGroups { get; } = [];

internal bool IsDefaultEntity { get; set; }

/// <summary>
/// Converts the current instance to a provisioning entity.
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/Aspire.Hosting.Azure.EventHubs/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static Aspire.Hosting.AzureEventHubsExtensions.RunAsEmulator(this Aspire.Hosting
static Aspire.Hosting.AzureEventHubsExtensions.WithConfigurationFile(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>! builder, string! path) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>!
static Aspire.Hosting.AzureEventHubsExtensions.WithDataBindMount(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>! builder, string? path = null) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>!
static Aspire.Hosting.AzureEventHubsExtensions.WithDataVolume(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>! builder, string? name = null) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>!
static Aspire.Hosting.AzureEventHubsExtensions.WithDefaultEntity(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsResource!>! builder, string! name) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsResource!>!
static Aspire.Hosting.AzureEventHubsExtensions.WithGatewayPort(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>! builder, int? port) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>!
static Aspire.Hosting.AzureEventHubsExtensions.WithHostPort(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>! builder, int? port) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsEmulatorResource!>!
static Aspire.Hosting.AzureEventHubsExtensions.WithHub(this Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsResource!>! builder, string! name, System.Action<Aspire.Hosting.Azure.EventHubs.EventHub!>? configure = null) -> Aspire.Hosting.ApplicationModel.IResourceBuilder<Aspire.Hosting.Azure.AzureEventHubsResource!>!
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Security.Cryptography;
using Aspire.Azure.Common;
using Aspire.Azure.Messaging.EventHubs;
Expand Down Expand Up @@ -58,7 +59,7 @@ protected static string GetNamespaceFromSettings(AzureMessagingEventHubsSettings
// This is likely to be similar to {yournamespace}.servicebus.windows.net or {yournamespace}.servicebus.chinacloudapi.cn
if (ns.Contains(".servicebus", StringComparison.OrdinalIgnoreCase))
{
ns = ns[..ns.IndexOf(".servicebus")];
ns = ns[..ns.IndexOf(".servicebus", StringComparison.OrdinalIgnoreCase)];
}
else
{
Expand Down Expand Up @@ -89,30 +90,52 @@ protected static void EnsureConnectionStringOrNamespaceProvided(AzureMessagingEv
$"'ConnectionStrings:{connectionName}' or specify a 'ConnectionString' or 'FullyQualifiedNamespace' in the '{configurationSectionName}' configuration section.");
}

// If we have a connection string, ensure there's an EntityPath if settings.EventHubName is missing
// Emulator: If we have a connection string, ensure there's an EntityPath if settings.EventHubName is missing
if (!string.IsNullOrWhiteSpace(settings.ConnectionString))
{
// We have a connection string -- do we have an EventHubName?
if (string.IsNullOrWhiteSpace(settings.EventHubName))
{
// look for EntityPath
{
// look for EntityPath in the Endpoint style connection string
var props = EventHubsConnectionStringProperties.Parse(connectionString);

// if EntityPath is missing, throw
if (string.IsNullOrWhiteSpace(props.EventHubName))
// if EntityPath is found, capture it
if (!string.IsNullOrWhiteSpace(props.EventHubName))
{
throw new InvalidOperationException(
$"A {typeof(TClient).Name} could not be configured. Ensure a valid EventHubName was provided in " +
$"the '{configurationSectionName}' configuration section, or include an EntityPath in the ConnectionString.");
// this is used later to create the checkpoint blob container
settings.EventHubName = props.EventHubName;
}
}
}
// If we have a namespace and no connection string, ensure there's an EventHubName
// Live: If we have a namespace and no connection string, ensure there's an EventHubName (also look for hint in FQNS)
else if (!string.IsNullOrWhiteSpace(settings.FullyQualifiedNamespace) && string.IsNullOrWhiteSpace(settings.EventHubName))
{
if (Uri.TryCreate(settings.FullyQualifiedNamespace, UriKind.Absolute, out var fqns))
{
var query = fqns.Query.AsSpan().TrimStart('?');

var key = "EntityPath=";
int startIndex = query.IndexOf(key);

if (startIndex != -1)
{
var valueSpan = query.Slice(startIndex + key.Length);
int endIndex = valueSpan.IndexOf('&');
var entityPath = endIndex == -1 ? valueSpan :
valueSpan.Slice(0, endIndex);

settings.EventHubName = entityPath.ToString();

Debug.Assert(!string.IsNullOrWhiteSpace(settings.EventHubName));
}
}
}

if (string.IsNullOrWhiteSpace(settings.EventHubName))
{
throw new InvalidOperationException(
$"A {typeof(TClient).Name} could not be configured. Ensure a valid EventHubName was provided in " +
$"the '{configurationSectionName}' configuration section.");
$"the '{configurationSectionName}' configuration section, or assign one in the settings callback for this client.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private static BlobContainerClient GetBlobContainerClient(
}

var containerClient = blobClient.GetBlobContainerClient(settings.BlobContainerName);


if (shouldTryCreateIfNotExists)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Components/Aspire.Azure.Messaging.EventHubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ builder.AddAzureEventHubProducerClient("eventHubsConnectionName",
});
```

NOTE: Earlier versions of Aspire (&lt;9.1) required you to always set the EventHubName here because the Azure Event Hubs Hosting component did not provide a way to specify which Event Hub was to be included in the connection string. Beginning in 9.1, it is now possible to specify which Event Hub is to be used by way of calling `WithDefaultEntity(string)` with the name of a hub you have added via `WithHub(string)`. Only one Event Hub can be the default and attempts to flag multiple will elicit an Exception at runtime.

And then the connection information will be retrieved from the `ConnectionStrings` configuration section. Two connection formats are supported:

#### Fully Qualified Namespace
Expand Down
51 changes: 51 additions & 0 deletions tests/Aspire.Hosting.Azure.Tests/AzureEventHubsExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,57 @@ public async Task VerifyWaitForOnEventHubsEmulatorBlocksDependentResources()
await app.StopAsync();
}

[Fact]
[RequiresDocker]
[ActiveIssue("https://github.com/dotnet/aspire/issues/7093")]
public async Task VerifyEntityPathInConnectionStringForIsDefaultEntity()
{
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);
var eventHub = builder.AddAzureEventHubs("eventhubns")
.RunAsEmulator()
.WithHub("hub")
.WithDefaultEntity("hub");

using var app = builder.Build();
await app.StartAsync();

// since we're running in Docker, this only tests the ConnectionString with the Emulator
// when using the real service, we pass a hint in the FQNS to the client. We can't test that here.
string? connectionString =
await eventHub.Resource.ConnectionStringExpression.GetValueAsync(CancellationToken.None);

// has EntityPath?
Assert.Contains(";EntityPath=hub", connectionString);

// well-formed connection string?
var props = EventHubsConnectionStringProperties.Parse(connectionString);
Assert.NotNull(props);
Assert.Equal("hub", props.EventHubName);
}

[Fact]
[RequiresDocker]
[ActiveIssue("https://github.com/dotnet/aspire/issues/7093")]
public Task VerifyMultipleDefaultEntityThrowsException()
{
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);
var eventHub = builder.AddAzureEventHubs("eventhubns")
.RunAsEmulator()
.WithHub("hub")
.WithHub("hub2")
.WithDefaultEntity("hub");

// should throw for a second hub with default entity
Assert.Throws<DistributedApplicationException>(() => eventHub.WithDefaultEntity("hub2"));

// should not throw for same hub again
eventHub.WithDefaultEntity("hub");

using var app = builder.Build();

return Task.CompletedTask;
}

[Fact]
[RequiresDocker]
[ActiveIssue("https://github.com/dotnet/aspire/issues/6751")]
Expand Down
Loading