From 50ceb4c6675eff77555072320162d035ffbf69ba Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Thu, 25 Aug 2022 14:30:01 +0200 Subject: [PATCH 1/3] Refactor MockServer methods with path as parameter Let's make this call more flexible, allowing different paths. Signed-off-by: Stefano Guerrini --- .../platform/kahpp/integration/KaHPPMockServer.java | 12 +++++------- .../kahpp/integration/http/HttpConditionalTest.java | 2 +- .../http/HttpForwardAndNotProduceErrorTest.java | 5 +++-- .../http/HttpForwardAndProduceErrorTest.java | 5 +++-- .../kahpp/integration/http/HttpMetricsTest.java | 13 +++++++------ .../http/HttpNotForwardAndNotProduceErrorTest.java | 5 +++-- .../http/HttpNotForwardAndProduceErrorTest.java | 6 ++++-- .../integration/http/HttpResponseHandlerTest.java | 2 +- .../http/ResponseHandlerRecordRouteTest.java | 4 ++-- .../unit/configuration/http/SimpleHttpCallTest.java | 9 +++++---- 10 files changed, 34 insertions(+), 29 deletions(-) diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/KaHPPMockServer.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/KaHPPMockServer.java index 4f0dc07..9c47196 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/KaHPPMockServer.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/KaHPPMockServer.java @@ -40,11 +40,12 @@ public static void closeServer() { mockServer.close(); } - public static void mockHttpResponse(String body, int statusCode) { - mockHttpResponse(body, statusCode, null); + public static void mockHttpResponse(String path, String body, int statusCode) { + mockHttpResponse(path, body, statusCode, null); } - public static void mockHttpResponse(String body, int statusCode, String responseBody) { + public static void mockHttpResponse( + String path, String body, int statusCode, String responseBody) { int requestTimes = REQUEST_WITHOUT_RETRIES; if (statusCode >= HTTP_STATUS_CODE_SERVER_ERROR) { requestTimes = REQUEST_WITH_RETRIES; @@ -52,10 +53,7 @@ public static void mockHttpResponse(String body, int statusCode, String response new MockServerClient("localhost", mockServer.getLocalPort()) .when( - HttpRequest.request() - .withMethod("POST") - .withPath("/enrich") - .withBody(new JsonBody(body)), + HttpRequest.request().withMethod("POST").withPath(path).withBody(new JsonBody(body)), Times.exactly(requestTimes)) .respond(HttpResponse.response().withStatusCode(statusCode).withBody(responseBody)); } diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpConditionalTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpConditionalTest.java index 10e0991..68c84e0 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpConditionalTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpConditionalTest.java @@ -36,7 +36,7 @@ static void stopMockServer() { void shouldExecuteHttpCall() { Fixture fixture = loadFixture("conditional", "true"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{}"); + KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 200, "{}"); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndNotProduceErrorTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndNotProduceErrorTest.java index 0a49817..5472370 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndNotProduceErrorTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndNotProduceErrorTest.java @@ -37,7 +37,8 @@ static void stopMockServer() { @Test void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); + KaHPPMockServer.mockHttpResponse( + "/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = @@ -55,7 +56,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin @Test void erroredHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500); + KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndProduceErrorTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndProduceErrorTest.java index 85b8f24..64bd0c9 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndProduceErrorTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpForwardAndProduceErrorTest.java @@ -38,7 +38,8 @@ static void stopMockServer() { @Test void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); + KaHPPMockServer.mockHttpResponse( + "/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = @@ -57,7 +58,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin @SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert") void erroredHttpCallShouldProduceRecordOnSinkAndErrorTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500); + KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordOnError = diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpMetricsTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpMetricsTest.java index f381446..6f89e99 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpMetricsTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpMetricsTest.java @@ -18,6 +18,7 @@ class HttpMetricsTest extends AbstractKaHPPTest { private static final String RESPONSE_BODY_VALUE = "{}"; + public static final String HTTP_CALL_PATH = "/enrich"; @Autowired private transient MeterRegistry meterRegistry; @@ -35,17 +36,17 @@ static void stopMockServer() { void httpMetricsWithSuccessfulTagAreCreated() { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE); sendFixture(TOPIC_SOURCE, fixture); KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK); assertStepMetricsCount(1.0, true); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 500); sendFixture(TOPIC_SOURCE, fixture); KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK); assertStepMetricsCount(1.0, false); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE); sendFixture(TOPIC_SOURCE, fixture); KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK); assertStepMetricsCount(2.0, true); @@ -56,19 +57,19 @@ void httpMetricsWithSuccessfulTagAreCreated() { void httpDurationMetricsWithSuccessfulTagAreCreated() { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE); sendFixture(TOPIC_SOURCE, fixture); KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK); assertTimeMetric(true, 1L); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 429, RESPONSE_BODY_VALUE); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 429, RESPONSE_BODY_VALUE); sendFixture(TOPIC_SOURCE, fixture); KafkaTestUtils.getSingleRecord(errorTopicConsumer, TOPIC_ERROR); assertTimeMetric(false, 1L); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, RESPONSE_BODY_VALUE); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, fixture.getValue(), 200, RESPONSE_BODY_VALUE); sendFixture(TOPIC_SOURCE, fixture); KafkaTestUtils.getSingleRecord(sinkTopicConsumer, TOPIC_SINK); diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndNotProduceErrorTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndNotProduceErrorTest.java index 8ed8546..ac3f691 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndNotProduceErrorTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndNotProduceErrorTest.java @@ -37,7 +37,8 @@ static void stopMockServer() { @Test void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); + KaHPPMockServer.mockHttpResponse( + "/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = @@ -55,7 +56,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin @Test void erroredHttpCallShouldNotProduceRecord() { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500); + KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500); sendFixture(TOPIC_SOURCE, fixture); assertThatThrownBy( diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndProduceErrorTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndProduceErrorTest.java index 12548fc..cb16806 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndProduceErrorTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpNotForwardAndProduceErrorTest.java @@ -38,7 +38,8 @@ static void stopMockServer() { @Test void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); + KaHPPMockServer.mockHttpResponse( + "/enrich", fixture.getValue(), 200, "{\"new_value\":\"beautiful\"}"); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = @@ -56,7 +57,7 @@ void successfulHttpCallShouldProduceRecordOnlyOnSinkTopic() throws JsonProcessin @Test void erroredHttpCallShouldProduceRecordOnlyOnErrorTopic() throws JsonProcessingException { Fixture fixture = loadFixture("collection", "collection_6"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 500); + KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 500); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordOnError = @@ -76,6 +77,7 @@ void erroredHttpCallShouldProduceRecordOnlyOnErrorTopic() throws JsonProcessingE void htmlHttpResponseShouldGoToErrorTopic() { Fixture fixture = loadFixture("collection", "collection_6"); KaHPPMockServer.mockHttpResponse( + "/enrich", fixture.getValue(), 200, "\n" diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpResponseHandlerTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpResponseHandlerTest.java index b9c1580..3e9a186 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpResponseHandlerTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/integration/http/HttpResponseHandlerTest.java @@ -38,7 +38,7 @@ static void stopMockServer() { void shouldExecuteHttpCall() { Fixture fixture = loadFixture("conditional", "true"); - KaHPPMockServer.mockHttpResponse(fixture.getValue(), 200, "{}"); + KaHPPMockServer.mockHttpResponse("/enrich", fixture.getValue(), 200, "{}"); sendFixture(TOPIC_SOURCE, fixture); ConsumerRecord recordSink = diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/ResponseHandlerRecordRouteTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/ResponseHandlerRecordRouteTest.java index d9a554e..c4f6623 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/ResponseHandlerRecordRouteTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/ResponseHandlerRecordRouteTest.java @@ -73,7 +73,7 @@ void shouldRouteToTopicIfTheStatusCodeIsInRange() { JsonNode key = objectMapper.createObjectNode().put("key", 1); JsonNode value = objectMapper.createObjectNode().put("value", "foo"); - KaHPPMockServer.mockHttpResponse(value.toString(), 410, "{\"foo\":\"bar\"}"); + KaHPPMockServer.mockHttpResponse("/enrich", value.toString(), 410, "{\"foo\":\"bar\"}"); Either afterCall = httpCall.call(KaHPPRecord.build(key, value, 1584352842123L)); @@ -92,7 +92,7 @@ void shouldNotRouteToTopicIfTheStatusCodeIsNotInRange() { JsonNode key = objectMapper.createObjectNode().put("key", 1); JsonNode value = objectMapper.createObjectNode().put("value", "foo"); - KaHPPMockServer.mockHttpResponse(value.toString(), 200, "{\"foo\":\"bar\"}"); + KaHPPMockServer.mockHttpResponse("/enrich", value.toString(), 200, "{\"foo\":\"bar\"}"); Either afterCall = httpCall.call(KaHPPRecord.build(key, value, 1584352842123L)); diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java index 22d1cba..4b17e38 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java @@ -25,6 +25,7 @@ class SimpleHttpCallTest { + public static final String HTTP_CALL_PATH = "/enrich"; private static final String FIXTURE_KEY = "key"; private static final String FIXTURE_VALUE = "value"; @@ -75,7 +76,7 @@ void happyPathCallReturning200AsHttpResponse() throws IOException { JsonNode key = getJsonNodeFromFile(FIXTURE_KEY); JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE); - KaHPPMockServer.mockHttpResponse(value.toString(), 200, "{}"); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, value.toString(), 200, "{}"); Either afterCall = httpCall.call(KaHPPRecord.build(key, value, 1584352842123L)); @@ -90,7 +91,7 @@ void test200WithInvalidJsonResponse() throws IOException { JsonNode key = getJsonNodeFromFile(FIXTURE_KEY); JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE); - KaHPPMockServer.mockHttpResponse(value.toString(), 200, "{'bad': 'json'}"); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, value.toString(), 200, "{'bad': 'json'}"); Either transformRecord = httpCall.call(KaHPPRecord.build(key, value, 1584352842123L)); @@ -104,7 +105,7 @@ void test500StatusCodeResponses() throws IOException { JsonNode key = getJsonNodeFromFile(FIXTURE_KEY); JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE); - KaHPPMockServer.mockHttpResponse(value.toString(), 500); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, value.toString(), 500); Either transformRecord = httpCall.call(KaHPPRecord.build(key, value, 1584352842123L)); @@ -117,7 +118,7 @@ void test400StatusCodeResponses() throws IOException { JsonNode key = getJsonNodeFromFile(FIXTURE_KEY); JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE); - KaHPPMockServer.mockHttpResponse(value.toString(), 400); + KaHPPMockServer.mockHttpResponse(HTTP_CALL_PATH, value.toString(), 400); Either transformRecord = httpCall.call(KaHPPRecord.build(key, value, 1584352842123L)); From c9ba644be36ddc9a60c3d8c4ed1c4a7029c9f336 Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Thu, 25 Aug 2022 14:38:08 +0200 Subject: [PATCH 2/3] Refactor test moving ApiClient on the class scope To have the possibiliy to use different http calls using the same client. Signed-off-by: Stefano Guerrini --- .../kahpp/unit/configuration/http/SimpleHttpCallTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java index 4b17e38..69193cf 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java @@ -11,6 +11,7 @@ import dev.vox.platform.kahpp.configuration.http.HttpClient; import dev.vox.platform.kahpp.configuration.http.ResponseHandlerRecordUpdate; import dev.vox.platform.kahpp.configuration.http.SimpleHttpCall; +import dev.vox.platform.kahpp.configuration.http.client.ApiClient; import dev.vox.platform.kahpp.integration.KaHPPMockServer; import dev.vox.platform.kahpp.streams.KaHPPRecord; import io.vavr.control.Either; @@ -30,6 +31,7 @@ class SimpleHttpCallTest { private static final String FIXTURE_VALUE = "value"; private transient HttpCall httpCall; + private ApiClient apiClient; @BeforeAll static void setupMockServer() { @@ -52,6 +54,7 @@ void init() { new HttpClient( String.format("http://localhost:%s/", KaHPPMockServer.getLocalPort()), options); + apiClient = httpClient.buildApiClient(); httpCall = new SimpleHttpCall( "http_test", @@ -61,7 +64,7 @@ void init() { "path", "enrich", "apiClient", - httpClient.buildApiClient(), + apiClient, "responseHandler", ResponseHandlerRecordUpdate.RECORD_VALUE_REPLACE)); } From 1262275bed6a284ab72ca577e0e7c499ea5e111b Mon Sep 17 00:00:00 2001 From: Stefano Guerrini Date: Thu, 25 Aug 2022 16:29:21 +0200 Subject: [PATCH 3/3] Introduce dynamic path for HTTP Call Let's say that we want to call an endpoint using a different value on the path for each record, with this implementation we can easily do that. Using a placeholder we can take data from the record a mutate the path dynamically. Example We have a list of producs: ``` [ { "productId": 1 }, { "productId": 2 }, { "productId": 3 }, { "productId": 4 } ] ``` We can receive the info about each product by calling '/products/' To do that we use this config: ``` config: api: pruduct-api path: /products/${value.productId} topic: error ``` If the value is not there, the call will fail, and the record will be routed to the error topic. Signed-off-by: Stefano Guerrini --- docs/features/http.md | 24 +++++++ .../configuration/http/AbstractHttpCall.java | 22 +++++++ .../kahpp/configuration/http/HttpCall.java | 5 ++ .../http/HttpCallStepProcessor.java | 2 +- .../http/SimpleHttpCallTest.java | 62 +++++++++++++++++++ 5 files changed, 114 insertions(+), 1 deletion(-) diff --git a/docs/features/http.md b/docs/features/http.md index d3df68f..d44bc06 100644 --- a/docs/features/http.md +++ b/docs/features/http.md @@ -100,3 +100,27 @@ The response can also be handled in different ways based on the status code. | RECORD_UPDATE | Replace record field with the HTTP Response body | jmesPath | Field to replace with the HTTP Response | | RECORD_TERMINATE | Don't forward record to the next step | | | | RECORD_ROUTE | Route record to specific topics | topics | List of topics to route the record to | + +## Configure dynamic path + +It's possible to configure the path dynamically using placeholders. +The value correspondent to the placeholder need to be available inside the record. + +Example: +```yaml + config: + api: my-dummy-api + path: /my/path/${value.myDynamic.field} + topic: error +``` + +So record value needs to look like this: +```json +{ + "myDynamic": { + "field": 1234 + } +} +``` + +If the value is not there, the call will fail, and the record will be routed to the error topic. diff --git a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/AbstractHttpCall.java b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/AbstractHttpCall.java index 40661c1..aa40d8b 100644 --- a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/AbstractHttpCall.java +++ b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/AbstractHttpCall.java @@ -5,15 +5,22 @@ import dev.vox.platform.kahpp.configuration.conditional.Conditional; import dev.vox.platform.kahpp.configuration.http.client.ApiClient; import dev.vox.platform.kahpp.streams.KaHPPRecord; +import io.burt.jmespath.jackson.JacksonRuntime; import io.vavr.control.Either; import io.vavr.control.Try; import java.util.Locale; import java.util.Map; +import java.util.regex.Matcher; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractHttpCall implements HttpCall, Conditional { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHttpCall.class); + @NotBlank private final transient String name; /** todo: Custom validator that exposes the missing api reference */ @@ -77,6 +84,21 @@ public Either call(KaHPPRecord record) { .toEither(); } + @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") + @Override + public Either call(KaHPPRecord record, JacksonRuntime jacksonRuntime) { + Matcher m = java.util.regex.Pattern.compile("\\$\\{(.*?)\\}").matcher(path); + while (m.find()) { + String value = jacksonRuntime.compile(m.group(1)).search(record.build()).textValue(); + if (value == null) { + LOGGER.warn("No value found inside the record for this placeholder {}", m.group(0)); + return Either.left(new RuntimeException("Placeholder value not found")); + } + path = path.replace(m.group(0), value); + } + return call(record); + } + @Override public boolean shouldForwardRecordOnError() { return Boolean.parseBoolean(forwardRecordOnError); diff --git a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCall.java b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCall.java index eaf9bcd..ef958e7 100644 --- a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCall.java +++ b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCall.java @@ -3,6 +3,7 @@ import dev.vox.platform.kahpp.configuration.RecordAction; import dev.vox.platform.kahpp.configuration.Step; import dev.vox.platform.kahpp.streams.KaHPPRecord; +import io.burt.jmespath.jackson.JacksonRuntime; import io.vavr.control.Either; public interface HttpCall extends Step { @@ -10,5 +11,9 @@ public interface HttpCall extends Step { Either call(KaHPPRecord record); + default Either call(KaHPPRecord record, JacksonRuntime jacksonRuntime) { + return call(record); + } + boolean shouldForwardRecordOnError(); } diff --git a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCallStepProcessor.java b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCallStepProcessor.java index b440e56..3f6f1f5 100644 --- a/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCallStepProcessor.java +++ b/kahpp-spring-autoconfigure/src/main/java/dev/vox/platform/kahpp/configuration/http/HttpCallStepProcessor.java @@ -120,7 +120,7 @@ public HttpCallStepProcessor( public void process(KaHPPRecord sourceRecord) { final Timer.Sample sample = Timer.start(meterRegistry); - Either recordAction = step().call(sourceRecord); + Either recordAction = step().call(sourceRecord, jacksonRuntime); RecordAction action = null; if (recordAction.isLeft()) { diff --git a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java index 69193cf..0832d6d 100644 --- a/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java +++ b/kahpp-spring-autoconfigure/src/test/java/dev/vox/platform/kahpp/unit/configuration/http/SimpleHttpCallTest.java @@ -14,6 +14,7 @@ import dev.vox.platform.kahpp.configuration.http.client.ApiClient; import dev.vox.platform.kahpp.integration.KaHPPMockServer; import dev.vox.platform.kahpp.streams.KaHPPRecord; +import io.burt.jmespath.jackson.JacksonRuntime; import io.vavr.control.Either; import java.io.IOException; import java.nio.file.Files; @@ -33,6 +34,8 @@ class SimpleHttpCallTest { private transient HttpCall httpCall; private ApiClient apiClient; + private final JacksonRuntime jacksonRuntime = new JacksonRuntime(); + @BeforeAll static void setupMockServer() { KaHPPMockServer.initServer(); @@ -135,4 +138,63 @@ private JsonNode getJsonNodeFromFile(String file) throws IOException { Paths.get("src/test/resources/Fixtures/collection/collection_6/" + file + ".json")); return new ObjectMapper().readTree(keyString); } + + @Test + void shouldReplacePathWithValue() throws IOException { + + httpCall = + new SimpleHttpCall( + "http_test", + Map.of( + "api", + "suchNiceApi", + "path", + HTTP_CALL_PATH + "/${value.payload.id}", + "apiClient", + apiClient, + "responseHandler", + ResponseHandlerRecordUpdate.RECORD_VALUE_REPLACE)); + + JsonNode key = getJsonNodeFromFile(FIXTURE_KEY); + JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE); + + KaHPPMockServer.mockHttpResponse( + HTTP_CALL_PATH + "/" + value.get("payload").get("id").textValue(), + value.toString(), + 200, + "{}"); + + Either afterCall = + httpCall.call(KaHPPRecord.build(key, value, 1584352842123L), jacksonRuntime); + + assertTrue(afterCall.isRight()); + assertThat(afterCall.get()).isExactlyInstanceOf(TransformRecord.class); + assertThat(((TransformRecord) afterCall.get()).getDataSource().toString()).isEqualTo("{}"); + } + + @Test + void shouldEndCallIfPlaceholderValueNotExists() throws IOException { + + httpCall = + new SimpleHttpCall( + "http_test_with_placeholder", + Map.of( + "api", + "suchNiceApi", + "path", + HTTP_CALL_PATH + "/${value.notExistsThisField}", + "apiClient", + apiClient, + "responseHandler", + ResponseHandlerRecordUpdate.RECORD_VALUE_REPLACE)); + + JsonNode key = getJsonNodeFromFile(FIXTURE_KEY); + JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE); + + Either afterCall = + httpCall.call(KaHPPRecord.build(key, value, 1584352842123L), jacksonRuntime); + + assertTrue(afterCall.isLeft()); + assertThat(afterCall.getLeft().getClass().getSimpleName()).isEqualTo("RuntimeException"); + } }