|
1 | 1 | package com.pinterest.psc.consumer.kafka;
|
2 | 2 |
|
3 | 3 | import com.google.common.annotations.VisibleForTesting;
|
| 4 | + |
4 | 5 | import com.pinterest.psc.common.BaseTopicUri;
|
5 | 6 | import com.pinterest.psc.common.MessageId;
|
6 | 7 | import com.pinterest.psc.common.PscCommon;
|
@@ -88,20 +89,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t
|
88 | 89 | ConsumerConfig.CLIENT_ID_CONFIG,
|
89 | 90 | pscConfigurationInternal.getPscConsumerClientId() + "-" + UUID.randomUUID()
|
90 | 91 | );
|
91 |
| - |
92 |
| - String kafkaConsumerClassName = pscConfigurationInternal.getConfiguration().getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS); |
93 |
| - try { |
94 |
| - if (kafkaConsumerClassName != null) { |
95 |
| - Class<?> kafkaConsumerClass = Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class); |
96 |
| - kafkaConsumer = (Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class).newInstance(properties); |
97 |
| - } else { |
98 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
99 |
| - } |
100 |
| - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { |
101 |
| - logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e); |
102 |
| - logger.info("Defaulting to native KafkaConsumer class", e); |
103 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
104 |
| - } |
| 92 | + initializeKafkaConsumer(); |
105 | 93 | kafkaPollTimeoutMs = pscConfigurationInternal.getPscConsumerPollTimeoutMs();
|
106 | 94 |
|
107 | 95 | // if using secure protocol (SSL), calculate cert expiry time
|
@@ -1123,12 +1111,40 @@ else if (topicUriOrPartition instanceof TopicUriPartition)
|
1123 | 1111 | return false;
|
1124 | 1112 | }
|
1125 | 1113 |
|
| 1114 | + /** |
| 1115 | + * Initializes the Kafka consumer. |
| 1116 | + * @throws ConsumerException |
| 1117 | + */ |
| 1118 | + protected void initializeKafkaConsumer() { |
| 1119 | + String |
| 1120 | + kafkaConsumerClassName = |
| 1121 | + pscConfigurationInternal.getConfiguration() |
| 1122 | + .getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS); |
| 1123 | + try { |
| 1124 | + if (kafkaConsumerClassName != null) { |
| 1125 | + Class<?> |
| 1126 | + kafkaConsumerClass = |
| 1127 | + Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class); |
| 1128 | + kafkaConsumer = |
| 1129 | + (Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class) |
| 1130 | + .newInstance(properties); |
| 1131 | + } else { |
| 1132 | + kafkaConsumer = new KafkaConsumer<>(properties); |
| 1133 | + } |
| 1134 | + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | |
| 1135 | + NoSuchMethodException | InvocationTargetException e) { |
| 1136 | + logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e); |
| 1137 | + logger.info("Defaulting to native KafkaConsumer class", e); |
| 1138 | + kafkaConsumer = new KafkaConsumer<>(properties); |
| 1139 | + } |
| 1140 | + } |
| 1141 | + |
1126 | 1142 | @Override
|
1127 | 1143 | protected void resetBackendClient() throws ConsumerException {
|
1128 | 1144 | super.resetBackendClient();
|
1129 | 1145 | logger.warn("Resetting the backend Kafka consumer (potentially to retry an API if an earlier call failed).");
|
1130 | 1146 | executeBackendCallWithRetries(() -> kafkaConsumer.close());
|
1131 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
| 1147 | + initializeKafkaConsumer(); |
1132 | 1148 | if (!currentAssignment.isEmpty())
|
1133 | 1149 | assign(currentAssignment);
|
1134 | 1150 | else if (!currentSubscription.isEmpty())
|
|
0 commit comments