Skip to content

Commit

Permalink
Merge pull request #190 from GetFeedback/improve-abstract-http-call
Browse files Browse the repository at this point in the history
Improve abstract http call
  • Loading branch information
Stefano Guerrini authored Aug 25, 2022
2 parents 900123e + 1262275 commit 471e9bf
Show file tree
Hide file tree
Showing 14 changed files with 152 additions and 31 deletions.
24 changes: 24 additions & 0 deletions docs/features/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,27 @@ The response can also be handled in different ways based on the status code.
| RECORD_UPDATE | Replace record field with the HTTP Response body | jmesPath | Field to replace with the HTTP Response |
| RECORD_TERMINATE | Don't forward record to the next step | | |
| RECORD_ROUTE | Route record to specific topics | topics | List of topics to route the record to |

## Configure dynamic path

It's possible to configure the path dynamically using placeholders.
The value correspondent to the placeholder need to be available inside the record.

Example:
```yaml
config:
api: my-dummy-api
path: /my/path/${value.myDynamic.field}
topic: error
```

So record value needs to look like this:
```json
{
"myDynamic": {
"field": 1234
}
}
```

If the value is not there, the call will fail, and the record will be routed to the error topic.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
import dev.vox.platform.kahpp.configuration.conditional.Conditional;
import dev.vox.platform.kahpp.configuration.http.client.ApiClient;
import dev.vox.platform.kahpp.streams.KaHPPRecord;
import io.burt.jmespath.jackson.JacksonRuntime;
import io.vavr.control.Either;
import io.vavr.control.Try;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHttpCall implements HttpCall, Conditional {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHttpCall.class);

@NotBlank private final transient String name;

