Kafka
A distributed event streaming platform.
- Apache Kafka: back-end application, the Kafka Cluster
- A Kafka Cluster is a collection of one or more Kafka Brokers.
- Client Applications: two forms:
- Kafka Connect: A data integration framework, supports Producer and Consumer.
- Kafka Streams: A Java API to read data a Topic, process it, then write the result to another Topic (or another system.)
A stream of data, identified by its name. The sequence of Messages sent to a Topic is a Data Stream.
Split into a pre-determined number of Partitions.
Immutable: Messages written a Topic cannot be changed. (They can be deleted, but Offsets cannot be re-used.)
Data is stored for a limited time (default: 1 week, but configurable.)
Messages written to a Topic are split into Partitions.
Messages are ordered within a Partition. They are not ordered across Partitions.
aka Event, Record
Offset: The ordered ID of Messages in a Topic Partition. i.e. Different Partitions can used the same Offset.
Producers choose the Key for Message, can be null.
Messages with Key=null are sent round-robin to Partitions.
Messages with the same (non-null) Key value are sent to the same Partition, while you have same number of Partitions. (Determined using a hash, which takes the number of Partitions into account.)
Choice of Key: For what entity do you want messages to be ordered for?
- Key (can be null)
- Value (can be null)
- Compression Type (can be none)
- Headers (optional)
- Key (presumably cannot be null?)
- Value
- Partition + Offset
- Timestamp
Producers write/send Messages to Topics.
- acks=0 → Don't wait for acknowledgement (possible data loss)
- acks=1 → Wait for Leader acknowledgement (limited data loss)
- acks=all/-1 → Wait for Leader and all ISRs acknowledgement (no data loss) (Default since Kafka 3.0)
- Works well (in Production, RF>=3) with
min.insync.replicas=2
, i.e. Make sure "all" ISRs is > 1.
- Works well (in Production, RF>=3) with
Default "retries" since Kafka 2.1 is a very high number, but that's bounded by delivery.timeout.ms
(2 minutes by default.)
Prevents duplicates of the same message when the Kafka Ack doesn't reach the Producer (e.g. due to a network error.) Since Kafka 3.0 enable.idempotence
is defaulted to true
in the Producer properties.
compression.type
(the default is none
)
Faster to be sent over the network, and also stored compressed on Brokers resulting in less disk utilization.
linger.ms
→ How long to delay ending a batch to see if more records arrive. Default is 0
(no delay.) Tradeoff higher latency for higher throughput.
batch.size
→ Control the upper bound on the size (in bytes) of batches.
Consumers read Message from Topics.
Consumer Groups are identified by a unique ID.
Consumers in a Group each read from different Partitions. (Though each may read from multiple Partitions. A Consumer in the Group not reading from a Partition is considered Inactive. )
Multiple Consumer Groups can read from the same Topic.
Consumer Group members cooperate to read from all Partitions in a Topic. Kafka balances (and re-balances) which Partitions are read my which members. Rebalance happens when:
- A Consumer joins the Group
- A Consumer leaves the Group
- A new Partition is added to the Topic
There are different rebalance (partition assignment) strategies:
- Eager: All Consumer stop for a period of time, and no guarantee that they resume with any of the same Partition(s) as before.
- RangeAssignor (Kafka Consumer default)
- RoundRobin
- StickyAssignor
- Cooperative (aka Incremental): Reassign a small subset. Consumers that aren't reassigned can continue processing. Iterates, os can have multiple round of changes before done.
- CooperativeStickyAssignor (Kafka Consumer best)
Can prevent/delay rebalancing. By default joining a groups results in a new "member ID", and triggers re-balancing.
But you can instead specify a "group member ID", which rebalances only if it's new. When it leaves rebalancing is delayed for a (configurable) period of time. If the same "group member ID" re-joins before timeout, then it gets the same partitions. If not, then rebalancing occurs.
The Offsets or processed Messages are comitted to the Kafka broker. (Allows re-reading of a Message of the Consumer dies before processing it. Stored in internal Kafka __consumer_offsets Topic.)
A Commit applies to the Consumer Group. i.e. One Consumer Group commits a message means that it won't re-read the message, but of course other Consumer Groups still can.
Java: By default, automatic (anable.auto.commit=true
) At Least Once. After auto.commit.interval.ms
has elapsed, calling the next poll()
triggers auto-commit of the previous batch. (So don't call poll()
again until the previous batch of messages have been processed.)
Alternatively: anable.auto.commit=false
, and manually commit the offsets. i.e. If the processing of messages is asynchronous.
- Committed after the Message is processed.
- The same Message may be processed multiple times, so processing must be idempotent.
- Java Kafka Connect: Process all messages received by one KafkaConsumer.poll(), before calling poll() again.
- Committed as soon as the Message is received.
- The Message can't be read again, so it's lost if processing fails.
- Achieved only with Kafka→Kafka, and with the Transactional API
- Easy with the Kafka Streams API
Composed of multiple Brokers.
A server, identified with a unique integer ID.
Clients connect to any single Broker then know how to connect to any Broker in the Cluster.
For each Topic Partition, exactly one Broker is the Leader. Other Broker(s) can be an In-Sync Replica (ISR), and can become Leader if the original Leader Broker is lost.
The total number of data replicas, including the Leader.
Replication Factor = 1 (Leader) + # ISRs
Topic Durability: A Cluster with a Replication Factor of N can withstand the loss of N-1 Brokers without data loss.
Production: At least two, usually three, not more than four.
When used without --group
, a temporary Group is created.
To get all available Messages not yet committed by the Group, use --from-beginning
. (When a temporary Group is used, there will of course not have been any commits for it yet.)
https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients - Producer and Consumer
Exit gracefully, otherwise re-balancing will be delayed and re-starting the consumer will encounter delays in getting new messages.
// This causes consumer.poll() to throw a WakeupException (extends KafkaException)
consumer.wakeup();
// i.e. Call it from Runtime.getRuntime().addShutdownHook(...
Solves:
- External "Source" → Kafka
- Kafka → External "Sink"
100s of pre-written "Connectors" to various existing external systems. e.g. PostgreSQL, JDBC, HTTP, HTML SSE API, AWS Redshift, AWS S3, ActiveMQ, etc. See: https://www.confluent.io/hub
Monitor a DB for changes → Events
Solves Kafka→Kafka. i.e. Transformation of data
Official: https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
Quarkus: smallrye-reactive-messaging-kafka extension, and Overview
Perform SQL queries on Kafka data.
Schemas change over time. Kafka Brokers don't verify/parse messages. Producers and Consumers use the Schema Registry to maintain a common data format.
Apache Avro is the data serialization framework. (Others are supported.) Used to define a binary format, and map it.
How it works: Producer send the schema to the Schema Registry, then send avro data to Kafka (smaller, since it doesn't contain the schema. Consumers then retrieve the corresponding schema from the Schema Registry, and then can parse the binary data.
Free under Confluent Community License (has restrictions, not open-source, has Enterprise version): Confluent Schema Registry
Optionally compact multiple Active Segments into fewer Active Segments, by keeping only the latest message for each Key.
Useful when you only care about the last message for each Key. Only affects Consumers that are "behind", e.g. read "from beginning". A "realtime" Consumer will still see all the messages.
https://kafka.apache.org/documentation/streams/developer-guide/testing.html
In server.properties
:
listeners=REMOTE://172.18.152.219:9093,LOCAL://127.0.0.1:9092
advertised.listeners=REMOTE://172.18.152.219:9093,LOCAL://127.0.0.1:9092
listener.security.protocol.map=REMOTE:PLAINTEXT,LOCAL:PLAINTEXT
inter.broker.listener.name=LOCAL
See: https://issues.apache.org/jira/browse/KAFKA-9257 Allow listeners to bind on same port but different interfaces
Don't use it. KRaft is the replacement, and has been production-ready since 3.3.1.
For geographically distributed sites, where Kafka's Cluster replication would be too slow/costly. Just a Producer and a Consumer to copy the data, but does not get the same Offsets. There are a number of open-source implementations, including MirrorMaker 2 which is part of base Kafka.