Skip to content

Commit 56d4298

Browse files
authored
Merge pull request #4427 from passuied/feature/add-kafka-pubsub-headers
Added missing information about kafka-pubsub special metadata headers
2 parents 6f8fcb2 + 224c7de commit 56d4298

File tree

1 file changed

+46
-2
lines changed

1 file changed

+46
-2
lines changed

daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md

+46-2
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options:
468468

469469
When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url.
470470

471-
The param name is `partitionKey`.
471+
The param name can either be `partitionKey` or `__key`
472472

473473
Example:
474474

@@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti
484484

485485
### Message headers
486486

487-
All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
487+
All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
488488

489489
```shell
490490
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
@@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
495495
}
496496
}'
497497
```
498+
### Kafka Pubsub special message headers received on consumer side
498499

500+
When consuming messages, special message metadata are being automatically passed as headers. These are:
501+
- `__key`: the message key if available
502+
- `__topic`: the topic for the message
503+
- `__partition`: the partition number for the message
504+
- `__offset`: the offset of the message in the partition
505+
- `__timestamp`: the timestamp for the message
506+
507+
You can access them within the consumer endpoint as follows:
508+
{{< tabs "Python (FastAPI)" >}}
509+
510+
{{% codetab %}}
511+
512+
```python
513+
from fastapi import APIRouter, Body, Response, status
514+
import json
515+
import sys
516+
517+
app = FastAPI()
518+
519+
router = APIRouter()
520+
521+
522+
@router.get('/dapr/subscribe')
523+
def subscribe():
524+
subscriptions = [{'pubsubname': 'pubsub',
525+
'topic': 'my-topic',
526+
'route': 'my_topic_subscriber',
527+
}]
528+
return subscriptions
529+
530+
@router.post('/my_topic_subscriber')
531+
def my_topic_subscriber(
532+
key: Annotated[str, Header(alias="__key")],
533+
offset: Annotated[int, Header(alias="__offset")],
534+
event_data=Body()):
535+
print(f"key={key} - offset={offset} - data={event_data}", flush=True)
536+
return Response(status_code=status.HTTP_200_OK)
537+
538+
app.include_router(router)
539+
540+
```
541+
542+
{{% /codetab %}}
499543
## Receiving message headers with special characters
500544

501545
The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.

0 commit comments

Comments
 (0)