|
1 | 1 | package com.pinterest.psc.consumer.kafka;
|
2 | 2 |
|
3 | 3 | import com.google.common.annotations.VisibleForTesting;
|
| 4 | + |
4 | 5 | import com.pinterest.psc.common.BaseTopicUri;
|
5 | 6 | import com.pinterest.psc.common.MessageId;
|
6 | 7 | import com.pinterest.psc.common.PscCommon;
|
@@ -88,20 +89,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t
|
88 | 89 | ConsumerConfig.CLIENT_ID_CONFIG,
|
89 | 90 | pscConfigurationInternal.getPscConsumerClientId() + "-" + UUID.randomUUID()
|
90 | 91 | );
|
91 |
| - |
92 |
| - String kafkaConsumerClassName = pscConfigurationInternal.getConfiguration().getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS); |
93 |
| - try { |
94 |
| - if (kafkaConsumerClassName != null) { |
95 |
| - Class<?> kafkaConsumerClass = Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class); |
96 |
| - kafkaConsumer = (Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class).newInstance(properties); |
97 |
| - } else { |
98 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
99 |
| - } |
100 |
| - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { |
101 |
| - logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e); |
102 |
| - logger.info("Defaulting to native KafkaConsumer class", e); |
103 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
104 |
| - } |
| 92 | + initializeKafkaConsumer(); |
105 | 93 | kafkaPollTimeoutMs = pscConfigurationInternal.getPscConsumerPollTimeoutMs();
|
106 | 94 |
|
107 | 95 | // if using secure protocol (SSL), calculate cert expiry time
|
@@ -1148,12 +1136,40 @@ else if (topicUriOrPartition instanceof TopicUriPartition)
|
1148 | 1136 | return false;
|
1149 | 1137 | }
|
1150 | 1138 |
|
| 1139 | + /** |
| 1140 | + * Initializes the Kafka consumer. |
| 1141 | + * @throws ConsumerException |
| 1142 | + */ |
| 1143 | + protected void initializeKafkaConsumer() { |
| 1144 | + String |
| 1145 | + kafkaConsumerClassName = |
| 1146 | + pscConfigurationInternal.getConfiguration() |
| 1147 | + .getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS); |
| 1148 | + try { |
| 1149 | + if (kafkaConsumerClassName != null) { |
| 1150 | + Class<?> |
| 1151 | + kafkaConsumerClass = |
| 1152 | + Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class); |
| 1153 | + kafkaConsumer = |
| 1154 | + (Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class) |
| 1155 | + .newInstance(properties); |
| 1156 | + } else { |
| 1157 | + kafkaConsumer = new KafkaConsumer<>(properties); |
| 1158 | + } |
| 1159 | + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | |
| 1160 | + NoSuchMethodException | InvocationTargetException e) { |
| 1161 | + logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e); |
| 1162 | + logger.info("Defaulting to native KafkaConsumer class", e); |
| 1163 | + kafkaConsumer = new KafkaConsumer<>(properties); |
| 1164 | + } |
| 1165 | + } |
| 1166 | + |
1151 | 1167 | @Override
|
1152 | 1168 | protected void resetBackendClient() throws ConsumerException {
|
1153 | 1169 | super.resetBackendClient();
|
1154 | 1170 | logger.warn("Resetting the backend Kafka consumer (potentially to retry an API if an earlier call failed).");
|
1155 | 1171 | executeBackendCallWithRetries(() -> kafkaConsumer.close());
|
1156 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
| 1172 | + initializeKafkaConsumer(); |
1157 | 1173 | if (!currentAssignment.isEmpty())
|
1158 | 1174 | assign(currentAssignment);
|
1159 | 1175 | else if (!currentSubscription.isEmpty())
|
|
0 commit comments