Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flagd): add http connector for In-process resolver #1299

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,42 @@ The value is updated with every (re)connection to the sync implementation.
This can be used to enrich evaluations with such data.
If the `in-process` mode is not used, and before the provider is ready, the `getSyncMetadata` returns an empty map.

#### Http Connector
HttpConnector is responsible for polling data from a specified URL at regular intervals.
It is leveraging Http cache mechanism with 'ETag' header, then when receiving 304 Not Modified response,
reducing traffic, reducing rate limits effects and changes updates. Can be enabled via useHttpCache option.
One of its benefits is to reduce infrastructure/devops work, without additional containers needed.
The implementation is using Java HttpClient.

##### What happens if the Http source is down when application is starting ?

It supports optional fail-safe initialization via cache, such that on initial fetch error following by
source downtime window, initial payload is taken from cache to avoid starting with default values until
the source is back up. Therefore, the cache ttl expected to be higher than the expected source
down-time to recover from during initialization.

##### Sample flow
Sample flow can use:
- Github as the flags payload source.
- Redis cache as a fail-safe initialization cache.

Sample flow of initialization during Github down-time window, showing that application can still use flags
values as fetched from cache.
```mermaid
sequenceDiagram
participant Provider
participant Github
participant Redis

break source downtime
Provider->>Github: initialize
Github->>Provider: failure
end
Provider->>Redis: fetch
Redis->>Provider: last payload

```

### Offline mode (File resolver)

In-process resolvers can also work in an offline mode.
Expand All @@ -74,15 +110,17 @@ This mode is useful for local development, tests and offline applications.
#### Custom Connector

