Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Producer with Dapr Consumer Sample #223

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
414 changes: 414 additions & 0 deletions pubsub-raw-payload/.gitignore

Large diffs are not rendered by default.

120 changes: 120 additions & 0 deletions pubsub-raw-payload/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Kafka Producer with Dapr Consumer Sample

This sample demonstrates how to integrate a Kafka producer using the Confluent Kafka SDK with a Dapr-powered consumer in .NET applications. The producer publishes messages directly to Kafka, while the consumer uses Dapr's pub/sub building block to receive them. These messages are not wrapped as CloudEvents, whioch is the default Dapr behaviour when publishing/subscribing to messages.

You can find more details about publishing & subscribing messages without CloudEvents [here](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-raw).

## Prerequisites

- [.NET 8 SDK](https://dotnet.microsoft.com/download)
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/)
- [Docker](https://www.docker.com/products/docker-desktop)

## Setup

1. Clone the repository
2. Navigate to the solution folder:

```bash
cd pubsub-raw-payload
```

3. Start Kafka using Docker Compose:

```bash
docker-compose up -d
```

## Running the Applications

1. Start the Dapr Subscriber:

```bash
dapr run --app-id subscriber \
--app-port 5001 \
--dapr-http-port 3501 \
--resources-path ./components \
-- dotnet run --project src/Subscriber/Subscriber.csproj
```

2. In a new terminal, start the Kafka Publisher:

```bash
dotnet run --project src/Publisher/Publisher.csproj
```

## Subscription Configuration

### Programmatic Subscription

The subscriber uses programmatic subscription configured in code:

```csharp
app.MapGet("/dapr/subscribe", () =>
{
var subscriptions = new[]
{
new
{
pubsubname = "pubsub",
topic = "messages",
route = "/messages",
metadata = new Dictionary<string, string>
{
{ "isRawPayload", "true" }
}
}
};
return Results.Ok(subscriptions);
});
```

### Declarative Subscription

Alternatively, create a `subscription.yaml` in your components directory:

```yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: message-subscription
spec:
topic: messages
routes:
default: /messages
pubsubname: pubsub
metadata:
isRawPayload: "true"
```

When using declarative subscriptions:

1. Remove the `/dapr/subscribe` endpoint from your subscriber application
2. Place the `subscription.yaml` file in your components directory
3. The subscription will be automatically loaded when you start your application

## Testing

To publish a message:

```bash
curl -X POST http://localhost:5000/publish
```

The subscriber will display received messages in its console output.

## Stopping the Applications

1. Stop the running applications using Ctrl+C in each terminal
2. Stop Kafka:

```bash
docker-compose down
```

## Important Notes

1. The `isRawPayload` attribute is required for receiving raw JSON messages in .NET applications
2. The publisher uses the Confluent.Kafka client directly to publish messages to Kafka
3. The subscriber uses Dapr's pub/sub building block to consume messages
4. Make sure your Kafka broker is running before starting the applications
16 changes: 16 additions & 0 deletions pubsub-raw-payload/components/pubsub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: localhost:9092
- name: consumerGroup
value: dapr
- name: clientId
value: dapr-pubsub-sample
- name: authRequired
value: false
13 changes: 13 additions & 0 deletions pubsub-raw-payload/components/subscription.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# apiVersion: dapr.io/v2alpha1
# kind: Subscription
# metadata:
# name: message-subscription
# spec:
# topic: messages
# routes:
# default: /messages
# pubsubname: pubsub
# metadata:
# isRawPayload: "true"
# scopes:
# - subscriber
21 changes: 21 additions & 0 deletions pubsub-raw-payload/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.5.1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
43 changes: 43 additions & 0 deletions pubsub-raw-payload/pubsub-raw-payload.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.5.2.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{827E0CD3-B72D-47B6-A68D-7590B98EB39B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Publisher", "src\Publisher\Publisher.csproj", "{B53A3F93-644F-077D-263C-2A9461829575}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared", "src\Shared\Shared.csproj", "{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriber", "src\Subscriber\Subscriber.csproj", "{B93D2770-CD58-5609-5939-2FC86CCE9651}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{B53A3F93-644F-077D-263C-2A9461829575}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B53A3F93-644F-077D-263C-2A9461829575}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B53A3F93-644F-077D-263C-2A9461829575}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B53A3F93-644F-077D-263C-2A9461829575}.Release|Any CPU.Build.0 = Release|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Debug|Any CPU.Build.0 = Debug|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Release|Any CPU.ActiveCfg = Release|Any CPU
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73}.Release|Any CPU.Build.0 = Release|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B93D2770-CD58-5609-5939-2FC86CCE9651}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{B53A3F93-644F-077D-263C-2A9461829575} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{99507BA0-BB0B-3FD9-D8E2-2AE9B409CB73} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
{B93D2770-CD58-5609-5939-2FC86CCE9651} = {827E0CD3-B72D-47B6-A68D-7590B98EB39B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3945FF2B-1CF3-4CB9-835C-A0E05C800F0F}
EndGlobalSection
EndGlobal
59 changes: 59 additions & 0 deletions pubsub-raw-payload/src/Publisher/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using Confluent.Kafka;
using System.Text.Json;
using Shared;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

