Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swift-kafka-client consumer is very slow (was: Provide bulk messages from KafkaConsumer) #132

Closed
blindspotbounty opened this issue Sep 13, 2023 · 10 comments
Labels
area/performance Improvements to performance.

Comments

@blindspotbounty
Copy link
Collaborator

Currently, messages in KafkaConsumer provide messages one by one.
That is convenient, however it is not efficient for reading big topics, especially on service recovery.

I've made a small experiment by changing consumerMessages enum to accept array instead of single message and pack all messages from single poll to one event.

// RDKafkaClient.swift
    /// Swift wrapper for events from `librdkafka`'s event queue.
    enum KafkaEvent {
        case deliveryReport(results: [KafkaDeliveryReport])
        case consumerMessages(results: [KafkaConsumerMessage])
        case error(result: Error)
...        
    func eventPoll(maxEvents: Int, consumer: Bool = false) -> [KafkaEvent] {
        var events = [KafkaEvent]()
        events.reserveCapacity(maxEvents)
        // ...
        var msgs = [KafkaConsumerMessage]()
        if consumer {
            msgs.reserveCapacity(maxEvents)
        }
        
        for _ in 0..<maxEvents {
            // ...
            switch eventType {
            // ...
            case .fetch:
                do {
                    if let msg = try self.handleFetchEvent(event) {
                        msgs.append(msg)
                        shouldSleep = false
                    }
                } catch {
                    events.append(.error(result: error))
                }
            // ...
            case .none:
                if !msgs.isEmpty {
                    events.append(.consumerMessages(results: msgs))
                }
                // Finished reading events, return early
                return events
            default:
                break // Ignored Event
            }
        }

        if !msgs.isEmpty {
            events.append(.consumerMessages(results: msgs))
        }
        return events
    }

Also changed messages and KafkaConsumerMessages to provide bulks.

Then tested that with simple consumer applications.
For single messages:

var i = 0
var ctr: UInt64 = 0
var tmpCtr: UInt64 = 0

let interval: UInt64 = 1_000_000

var counter = ProcessingRateLocal(interval: interval)
var startDate = Date.now
var bytes: UInt64 = 0

let totalStartDate = Date.now
var totalBytes: UInt64 = 0

for try await record in consumer.messages {
    i = record.offset.offset()
    ctr += 1
    bytes += UInt64(record.msg.value.readableBytes)
    totalBytes += UInt64(record.msg.value.readableBytes)

    tmpCtr += 1
    if tmpCtr >= interval {
        let timeInterval = -startDate.timeIntervalSinceNow
        let rate = Int64(Double(tmpCtr) / timeInterval)
        let rateMb = Double(bytes) / timeInterval / 1024 / 1024

        let timeIntervalTotal = -totalStartDate.timeIntervalSinceNow
        let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 / 1024
        
        print("read up to \(record.offset.offset()) in partition \(record.partition.idx()), ctr: \(ctr), rate: \(rate) (\(Int(rateMb))MB/s), avgRate: (\(Int(avgRateMb))MB/s), timePassed: \(Int(timeIntervalTotal))sec")

        tmpCtr = 0
        bytes = 0
        startDate = .now
    }
}

With results:

read up to 9319749 in partition 5, ctr: 1000000, rate: 115862 (25MB/s), avgRate: (25MB/s), timePassed: 8sec
read up to 9880212 in partition 5, ctr: 2000000, rate: 121114 (23MB/s), avgRate: (24MB/s), timePassed: 16sec
read up to 8904245 in partition 2, ctr: 3000000, rate: 120674 (23MB/s), avgRate: (24MB/s), timePassed: 25sec
read up to 8979310 in partition 2, ctr: 4000000, rate: 122616 (22MB/s), avgRate: (24MB/s), timePassed: 33sec
read up to 9214895 in partition 4, ctr: 5000000, rate: 120713 (23MB/s), avgRate: (23MB/s), timePassed: 41sec
read up to 9222254 in partition 1, ctr: 6000000, rate: 122643 (21MB/s), avgRate: (23MB/s), timePassed: 49sec
read up to 9257242 in partition 2, ctr: 7000000, rate: 123706 (20MB/s), avgRate: (23MB/s), timePassed: 57sec
read up to 9701009 in partition 1, ctr: 8000000, rate: 121007 (20MB/s), avgRate: (22MB/s), timePassed: 66sec
read up to 9731942 in partition 2, ctr: 9000000, rate: 121474 (20MB/s), avgRate: (22MB/s), timePassed: 74sec
read up to 10128593 in partition 0, ctr: 10000000, rate: 121398 (20MB/s), avgRate: (22MB/s), timePassed: 82sec
read up to 10478118 in partition 4, ctr: 11000000, rate: 109100 (18MB/s), avgRate: (21MB/s), timePassed: 91sec
read up to 10446066 in partition 1, ctr: 12000000, rate: 120968 (20MB/s), avgRate: (21MB/s), timePassed: 100sec
read up to 10700779 in partition 2, ctr: 13000000, rate: 120796 (20MB/s), avgRate: (21MB/s), timePassed: 108sec
read up to 10921598 in partition 2, ctr: 14000000, rate: 119480 (20MB/s), avgRate: (21MB/s), timePassed: 116sec
read up to 11358237 in partition 4, ctr: 15000000, rate: 119992 (20MB/s), avgRate: (21MB/s), timePassed: 124sec
read up to 11544553 in partition 4, ctr: 16000000, rate: 119870 (20MB/s), avgRate: (21MB/s), timePassed: 133sec
read up to 11966376 in partition 3, ctr: 17000000, rate: 119556 (20MB/s), avgRate: (21MB/s), timePassed: 141sec
read up to 11400495 in partition 1, ctr: 18000000, rate: 117463 (20MB/s), avgRate: (21MB/s), timePassed: 150sec
read up to 11834628 in partition 1, ctr: 19000000, rate: 119794 (20MB/s), avgRate: (21MB/s), timePassed: 158sec

For bulk:

var i = 0
var ctr: UInt64 = 0
var tmpCtr: UInt64 = 0

let interval: UInt64 = 1_000_000

var counter = ProcessingRateLocal(interval: interval)
var startDate = Date.now
var bytes: UInt64 = 0

let totalStartDate = Date.now
var totalBytes: UInt64 = 0

for try await bulk in consumer.bulkMessages {
    if let offset = bulk.last?.offset.offset() {
        i = offset
    }
    ctr += UInt64(bulk.count)
    bulk.forEach { record in
        bytes += UInt64(record.msg.value.readableBytes)
        totalBytes += UInt64(record.msg.value.readableBytes)
    }
    tmpCtr += UInt64(bulk.count)
    if tmpCtr >= interval {
        let timeInterval = -startDate.timeIntervalSinceNow
        let rate = Int64(Double(tmpCtr) / timeInterval)
        let rateMb = Double(bytes) / timeInterval / 1024 / 1024
        
        let timeIntervalTotal = -totalStartDate.timeIntervalSinceNow
        let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 / 1024
        print("read up to \(bulk.last!.offset.offset()) in partition \(bulk.last!.partition.idx()), ctr: \(ctr), rate: \(rate) (\(Int(rateMb))MB/s), avgRate: (\(Int(avgRateMb))MB/s), timePassed: \(Int(timeIntervalTotal))sec")

        tmpCtr = 0
        bytes = 0
        startDate = .now
    }
}

Results:

read up to 9346205 in partition 5, ctr: 1000098, rate: 378051 (83MB/s), avgRate: (83MB/s), timePassed: 2sec
read up to 9937226 in partition 5, ctr: 2000115, rate: 633834 (123MB/s), avgRate: (98MB/s), timePassed: 4sec
read up to 10265507 in partition 5, ctr: 3000197, rate: 458818 (93MB/s), avgRate: (96MB/s), timePassed: 6sec
read up to 8969720 in partition 2, ctr: 4000283, rate: 750584 (140MB/s), avgRate: (104MB/s), timePassed: 7sec
read up to 9370086 in partition 0, ctr: 5000321, rate: 636718 (118MB/s), avgRate: (106MB/s), timePassed: 9sec
read up to 10288374 in partition 3, ctr: 6000369, rate: 918704 (162MB/s), avgRate: (112MB/s), timePassed: 10sec
read up to 9291663 in partition 1, ctr: 7000412, rate: 1017207 (175MB/s), avgRate: (117MB/s), timePassed: 11sec
read up to 10165390 in partition 0, ctr: 8000416, rate: 1289984 (218MB/s), avgRate: (124MB/s), timePassed: 12sec
read up to 9612901 in partition 2, ctr: 9000486, rate: 1028644 (172MB/s), avgRate: (127MB/s), timePassed: 13sec
read up to 11031623 in partition 5, ctr: 10000548, rate: 1000979 (167MB/s), avgRate: (130MB/s), timePassed: 14sec
read up to 10267946 in partition 1, ctr: 11000590, rate: 647163 (109MB/s), avgRate: (128MB/s), timePassed: 15sec
read up to 10599079 in partition 0, ctr: 12000591, rate: 595727 (100MB/s), avgRate: (125MB/s), timePassed: 17sec
read up to 10614768 in partition 1, ctr: 13000606, rate: 552293 (93MB/s), avgRate: (122MB/s), timePassed: 19sec
read up to 10867621 in partition 0, ctr: 14000681, rate: 538282 (91MB/s), avgRate: (119MB/s), timePassed: 21sec
read up to 11014089 in partition 0, ctr: 15000754, rate: 497392 (84MB/s), avgRate: (116MB/s), timePassed: 23sec
read up to 11211448 in partition 2, ctr: 16000776, rate: 520717 (89MB/s), avgRate: (114MB/s), timePassed: 24sec
read up to 11950906 in partition 5, ctr: 17000866, rate: 483280 (82MB/s), avgRate: (112MB/s), timePassed: 27sec
read up to 11601443 in partition 1, ctr: 18000878, rate: 395840 (67MB/s), avgRate: (108MB/s), timePassed: 29sec
read up to 11959260 in partition 4, ctr: 19000952, rate: 375111 (64MB/s), avgRate: (104MB/s), timePassed: 32sec

The latter shows results that are near 1Gbps network limits.

This is interesting as it is mostly done in the library and very natural with current poll implementation (as in some librdkafka examples) but not provided to end user.
From our perspective, that is especially useful when application require recovery from a huge topic(s) and needs to cache data e.g. in database, so it can receive and use bulk data.

@blindspotbounty blindspotbounty changed the title Provide bulk messages Provide bulk messages from KafkaConsumer Sep 13, 2023
@FranzBusch
Copy link
Contributor

I am curious to where this slow down come from. Is it from how we are polling Kafka or just the fact that every next call on the iterator has to go through the lock. If it is the latter we need to come up with something in the stdlib. Potentially a few @inlinable annotations could also go a long way here.

@blindspotbounty
Copy link
Collaborator Author

Hi @FranzBusch! Thank you for your thoughts.
Sorry for the late response - was on vacation for a while.
Unfortunately, I did not profile code, so can't say for sure immediately.

My assumption is that there are coincidence of 2 factors (I have to profile and check to prove it though):

  1. lock for sequence
  2. losing context on every await in for-loop

However, there is something else here because I think I've tried to make sequence that returns one-by-one messages but inside operates with arrays and it was not as good as providing arrays as is. Though, need to double check that as well.

@blindspotbounty
Copy link
Collaborator Author

I've made two examples in the fork for single and multiple messages transferred throw async sequence. Note, that I could not see the difference in docker because the overall execution was too slow. So, tests were performed with bare metal and with RedPanda (Redpanda just because it was installed).

  1. provide bulks (https://github.com/ordo-one/swift-kafka-client/tree/bulk-variants):
bulk messages
Start consuming
read up to 127730 in partition 2, ctr: 750031, rate: 231962 (10839KB/s), avgRate: (10839KB/s), timePassed: 3sec
read up to 260882 in partition 5, ctr: 1500101, rate: 187978 (8933KB/s), avgRate: (9786KB/s), timePassed: 7sec
read up to 371852 in partition 4, ctr: 2250147, rate: 187244 (8959KB/s), avgRate: (9491KB/s), timePassed: 11sec
read up to 492048 in partition 3, ctr: 3000200, rate: 187757 (8984KB/s), avgRate: (9358KB/s), timePassed: 15sec
read up to 621339 in partition 2, ctr: 3750202, rate: 187910 (8991KB/s), avgRate: (9282KB/s), timePassed: 19sec
read up to 749399 in partition 1, ctr: 4500242, rate: 188128 (9002KB/s), avgRate: (9234KB/s), timePassed: 23sec
read up to 872846 in partition 0, ctr: 5250322, rate: 241929 (11576KB/s), avgRate: (9510KB/s), timePassed: 26sec
read up to 999513 in partition 4, ctr: 6000339, rate: 187922 (8992KB/s), avgRate: (9442KB/s), timePassed: 30sec
read up to 1113565 in partition 3, ctr: 6750400, rate: 187136 (8954KB/s), avgRate: (9385KB/s), timePassed: 34sec
read up to 1253273 in partition 5, ctr: 7500486, rate: 187697 (8981KB/s), avgRate: (9343KB/s), timePassed: 38sec
read up to 1376083 in partition 3, ctr: 8250571, rate: 189206 (9053KB/s), avgRate: (9315KB/s), timePassed: 42sec
read up to 1495120 in partition 2, ctr: 9000604, rate: 187918 (8992KB/s), avgRate: (9288KB/s), timePassed: 46sec
read up to 1636007 in partition 0, ctr: 9750698, rate: 188045 (8998KB/s), avgRate: (9265KB/s), timePassed: 50sec
read up to 1753974 in partition 0, ctr: 10500766, rate: 241701 (11723KB/s), avgRate: (9408KB/s), timePassed: 53sec
read up to 1872605 in partition 0, ctr: 11250822, rate: 186572 (9109KB/s), avgRate: (9387KB/s), timePassed: 57sec
read up to 1996171 in partition 0, ctr: 12000853, rate: 188279 (9193KB/s), avgRate: (9374KB/s), timePassed: 61sec
read up to 2128611 in partition 5, ctr: 12750944, rate: 187247 (9142KB/s), avgRate: (9360KB/s), timePassed: 65sec
read up to 2252451 in partition 5, ctr: 13500975, rate: 187131 (9137KB/s), avgRate: (9347KB/s), timePassed: 69sec
read up to 2383333 in partition 5, ctr: 14250999, rate: 187491 (9154KB/s), avgRate: (9336KB/s), timePassed: 73sec
All read up to ctr: 15000000, avgRate: (9222KB/s), timePassed: 78sec
Finish consuming
  1. Passing arrays and wrapping them as single messages (same branch):
single messages
Start consuming
read up to 127699 in partition 2, ctr: 750000, rate: 232178 (10849KB/s), avgRate: (10849KB/s), timePassed: 3sec
read up to 260781 in partition 5, ctr: 1500000, rate: 188189 (8943KB/s), avgRate: (9797KB/s), timePassed: 7sec
read up to 371705 in partition 4, ctr: 2250000, rate: 187622 (8978KB/s), avgRate: (9505KB/s), timePassed: 11sec
read up to 491848 in partition 3, ctr: 3000000, rate: 187857 (8989KB/s), avgRate: (9369KB/s), timePassed: 15sec
read up to 621137 in partition 2, ctr: 3750000, rate: 187526 (8973KB/s), avgRate: (9287KB/s), timePassed: 19sec
read up to 749157 in partition 1, ctr: 4500000, rate: 187730 (8983KB/s), avgRate: (9234KB/s), timePassed: 23sec
read up to 872524 in partition 0, ctr: 5250000, rate: 242031 (11581KB/s), avgRate: (9511KB/s), timePassed: 26sec
read up to 999174 in partition 4, ctr: 6000000, rate: 187677 (8980KB/s), avgRate: (9441KB/s), timePassed: 30sec
read up to 1130808 in partition 2, ctr: 6750000, rate: 188368 (9013KB/s), avgRate: (9391KB/s), timePassed: 34sec
read up to 1252787 in partition 5, ctr: 7500000, rate: 187679 (8980KB/s), avgRate: (9348KB/s), timePassed: 38sec
read up to 1375512 in partition 3, ctr: 8250000, rate: 188217 (9006KB/s), avgRate: (9316KB/s), timePassed: 42sec
read up to 1494516 in partition 2, ctr: 9000000, rate: 188002 (8996KB/s), avgRate: (9288KB/s), timePassed: 46sec
read up to 1635309 in partition 0, ctr: 9750000, rate: 188564 (9023KB/s), avgRate: (9267KB/s), timePassed: 50sec
read up to 1753208 in partition 0, ctr: 10500000, rate: 241318 (11704KB/s), avgRate: (9409KB/s), timePassed: 53sec
read up to 1871783 in partition 0, ctr: 11250000, rate: 186998 (9130KB/s), avgRate: (9390KB/s), timePassed: 57sec
read up to 1995318 in partition 0, ctr: 12000000, rate: 187522 (9156KB/s), avgRate: (9375KB/s), timePassed: 61sec
read up to 2127667 in partition 5, ctr: 12750000, rate: 187701 (9165KB/s), avgRate: (9362KB/s), timePassed: 65sec
read up to 2251476 in partition 5, ctr: 13500000, rate: 187220 (9141KB/s), avgRate: (9349KB/s), timePassed: 69sec
read up to 2382334 in partition 5, ctr: 14250000, rate: 187237 (9142KB/s), avgRate: (9338KB/s), timePassed: 73sec
read up to 2501196 in partition 1, ctr: 15000000, rate: 187098 (9135KB/s), avgRate: (9327KB/s), timePassed: 77sec
All read up to ctr: 15000000, avgRate: (9327KB/s), timePassed: 77sec
  1. Original implementation, i.e. single message transferred through async sequence (https://github.com/ordo-one/swift-kafka-client/tree/test-no-bulk):
single messages
Start consuming
read up to 132363 in partition 1, ctr: 750000, rate: 42409 (1981KB/s), avgRate: (1981KB/s), timePassed: 17sec
read up to 248823 in partition 0, ctr: 1500000, rate: 42277 (2009KB/s), avgRate: (1995KB/s), timePassed: 35sec
read up to 377336 in partition 4, ctr: 2250000, rate: 43744 (2093KB/s), avgRate: (2027KB/s), timePassed: 52sec
read up to 500301 in partition 3, ctr: 3000000, rate: 42628 (2039KB/s), avgRate: (2030KB/s), timePassed: 70sec
read up to 624243 in partition 2, ctr: 3750000, rate: 42295 (2023KB/s), avgRate: (2029KB/s), timePassed: 87sec
read up to 748529 in partition 1, ctr: 4500000, rate: 42859 (2050KB/s), avgRate: (2032KB/s), timePassed: 105sec
read up to 877719 in partition 0, ctr: 5250000, rate: 42891 (2052KB/s), avgRate: (2035KB/s), timePassed: 122sec
read up to 1000660 in partition 5, ctr: 6000000, rate: 42502 (2033KB/s), avgRate: (2035KB/s), timePassed: 140sec
read up to 1123272 in partition 3, ctr: 6750000, rate: 43075 (2061KB/s), avgRate: (2038KB/s), timePassed: 157sec
read up to 1245166 in partition 2, ctr: 7500000, rate: 43042 (2059KB/s), avgRate: (2040KB/s), timePassed: 175sec
read up to 1377618 in partition 1, ctr: 8250000, rate: 43203 (2067KB/s), avgRate: (2042KB/s), timePassed: 192sec
read up to 1502962 in partition 0, ctr: 9000000, rate: 43525 (2082KB/s), avgRate: (2046KB/s), timePassed: 209sec
read up to 1625774 in partition 5, ctr: 9750000, rate: 43286 (2071KB/s), avgRate: (2047KB/s), timePassed: 227sec
read up to 1749433 in partition 4, ctr: 10500000, rate: 43500 (2109KB/s), avgRate: (2052KB/s), timePassed: 244sec
read up to 1873427 in partition 4, ctr: 11250000, rate: 43004 (2099KB/s), avgRate: (2055KB/s), timePassed: 261sec
read up to 1995651 in partition 4, ctr: 12000000, rate: 42952 (2097KB/s), avgRate: (2058KB/s), timePassed: 279sec
read up to 2118730 in partition 4, ctr: 12750000, rate: 43041 (2101KB/s), avgRate: (2060KB/s), timePassed: 296sec
read up to 2246627 in partition 4, ctr: 13500000, rate: 42915 (2095KB/s), avgRate: (2062KB/s), timePassed: 314sec
read up to 2373631 in partition 5, ctr: 14250000, rate: 41108 (2007KB/s), avgRate: (2059KB/s), timePassed: 332sec
read up to 2502202 in partition 4, ctr: 15000000, rate: 41532 (2027KB/s), avgRate: (2057KB/s), timePassed: 350sec
All read up to ctr: 15000000, avgRate: (2057KB/s), timePassed: 350sec
Finish consuming

The slight difference between 1 and 2 I guess related to rate calculation, while it seems that for the 3rd async sequence overhead make the most impact.

Even though we know that the performance problem somewhere in AsyncSequence that might require use better synchronisation primitives or any other improvements in stdlib, it still would be nice to have bulk updates.
Our use case is to write those updates directly to our cache database which would be preferable to have the in bulks.
Since eventPoll accumulates an array of events, it can aggregate all consumer messages to array. Additionally, if end user needs per message iterator, that iterator could be wrapped easily (as shown in 1st branch).

@FranzBusch could you suggest if you have something in mind on that, please?

@mr-swifter
Copy link
Collaborator

Hi @felixschlegel , @FranzBusch! I think I need to do a step back and elaborate a bit why @blindspotbounty suggests to implement reading in batches.

We did some measurements internally with small app which just reads data from Kafka and does nothing. Just to understand maximum reading throughput we can get from Kafka. At the moment swift client is slower (@blindspotbounty will provide exact numbers).

@blindspotbounty , maybe it's even better to add benchmark test to the swift-kafka-client library that @felixschlegel and @FranzBusch will have reproducible test at hands.

So we are thinking what kind of improvements we can suggest to the swift client to get parity with native librdkafka reading throughput.

As one possible solution, as @FranzBusch mention, we can optimise AsyncStream to make it much more efficient for reading / writing, another solutions is, much more extreme and maybe not desired, the swift client can expose native poll interface and do not expose swift concurrency friendly interface with streams. Yet another possible approach is to provide data to consumer in batches, which seems give per message reading performance quite close to librdkafka reading performance.

So, to summarise, I would rename the case to Optimise reading performance to get as close as possible to librdkafka performance. I hope we all agree it would be very desired! As next step it would be great to get your input about how you envision this optimisation. Based on that we can do some work in our fork which later on we can shared with you and do PR(s).

@FranzBusch
Copy link
Contributor

I do understand the necessity to improve performance and it is something we haven't focused on yet. I agree that the first step would be to setup a benchmarking target similar to what we have done in swift-nio and swift-certificates. This will give us a good baseline to see where we are starting from.

On the different approaches, I think the poll based on is really a last resort and I am quite sure we can do better than that. I do think that the batch consumption is a very pressing topic for AsyncSequences in general and we do want to solve this holistically and I would love to dodge having special code in kafka just to cater for it.

Though first things first, let's add a benchmark and then actually see the instrument traces where we are spending our time and allocations.

@blindspotbounty
Copy link
Collaborator Author

@mr-swifter thank you for your input!

The bulks is not the target but rather one of possible solution to speedup current implementation to be able to make it comparable with librdkafka.

Sorry that I didn't provide it before... But to show the difference, I've made two tests in one executable (available at https://github.com/ordo-one/swift-kafka-client/tree/pure-librdkafka-poll). The branch is based on swift-kafka-client/main.

Test creates unique topic (similar to tests) and produce 15_000_000 messages. Then read them one of the following ways:

  1. Runs current swift-kafka-client implementation
  2. Uses pure librdkafka interface through swift-kaka.

To reproduce locally, they can be run with the following command:

swift run -c release

Results for (1):

All read up to ctr: 15000000, avgRate: (1979KB/s), timePassed: 364sec

Results for (2):

All read up to ctr: 15000000, avgRate: (133266KB/s), timePassed: 5sec

I've also added docker-compose-consumer-performance.yaml in branch. Unfortunately, in docker it is too slow to generate 15_000_000, so I've set number of messages to 3_000_000 (can be tweaked with MESSAGES_NUMBER environment variable).
Docker command would be similar to tests:

docker-compose -f docker/docker-compose-consumer-performance.yaml run test

Results for (1) in docker:

All read up to ctr: 3000000, avgRate: (2442KB/s), timePassed: 58sec

Results for (2) in docker:

All read up to ctr: 3000000, avgRate: (159903KB/s), timePassed: 0sec

Finally, the difference between running with native librdkafka and swift-kafka-client interface is about 65x.

I believe there are two places where swift-kafka-client spend a lot of time:

  1. Async sequence yield/next are slow
  2. Construction of KafkaConsumerMessage

The above branch is main with slight modification exclusively for this test. Therefore, it is possible to test/profile swift-kafka-client code directly using that test executable.

@FranzBusch, @felixschlegel maybe that code will help in benchmarking and probably comparison with librdkafka as baseline.

@blindspotbounty blindspotbounty changed the title Provide bulk messages from KafkaConsumer swift-kafka-client consumer is very slow (was: Provide bulk messages from KafkaConsumer) Oct 11, 2023
@hassila
Copy link

hassila commented Oct 11, 2023

@blindspotbounty we should set up an embedded benchmark target as suggested - please coordinate with @mr-swifter as I’ll be away a couple of days.

@blindspotbounty
Copy link
Collaborator Author

@hassila, @mr-swifter added a draft PR with both tests. Hope that will be helpful :)

@FranzBusch
Copy link
Contributor

@blindspotbounty are we happy with the performance now?

@FranzBusch FranzBusch added the area/performance Improvements to performance. label Oct 31, 2024
@blindspotbounty
Copy link
Collaborator Author

@FranzBusch yeah, after moving to direct access of librdkafka it is not a problem anymore: #158

So, this case we should consider closed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/performance Improvements to performance.
Projects
None yet
Development

No branches or pull requests

4 participants