Skip to content

Commit 1c92dfe

Browse files
committed
KAFKA-18641: Add timeout to reconcileAndAutoCommit, offsetsReady future
1 parent 21eaaae commit 1c92dfe

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
755755
applicationEventHandler.add(event);
756756
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
757757
// retrieve the positions to commit before proceeding with fetching new records
758-
ConsumerUtils.getResult(event.reconcileAndAutoCommit());
758+
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
759759

760760
// We must not allow wake-ups between polling for fetches and returning the records.
761761
// If the polled fetches are not empty the consumed position has already been updated in the polling
@@ -853,7 +853,7 @@ private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(final C
853853

854854
// This blocks until the background thread retrieves allConsumed positions to commit if none were explicitly specified.
855855
// This operation will ensure that the offsets to commit are not affected by fetches which may start after this
856-
ConsumerUtils.getResult(commitEvent.offsetsReady());
856+
ConsumerUtils.getResult(commitEvent.offsetsReady(), defaultApiTimeoutMs.toMillis());
857857
return commitEvent.future();
858858
}
859859

0 commit comments

Comments
 (0)