You can include a custom connector as a configuration option to customize how the in-process resolver fetches flags.
The custom connector must implement the [Connector interface](https://github.com/open-feature/java-sdk-contrib/blob/main/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java).
The custom connector must implement the [QueueSource interface](https://github.com/open-feature/java-sdk-contrib/blob/main/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueueSource.java).

```java
Connector myCustomConnector = new MyCustomConnector();
QueueSource connector = HttpConnector.builder()
.url(testUrl)
.build();
FlagdOptions options =
FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.customConnector(myCustomConnector)
.build();
FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.customConnector(myCustomConnector)
.build();

FlagdProvider flagdProvider = new FlagdProvider(options);
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* Fetches content from a given HTTP endpoint using caching headers to optimize network usage.
* If cached ETag or Last-Modified values are available, they are included in the request headers
* to potentially receive a 304 Not Modified response, reducing data transfer.
* Updates the cached ETag and Last-Modified values upon receiving a 200 OK response.
* It does not store the cached response, assuming not needed after first successful fetching.
* Non thread-safe.
*
* @param httpClient the HTTP client used to send the request
* @param httpRequestBuilder the builder for constructing the HTTP request
* @return the HTTP response received from the server
*/
@Slf4j
public class HttpCacheFetcher {
private String cachedETag = null;
private String cachedLastModified = null;

@SneakyThrows
public HttpResponse<String> fetchContent(HttpClient httpClient, HttpRequest.Builder httpRequestBuilder) {
if (cachedETag != null) {
httpRequestBuilder.header("If-None-Match", cachedETag);
}
if (cachedLastModified != null) {
httpRequestBuilder.header("If-Modified-Since", cachedLastModified);
}

HttpRequest request = httpRequestBuilder.build();
HttpResponse<String> httpResponse = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

if (httpResponse.statusCode() == 200) {
if (httpResponse.headers() != null) {
cachedETag = httpResponse.headers().firstValue("ETag").orElse(null);
cachedLastModified = httpResponse.headers().firstValue("Last-Modified").orElse(null);
}
log.debug("fetched new content");
} else if (httpResponse.statusCode() == 304) {
log.debug("got 304 Not Modified");
}
return httpResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync.http;

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.contrib.providers.flagd.util.ConcurrentUtils;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static java.net.http.HttpClient.Builder.NO_PROXY;

/**
* HttpConnector is responsible for polling data from a specified URL at regular intervals.
* Notice rate limits for polling http sources like Github.
* It implements the QueueSource interface to enqueue and dequeue change messages.
* The class supports configurable parameters such as poll interval, request timeout, and proxy settings.
* It uses a ScheduledExecutorService to schedule polling tasks and an ExecutorService for HTTP client execution.
* The class also provides methods to initialize, retrieve the stream queue, and shutdown the connector gracefully.
* It supports optional fail-safe initialization via cache.
*
* See readme - Http Connector section.
*/
@Slf4j
public class HttpConnector implements QueueSource {

private Integer pollIntervalSeconds;
private Integer requestTimeoutSeconds;
private BlockingQueue<QueuePayload> queue;
private HttpClient client;
private ExecutorService httpClientExecutor;
private ScheduledExecutorService scheduler;
private Map<String, String> headers;
private PayloadCacheWrapper payloadCacheWrapper;
private PayloadCache payloadCache;
private HttpCacheFetcher httpCacheFetcher;

@NonNull
private String url;

@Builder
public HttpConnector(HttpConnectorOptions httpConnectorOptions) {
this.pollIntervalSeconds = httpConnectorOptions.getPollIntervalSeconds();
this.requestTimeoutSeconds = httpConnectorOptions.getRequestTimeoutSeconds();
ProxySelector proxySelector = NO_PROXY;
if (httpConnectorOptions.getProxyHost() != null && httpConnectorOptions.getProxyPort() != null) {
proxySelector = ProxySelector.of(new InetSocketAddress(httpConnectorOptions.getProxyHost(),
httpConnectorOptions.getProxyPort()));
}
this.url = httpConnectorOptions.getUrl();
this.headers = httpConnectorOptions.getHeaders();
this.httpClientExecutor = httpConnectorOptions.getHttpClientExecutor();
scheduler = Executors.newScheduledThreadPool(httpConnectorOptions.getScheduledThreadPoolSize());
this.client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(httpConnectorOptions.getConnectTimeoutSeconds()))
.proxy(proxySelector)
.executor(this.httpClientExecutor)
.build();
this.queue = new LinkedBlockingQueue<>(httpConnectorOptions.getLinkedBlockingQueueCapacity());
this.payloadCache = httpConnectorOptions.getPayloadCache();
if (payloadCache != null) {
this.payloadCacheWrapper = PayloadCacheWrapper.builder()
.payloadCache(payloadCache)
.payloadCacheOptions(httpConnectorOptions.getPayloadCacheOptions())
.build();
}
if (Boolean.TRUE.equals(httpConnectorOptions.getUseHttpCache())) {
httpCacheFetcher = new HttpCacheFetcher();
}
}

@Override
public void init() throws Exception {
log.info("init Http Connector");
}

@Override
public BlockingQueue<QueuePayload> getStreamQueue() {
boolean success = fetchAndUpdate();
if (!success) {
log.info("failed initial fetch");
if (payloadCache != null) {
updateFromCache();
}
}
Runnable pollTask = buildPollTask();
scheduler.scheduleWithFixedDelay(pollTask, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
return queue;
}

private void updateFromCache() {
log.info("taking initial payload from cache to avoid starting with default values");
String flagData = payloadCache.get();
if (flagData == null) {
log.debug("got null from cache");
return;
}
if (!this.queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData))) {
log.warn("init: Unable to offer file content to queue: queue is full");
}
}

protected Runnable buildPollTask() {
return this::fetchAndUpdate;
}

private boolean fetchAndUpdate() {
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofSeconds(requestTimeoutSeconds))
.GET();
headers.forEach(requestBuilder::header);

HttpResponse<String> response;
try {
log.debug("fetching response");
response = execute(requestBuilder);
} catch (IOException e) {
log.info("could not fetch", e);
return false;
} catch (Exception e) {
log.debug("exception", e);
return false;
}
log.debug("fetched response");
String payload = response.body();
if (!isSuccessful(response)) {
log.info("received non-successful status code: {} {}", response.statusCode(), payload);
return false;
} else if (response.statusCode() == 304) {
log.debug("got 304 Not Modified, skipping update");
return false;
}
if (payload == null) {
log.debug("payload is null");
return false;
}
log.debug("adding payload to queue");
if (!this.queue.offer(new QueuePayload(QueuePayloadType.DATA, payload))) {
log.warn("Unable to offer file content to queue: queue is full");
return false;
}
if (payloadCacheWrapper != null) {
log.debug("scheduling cache update if needed");
scheduler.execute(() ->
payloadCacheWrapper.updatePayloadIfNeeded(payload)
);
}
return payload != null;
}

private static boolean isSuccessful(HttpResponse<String> response) {
return response.statusCode() == 200 || response.statusCode() == 304;
}

protected HttpResponse<String> execute(HttpRequest.Builder requestBuilder) throws IOException, InterruptedException {
if (httpCacheFetcher != null) {
return httpCacheFetcher.fetchContent(client, requestBuilder);
}
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
}

@Override
public void shutdown() throws InterruptedException {
ConcurrentUtils.shutdownAndAwaitTermination(scheduler, 10);
ConcurrentUtils.shutdownAndAwaitTermination(httpClientExecutor, 10);
}
}
Loading
Loading