// Kafka producer config
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "kafka-producer-sample"
};

// Create producer instance
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();

app.MapGet("/", () => "Publisher API");

app.MapPost("/publish", async (HttpContext context) =>
{
var message = new Message(
Guid.NewGuid().ToString(),
$"Hello at {DateTime.UtcNow}",
DateTime.UtcNow
);

try
{
// Serialize the message to JSON
var jsonMessage = JsonSerializer.Serialize(message);

// Create the Kafka message
var kafkaMessage = new Message<string, string>
{
Key = message.Id, // Using the message ID as the key
Value = jsonMessage
};

// Publish to Kafka
var deliveryResult = await producer.ProduceAsync(
"messages", // topic name
kafkaMessage
);

Console.WriteLine($"Delivered message to: {deliveryResult.TopicPartitionOffset}");
return Results.Ok(message);
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
return Results.StatusCode(500);
}
});

app.Run();

// Ensure proper cleanup
AppDomain.CurrentDomain.ProcessExit += (s, e) => producer?.Dispose();
13 changes: 13 additions & 0 deletions pubsub-raw-payload/src/Publisher/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"profiles": {
"Publisher": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
16 changes: 16 additions & 0 deletions pubsub-raw-payload/src/Publisher/Publisher.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.8.0" />
</ItemGroup>
</Project>
9 changes: 9 additions & 0 deletions pubsub-raw-payload/src/Shared/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Shared;

using System.Text.Json.Serialization;

public record Message(
[property: JsonPropertyName("id")] string Id,
[property: JsonPropertyName("content")] string Content,
[property: JsonPropertyName("timestamp")] DateTime Timestamp
);
7 changes: 7 additions & 0 deletions pubsub-raw-payload/src/Shared/Shared.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
</Project>
52 changes: 52 additions & 0 deletions pubsub-raw-payload/src/Subscriber/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Text.Json;
using Shared;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

app.MapGet("/", () => "Subscriber API");

app.MapGet("/dapr/subscribe", () =>
{
var subscriptions = new[]
{
new
{
pubsubname = "pubsub",
topic = "messages",
route = "/messages",
metadata = new Dictionary<string, string>
{
{ "isRawPayload", "true" }
}
}
};
return Results.Ok(subscriptions);
});

app.MapPost("/messages", async (HttpContext context) =>
{
using var reader = new StreamReader(context.Request.Body);
var json = await reader.ReadToEndAsync();
Console.WriteLine($"Raw message received: {json}"); // Debug log

try
{
var message = JsonSerializer.Deserialize<Message>(json);
if (message != null)
{
Console.WriteLine($"Received message: {message.Id}");
Console.WriteLine($"Content: {message.Content}");
Console.WriteLine($"Timestamp: {message.Timestamp}");
}
}
catch (JsonException ex)
{
Console.WriteLine($"Error deserializing message: {ex.Message}");
return Results.BadRequest("Invalid message format");
}

return Results.Ok();
});

app.Run();
Loading