Skip to content

Commit 70ca6d7

Browse files
authoredMar 24, 2025··
Add optional timeout_ms kwarg to consumer.close() / fix potential hang in test_group (#2564)
1 parent 3f3c424 commit 70ca6d7

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed
 

‎kafka/consumer/group.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -470,19 +470,21 @@ def assignment(self):
470470
"""
471471
return self._subscription.assigned_partitions()
472472

473-
def close(self, autocommit=True):
473+
def close(self, autocommit=True, timeout_ms=None):
474474
"""Close the consumer, waiting indefinitely for any needed cleanup.
475475
476476
Keyword Arguments:
477477
autocommit (bool): If auto-commit is configured for this consumer,
478478
this optional flag causes the consumer to attempt to commit any
479479
pending consumed offsets prior to close. Default: True
480+
timeout_ms (num, optional): Milliseconds to wait for auto-commit.
481+
Default: None
480482
"""
481483
if self._closed:
482484
return
483485
log.debug("Closing the KafkaConsumer.")
484486
self._closed = True
485-
self._coordinator.close(autocommit=autocommit)
487+
self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms)
486488
self._metrics.close()
487489
self._client.close()
488490
try:

‎test/test_consumer_group.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def test_group(kafka_broker, topic):
4747
consumers = {}
4848
stop = {}
4949
threads = {}
50-
messages = collections.defaultdict(list)
50+
messages = collections.defaultdict(lambda: collections.defaultdict(list))
5151
group_id = 'test-group-' + random_string(6)
5252
def consumer_thread(i):
5353
assert i not in consumers
@@ -60,15 +60,15 @@ def consumer_thread(i):
6060
api_version_auto_timeout_ms=5000,
6161
heartbeat_interval_ms=500)
6262
while not stop[i].is_set():
63-
for tp, records in six.itervalues(consumers[i].poll(timeout_ms=200)):
63+
for tp, records in six.iteritems(consumers[i].poll(timeout_ms=200)):
6464
messages[i][tp].extend(records)
65-
consumers[i].close()
65+
consumers[i].close(timeout_ms=500)
6666
consumers[i] = None
6767
stop[i] = None
6868

6969
num_consumers = 4
7070
for i in range(num_consumers):
71-
t = threading.Thread(target=consumer_thread, args=(i,))
71+
t = threading.Thread(target=consumer_thread, args=(i,), daemon=True)
7272
t.start()
7373
threads[i] = t
7474

@@ -129,7 +129,8 @@ def consumer_thread(i):
129129
for c in range(num_consumers):
130130
logging.info('Stopping consumer %s', c)
131131
stop[c].set()
132-
threads[c].join()
132+
threads[c].join(timeout=5)
133+
assert not threads[c].is_alive()
133134
threads[c] = None
134135

135136

@@ -179,4 +180,4 @@ def test_heartbeat_thread(kafka_broker, topic):
179180
assert consumer._coordinator.heartbeat.last_poll == last_poll
180181
consumer.poll(timeout_ms=100)
181182
assert consumer._coordinator.heartbeat.last_poll > last_poll
182-
consumer.close()
183+
consumer.close(timeout_ms=100)

0 commit comments

Comments
 (0)
Please sign in to comment.