Skip to content

Commit

Permalink
Introduce dynamic path for HTTP Call
Browse files Browse the repository at this point in the history
Let's say that we want to call an endpoint using a different value on the path for each record, with this implementation we can easily do that.
Using a placeholder we can take data from the record a mutate the path dynamically.

Example

We have a list of producs:
```
[
    {
        "productId": 1
    },
    {
        "productId": 2
    },
    {
        "productId": 3
    },
    {
        "productId": 4
    }
]
```

We can receive the info about each product by calling '/products/<id of the product>'

To do that we use this config:
```
    config:
      api: pruduct-api
      path: /products/${value.productId}
      topic: error
```

If the value is not there, the call will fail, and the record will be routed to the error topic.

Signed-off-by: Stefano Guerrini <[email protected]>
  • Loading branch information
Stefano Guerrini committed Aug 25, 2022
1 parent c9ba644 commit 1262275
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 1 deletion.
24 changes: 24 additions & 0 deletions docs/features/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,27 @@ The response can also be handled in different ways based on the status code.
| RECORD_UPDATE | Replace record field with the HTTP Response body | jmesPath | Field to replace with the HTTP Response |
| RECORD_TERMINATE | Don't forward record to the next step | | |
| RECORD_ROUTE | Route record to specific topics | topics | List of topics to route the record to |

## Configure dynamic path

It's possible to configure the path dynamically using placeholders.
The value correspondent to the placeholder need to be available inside the record.

Example:
```yaml
config:
api: my-dummy-api
path: /my/path/${value.myDynamic.field}
topic: error
```

So record value needs to look like this:
```json
{
"myDynamic": {
"field": 1234
}
}
```

If the value is not there, the call will fail, and the record will be routed to the error topic.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@
import dev.vox.platform.kahpp.configuration.conditional.Conditional;
import dev.vox.platform.kahpp.configuration.http.client.ApiClient;
import dev.vox.platform.kahpp.streams.KaHPPRecord;
import io.burt.jmespath.jackson.JacksonRuntime;
import io.vavr.control.Either;
import io.vavr.control.Try;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHttpCall implements HttpCall, Conditional {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractHttpCall.class);

@NotBlank private final transient String name;

/** todo: Custom validator that exposes the missing api reference */
Expand Down Expand Up @@ -77,6 +84,21 @@ public Either<Throwable, RecordAction> call(KaHPPRecord record) {
.toEither();
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
@Override
public Either<Throwable, RecordAction> call(KaHPPRecord record, JacksonRuntime jacksonRuntime) {
Matcher m = java.util.regex.Pattern.compile("\\$\\{(.*?)\\}").matcher(path);
while (m.find()) {
String value = jacksonRuntime.compile(m.group(1)).search(record.build()).textValue();
if (value == null) {
LOGGER.warn("No value found inside the record for this placeholder {}", m.group(0));
return Either.left(new RuntimeException("Placeholder value not found"));
}
path = path.replace(m.group(0), value);
}
return call(record);
}

@Override
public boolean shouldForwardRecordOnError() {
return Boolean.parseBoolean(forwardRecordOnError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@
import dev.vox.platform.kahpp.configuration.RecordAction;
import dev.vox.platform.kahpp.configuration.Step;
import dev.vox.platform.kahpp.streams.KaHPPRecord;
import io.burt.jmespath.jackson.JacksonRuntime;
import io.vavr.control.Either;

public interface HttpCall extends Step {
String RESPONSE_HANDLER_CONFIG = "responseHandler";

Either<Throwable, RecordAction> call(KaHPPRecord record);

default Either<Throwable, RecordAction> call(KaHPPRecord record, JacksonRuntime jacksonRuntime) {
return call(record);
}

boolean shouldForwardRecordOnError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public HttpCallStepProcessor(
public void process(KaHPPRecord sourceRecord) {
final Timer.Sample sample = Timer.start(meterRegistry);

Either<Throwable, RecordAction> recordAction = step().call(sourceRecord);
Either<Throwable, RecordAction> recordAction = step().call(sourceRecord, jacksonRuntime);
RecordAction action = null;

if (recordAction.isLeft()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import dev.vox.platform.kahpp.configuration.http.client.ApiClient;
import dev.vox.platform.kahpp.integration.KaHPPMockServer;
import dev.vox.platform.kahpp.streams.KaHPPRecord;
import io.burt.jmespath.jackson.JacksonRuntime;
import io.vavr.control.Either;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -33,6 +34,8 @@ class SimpleHttpCallTest {
private transient HttpCall httpCall;
private ApiClient apiClient;

private final JacksonRuntime jacksonRuntime = new JacksonRuntime();

@BeforeAll
static void setupMockServer() {
KaHPPMockServer.initServer();
Expand Down Expand Up @@ -135,4 +138,63 @@ private JsonNode getJsonNodeFromFile(String file) throws IOException {
Paths.get("src/test/resources/Fixtures/collection/collection_6/" + file + ".json"));
return new ObjectMapper().readTree(keyString);
}

@Test
void shouldReplacePathWithValue() throws IOException {

httpCall =
new SimpleHttpCall(
"http_test",
Map.of(
"api",
"suchNiceApi",
"path",
HTTP_CALL_PATH + "/${value.payload.id}",
"apiClient",
apiClient,
"responseHandler",
ResponseHandlerRecordUpdate.RECORD_VALUE_REPLACE));

JsonNode key = getJsonNodeFromFile(FIXTURE_KEY);
JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE);

KaHPPMockServer.mockHttpResponse(
HTTP_CALL_PATH + "/" + value.get("payload").get("id").textValue(),
value.toString(),
200,
"{}");

Either<Throwable, RecordAction> afterCall =
httpCall.call(KaHPPRecord.build(key, value, 1584352842123L), jacksonRuntime);

assertTrue(afterCall.isRight());
assertThat(afterCall.get()).isExactlyInstanceOf(TransformRecord.class);
assertThat(((TransformRecord) afterCall.get()).getDataSource().toString()).isEqualTo("{}");
}

@Test
void shouldEndCallIfPlaceholderValueNotExists() throws IOException {

httpCall =
new SimpleHttpCall(
"http_test_with_placeholder",
Map.of(
"api",
"suchNiceApi",
"path",
HTTP_CALL_PATH + "/${value.notExistsThisField}",
"apiClient",
apiClient,
"responseHandler",
ResponseHandlerRecordUpdate.RECORD_VALUE_REPLACE));

JsonNode key = getJsonNodeFromFile(FIXTURE_KEY);
JsonNode value = getJsonNodeFromFile(FIXTURE_VALUE);

Either<Throwable, RecordAction> afterCall =
httpCall.call(KaHPPRecord.build(key, value, 1584352842123L), jacksonRuntime);

assertTrue(afterCall.isLeft());
assertThat(afterCall.getLeft().getClass().getSimpleName()).isEqualTo("RuntimeException");
}
}

0 comments on commit 1262275

Please sign in to comment.