Skip to content

Commit

Permalink
Merge pull request #19 from GetFeedback/feature/add-throttle-step
Browse files Browse the repository at this point in the history
Implement a throttle step
  • Loading branch information
johanderuijter authored Mar 21, 2022
2 parents 91ff47f + fb5a83b commit 3671c31
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 4 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Here is an example of what KaHPP is capable of doing:
### Filters
- `FilterField`: permits to filter records by specific field (key,value,timestamp) using `jmespath`.

### Throttle
- `Throttle`: Limit the output of a KaHPP instance by applying a rate limit using `recordsPerSecond`.

### JMESPath custom functions
We can make our filters more powerful using `JMESPath` functions.
- `now`: permits addition or subtraction from now time, for different units.
Expand Down Expand Up @@ -71,4 +74,3 @@ Then in the steps we may decide how to use it and how to handle the response.
## Contributing to KaHPP

Be sure to setup `gradle.properties` with required information documented in `gradle.properties.dist`.

4 changes: 1 addition & 3 deletions kahpp-spring-autoconfigure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ dependencies{
implementation ("org.bouncycastle:bcpkix-jdk15on:1.70") {
because "due to ClassNotFoundException after bump mockserver-netty-5.12.0"
}
implementation("com.google.guava:guava:31.1-jre") {
because "due to ClassNotFoundException after update mockserver-netty to 5.11.2"
}
implementation "com.google.guava:guava:31.1-jre"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
implementation "org.testcontainers:testcontainers:1.16.3"
implementation "org.assertj:assertj-core:3.22.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dev.vox.platform.kahpp.configuration.throttle;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GuavaRateLimiter implements RateLimiter {
private static final Logger LOGGER = LoggerFactory.getLogger(GuavaRateLimiter.class);
private final transient com.google.common.util.concurrent.RateLimiter rateLimiter;

public GuavaRateLimiter(int recordsPerSecond) {
this.rateLimiter = com.google.common.util.concurrent.RateLimiter.create(recordsPerSecond);
}

@Override
public void acquire() {
LOGGER.debug("Attempting to acquire permit");

double delay = rateLimiter.acquire();
if (delay > 0) {
LOGGER.debug(
"Waited {} seconds due to defined rate limit of `{}`", delay, rateLimiter.getRate());
}

LOGGER.debug("Acquired permit");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package dev.vox.platform.kahpp.configuration.throttle;

public interface RateLimiter {
void acquire();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dev.vox.platform.kahpp.configuration.throttle;

import dev.vox.platform.kahpp.configuration.Step;
import java.util.Map;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Positive;

public class Throttle implements Step {
@NotBlank private final transient String name;
@Positive private final transient int recordsPerSecond;

public Throttle(String name, Map<String, ?> config) {
this.name = name;
this.recordsPerSecond = (int) config.get("recordsPerSecond");
}

@Override
public String getName() {
return name;
}

public int getRecordsPerSecond() {
return recordsPerSecond;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package dev.vox.platform.kahpp.configuration.throttle;

import com.fasterxml.jackson.databind.JsonNode;
import dev.vox.platform.kahpp.processor.StepProcessor;
import dev.vox.platform.kahpp.processor.StepProcessorSupplier;
import dev.vox.platform.kahpp.step.ChildStep;
import dev.vox.platform.kahpp.streams.KaHPPRecord;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class ThrottleStepToKStream extends StepProcessorSupplier<Throttle> {
private final transient MeterRegistry meterRegistry;

@Autowired
public ThrottleStepToKStream(MeterRegistry meterRegistry) {
super(Throttle.class);
this.meterRegistry = meterRegistry;
}

@Override
public ProcessorSupplier<JsonNode, JsonNode> supplier(Throttle step, ChildStep child) {
return () -> new ThrottleStepProcessor(step, child);
}

private class ThrottleStepProcessor extends StepProcessor<Throttle> {

private final transient RateLimiter rateLimiter;

public ThrottleStepProcessor(Throttle step, ChildStep child) {
super(step, child, meterRegistry);
this.rateLimiter = new GuavaRateLimiter(step().getRecordsPerSecond());
}

@Override
public void process(KaHPPRecord record) {
rateLimiter.acquire();

forwardToNextStep(record);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dev.vox.platform.kahpp.integration.throttle;

import static org.assertj.core.api.Assertions.assertThat;

import dev.vox.platform.kahpp.configuration.Step;
import dev.vox.platform.kahpp.configuration.throttle.Throttle;
import dev.vox.platform.kahpp.configuration.topic.ProduceToTopic;
import dev.vox.platform.kahpp.integration.AbstractKaHPPTest;
import dev.vox.platform.kahpp.integration.KafkaStreamsTest;
import dev.vox.platform.kahpp.step.StepConfiguration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.test.utils.KafkaTestUtils;

@SpringBootTest(classes = ThrottleTest.KStreamsTest.class)
class ThrottleTest extends AbstractKaHPPTest {
@Test
void recordIsNotMutated() {
sendFixture(TOPIC_SOURCE, "collection", "simple_record");

ConsumerRecord<String, String> record =
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);

assertThat(record.key()).isEqualTo("{\"name\":\"simple_record\"}");
assertThat(record.value()).isEqualTo("{\"foo\":\"bar\"}");
}

@Test
void recordsAreThrottled() {
// The first record includes startup time which influences the rate limiter. Ignoring it for
// this test.
sendFixture(TOPIC_SOURCE, "collection", "simple_record");
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);

long start = Instant.now().toEpochMilli();
sendFixture(TOPIC_SOURCE, "collection", "simple_record");
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);
sendFixture(TOPIC_SOURCE, "collection", "simple_record");
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);
long end = Instant.now().toEpochMilli();

assertThat(end - start).isGreaterThanOrEqualTo(1000);
}

@Configuration
public static class KStreamsTest extends KafkaStreamsTest {

@Override
protected List<StepConfiguration<? extends Step>> getSteps() {
final StepConfiguration<Throttle> throttleStep =
new StepConfiguration<>(Throttle.class, "throttle", Map.of("recordsPerSecond", 1));

final StepConfiguration<ProduceToTopic> produceToTopicStep =
new StepConfiguration<>(
ProduceToTopic.class, "produceRecordToSinkTopic", Map.of("topic", "sink"));

return List.of(throttleStep, produceToTopicStep);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.vox.platform.kahpp.unit.configuration.throttle;

import static org.assertj.core.api.Assertions.assertThat;

import dev.vox.platform.kahpp.configuration.throttle.GuavaRateLimiter;
import java.time.Instant;
import org.junit.jupiter.api.Test;

class GuavaRateLimiterTest {
@Test
public void acquireIsRateLimited() {
GuavaRateLimiter rateLimiter = new GuavaRateLimiter(10);

long start = Instant.now().toEpochMilli();
for (int i = 0; i < 10; i++) {
rateLimiter.acquire();

long checkpoint = Instant.now().toEpochMilli();
assertThat(checkpoint - start).isLessThan(1000);
}

rateLimiter.acquire();

long end = Instant.now().toEpochMilli();
assertThat(end - start).isGreaterThanOrEqualTo(1000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.vox.platform.kahpp.unit.configuration.throttle;

import static org.assertj.core.api.Assertions.assertThat;

import dev.vox.platform.kahpp.configuration.throttle.Throttle;
import dev.vox.platform.kahpp.unit.ConstraintViolationTestAbstract;
import java.util.Map;
import java.util.Set;
import javax.validation.ConstraintViolation;
import org.junit.jupiter.api.Test;

class ThrottleTest extends ConstraintViolationTestAbstract {
@Test
public void canBeConstructed() {
Throttle step = new Throttle("test", Map.ofEntries(Map.entry("recordsPerSecond", 10)));

Set<ConstraintViolation<Throttle>> violations = validator.validate(step);
assertThat(violations).hasSize(0);
}

@Test
public void containsRateLimiterConfig() {
Throttle step = new Throttle("test", Map.ofEntries(Map.entry("recordsPerSecond", 10)));

assertThat(step.getRecordsPerSecond()).isEqualTo(10);
}
}

0 comments on commit 3671c31

Please sign in to comment.