1
1
package no .nav .vedtak .felles .integrasjon .kafka ;
2
2
3
+ import java .util .Optional ;
4
+
3
5
import org .apache .kafka .clients .producer .KafkaProducer ;
4
6
import org .apache .kafka .clients .producer .Producer ;
5
7
import org .apache .kafka .clients .producer .ProducerRecord ;
6
8
import org .apache .kafka .clients .producer .RecordMetadata ;
9
+ import org .apache .kafka .common .header .internals .RecordHeader ;
7
10
8
11
import no .nav .vedtak .exception .IntegrasjonException ;
9
12
@@ -21,16 +24,30 @@ public String getTopicName() {
21
24
return topicName ;
22
25
}
23
26
27
+ public record KafkaHeader (String key , byte [] value ) {}
28
+
24
29
public RecordMetadata send (String key , String message ) {
25
30
if (topicName == null ) {
26
31
throw kafkaPubliseringException ("null" , new IllegalArgumentException ());
27
32
}
28
- return send (key , message , this .topicName );
33
+ return send (null , key , message , topicName );
34
+ }
35
+
36
+ public RecordMetadata send (KafkaHeader header , String key , String message ) {
37
+ if (topicName == null ) {
38
+ throw kafkaPubliseringException ("null" , new IllegalArgumentException ());
39
+ }
40
+ return send (header , key , message , this .topicName );
29
41
}
30
42
31
43
public RecordMetadata send (String key , String message , String topic ) {
44
+ return send (null , key , message , topic );
45
+ }
46
+
47
+ public RecordMetadata send (KafkaHeader header , String key , String message , String topic ) {
32
48
try {
33
49
var record = new ProducerRecord <>(topic , key , message );
50
+ Optional .ofNullable (header ).ifPresent (h -> record .headers ().add (new RecordHeader (h .key (), h .value ())));
34
51
return producer .send (record ).get ();
35
52
} catch (Exception e ) {
36
53
if (e instanceof InterruptedException ) {
0 commit comments