Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crash when sending message with headers #150

Open
blindspotbounty opened this issue Nov 29, 2023 · 1 comment
Open

Crash when sending message with headers #150

blindspotbounty opened this issue Nov 29, 2023 · 1 comment
Labels
kind/bug Feature doesn't work as expected.

Comments

@blindspotbounty
Copy link
Collaborator

For some situations, headers could be freed by librdkafka thus leading to crash:

Thread 5 Queue : com.apple.root.user-initiated-qos.cooperative (concurrent)
#0	0x0000000189d4d11c in __pthread_kill ()
#1	0x0000000189d84cc0 in pthread_kill ()
#2	0x0000000189c94a40 in abort ()
#3	0x0000000189babb08 in malloc_vreport ()
#4	0x0000000189baf3f4 in malloc_report ()
#5	0x0000000189bc3ebc in find_zone_and_free ()
#6	0x0000000106c1a644 in rd_free at <path>/Sources/Crdkafka/librdkafka/src/rd.h:151
#7	0x0000000106c1a570 in rd_list_destroy_elems at <path>/Sources/Crdkafka/librdkafka/src/rdlist.c:284
#8	0x0000000106c1a5dc in rd_list_destroy at <path>/Sources/Crdkafka/librdkafka/src/rdlist.c:298
#9	0x0000000106af0fe4 in rd_kafka_headers_destroy at <path>/Sources/Crdkafka/librdkafka/src/rdkafka_header.c:37
#10	0x0000000106b60008 in rd_kafka_produceva at <path>/Sources/Crdkafka/librdkafka/src/rdkafka_msg.c:520
#11	0x0000000106d0ee88 in RDKafkaClient._produceVariadic(topicHandle:partition:messageFlags:key:value:opaque:cHeaders:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:231
#12	0x0000000106d0df1c in closure #1 in closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:150
#13	0x0000000106d1667c in partial apply for closure #1 in closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) ()
#14	0x0000000106d0fbec in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:280
#15	0x0000000106d0feb4 in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:301
#16	0x0000000106d1683c in partial apply for closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) ()
#17	0x0000000199692518 in String.withCString<τ_0_0>(_:) ()
#18	0x0000000106d0fb24 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:287
#19	0x0000000106d10220 in closure #1 in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:292
#20	0x0000000106d1688c in partial apply for closure #1 in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) ()
#21	0x00000001064fd70c in ByteBuffer.withUnsafeReadableBytes<τ_0_0>(_:) at /Users/pav/Library/Developer/Xcode/DerivedData/swift-kafka-ordo-bmzbbfqgoulzpcazwapzymnuanmm/SourcePackages/checkouts/swift-nio/Sources/NIOCore/ByteBuffer-core.swift:704
#22	0x0000000106d1001c in closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:289
#23	0x0000000106d1683c in partial apply for closure #1 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) ()
#24	0x0000000199692518 in String.withCString<τ_0_0>(_:) ()
#25	0x0000000106d0fb24 in static RDKafkaClient._withKafkaCHeadersRecursive<τ_0_0>(kafkaHeaders:cHeaders:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:287
#26	0x0000000106d0f06c in static RDKafkaClient.withKafkaCHeaders<τ_0_0>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:267
#27	0x0000000106d0dc68 in closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:148
#28	0x0000000106d161a4 in partial apply for closure #1 in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) ()
#29	0x0000000106d0f848 in closure #1 in closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:249
#30	0x0000000106d16588 in partial apply for closure #1 in closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) ()
#31	0x0000000106ce4d10 in closure #1 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:22
#32	0x0000000106ce4d80 in partial apply for closure #1 in String.withUnsafeBytes<τ_0_0>(_:) ()
#33	0x000000019980ad80 in String.UTF8View.withContiguousStorageIfAvailable<τ_0_0>(_:) ()
#34	0x0000000106ce49b8 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:19
#35	0x0000000106ce4e20 in protocol witness for KafkaContiguousBytes.withUnsafeBytes<τ_0_0>(_:) in conformance String ()
#36	0x0000000106d0f6dc in closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:248
#37	0x0000000106d1634c in partial apply for closure #1 in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) ()
#38	0x0000000106ce4d10 in closure #1 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:22
#39	0x0000000106ce4d80 in partial apply for closure #1 in String.withUnsafeBytes<τ_0_0>(_:) ()
#40	0x000000019980ad80 in String.UTF8View.withContiguousStorageIfAvailable<τ_0_0>(_:) ()
#41	0x0000000106ce49b8 in String.withUnsafeBytes<τ_0_0>(_:) at <path>/Sources/Kafka/Data/String+KafkaContiguousBytes.swift:19
#42	0x0000000106ce4e20 in protocol witness for KafkaContiguousBytes.withUnsafeBytes<τ_0_0>(_:) in conformance String ()
#43	0x0000000106d0f30c in static RDKafkaClient.withMessageKeyAndValueBuffer<τ_0_0, τ_0_1, τ_0_2>(for:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:246
#44	0x0000000106d0d4dc in closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:133
#45	0x0000000106d14678 in partial apply for closure #1 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) ()
#46	0x0000000106d1a330 in RDKafkaTopicHandles.withTopicHandlePointer<τ_0_0>(topic:topicConfiguration:_:) at <path>/Sources/Kafka/RDKafka/RDKafkaTopicHandles.swift:51
#47	0x0000000106d0d120 in RDKafkaClient.produce<τ_0_0, τ_0_1>(message:newMessageID:topicConfiguration:topicHandles:) at <path>/Sources/Kafka/RDKafka/RDKafkaClient.swift:129
#48	0x0000000106d04f70 in KafkaProducer.send<τ_0_0, τ_0_1>(_:) at <path>/Sources/Kafka/KafkaProducer.swift:257
#49	0x00000001061360b8 in closure #2 in closure #1 in KafkaTests.testProduceAndConsumeWithMessageHeaders() at <path>/Tests/IntegrationTests/KafkaTests.swift:375
#50	0x000000010614b014 in partial apply for closure #2 in closure #1 in KafkaTests.testProduceAndConsumeWithMessageHeaders() ()