/** todo: Custom validator that exposes the missing api reference */
Expand Down Expand Up @@ -77,6 +84,21 @@ public Either<Throwable, RecordAction> call(KaHPPRecord record) {
.toEither();
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
@Override
public Either<Throwable, RecordAction> call(KaHPPRecord record, JacksonRuntime jacksonRuntime) {
Matcher m = java.util.regex.Pattern.compile("\\$\\{(.*?)\\}").matcher(path);
while (m.find()) {
String value = jacksonRuntime.compile(m.group(1)).search(record.build()).textValue();
if (value == null) {
LOGGER.warn("No value found inside the record for this placeholder {}", m.group(0));
return Either.left(new RuntimeException("Placeholder value not found"));
}
path = path.replace(m.group(0), value);
}
return call(record);
}

@Override
public boolean shouldForwardRecordOnError() {
return Boolean.parseBoolean(forwardRecordOnError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import dev.vox.platform.kahpp.configuration.RecordAction;
import dev.vox.platform.kahpp.configuration.Step;
import dev.vox.platform.kahpp.streams.KaHPPRecord;
import io.burt.jmespath.jackson.JacksonRuntime;
import io.vavr.control.Either;

public interface HttpCall extends Step {
String RESPONSE_HANDLER_CONFIG = "responseHandler";

Either<Throwable, RecordAction> call(KaHPPRecord record);

default Either<Throwable, RecordAction> call(KaHPPRecord record, JacksonRuntime jacksonRuntime) {
return call(record);
}

boolean shouldForwardRecordOnError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public HttpCallStepProcessor(
public void process(KaHPPRecord sourceRecord) {
final Timer.Sample sample = Timer.start(meterRegistry);

Either<Throwable, RecordAction> recordAction = step().call(sourceRecord);
Either<Throwable, RecordAction> recordAction = step().call(sourceRecord, jacksonRuntime);
RecordAction action = null;

if (recordAction.isLeft()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,20 @@ public static void closeServer() {
mockServer.close();
}

public static void mockHttpResponse(String body, int statusCode) {
mockHttpResponse(body, statusCode, null);
public static void mockHttpResponse(String path, String body, int statusCode) {
mockHttpResponse(path, body, statusCode, null);
}

public static void mockHttpResponse(String body, int statusCode, String responseBody) {
public static void mockHttpResponse(
String path, String body, int statusCode, String responseBody) {
int requestTimes = REQUEST_WITHOUT_RETRIES;
if (statusCode >= HTTP_STATUS_CODE_SERVER_ERROR) {
requestTimes = REQUEST_WITH_RETRIES;
}

new MockServerClient("localhost", mockServer.getLocalPort())
.when(
HttpRequest.request()
.withMethod("POST")
.withPath("/enrich")
.withBody(new JsonBody(body)),
HttpRequest.request().withMethod("POST").withPath(path).withBody(new JsonBody(body)),
Times.exactly(requestTimes))
.respond(HttpResponse.response().withStatusCode(statusCode).withBody(responseBody));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static void stopMockServer() {
void shouldExecuteHttpCall() {
Fixture fixture = loadFixture("conditional", "true");

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{}");
KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 200, "{}");
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static void stopMockServer() {
@Test
void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
KaHPPMockServer.mockHttpResponse(
"/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand All @@ -55,7 +56,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin
@Test
void erroredHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500);
KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500);
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static void stopMockServer() {
@Test
void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
KaHPPMockServer.mockHttpResponse(
"/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand All @@ -57,7 +58,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin
@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
void erroredHttpCallShouldProduceRecordOnSinkAndErrorTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500);
KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500);
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordOnError =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
class HttpMetricsTest extends AbstractKaHPPTest {

private static final String RESPONSE_BODY_VALUE = "{}";
public static final String HTTP_CALL_PATH = "/enrich";

@Autowired private transient MeterRegistry meterRegistry;

Expand All @@ -35,17 +36,17 @@ static void stopMockServer() {
void httpMetricsWithSuccessfulTagAreCreated() {
Fixture fixture = loadFixture("collection", "collection_6");

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE);
KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE);
sendFixture(TOPIC_SOURCE, fixture);
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);
assertStepMetricsCount(1.0, true);

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500);
KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 500);
sendFixture(TOPIC_SOURCE, fixture);
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);
assertStepMetricsCount(1.0, false);

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE);
KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE);
sendFixture(TOPIC_SOURCE, fixture);
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);
assertStepMetricsCount(2.0, true);
Expand All @@ -56,19 +57,19 @@ void httpMetricsWithSuccessfulTagAreCreated() {
void httpDurationMetricsWithSuccessfulTagAreCreated() {
Fixture fixture = loadFixture("collection", "collection_6");

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE);
KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE);
sendFixture(TOPIC_SOURCE, fixture);
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);

assertTimeMetric(true, 1L);

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 429, RESPONSE_BODY_VALUE);
KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 429, RESPONSE_BODY_VALUE);
sendFixture(TOPIC_SOURCE, fixture);
KafkaTestUtils.getSingleRecord(errorTopicConsumer, TOPIC_ERROR);

assertTimeMetric(false, 1L);

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE);
KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE);
sendFixture(TOPIC_SOURCE, fixture);
KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static void stopMockServer() {
@Test
void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
KaHPPMockServer.mockHttpResponse(
"/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand All @@ -55,7 +56,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin
@Test
void erroredHttpCallShouldNotProduceRecord() {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500);
KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500);
sendFixture(TOPIC_SOURCE, fixture);

assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ static void stopMockServer() {
@Test
void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
KaHPPMockServer.mockHttpResponse(
"/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}");
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand All @@ -56,7 +57,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin
@Test
void erroredHttpCallShouldProduceRecordOnlyOnErrorTopic() throws JsonProcessingException {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500);
KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500);
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordOnError =
Expand All @@ -76,6 +77,7 @@ void erroredHttpCallShouldProduceRecordOnlyOnErrorTopic() throws JsonProcessingE
void htmlHttpResponseShouldGoToErrorTopic() {
Fixture fixture = loadFixture("collection", "collection_6");
KaHPPMockServer.mockHttpResponse(
"/enrich",
fixture.getValue(),
200,
"\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static void stopMockServer() {
void shouldExecuteHttpCall() {
Fixture fixture = loadFixture("conditional", "true");

KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{}");
KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 200, "{}");
sendFixture(TOPIC_SOURCE, fixture);

ConsumerRecord<String, String> recordSink =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void shouldRouteToTopicIfTheStatusCodeIsInRange() {
JsonNode key = objectMapper.createObjectNode().put("key", 1);
JsonNode value = objectMapper.createObjectNode().put("value", "foo");

KaHPPMockServer.mockHttpResponse(value.toString(), 410, "{\"foo\":\"bar\"}");
KaHPPMockServer.mockHttpResponse("/enrich", value.toString(), 410, "{\"foo\":\"bar\"}");

Either<Throwable, RecordAction> afterCall =
httpCall.call(KaHPPRecord.build(key, value, 1584352842123L));
Expand All @@ -92,7 +92,7 @@ void shouldNotRouteToTopicIfTheStatusCodeIsNotInRange() {
JsonNode key = objectMapper.createObjectNode().put("key", 1);
JsonNode value = objectMapper.createObjectNode().put("value", "foo");

KaHPPMockServer.mockHttpResponse(value.toString(), 200, "{\"foo\":\"bar\"}");
KaHPPMockServer.mockHttpResponse("/enrich", value.toString(), 200, "{\"foo\":\"bar\"}");

Either<Throwable, RecordAction> afterCall =
httpCall.call(KaHPPRecord.build(key, value, 1584352842123L));
Expand Down
Loading

0 comments on commit 471e9bf

Please sign in to comment.