Skip to content

Commit 8465c55

Browse files
authored
Fix reconsume broken while using non-FQDN topics (apache#386)
### Issue Retry policy not effective with non-FQDN topic. - reproduction ```go client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) consumer, _ := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-01", SubscriptionName: "my-sub", RetryEnable: true, DLQ: &pulsar.DLQPolicy{MaxDeliveries: 2}, }) msg, _ := consumer.Receive(context.Background()) consumer.ReconsumeLater(msg, 5*time.Second) ``` - logs ``` RN[0000] consumer of topic [persistent://public/default/topic-01] not exist unexpectedly topic="[topic-01 persistent://public/default/my-sub-RETRY]" ``` ### Cause For MultiTopicConsumer `consumers` map filed: - key: user provided topic, maybe non-FQDN. - value: consumer instance. `ReconsumeLater` using msg's FQDN topic as key to find `consumer` in `consumers`, if mismatch with non-FQDN topic, this invoke will be ignored, lead to Retry policy not effective. ### Modifications - Normalize user provided topics as FQDN topics before initializing consumers. - Add non-FQDN topic consumption case in Retry policy tests. ### Verifying this change - [x] Make sure that the change passes the CI checks.
1 parent 793ea91 commit 8465c55

File tree

3 files changed

+19
-13
lines changed

3 files changed

+19
-13
lines changed

pulsar/consumer_impl.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -160,24 +160,29 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
160160
return nil, err
161161
}
162162

163+
// normalize as FQDN topics
164+
var tns []*internal.TopicName
163165
// single topic consumer
164166
if options.Topic != "" || len(options.Topics) == 1 {
165167
topic := options.Topic
166168
if topic == "" {
167169
topic = options.Topics[0]
168170
}
169171

170-
if err := validateTopicNames(topic); err != nil {
172+
if tns, err = validateTopicNames(topic); err != nil {
171173
return nil, err
172174
}
173-
175+
topic = tns[0].Name
174176
return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
175177
}
176178

177179
if len(options.Topics) > 1 {
178-
if err := validateTopicNames(options.Topics...); err != nil {
180+
if tns, err = validateTopicNames(options.Topics...); err != nil {
179181
return nil, err
180182
}
183+
for i := range options.Topics {
184+
options.Topics[i] = tns[i].Name
185+
}
181186

182187
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
183188
}

pulsar/consumer_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ func TestDLQMultiTopics(t *testing.T) {
11471147
}
11481148

11491149
func TestRLQ(t *testing.T) {
1150-
topic := "persistent://public/default/" + newTopicName()
1150+
topic := newTopicName()
11511151
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
11521152
maxRedeliveries := 2
11531153
N := 100
@@ -1243,7 +1243,7 @@ func TestRLQ(t *testing.T) {
12431243
func TestRLQMultiTopics(t *testing.T) {
12441244
now := time.Now().Unix()
12451245
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
1246-
topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
1246+
topic02 := fmt.Sprintf("topic-%d-2", now)
12471247
topics := []string{topic01, topic02}
12481248

12491249
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
@@ -1270,7 +1270,7 @@ func TestRLQMultiTopics(t *testing.T) {
12701270

12711271
// subscribe DLQ Topic
12721272
dlqConsumer, err := client.Subscribe(ConsumerOptions{
1273-
Topic: "persistent://public/default/" + subName + "-DLQ",
1273+
Topic: subName + "-DLQ",
12741274
SubscriptionName: subName,
12751275
SubscriptionInitialPosition: SubscriptionPositionEarliest,
12761276
})

pulsar/helper.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,16 @@ func (e *unexpectedErrMsg) Error() string {
5353
return msg
5454
}
5555

56-
func validateTopicNames(topics ...string) error {
57-
var errs error
58-
for _, t := range topics {
59-
if _, err := internal.ParseTopicName(t); err != nil {
60-
errs = pkgerrors.Wrapf(err, "invalid topic name: %s", t)
56+
func validateTopicNames(topics ...string) ([]*internal.TopicName, error) {
57+
tns := make([]*internal.TopicName, len(topics))
58+
for i, t := range topics {
59+
tn, err := internal.ParseTopicName(t)
60+
if err != nil {
61+
return nil, pkgerrors.Wrapf(err, "invalid topic name: %s", t)
6162
}
63+
tns[i] = tn
6264
}
63-
64-
return errs
65+
return tns, nil
6566
}
6667

6768
func toKeyValues(metadata map[string]string) []*pb.KeyValue {

0 commit comments

Comments
 (0)