Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ensure_valid_topic_name to kafka.util; use in client and producer #2561

Merged
merged 3 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.metadata import MetadataRequest
from kafka.util import Dict, WeakMethod
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from kafka.vendor import socketpair # noqa: F401
Expand Down Expand Up @@ -909,7 +909,13 @@ def add_topic(self, topic):

Returns:
Future: resolves after metadata request/response

Raises:
TypeError: if topic is not a string
ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length
"""
ensure_valid_topic_name(topic)

if topic in self._topics:
return Future().success(set(self._topics))

Expand Down
25 changes: 2 additions & 23 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kafka.errors import IllegalStateError
from kafka.protocol.list_offsets import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata
from kafka.util import ensure_valid_topic_name

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,10 +44,6 @@ class SubscriptionState(object):
" (2) subscribe to topics matching a regex pattern,"
" (3) assign itself specific topic-partitions.")

# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
_MAX_NAME_LENGTH = 249
_TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$')

def __init__(self, offset_reset_strategy='earliest'):
"""Initialize a SubscriptionState instance

Expand Down Expand Up @@ -123,24 +120,6 @@ def subscribe(self, topics=(), pattern=None, listener=None):
raise TypeError('listener must be a ConsumerRebalanceListener')
self.listener = listener

def _ensure_valid_topic_name(self, topic):
""" Ensures that the topic name is valid according to the kafka source. """

# See Kafka Source:
# https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
if topic is None:
raise TypeError('All topics must not be None')
if not isinstance(topic, six.string_types):
raise TypeError('All topics must be strings')
if len(topic) == 0:
raise ValueError('All topics must be non-empty strings')
if topic == '.' or topic == '..':
raise ValueError('Topic name cannot be "." or ".."')
if len(topic) > self._MAX_NAME_LENGTH:
raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic))
if not self._TOPIC_LEGAL_CHARS.match(topic):
raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))

def change_subscription(self, topics):
"""Change the topic subscription.

Expand All @@ -166,7 +145,7 @@ def change_subscription(self, topics):
return

for t in topics:
self._ensure_valid_topic_name(t)
ensure_valid_topic_name(t)

log.info('Updating subscribed topics to: %s', topics)
self.subscription = set(topics)
Expand Down
5 changes: 5 additions & 0 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -593,11 +594,15 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
Raises:
KafkaTimeoutError: if unable to fetch topic metadata, or unable
to obtain memory buffer prior to configured max_block_ms
TypeError: if topic is not a string
ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length
AssertionError: if KafkaProducer is closed, or key and value are both None
"""
assert not self._closed, 'KafkaProducer already closed!'
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
'Null messages require kafka >= 0.8.1')
assert not (value is None and key is None), 'Need at least one: key or value'
ensure_valid_topic_name(topic)
key_bytes = value_bytes = None
try:
assigned_partition = None
Expand Down
24 changes: 24 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import

import binascii
import re
import time
import weakref

Expand Down Expand Up @@ -43,6 +44,29 @@ def inner_timeout_ms(fallback=None):
return inner_timeout_ms


# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
TOPIC_MAX_LENGTH = 249
TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$')

def ensure_valid_topic_name(topic):
""" Ensures that the topic name is valid according to the kafka source. """

# See Kafka Source:
# https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
if topic is None:
raise TypeError('All topics must not be None')
if not isinstance(topic, six.string_types):
raise TypeError('All topics must be strings')
if len(topic) == 0:
raise ValueError('All topics must be non-empty strings')
if topic == '.' or topic == '..':
raise ValueError('Topic name cannot be "." or ".."')
if len(topic) > TOPIC_MAX_LENGTH:
raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(TOPIC_MAX_LENGTH, topic))
if not TOPIC_LEGAL_CHARS.match(topic):
raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))


class WeakMethod(object):
"""
Callable that weakly references a method and the object it is bound to. It
Expand Down
5 changes: 2 additions & 3 deletions test/test_subscription_state.py → test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from kafka.consumer.subscription_state import SubscriptionState
from kafka.util import ensure_valid_topic_name

@pytest.mark.parametrize(('topic_name', 'expectation'), [
(0, pytest.raises(TypeError)),
Expand All @@ -20,6 +20,5 @@
('name+with+plus', pytest.raises(ValueError)),
])
def test_topic_name_validation(topic_name, expectation):
state = SubscriptionState()
with expectation:
state._ensure_valid_topic_name(topic_name)
ensure_valid_topic_name(topic_name)
Loading