Skip to content

Commit 07793b2

Browse files
committed
add tiered storage consumer dependency
1 parent 275df6d commit 07793b2

File tree

2 files changed

+36
-15
lines changed

2 files changed

+36
-15
lines changed

psc/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
</exclusion>
3535
</exclusions>
3636
</dependency>
37+
<dependency>
38+
<groupId>com.pinterest.kafka.tieredstorage</groupId>
39+
<artifactId>ts-consumer</artifactId>
40+
<version>0.0.2</version>
41+
</dependency>
3742
<dependency>
3843
<groupId>org.reflections</groupId>
3944
<artifactId>reflections</artifactId>

psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java

+31-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.pinterest.psc.consumer.kafka;
22

33
import com.google.common.annotations.VisibleForTesting;
4+
45
import com.pinterest.psc.common.BaseTopicUri;
56
import com.pinterest.psc.common.MessageId;
67
import com.pinterest.psc.common.PscCommon;
@@ -88,20 +89,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t
8889
ConsumerConfig.CLIENT_ID_CONFIG,
8990
pscConfigurationInternal.getPscConsumerClientId() + "-" + UUID.randomUUID()
9091
);
91-
92-
String kafkaConsumerClassName = pscConfigurationInternal.getConfiguration().getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS);
93-
try {
94-
if (kafkaConsumerClassName != null) {
95-
Class<?> kafkaConsumerClass = Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class);
96-
kafkaConsumer = (Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class).newInstance(properties);
97-
} else {
98-
kafkaConsumer = new KafkaConsumer<>(properties);
99-
}
100-
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
101-
logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e);
102-
logger.info("Defaulting to native KafkaConsumer class", e);
103-
kafkaConsumer = new KafkaConsumer<>(properties);
104-
}
92+
initializeKafkaConsumer();
10593
kafkaPollTimeoutMs = pscConfigurationInternal.getPscConsumerPollTimeoutMs();
10694

10795
// if using secure protocol (SSL), calculate cert expiry time
@@ -1148,12 +1136,40 @@ else if (topicUriOrPartition instanceof TopicUriPartition)
11481136
return false;
11491137
}
11501138

1139+
/**
1140+
* Initializes the Kafka consumer.
1141+
* @throws ConsumerException
1142+
*/
1143+
protected void initializeKafkaConsumer() {
1144+
String
1145+
kafkaConsumerClassName =
1146+
pscConfigurationInternal.getConfiguration()
1147+
.getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS);
1148+
try {
1149+
if (kafkaConsumerClassName != null) {
1150+
Class<?>
1151+
kafkaConsumerClass =
1152+
Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class);
1153+
kafkaConsumer =
1154+
(Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class)
1155+
.newInstance(properties);
1156+
} else {
1157+
kafkaConsumer = new KafkaConsumer<>(properties);
1158+
}
1159+
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
1160+
NoSuchMethodException | InvocationTargetException e) {
1161+
logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e);
1162+
logger.info("Defaulting to native KafkaConsumer class", e);
1163+
kafkaConsumer = new KafkaConsumer<>(properties);
1164+
}
1165+
}
1166+
11511167
@Override
11521168
protected void resetBackendClient() throws ConsumerException {
11531169
super.resetBackendClient();
11541170
logger.warn("Resetting the backend Kafka consumer (potentially to retry an API if an earlier call failed).");
11551171
executeBackendCallWithRetries(() -> kafkaConsumer.close());
1156-
kafkaConsumer = new KafkaConsumer<>(properties);
1172+
initializeKafkaConsumer();
11571173
if (!currentAssignment.isEmpty())
11581174
assign(currentAssignment);
11591175
else if (!currentSubscription.isEmpty())

0 commit comments

Comments
 (0)