From b6085837d8d6c88ced26d77b1d96ce5c61a3dd08 Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Fri, 27 Jan 2023 14:30:25 +0100 Subject: [PATCH 1/4] Remove unused health check library This library is not used at all, we need additional configuration and a specific heartbeat topic for each instance. Signed-off-by: Stefano Guerrini --- kahpp-spring-autoconfigure/build.gradle | 1 - 1 file changed, 1 deletion(-) diff --git a/kahpp-spring-autoconfigure/build.gradle b/kahpp-spring-autoconfigure/build.gradle index 0140a3b..45242f0 100644 --- a/kahpp-spring-autoconfigure/build.gradle +++ b/kahpp-spring-autoconfigure/build.gradle @@ -30,7 +30,6 @@ dependencies{ implementation "io.burt:jmespath-core:0.5.1" implementation "io.burt:jmespath-jackson:0.5.1" implementation "io.vavr:vavr:1.0.0-alpha-4" - implementation "com.deviceinsight.kafka:kafka-health-check:1.3.0" implementation "org.springframework.kafka:spring-kafka" implementation "org.springframework.boot:spring-boot-starter" implementation "org.springframework.boot:spring-boot-starter-validation" From a21efad464c5cb7024dba9b2bdf6575b0e43c845 Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Fri, 27 Jan 2023 14:36:52 +0100 Subject: [PATCH 2/4] Introduce Kafka Streams state health check It check the actual Kafka Streams state. The instance status it's considered UP when the state is RUNNING or REBALANCING, all other state will bring the status to DOWN. More info can be found here: https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/KafkaStreams.State.html Signed-off-by: Stefano Guerrini --- .../kahpp/actuator/KafkaStreamsState.java | 47 +++++++++++++ .../kahpp/actuator/KafkaStreamsStateTest.java | 66 +++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsState.java create mode 100644 kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java diff --git a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsState.java b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsState.java new file mode 100644 index 0000000..d3391b6 --- /dev/null +++ b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsState.java @@ -0,0 +1,47 @@ +package dev.vox.platform.kahpp.actuator; + +import java.util.List; +import java.util.Objects; +import org.apache.kafka.streams.KafkaStreams; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.stereotype.Component; + +@Component +public class KafkaStreamsState extends AbstractHealthIndicator { + + private final List streamsBuilders; + + public KafkaStreamsState(List streamsBuilders) { + this.streamsBuilders = List.copyOf(streamsBuilders); + } + + @Override + @SuppressWarnings("PMD.CloseResource") + protected void doHealthCheck(Health.Builder builder) { + List streamsList = + streamsBuilders.stream() + .map(StreamsBuilderFactoryBean::getKafkaStreams) + .filter(Objects::nonNull) + .toList(); + + if (streamsList.isEmpty()) { + builder.status(Status.UNKNOWN); + return; + } + + for (KafkaStreams streams : streamsList) { + if (streams.state().hasNotStarted()) { + builder.status(Status.UNKNOWN); + return; + } else if (!streams.state().isRunningOrRebalancing()) { + builder.status(Status.DOWN); + return; + } + } + + builder.status(Status.UP); + } +} diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java new file mode 100644 index 0000000..59cbff6 --- /dev/null +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java @@ -0,0 +1,66 @@ +package dev.vox.platform.kahpp.actuator; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.KafkaStreams; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.kafka.config.StreamsBuilderFactoryBean; + +@ExtendWith(MockitoExtension.class) +class KafkaStreamsStateTest { + + @Mock private StreamsBuilderFactoryBean streamsBuilderFactoryBean; + + @Mock private KafkaStreams kafkaStreams; + + Map kafkaStateMapping = + Map.of( + KafkaStreams.State.CREATED, Status.UNKNOWN, + KafkaStreams.State.RUNNING, Status.UP, + KafkaStreams.State.REBALANCING, Status.UP, + KafkaStreams.State.ERROR, Status.DOWN, + KafkaStreams.State.PENDING_ERROR, Status.DOWN, + KafkaStreams.State.PENDING_SHUTDOWN, Status.DOWN, + KafkaStreams.State.NOT_RUNNING, Status.DOWN); + + @BeforeEach + public void setUp() { + lenient().when(streamsBuilderFactoryBean.getKafkaStreams()).thenReturn(kafkaStreams); + } + + @Test + void checkIfStatusIsUnknownWhenStreamsIsNull() { + Health.Builder builder = new Health.Builder(); + new KafkaStreamsState(List.of()).doHealthCheck(builder); + assertThat(builder.build().getStatus()).isEqualTo(Status.UNKNOWN); + } + + @Test + @SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert") + void checkAllKafkaStreamsStatuses() { + KafkaStreamsState kafkaStreamsState = new KafkaStreamsState(List.of(streamsBuilderFactoryBean)); + Health.Builder builder = new Health.Builder(); + + new KafkaStreamsState(List.of(streamsBuilderFactoryBean)); + Arrays.stream(KafkaStreams.State.values()) + .forEach( + state -> { + when(kafkaStreams.state()).thenReturn(state); + kafkaStreamsState.doHealthCheck(builder); + + assertThat(builder.build().getStatus()).as("Testing %s Kafka Stream State", state).isEqualTo(kafkaStateMapping.get(state)); + }); + } + +} From 1291fdc174f8fa3f06e1190f7478192bbc2f192e Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Fri, 27 Jan 2023 14:39:06 +0100 Subject: [PATCH 3/4] Introduce Kafka Streams Producer state health check It check, through JMX metrics if there is at least one producer active. The instance status it's considered DOWN when there aren't active producers. Signed-off-by: Stefano Guerrini --- .../actuator/KafkaStreamsProducerState.java | 37 ++++++++++++++++ .../KafkaStreamsProducerStateTest.java | 43 +++++++++++++++++++ .../kahpp/actuator/KafkaStreamsStateTest.java | 6 +-- 3 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerState.java create mode 100644 kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerStateTest.java diff --git a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerState.java b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerState.java new file mode 100644 index 0000000..ba0533f --- /dev/null +++ b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerState.java @@ -0,0 +1,37 @@ +package dev.vox.platform.kahpp.actuator; + +import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.search.Search; +import java.util.Optional; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; + +@Component +public class KafkaStreamsProducerState extends AbstractHealthIndicator { + + private final MeterRegistry meterRegistry; + + public static final String KAFKA_PRODUCER_CONNECTION_METRIC_LABEL = + "kafka.producer.connection.count"; + + public KafkaStreamsProducerState(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + } + + @Override + protected void doHealthCheck(Health.Builder builder) { + Search connectionsSearch = meterRegistry.find(KAFKA_PRODUCER_CONNECTION_METRIC_LABEL); + Double kafkaConnections = + Optional.ofNullable(connectionsSearch.functionCounter()) + .map(FunctionCounter::count) + .orElse(0d); + if (kafkaConnections > 0) { + builder.status(Status.UP); + return; + } + builder.status(Status.DOWN); + } +} diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerStateTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerStateTest.java new file mode 100644 index 0000000..63bedbf --- /dev/null +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsProducerStateTest.java @@ -0,0 +1,43 @@ +package dev.vox.platform.kahpp.actuator; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.micrometer.core.instrument.*; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; + +@ExtendWith(MockitoExtension.class) +class KafkaStreamsProducerStateTest { + + private MeterRegistry meterRegistry; + + @BeforeEach + public void setUp() { + meterRegistry = new SimpleMeterRegistry(); + } + + @Test + void statusDownWhenProducerIsNotActive() { + meterRegistry + .more() + .counter(KafkaStreamsProducerState.KAFKA_PRODUCER_CONNECTION_METRIC_LABEL, Tags.empty(), 0); + Health.Builder builder = new Health.Builder(); + new KafkaStreamsProducerState(meterRegistry).doHealthCheck(builder); + assertThat(builder.build().getStatus()).isEqualTo(Status.DOWN); + } + + @Test + void statusUpWhenProducerIsActive() { + meterRegistry + .more() + .counter(KafkaStreamsProducerState.KAFKA_PRODUCER_CONNECTION_METRIC_LABEL, Tags.empty(), 1); + Health.Builder builder = new Health.Builder(); + new KafkaStreamsProducerState(meterRegistry).doHealthCheck(builder); + assertThat(builder.build().getStatus()).isEqualTo(Status.UP); + } +} diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java index 59cbff6..a6dda12 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/actuator/KafkaStreamsStateTest.java @@ -58,9 +58,9 @@ void checkAllKafkaStreamsStatuses() { state -> { when(kafkaStreams.state()).thenReturn(state); kafkaStreamsState.doHealthCheck(builder); - - assertThat(builder.build().getStatus()).as("Testing %s Kafka Stream State", state).isEqualTo(kafkaStateMapping.get(state)); + assertThat(builder.build().getStatus()) + .as("Testing %s Kafka Stream State", state) + .isEqualTo(kafkaStateMapping.get(state)); }); } - } From 083f922c490a6a9aef40a82ec9b013d0ad3bcfd8 Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Fri, 27 Jan 2023 14:41:34 +0100 Subject: [PATCH 4/4] Add Kafka Streams health checks on readiness and liveness groups In this way the readiness and liveness probes will consider also these two health checks. Signed-off-by: Stefano Guerrini --- .../src/main/resources/application.properties | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kahpp-spring-autoconfigure/src/main/resources/application.properties b/kahpp-spring-autoconfigure/src/main/resources/application.properties index b34a46f..2123416 100644 --- a/kahpp-spring-autoconfigure/src/main/resources/application.properties +++ b/kahpp-spring-autoconfigure/src/main/resources/application.properties @@ -14,4 +14,6 @@ management.endpoint.metrics.enabled=true management.endpoint.health.show-details=always management.metrics.export.jmx.domain=dev.vox management.endpoints.web.exposure.include=metrics,health,prometheus +management.endpoint.health.group.readiness.include=readinessState,kafkaStreamsProducerState,kafkaStreamsState +management.endpoint.health.group.liveness.include=livenessState,kafkaStreamsProducerState,kafkaStreamsState spring.config.import=${KAHPP_CONFIG_LOCATION:file:/kahpp/application.yaml}