Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework kafka_franz batching and delivery mechanisms #2745

Open
Jeffail opened this issue Jul 29, 2024 · 2 comments
Open

Rework kafka_franz batching and delivery mechanisms #2745

Jeffail opened this issue Jul 29, 2024 · 2 comments
Labels
bughancement It's kind of a bug, but mostly an enhancement. inputs Any tasks or issues relating specifically to inputs

Comments

@Jeffail
Copy link
Collaborator

Jeffail commented Jul 29, 2024

The configuration of the kafka_franz input losely resembles the current kafka one in that it has a checkpoint_limit field as well as a batching field. The batching mechanism is operated per topic-partition, as is the checkpointer. However, one thing we currently do not have parity with is that the checkpointing mechanism can't guarantee ordered processing downstream.

This limitation is due to how message requests are made and distributed out to the batching and checkpointing mechanisms of the input component. In order to correct it we would need to refactor this mechanism which will likely also result in a change of behaviour.

We therefore should consider instead creating a new component, perhaps named the redpanda input, and then deprecated the kafka_franz input (and the old kafka input), once we're confident with it. We should also consider whether we want to ditch the advance batching mechanisms that we currently support in favour of directly forwarding the maximum message count that we receive from each poll request as a config field.

@Jeffail Jeffail added inputs Any tasks or issues relating specifically to inputs bughancement It's kind of a bug, but mostly an enhancement. labels Jul 29, 2024
@emaxerrno
Copy link
Contributor

@twmb is this about reducing the in-flight to 1

@Jeffail
Copy link
Collaborator Author

Jeffail commented Jul 29, 2024

It's complicated. We can have >1 in flight from a partition but they need to be of one batch, and we cannot have >1 batch in flight from a given partition (but only if we need to preserve ordering).

The kafka_franz input right now shares a single client connection for all topic partitions we're consuming from, but the general poll it makes has partitions removed when they hit their checkpoint limit. This mechanism is optimised for when the checkpoint limit is quite high and you just want high throughput, but this solution can't really support a lockstep system where checkpoints are precise.

When you bring the checkpoint down to just 1 it's actually just "very low", which is why the kafka_franz input doesnt say anywhere that it supports strict ordering.

The regular kafka input uses the sarama client library, which has layers and layers of abstraction away from the polling so that it's trivial to apply checkpointing (and back pressure) per partition. The performance is worse and it's less stable, but the advantage there is that we can easily implement lockstepped checkpointing and guarantee ordering (as long as all the other caveats are addressed elsewhere in the config).

Solution A

We remove all of the complexity of the batching mechanisms such that users can no longer have period or byte size based batching. Instead, there will only be a maximum messages count, which is applied per topic-partition and dictates the maximum batch size. This is closer in line with what the underlying client poll is doing and has some performance advantages.

We then ensure all topic-partition batches are consumed, processed and acked in lockstep. This is a major disadvantage for performance compared to systems that spawn a single consume -> process -> write thread per partition, but when the processing pipeline is >1 threads and the output is >1 max in flight, it's the only way to ensure ordered delivery.

Solution B

We continue as we are currently, with batching and checkpointing per partition on a "best attempt" basis. But, we add a new partitioning field to the pipeline configuration section. This would act similarly to kafka partitions where each message would be routed to a single processing thread based on the key, allowing you to ensure all messages of a given partition are processed serially.

You would also need to ensure that the output is configured with max in flight 1 in order to guarantee ordered delivery. However, the performance would be much better and you can perserve the relationship between processing threads and CPU cores.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bughancement It's kind of a bug, but mostly an enhancement. inputs Any tasks or issues relating specifically to inputs
Projects
None yet
Development

No branches or pull requests

2 participants