Crash is around:

rdkafka_msg.c:521:17
   518 	                rd_kafka_topic_destroy0(rkt);
   519 	
   520 	        if (hdrs)
-> 521 	                rd_kafka_headers_destroy(hdrs);
   522 	
   523 	        rd_assert(error != NULL);
   524 	        return error;

I used slightly modified test for headers:

diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift
index e6cf82e..4b92338 100644
--- a/Tests/IntegrationTests/KafkaTests.swift
+++ b/Tests/IntegrationTests/KafkaTests.swift
@@ -332,7 +332,7 @@ final class KafkaTests: XCTestCase {
 
     func testProduceAndConsumeWithMessageHeaders() async throws {
         let testMessages = Self.createTestMessages(
-            topic: self.uniqueTestTopic,
+            topic: "unknowntopic",
             headers: [
                 KafkaHeader(key: "some.header", value: ByteBuffer(string: "some-header-value")),
                 KafkaHeader(key: "some.null.header", value: nil),
@@ -340,7 +340,7 @@ final class KafkaTests: XCTestCase {
             count: 10
         )
 
-        let (producer, events) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
+        let producer = try KafkaProducer(configuration: self.producerConfig, logger: .kafkaTest)
 
         var consumerConfig = KafkaConsumerConfiguration(
             consumptionStrategy: .group(id: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]),
@@ -366,11 +366,12 @@ final class KafkaTests: XCTestCase {
 
             // Producer Task
             group.addTask {
-                try await Self.sendAndAcknowledgeMessages(
-                    producer: producer,
-                    events: events,
-                    messages: testMessages
-                )
+                let sleepInterval: Duration = .seconds(120)
+
+                for message in testMessages {
+                    try await Task.sleep(for: sleepInterval)
+                    try producer.send(message)
+                }
             }
 
             // Consumer Task

Expected behaviour: exception thrown instead of crash

@blindspotbounty
Copy link
Collaborator Author

Prepared an issue with asan stacks: confluentinc/librdkafka#4627

@FranzBusch FranzBusch added the kind/bug Feature doesn't work as expected. label Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Feature doesn't work as expected.
Projects
None yet
Development

No branches or pull requests

2 participants