Skip to content

Commit

Permalink
Proof of concept retaining endpoint concurrency limits on refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
carterkozak committed Nov 13, 2024
1 parent 52aeec6 commit 47bda92
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ final class ConcurrencyLimitedChannel implements LimitedChannel {
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
ConcurrencyLimitedChannel::createHostSpecificState);

private static final ChannelState.Key<CautiousIncreaseAggressiveDecreaseConcurrencyLimiter>
ENDPOINT_SPECIFIC_STATE_KEY = new ChannelState.Key<>(
CautiousIncreaseAggressiveDecreaseConcurrencyLimiter.class,
ConcurrencyLimitedChannel::createEndpointSpecificState);

private final NeverThrowChannel delegate;
private final CautiousIncreaseAggressiveDecreaseConcurrencyLimiter limiter;
private final String channelNameForLogging;
Expand All @@ -61,10 +66,11 @@ static LimitedChannel createForHost(Config cf, Channel channel, int uriIndex, Ch
* Creates a concurrency limited channel for per-endpoint limiting.
* Metrics are not reported by this component per-endpoint, only by the per-endpoint queue.
*/
static LimitedChannel createForEndpoint(Channel channel, String channelName, int uriIndex, Endpoint endpoint) {
static LimitedChannel createForEndpoint(
Channel channel, String channelName, int uriIndex, Endpoint endpoint, ChannelState endpointChannelState) {
return new ConcurrencyLimitedChannel(
channel,
new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL),
endpointChannelState.getState(ENDPOINT_SPECIFIC_STATE_KEY),
new EndpointConcurrencyLimitedChannelInstrumentation(channelName, uriIndex, endpoint));
}

Expand All @@ -81,6 +87,10 @@ static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createHostSpecificSt
return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.HOST_LEVEL);
}

static CautiousIncreaseAggressiveDecreaseConcurrencyLimiter createEndpointSpecificState() {
return new CautiousIncreaseAggressiveDecreaseConcurrencyLimiter(Behavior.ENDPOINT_LEVEL);
}

@Override
public Optional<ListenableFuture<Response>> maybeExecute(
Endpoint endpoint, Request request, LimitEnforcement limitEnforcement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.palantir.dialogue.core;

import com.codahale.metrics.Meter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -252,12 +254,17 @@ private static ImmutableList<LimitedChannel> createHostChannels(
LimitedChannel limitedChannel;
if (cf.isConcurrencyLimitingEnabled()) {
Channel unlimited = channel;
EndpointChannelState endpointChannelState = channelState.getState(EndpointChannelState.KEY);
channel = new ChannelToEndpointChannel(endpoint -> {
if (endpoint.tags().contains("dialogue-disable-endpoint-concurrency-limiting")) {
return unlimited;
}
LimitedChannel limited = ConcurrencyLimitedChannel.createForEndpoint(
unlimited, cf.channelName(), uriIndexForInstrumentation, endpoint);
unlimited,
cf.channelName(),
uriIndexForInstrumentation,
endpoint,
endpointChannelState.get(endpoint));
return QueuedChannel.create(cf, endpoint, limited);
});
limitedChannel = ConcurrencyLimitedChannel.createForHost(
Expand All @@ -271,6 +278,33 @@ private static ImmutableList<LimitedChannel> createHostChannels(
return perUriChannels.build();
}

/**
* {@link ChannelState} provider for per-endpoint channels like the endpoint concurrency limiter.
* This object is held in the per-host state, and can be used to look up a {@link ChannelState}
* scoped to an individual {@link Endpoint}.
* {@link Endpoint} state is held in a weak-keyed cache, equivalent to the one used in
* {@link ChannelToEndpointChannel}.
* {@link Endpoint} objects are usually enums, which will never be garbage collected, however it's possible
* that callers may build an endpoint instances on a per-call basis, so the weak-keyed map is defensive
* against short-lived endpoints.
* We don't use the same map because the {@link ChannelToEndpointChannel} retains full channel state which
* may or may not be designed to be reused across reloads, and we aim to be more precise with state that is
* kept across uri changes.
*/
private record EndpointChannelState(LoadingCache<Endpoint, ChannelState> cache) {
private static final ChannelState.Key<EndpointChannelState> KEY =
new ChannelState.Key<>(EndpointChannelState.class, EndpointChannelState::create);

ChannelState get(Endpoint endpoint) {
return cache.get(endpoint);
}

private static EndpointChannelState create() {
return new EndpointChannelState(
Caffeine.newBuilder().weakKeys().maximumSize(10_000).build(_key -> new ChannelState()));
}
}

private static EndpointChannelFactory createEndpointChannelFactory(Channel multiHostQueuedChannel, Config cf) {
Channel queuedChannel = new QueueOverrideChannel(multiHostQueuedChannel);
return endpoint -> {
Expand Down

0 comments on commit 47bda92

Please sign in to comment.