Skip to content

Commit

Permalink
[feat][broker] Prevent auto-creation of topics using legacy cluster-b…
Browse files Browse the repository at this point in the history
…ased naming scheme (#23620)

Co-authored-by: zjxxzjwang <[email protected]>
  • Loading branch information
zjxxzjwang and zjxxzjwang authored Jan 2, 2025
1 parent 7619e2f commit b02d52c
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 0 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster',
# the topic cannot be automatically created.
allowAutoTopicCreationWithLegacyNamingScheme=true

# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
allowAutoSubscriptionCreation=true

Expand Down
4 changes: 4 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,10 @@ allowAutoTopicCreation=true
# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
allowAutoTopicCreationType=non-partitioned

# If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster',
# the topic cannot be automatically created.
allowAutoTopicCreationWithLegacyNamingScheme=true

# Enable subscription auto creation if new consumer connected (disable auto creation with value false)
allowAutoSubscriptionCreation=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
)
private TopicType allowAutoTopicCreationType = TopicType.NON_PARTITIONED;
@FieldContext(category = CATEGORY_SERVER, dynamic = true,
doc = "If 'allowAutoTopicCreation' is true and the name of the topic contains 'cluster',"
+ "the topic cannot be automatically created."
)
private boolean allowAutoTopicCreationWithLegacyNamingScheme = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3536,6 +3536,13 @@ private CompletableFuture<Boolean> isAllowAutoTopicCreationAsync(final TopicName
return CompletableFuture.completedFuture(true);
}

//If 'allowAutoTopicCreation' is true, and the name of the topic contains 'cluster',
//the topic cannot be automatically created.
if (!pulsar.getConfiguration().isAllowAutoTopicCreationWithLegacyNamingScheme()
&& StringUtils.isNotBlank(topicName.getCluster())) {
return CompletableFuture.completedFuture(false);
}

final boolean allowed;
AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,23 @@ public void testExtensibleLoadManagerImplInternalTopicAutoCreations()

}

@Test
public void testAutoPartitionedTopicNameWithClusterName() throws Exception {
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
pulsar.getConfiguration().setDefaultNumPartitions(3);

final String topicString = "persistent://prop/ns-abc/testTopic/1";
// When allowAutoTopicCreationWithLegacyNamingScheme as the default value is false,
// four-paragraph topic cannot be created.
pulsar.getConfiguration().setAllowAutoTopicCreationWithLegacyNamingScheme(false);
Assert.assertThrows(PulsarClientException.NotFoundException.class,
() -> pulsarClient.newProducer().topic(topicString).create());

pulsar.getConfiguration().setAllowAutoTopicCreationWithLegacyNamingScheme(true);
Producer producer = pulsarClient.newProducer().topic(topicString).create();
Assert.assertEquals(producer.getTopic(), topicString);
producer.close();
}

}

0 comments on commit b02d52c

Please sign in to comment.