Skip to content

Commit 3f3c424

Browse files
authored
KIP-70: Auto-commit offsets on consumer.unsubscribe(), defer assignment changes to rejoin (#2560)
1 parent 0720a52 commit 3f3c424

File tree

6 files changed

+131
-30
lines changed

6 files changed

+131
-30
lines changed

kafka/consumer/group.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,15 @@ def assign(self, partitions):
444444
no rebalance operation triggered when group membership or cluster
445445
and topic metadata change.
446446
"""
447-
self._subscription.assign_from_user(partitions)
448-
self._client.set_topics([tp.topic for tp in partitions])
447+
if not partitions:
448+
self.unsubscribe()
449+
else:
450+
# make sure the offsets of topic partitions the consumer is unsubscribing from
451+
# are committed since there will be no following rebalance
452+
self._coordinator.maybe_auto_commit_offsets_now()
453+
self._subscription.assign_from_user(partitions)
454+
self._client.set_topics([tp.topic for tp in partitions])
455+
log.debug("Subscribed to partition(s): %s", partitions)
449456

450457
def assignment(self):
451458
"""Get the TopicPartitions currently assigned to this consumer.
@@ -959,8 +966,11 @@ def subscription(self):
959966

960967
def unsubscribe(self):
961968
"""Unsubscribe from all topics and clear all assigned partitions."""
969+
# make sure the offsets of topic partitions the consumer is unsubscribing from
970+
# are committed since there will be no following rebalance
971+
self._coordinator.maybe_auto_commit_offsets_now()
962972
self._subscription.unsubscribe()
963-
self._coordinator.close()
973+
self._coordinator.maybe_leave_group()
964974
self._client.cluster.need_all_topic_metadata = False
965975
self._client.set_topics([])
966976
log.debug("Unsubscribed all topics or patterns and assigned partitions")

kafka/consumer/subscription_state.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from __future__ import absolute_import
22

33
import abc
4+
try:
5+
from collections import Sequence
6+
except ImportError:
7+
from collections.abc import Sequence
48
import logging
59
import re
610

@@ -114,6 +118,8 @@ def subscribe(self, topics=(), pattern=None, listener=None):
114118
self.subscription = set()
115119
self.subscribed_pattern = re.compile(pattern)
116120
else:
121+
if isinstance(topics, str) or not isinstance(topics, Sequence):
122+
raise TypeError('Topics must be a list (or non-str sequence)')
117123
self.change_subscription(topics)
118124

119125
if listener and not isinstance(listener, ConsumerRebalanceListener):
@@ -151,11 +157,6 @@ def change_subscription(self, topics):
151157
self.subscription = set(topics)
152158
self._group_subscription.update(topics)
153159

154-
# Remove any assigned partitions which are no longer subscribed to
155-
for tp in set(self.assignment.keys()):
156-
if tp.topic not in self.subscription:
157-
del self.assignment[tp]
158-
159160
def group_subscribe(self, topics):
160161
"""Add topics to the current group subscription.
161162

kafka/coordinator/consumer.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -878,8 +878,15 @@ def _maybe_auto_commit_offsets_async(self):
878878
self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000
879879
elif time.time() > self.next_auto_commit_deadline:
880880
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
881-
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
882-
self._commit_offsets_async_on_complete)
881+
self._do_auto_commit_offsets_async()
882+
883+
def maybe_auto_commit_offsets_now(self):
884+
if self.config['enable_auto_commit'] and not self.coordinator_unknown():
885+
self._do_auto_commit_offsets_async()
886+
887+
def _do_auto_commit_offsets_async(self):
888+
self.commit_offsets_async(self._subscription.all_consumed_offsets(),
889+
self._commit_offsets_async_on_complete)
883890

884891

885892
class ConsumerCoordinatorMetrics(object):

test/test_consumer.py

+45-19
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,52 @@
1+
from __future__ import absolute_import
2+
13
import pytest
24

3-
from kafka import KafkaConsumer
4-
from kafka.errors import KafkaConfigurationError
5+
from kafka import KafkaConsumer, TopicPartition
6+
from kafka.errors import KafkaConfigurationError, IllegalStateError
7+
8+
9+
def test_session_timeout_larger_than_request_timeout_raises():
10+
with pytest.raises(KafkaConfigurationError):
11+
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)
12+
13+
14+
def test_fetch_max_wait_larger_than_request_timeout_raises():
15+
with pytest.raises(KafkaConfigurationError):
16+
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000)
17+
18+
19+
def test_request_timeout_larger_than_connections_max_idle_ms_raises():
20+
with pytest.raises(KafkaConfigurationError):
21+
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)
22+
523

24+
def test_subscription_copy():
25+
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
26+
sub = consumer.subscription()
27+
assert sub is not consumer.subscription()
28+
assert sub == set(['foo'])
29+
sub.add('fizz')
30+
assert consumer.subscription() == set(['foo'])
631

7-
class TestKafkaConsumer:
8-
def test_session_timeout_larger_than_request_timeout_raises(self):
9-
with pytest.raises(KafkaConfigurationError):
10-
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)
1132

12-
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
13-
with pytest.raises(KafkaConfigurationError):
14-
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000)
33+
def test_assign():
34+
# Consumer w/ subscription to topic 'foo'
35+
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
36+
assert consumer.assignment() == set()
37+
# Cannot assign manually
38+
with pytest.raises(IllegalStateError):
39+
consumer.assign([TopicPartition('foo', 0)])
1540

16-
def test_request_timeout_larger_than_connections_max_idle_ms_raises(self):
17-
with pytest.raises(KafkaConfigurationError):
18-
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)
41+
assert 'foo' in consumer._client._topics
1942

20-
def test_subscription_copy(self):
21-
consumer = KafkaConsumer('foo', api_version=(0, 10, 0))
22-
sub = consumer.subscription()
23-
assert sub is not consumer.subscription()
24-
assert sub == set(['foo'])
25-
sub.add('fizz')
26-
assert consumer.subscription() == set(['foo'])
43+
consumer = KafkaConsumer(api_version=(0, 10, 0))
44+
assert consumer.assignment() == set()
45+
consumer.assign([TopicPartition('foo', 0)])
46+
assert consumer.assignment() == set([TopicPartition('foo', 0)])
47+
assert 'foo' in consumer._client._topics
48+
# Cannot subscribe
49+
with pytest.raises(IllegalStateError):
50+
consumer.subscribe(topics=['foo'])
51+
consumer.assign([])
52+
assert consumer.assignment() == set()

test/test_fetcher.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def test_update_fetch_positions(fetcher, topic, mocker):
148148

149149
def test__reset_offset(fetcher, mocker):
150150
tp = TopicPartition("topic", 0)
151-
fetcher._subscriptions.subscribe(topics="topic")
151+
fetcher._subscriptions.subscribe(topics=["topic"])
152152
fetcher._subscriptions.assign_from_subscribed([tp])
153153
fetcher._subscriptions.need_offset_reset(tp)
154154
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')

test/test_subscription_state.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from __future__ import absolute_import
2+
3+
import pytest
4+
5+
from kafka import TopicPartition
6+
from kafka.consumer.subscription_state import SubscriptionState, TopicPartitionState
7+
from kafka.vendor import six
8+
9+
10+
def test_type_error():
11+
s = SubscriptionState()
12+
with pytest.raises(TypeError):
13+
s.subscribe(topics='foo')
14+
15+
s.subscribe(topics=['foo'])
16+
17+
18+
def test_change_subscription():
19+
s = SubscriptionState()
20+
s.subscribe(topics=['foo'])
21+
assert s.subscription == set(['foo'])
22+
s.change_subscription(['bar'])
23+
assert s.subscription == set(['bar'])
24+
25+
26+
def test_group_subscribe():
27+
s = SubscriptionState()
28+
s.subscribe(topics=['foo'])
29+
assert s.subscription == set(['foo'])
30+
s.group_subscribe(['bar'])
31+
assert s.subscription == set(['foo'])
32+
assert s._group_subscription == set(['foo', 'bar'])
33+
34+
s.reset_group_subscription()
35+
assert s.subscription == set(['foo'])
36+
assert s._group_subscription == set(['foo'])
37+
38+
39+
def test_assign_from_subscribed():
40+
s = SubscriptionState()
41+
s.subscribe(topics=['foo'])
42+
with pytest.raises(ValueError):
43+
s.assign_from_subscribed([TopicPartition('bar', 0)])
44+
45+
s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)])
46+
assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])
47+
assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)])
48+
assert all([not s.has_valid_position for s in six.itervalues(s.assignment)])
49+
50+
51+
def test_change_subscription_after_assignment():
52+
s = SubscriptionState()
53+
s.subscribe(topics=['foo'])
54+
s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)])
55+
# Changing subscription retains existing assignment until next rebalance
56+
s.change_subscription(['bar'])
57+
assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)])

0 commit comments

Comments
 (0)