|
29 | 29 | import com.pinterest.psc.metrics.PscMetrics;
|
30 | 30 | import com.pinterest.psc.metrics.kafka.KafkaMetricsHandler;
|
31 | 31 | import com.pinterest.psc.metrics.kafka.KafkaUtils;
|
| 32 | + |
| 33 | +import org.apache.kafka.clients.consumer.Consumer; |
32 | 34 | import org.apache.kafka.clients.consumer.ConsumerConfig;
|
33 | 35 | import org.apache.kafka.clients.consumer.ConsumerRecord;
|
34 | 36 | import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
40 | 42 | import org.apache.kafka.common.TopicPartition;
|
41 | 43 | import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
42 | 44 |
|
| 45 | +import java.lang.reflect.InvocationTargetException; |
43 | 46 | import java.time.Duration;
|
44 | 47 | import java.util.Collection;
|
45 | 48 | import java.util.Collections;
|
|
56 | 59 |
|
57 | 60 | public class PscKafkaConsumer<K, V> extends PscBackendConsumer<K, V> {
|
58 | 61 | private static final PscLogger logger = PscLogger.getLogger(PscKafkaConsumer.class);
|
59 |
| - private KafkaConsumer<byte[], byte[]> kafkaConsumer; |
| 62 | + private static final String PSC_CONSUMER_KAFKA_CONSUMER_CLASS = "psc.consumer.kafka.consumer.class"; |
| 63 | + private Consumer<byte[], byte[]> kafkaConsumer; |
60 | 64 | private final Set<TopicUri> currentSubscription = new HashSet<>();
|
61 | 65 | private final Set<TopicUriPartition> currentAssignment = new HashSet<>();
|
62 | 66 | private long kafkaPollTimeoutMs;
|
@@ -85,7 +89,21 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t
|
85 | 89 | pscConfigurationInternal.getPscConsumerClientId() + "-" + UUID.randomUUID()
|
86 | 90 | );
|
87 | 91 |
|
88 |
| - kafkaConsumer = new KafkaConsumer<>(properties); |
| 92 | + String kafkaConsumerClassName = pscConfigurationInternal.getConfiguration().getString(PSC_CONSUMER_KAFKA_CONSUMER_CLASS); |
| 93 | + try { |
| 94 | + Class<?> kafkaConsumerClass = Class.forName(kafkaConsumerClassName).asSubclass( |
| 95 | + Consumer.class); |
| 96 | + kafkaConsumer = (Consumer) kafkaConsumerClass.getDeclaredConstructor(Properties.class).newInstance(properties); |
| 97 | + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | |
| 98 | + NoSuchMethodException | InvocationTargetException e) { |
| 99 | + if (e instanceof ClassNotFoundException) { |
| 100 | + logger.error("Consumer class not found: " + kafkaConsumerClassName, e); |
| 101 | + } else if (e instanceof InstantiationException) { |
| 102 | + logger.error("Could not instantiate consumer class: " + kafkaConsumerClassName, e); |
| 103 | + } |
| 104 | + logger.info("Defaulting to native KafkaConsumer class", e); |
| 105 | + kafkaConsumer = new KafkaConsumer<>(properties); |
| 106 | + } |
89 | 107 | kafkaPollTimeoutMs = pscConfigurationInternal.getPscConsumerPollTimeoutMs();
|
90 | 108 |
|
91 | 109 | // if using secure protocol (SSL), calculate cert expiry time
|
|
0 commit comments