Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Initial tiered storage integration attempt #50

Draft
wants to merge 14 commits into
base: 4.0
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>3.2.7</version>
<version>3.2.4</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand Down
20 changes: 20 additions & 0 deletions psc-integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<properties>
<kafka.version>3.4.0</kafka.version>
<memq.version>0.2.21</memq.version>
<ts-consumer.version>0.0.2</ts-consumer.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -52,6 +53,25 @@
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.pinterest.kafka.tieredstorage</groupId>-->
<!-- <artifactId>ts-consumer</artifactId>-->
<!-- <version>${ts-consumer.version}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>log4j</groupId>-->
<!-- <artifactId>log4j</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>log4j-over-slf4j</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<dependency>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-logging</artifactId>
Expand Down
24 changes: 22 additions & 2 deletions psc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<properties>
<kafka.version>3.4.0</kafka.version>
<memq.version>0.2.21</memq.version>
<ts-consumer.version>0.0.2</ts-consumer.version>
</properties>

<dependencies>
Expand All @@ -34,6 +35,25 @@
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.pinterest.kafka.tieredstorage</groupId>-->
<!-- <artifactId>ts-consumer</artifactId>-->
<!-- <version>${ts-consumer.version}</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>log4j</groupId>-->
<!-- <artifactId>log4j</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>log4j-over-slf4j</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
Expand Down Expand Up @@ -97,12 +117,12 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>2.13.52</version>
<version>2.17.273</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<version>2.13.52</version>
<version>2.30.31</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.pinterest.psc.consumer.kafka;

import com.google.common.annotations.VisibleForTesting;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.PscCommon;
Expand Down Expand Up @@ -29,6 +30,8 @@
import com.pinterest.psc.metrics.PscMetrics;
import com.pinterest.psc.metrics.kafka.KafkaMetricsHandler;
import com.pinterest.psc.metrics.kafka.KafkaUtils;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -40,6 +43,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -56,7 +60,8 @@

public class PscKafkaConsumer<K, V> extends PscBackendConsumer<K, V> {
private static final PscLogger logger = PscLogger.getLogger(PscKafkaConsumer.class);
private KafkaConsumer<byte[], byte[]> kafkaConsumer;
private static final String PSC_CONSUMER_KAFKA_CONSUMER_CLASS = "psc.consumer.kafka.consumer.class";
private Consumer<byte[], byte[]> kafkaConsumer;
private final Set<TopicUri> currentSubscription = new HashSet<>();
private final Set<TopicUriPartition> currentAssignment = new HashSet<>();
private long kafkaPollTimeoutMs;
Expand Down Expand Up @@ -84,8 +89,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t
ConsumerConfig.CLIENT_ID_CONFIG,
pscConfigurationInternal.getPscConsumerClientId() + "-" + UUID.randomUUID()
);

kafkaConsumer = new KafkaConsumer<>(properties);
initializeKafkaConsumer();
kafkaPollTimeoutMs = pscConfigurationInternal.getPscConsumerPollTimeoutMs();

// if using secure protocol (SSL), calculate cert expiry time
Expand Down Expand Up @@ -1132,12 +1136,41 @@ else if (topicUriOrPartition instanceof TopicUriPartition)
return false;
}

/**
* Initializes the Kafka consumer.
*/
protected void initializeKafkaConsumer() {
String
kafkaConsumerClassName =
pscConfigurationInternal.getConfiguration()
.getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS);
try {
logger.info("Initializing Kafka consumer with class: " + kafkaConsumerClassName);
if (kafkaConsumerClassName != null) {
Class<?>
kafkaConsumerClass =
Class.forName(kafkaConsumerClassName).asSubclass(Consumer.class);
kafkaConsumer =
(Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class)
.newInstance(properties);
} else {
logger.info("No custom Kafka consumer class specified, defaulting to native KafkaConsumer class");
kafkaConsumer = new KafkaConsumer<>(properties);
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
NoSuchMethodException | InvocationTargetException e) {
logger.error("Error initializing consumer class: " + kafkaConsumerClassName, e);
logger.info("Defaulting to native KafkaConsumer class", e);
kafkaConsumer = new KafkaConsumer<>(properties);
}
}

@Override
protected void resetBackendClient() throws ConsumerException {
super.resetBackendClient();
logger.warn("Resetting the backend Kafka consumer (potentially to retry an API if an earlier call failed).");
executeBackendCallWithRetries(() -> kafkaConsumer.close());
kafkaConsumer = new KafkaConsumer<>(properties);
initializeKafkaConsumer();
if (!currentAssignment.isEmpty())
assign(currentAssignment);
else if (!currentSubscription.isEmpty())
Expand Down
Loading