Skip to content

Commit 4dd3e0f

Browse files
committed
KafkaConsumer standard oppsett
1 parent 4c3974c commit 4dd3e0f

File tree

5 files changed

+231
-0
lines changed

5 files changed

+231
-0
lines changed

integrasjon/kafka-properties/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
<artifactId>felles-konfig</artifactId>
2020
<version>${project.version}</version>
2121
</dependency>
22+
<dependency>
23+
<groupId>no.nav.foreldrepenger.felles</groupId>
24+
<artifactId>felles-feil</artifactId>
25+
<version>${project.version}</version>
26+
</dependency>
2227
<dependency>
2328
<groupId>org.apache.kafka</groupId>
2429
<artifactId>kafka-clients</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package no.nav.vedtak.felles.integrasjon.kafka;
2+
3+
import java.time.Duration;
4+
import java.time.LocalDateTime;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
import java.util.function.BiConsumer;
9+
import java.util.stream.Collectors;
10+
11+
import org.apache.kafka.clients.consumer.KafkaConsumer;
12+
import org.apache.kafka.common.errors.WakeupException;
13+
14+
public class KafkaConsumerManager<K,V> {
15+
16+
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
17+
18+
private final List<KafkaMessageHandler<K,V>> handlers;
19+
private final List<KafkaConsumerLoop<K,V>> consumers = new ArrayList<>();
20+
21+
public KafkaConsumerManager(List<KafkaMessageHandler<K, V>> handlers) {
22+
this.handlers = handlers;
23+
}
24+
25+
public void start(BiConsumer<String, Throwable> errorlogger) {
26+
consumers.addAll(handlers.stream().map(KafkaConsumerLoop::new).toList());
27+
consumers.forEach(c -> {
28+
var ct = new Thread(c, "KC-" + c.handler().groupId());
29+
ct.setUncaughtExceptionHandler((t, e) -> { errorlogger.accept(c.handler().topic(), e); stop(); });
30+
ct.start();
31+
});
32+
Runtime.getRuntime().addShutdownHook(new Thread(new KafkaConsumerCloser<>(consumers)));
33+
}
34+
35+
public void stop() {
36+
consumers.forEach(KafkaConsumerLoop::shutdown);
37+
var timeout = LocalDateTime.now().plus(CLOSE_TIMEOUT).plusSeconds(1);
38+
while (!allStopped() && LocalDateTime.now().isBefore(timeout)) {
39+
try {
40+
Thread.sleep(Duration.ofSeconds(1));
41+
} catch (InterruptedException e) {
42+
Thread.currentThread().interrupt();
43+
}
44+
}
45+
}
46+
47+
public boolean allRunning() {
48+
return consumers.stream().allMatch(KafkaConsumerLoop::isRunning);
49+
}
50+
51+
public boolean allStopped() {
52+
return consumers.stream().allMatch(KafkaConsumerLoop::isStopped);
53+
}
54+
55+
public String topicNames() {
56+
return handlers.stream().map(KafkaMessageHandler::topic).collect(Collectors.joining(","));
57+
}
58+
59+
private record KafkaConsumerCloser<K,V>(List<KafkaConsumerLoop<K,V>> consumers) implements Runnable {
60+
@Override
61+
public void run() {
62+
consumers.forEach(KafkaConsumerLoop::shutdown);
63+
}
64+
}
65+
66+
public static class KafkaConsumerLoop<K,V> implements Runnable {
67+
68+
private static final Duration POLL_TIMEOUT = Duration.ofMillis(100);
69+
private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
70+
private enum ConsumerState { UNINITIALIZED, RUNNING, STOPPING, STOPPED }
71+
private static final int RUNNING = ConsumerState.RUNNING.hashCode();
72+
73+
private final KafkaMessageHandler<K, V> handler;
74+
private KafkaConsumer<K, V> consumer;
75+
private final AtomicInteger running = new AtomicInteger(ConsumerState.UNINITIALIZED.hashCode());
76+
77+
public KafkaConsumerLoop(KafkaMessageHandler<K,V> handler) {
78+
this.handler = handler;
79+
}
80+
@Override
81+
public void run() {
82+
try(var key = handler.keyDeserializer().get(); var value = handler.valueDeserializer().get()) {
83+
var props = KafkaProperties.forConsumerGenericValue(handler.groupId(), key, value, handler.autoOffsetReset());
84+
consumer = new KafkaConsumer<>(props, key, value);
85+
consumer.subscribe(List.of(handler.topic()));
86+
running.set(RUNNING);
87+
while (running.get() == RUNNING) {
88+
var records = consumer.poll(POLL_TIMEOUT);
89+
// Hvis man vil komplisere ting så kan man håndtere både OffsetCommit og DBcommit i en Transcational handleRecords.
90+
// handleRecords må ta inn alle som er pollet (records) og 2 callbacks som a) legger til konsumert og b) commitAsync(konsumert)
91+
for (var record : records) {
92+
handler.handleRecord(record.key(), record.value());
93+
}
94+
}
95+
} catch (WakeupException e) {
96+
// ignore for shutdown
97+
} finally {
98+
if (consumer != null) {
99+
consumer.close(CLOSE_TIMEOUT);
100+
}
101+
running.set(ConsumerState.STOPPED.hashCode());
102+
}
103+
}
104+
105+
public void shutdown() {
106+
if (running.get() == RUNNING) {
107+
running.set(ConsumerState.STOPPING.hashCode());
108+
} else {
109+
running.set(ConsumerState.STOPPED.hashCode());
110+
}
111+
if (consumer != null) {
112+
consumer.wakeup();
113+
}
114+
}
115+
116+
public KafkaMessageHandler<K, V> handler() {
117+
return handler;
118+
}
119+
120+
public boolean isRunning() {
121+
return running.get() == RUNNING;
122+
}
123+
124+
public boolean isStopped() {
125+
return running.get() == ConsumerState.STOPPED.hashCode();
126+
}
127+
}
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package no.nav.vedtak.felles.integrasjon.kafka;
2+
3+
import java.util.Optional;
4+
import java.util.function.Supplier;
5+
6+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
7+
import org.apache.kafka.common.serialization.Deserializer;
8+
import org.apache.kafka.common.serialization.StringDeserializer;
9+
10+
public interface KafkaMessageHandler<K,V> {
11+
12+
void handleRecord(K key, V value);
13+
14+
// Configuration
15+
String topic();
16+
String groupId(); // Keep stable (or it will read from autoOffsetReset()
17+
default Optional<OffsetResetStrategy> autoOffsetReset() { // Implement if other than default (LATEST). Use NONE to discover low-volume topics
18+
return Optional.empty();
19+
}
20+
21+
// Deserialization - should be configured if Avro. Provided as Supplier to handle Closeable
22+
Supplier<Deserializer<K>> keyDeserializer();
23+
Supplier<Deserializer<V>> valueDeserializer();
24+
25+
// Implement KafkaStringMessageHandler for json-topics. The above are for Avro-topics
26+
interface KafkaStringMessageHandler extends KafkaMessageHandler<String, String> {
27+
default Supplier<Deserializer<String>> keyDeserializer() {
28+
return StringDeserializer::new;
29+
}
30+
31+
default Supplier<Deserializer<String>> valueDeserializer() {
32+
return StringDeserializer::new;
33+
}
34+
}
35+
}

integrasjon/kafka-properties/src/main/java/no/nav/vedtak/felles/integrasjon/kafka/KafkaProperties.java

+28
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@
77

88
import org.apache.kafka.clients.CommonClientConfigs;
99
import org.apache.kafka.clients.consumer.ConsumerConfig;
10+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
1011
import org.apache.kafka.clients.producer.ProducerConfig;
1112
import org.apache.kafka.common.config.SaslConfigs;
1213
import org.apache.kafka.common.config.SslConfigs;
1314
import org.apache.kafka.common.security.auth.SecurityProtocol;
15+
import org.apache.kafka.common.serialization.Deserializer;
1416
import org.apache.kafka.common.serialization.Serde;
1517
import org.apache.kafka.common.serialization.Serdes;
18+
import org.apache.kafka.common.serialization.StringDeserializer;
1619
import org.apache.kafka.common.serialization.StringSerializer;
1720
import org.apache.kafka.streams.StreamsConfig;
1821
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -52,6 +55,31 @@ public static Properties forProducer() {
5255
return props;
5356
}
5457

58+
// Alle som konsumerer Json-meldinger
59+
public static Properties forConsumerStringValue(String groupId) {
60+
return forConsumerGenericValue(groupId, new StringDeserializer(), new StringDeserializer(), Optional.empty());
61+
}
62+
63+
public static <K,V> Properties forConsumerGenericValue(String groupId, Deserializer<K> valueKey, Deserializer<V> valueSerde, Optional<OffsetResetStrategy> offsetReset) {
64+
final Properties props = new Properties();
65+
66+
props.put(CommonClientConfigs.GROUP_ID_CONFIG, groupId);
67+
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, generateClientId());
68+
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, getAivenConfig(AivenProperty.KAFKA_BROKERS));
69+
offsetReset.ifPresent(or -> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, or.toString()));
70+
71+
putSecurity(props);
72+
73+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, valueKey.getClass());
74+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueSerde.getClass());
75+
76+
// Polling
77+
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // Unngå store Tx dersom alle prosesseres innen samme Tx. Default 500
78+
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000"); // Gir inntil 1s pr record. Default er 600 ms/record
79+
80+
return props;
81+
}
82+
5583
// Alle som konsumerer Json-meldinger
5684
public static Properties forStreamsStringValue(String applicationId) {
5785
return forStreamsGenericValue(applicationId, Serdes.String());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package no.nav.vedtak.felles.integrasjon.kafka;
2+
3+
import org.apache.kafka.clients.producer.Producer;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
import org.apache.kafka.clients.producer.RecordMetadata;
6+
7+
import no.nav.vedtak.exception.IntegrasjonException;
8+
9+
public class KafkaSender {
10+
11+
private final Producer<String, String> producer;
12+
private final String topic;
13+
14+
public KafkaSender(Producer<String, String> producer, String topic) {
15+
this.producer = producer;
16+
this.topic = topic;
17+
}
18+
19+
public RecordMetadata send(String key, String message) {
20+
try {
21+
var record = new ProducerRecord<>(topic, key, message);
22+
return producer.send(record).get();
23+
} catch (Exception e) {
24+
if (e instanceof InterruptedException) {
25+
Thread.currentThread().interrupt();
26+
}
27+
throw kafkaPubliseringException(e);
28+
}
29+
}
30+
31+
private IntegrasjonException kafkaPubliseringException(Exception e) {
32+
return new IntegrasjonException("F-KAFKA-925475", "Unexpected error when sending message to topic " + topic, e);
33+
}
34+
35+
}

0 commit comments

Comments
 (0)