From 181f418b385d13e514bc0ce1b83b1e8b6f698c62 Mon Sep 17 00:00:00 2001 From: Carter Kozak Date: Thu, 14 Nov 2024 20:47:54 -0500 Subject: [PATCH] fix several QueuedChannel tests --- .../dialogue/core/QueuedChannelTest.java | 373 ++++++++++++------ .../QueuedChannelTest/testQueueTracing.log | 4 +- 2 files changed, 260 insertions(+), 117 deletions(-) diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java index bbdf550a8..f40e7fa59 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/QueuedChannelTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.codahale.metrics.Timer; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -30,11 +29,17 @@ import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; +import com.palantir.dialogue.TestEndpoint; +import com.palantir.dialogue.TestResponse; import com.palantir.dialogue.core.LimitedChannel.LimitEnforcement; +import com.palantir.dialogue.core.QueuedChannel.QueuedChannelInstrumentation; import com.palantir.tracing.TestTracing; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -81,25 +86,47 @@ public void before() { mockHasCapacity(); } + private static QueuedChannel createQueue(LimitedChannel delegate) { + return createQueue(delegate, 100_000); + } + + private static QueuedChannel createQueue(LimitedChannel delegate, int maxQueueSize) { + String channelName = "my-channel"; + return new QueuedChannel( + delegate, + channelName, + "queue-type", + QueuedChannel.channelInstrumentation( + DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), channelName), + maxQueueSize); + } + @Test public void testReceivesSuccessfulResponse() throws ExecutionException, InterruptedException { + SettableFuture result = SettableFuture.create(); + LimitedChannel delegateChannel = (_endpoint, _request, _limitEnforcement) -> Optional.of(result); + QueuedChannel queued = createQueue(delegateChannel); ListenableFuture response = - queuedChannel.maybeExecute(endpoint, request).get(); + queued.maybeExecute(TestEndpoint.GET, Request.builder().build()).get(); assertThat(response.isDone()).isFalse(); - futureResponse.set(mockResponse); + Response expectedResponse = new TestResponse().code(200); + result.set(expectedResponse); assertThat(response.isDone()).isTrue(); - assertThat(response.get()).isEqualTo(mockResponse); + assertThat(response.get()).isEqualTo(expectedResponse); } @Test public void testReceivesExceptionalResponse() { + SettableFuture result = SettableFuture.create(); + LimitedChannel delegateChannel = (_endpoint, _request, _limitEnforcement) -> Optional.of(result); + QueuedChannel queued = createQueue(delegateChannel); ListenableFuture response = - queuedChannel.maybeExecute(endpoint, request).get(); + queued.maybeExecute(TestEndpoint.GET, Request.builder().build()).get(); assertThat(response.isDone()).isFalse(); - futureResponse.setException(new IllegalArgumentException()); + result.setException(new IllegalArgumentException()); assertThat(response.isDone()).isTrue(); assertThatThrownBy(response::get) @@ -109,13 +136,42 @@ public void testReceivesExceptionalResponse() { @Test public void testQueuedRequestExecutedOnNextSubmission() { - mockNoCapacity(); - queuedChannel.maybeExecute(endpoint, request); - verify(delegate, times(2)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); - - mockHasCapacity(); - queuedChannel.maybeExecute(endpoint, request); - verify(delegate, times(4)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); + List>> settableResponses = new CopyOnWriteArrayList<>(); + AtomicBoolean hasCapacity = new AtomicBoolean(false); + LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> { + Optional> result = Optional.empty(); + if (hasCapacity.get() || !limitEnforcement.enforceLimits()) { + result = Optional.of(SettableFuture.create()); + } + settableResponses.add(result); + return result.map(item -> item); + }; + QueuedChannel queued = createQueue(delegateChannel); + + assertThat(settableResponses).isEmpty(); + + // Initial request is expected to be allowed in all cases, to allow the queue to be processed + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(1); + assertThat(settableResponses.get(0)) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(3); + // scheduleNextTask is attempted twice + assertThat(settableResponses.get(1)).isEmpty(); + assertThat(settableResponses.get(2)).isEmpty(); + + // interactions with the delegate are no longer rejected + hasCapacity.set(true); + // submit another request which triggers processing of the queue + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(5); + assertThat(settableResponses.get(3)).isPresent(); + assertThat(settableResponses.get(4)).isPresent(); } @Test @@ -147,104 +203,164 @@ public void testQueuedRequestExecutedOnNextSubmission_throws() throws ExecutionE @Test public void testQueuedRequestExecutedWhenRunningRequestCompletes() { - mockHasCapacity(); - queuedChannel.maybeExecute(endpoint, request); - verify(delegate, times(1)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); - - mockNoCapacity(); - queuedChannel.maybeExecute(endpoint, request); - verify(delegate, times(3)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); - futureResponse.set(mockResponse); - - verify(delegate, times(4)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); + List>> settableResponses = new CopyOnWriteArrayList<>(); + AtomicBoolean hasCapacity = new AtomicBoolean(false); + LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> { + Optional> result = Optional.empty(); + if (hasCapacity.get() || !limitEnforcement.enforceLimits()) { + result = Optional.of(SettableFuture.create()); + } + settableResponses.add(result); + return result.map(item -> item); + }; + QueuedChannel queued = createQueue(delegateChannel); + + assertThat(settableResponses).isEmpty(); + + // Initial request is expected to be allowed in all cases, to allow the queue to be processed + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(1); + assertThat(settableResponses.get(0)) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(3); + // scheduleNextTask is attempted twice + assertThat(settableResponses.get(1)).isEmpty(); + assertThat(settableResponses.get(2)).isEmpty(); + + // interactions with the delegate are no longer rejected + // We complete the initial request, which triggers processing of the queue + hasCapacity.set(true); + settableResponses.get(0).get().set(new TestResponse().code(200)); + assertThat(settableResponses).hasSize(4); + assertThat(settableResponses.get(3)).isPresent(); } @Test @TestTracing(snapshot = true) public void testQueueTracing() { - // Put requests on queue - mockNoCapacity(); - queuedChannel.maybeExecute(endpoint, request); - queuedChannel.maybeExecute(endpoint, request); - verify(delegate, times(3)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); - - // flush queue by completing a request - mockHasCapacity(); - queuedChannel.maybeExecute(endpoint, request); - verify(delegate, times(6)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); - futureResponse.set(mockResponse); - - verify(delegate, times(6)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); + testQueuedRequestExecutedWhenRunningRequestCompletes(); } @Test public void testQueueFullReturnsLimited() { - queuedChannel = new QueuedChannel( - delegate, - "my-channel", - "queue-type", - QueuedChannel.channelInstrumentation( - DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), "my-channel"), - 1); - - mockNoCapacity(); - queuedChannel.maybeExecute(endpoint, request); - - assertThat(queuedChannel.maybeExecute(endpoint, request)).isEmpty(); + List>> settableResponses = new CopyOnWriteArrayList<>(); + LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> { + Optional> result = Optional.empty(); + if (!limitEnforcement.enforceLimits()) { + result = Optional.of(SettableFuture.create()); + } + settableResponses.add(result); + return result.map(item -> item); + }; + QueuedChannel queued = createQueue(delegateChannel, 1); + + assertThat(settableResponses).isEmpty(); + + // Initial request is expected to be allowed in all cases, to allow the queue to be processed + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(1); + assertThat(settableResponses.get(0)) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + + // Now that we have a request in flight, we can queue one: + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .isPresent(); + + // The next request exceeds the maximum queue size + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .as("When the queue is full, this should return a 'limited' empty optional") + .isEmpty(); } @Test public void testQueueSizeMetric() { - DialogueClientMetrics metrics = DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()); - String channelName = "my-channel"; - - queuedChannel = new QueuedChannel( - delegate, channelName, "queue-type", QueuedChannel.channelInstrumentation(metrics, channelName), 1); - - mockNoCapacity(); - queuedChannel.maybeExecute(endpoint, request); - - assertThat(queuedChannel.maybeExecute(endpoint, request)).isEmpty(); - assertThat(metrics.requestsQueued(channelName).getCount()).isOne(); - } - - @Test - public void testQueueTimeMetric_success() { - DialogueClientMetrics metrics = DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()); - String channelName = "my-channel"; - - queuedChannel = new QueuedChannel( - delegate, channelName, "queue-type", QueuedChannel.channelInstrumentation(metrics, channelName), 1); - - mockNoCapacity(); - assertThat(queuedChannel.maybeExecute(endpoint, request)) - .hasValueSatisfying(future -> assertThat(future).isNotDone()); - mockHasCapacity(); - queuedChannel.schedule(); - futureResponse.set(mockResponse); - - Timer timer = metrics.requestQueuedTime(channelName); - assertThat(timer.getCount()).isOne(); - assertThat(timer.getSnapshot().getMax()).isPositive(); + List>> settableResponses = new CopyOnWriteArrayList<>(); + AtomicBoolean hasCapacity = new AtomicBoolean(false); + LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> { + Optional> result = Optional.empty(); + if (hasCapacity.get() || !limitEnforcement.enforceLimits()) { + result = Optional.of(SettableFuture.create()); + } + settableResponses.add(result); + return result.map(item -> item); + }; + QueuedChannelInstrumentation instrumentation = QueuedChannel.channelInstrumentation( + DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), "channel"); + QueuedChannel queued = new QueuedChannel(delegateChannel, "channel", "queue-type", instrumentation, 100_000); + + assertThat(settableResponses).isEmpty(); + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + + // Initial request is expected to be allowed in all cases, to allow the queue to be processed + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(1); + assertThat(settableResponses.get(0)) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + + // Now that we have a request in flight, we can queue one: + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .isPresent(); + + assertThat(instrumentation.requestsQueued().getCount()).isOne(); + assertThat(instrumentation.requestQueuedTime().getCount()).isZero(); + + hasCapacity.set(true); + settableResponses.get(0).get().set(new TestResponse().code(200)); + + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + assertThat(instrumentation.requestQueuedTime().getCount()).isOne(); + assertThat(instrumentation.requestQueuedTime().getSnapshot().getMax()).isPositive(); } @Test public void testQueueTimeMetric_cancel() { - DialogueClientMetrics metrics = DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()); - String channelName = "my-channel"; - - queuedChannel = new QueuedChannel( - delegate, channelName, "queue-type", QueuedChannel.channelInstrumentation(metrics, channelName), 1); - - mockNoCapacity(); - Optional> result = queuedChannel.maybeExecute(endpoint, request); - assertThat(result).hasValueSatisfying(future -> assertThat(future).isNotDone()); - result.get().cancel(true); - queuedChannel.schedule(); - - Timer timer = metrics.requestQueuedTime(channelName); - assertThat(timer.getCount()).isOne(); - assertThat(timer.getSnapshot().getMax()).isPositive(); + List>> settableResponses = new CopyOnWriteArrayList<>(); + LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> { + Optional> result = Optional.empty(); + if (!limitEnforcement.enforceLimits()) { + result = Optional.of(SettableFuture.create()); + } + settableResponses.add(result); + return result.map(item -> item); + }; + QueuedChannelInstrumentation instrumentation = QueuedChannel.channelInstrumentation( + DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), "channel"); + QueuedChannel queued = new QueuedChannel(delegateChannel, "channel", "queue-type", instrumentation, 100_000); + + assertThat(settableResponses).isEmpty(); + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + + // Initial request is expected to be allowed in all cases, to allow the queue to be processed + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(1); + assertThat(settableResponses.get(0)) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + + // Now that we have a request in flight, we can queue one: + Optional> queuedResponse = + queued.maybeExecute(TestEndpoint.GET, Request.builder().build()); + assertThat(queuedResponse).isPresent(); + + assertThat(instrumentation.requestsQueued().getCount()).isOne(); + assertThat(instrumentation.requestQueuedTime().getCount()).isZero(); + + queuedResponse.get().cancel(false); + queued.schedule(); + + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + assertThat(instrumentation.requestQueuedTime().getCount()).isOne(); + assertThat(instrumentation.requestQueuedTime().getSnapshot().getMax()).isPositive(); } @Test @@ -273,20 +389,53 @@ public void testQueuedResponseClosedOnCancel() { @Test public void testQueuedResponsePropagatesCancel() { - Request queued = Request.builder().putHeaderParams("key", "val").build(); - when(delegate.maybeExecute(endpoint, queued, DO_NOT_SKIP_LIMITS)).thenReturn(Optional.empty()); - ListenableFuture result = - queuedChannel.maybeExecute(endpoint, queued).get(); - verify(delegate, times(2)).maybeExecute(endpoint, queued, DO_NOT_SKIP_LIMITS); - - when(delegate.maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS)) - .thenReturn(Optional.of(Futures.immediateFuture(Mockito.mock(Response.class)))); - when(delegate.maybeExecute(endpoint, queued, DO_NOT_SKIP_LIMITS)).thenReturn(maybeResponse); - queuedChannel.maybeExecute(endpoint, request); - result.cancel(true); - assertThat(futureResponse).isCancelled(); - verify(delegate, times(1)).maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS); - verify(delegate, times(3)).maybeExecute(endpoint, queued, DO_NOT_SKIP_LIMITS); + List>> settableResponses = new CopyOnWriteArrayList<>(); + AtomicBoolean hasCapacity = new AtomicBoolean(false); + LimitedChannel delegateChannel = (_endpoint, _request, limitEnforcement) -> { + Optional> result = Optional.empty(); + if (hasCapacity.get() || !limitEnforcement.enforceLimits()) { + result = Optional.of(SettableFuture.create()); + } + settableResponses.add(result); + return result.map(item -> item); + }; + QueuedChannelInstrumentation instrumentation = QueuedChannel.channelInstrumentation( + DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()), "channel"); + QueuedChannel queued = new QueuedChannel(delegateChannel, "channel", "queue-type", instrumentation, 100_000); + + assertThat(settableResponses).isEmpty(); + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + + // Initial request is expected to be allowed in all cases, to allow the queue to be processed + assertThat(queued.maybeExecute(TestEndpoint.GET, Request.builder().build())) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + assertThat(settableResponses).hasSize(1); + assertThat(settableResponses.get(0)) + .hasValueSatisfying(item -> assertThat(item).isNotDone()); + + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + + // Now that we have a request in flight, we can queue one: + Optional> queuedResponse = + queued.maybeExecute(TestEndpoint.GET, Request.builder().build()); + assertThat(queuedResponse).isPresent(); + + assertThat(instrumentation.requestsQueued().getCount()).isOne(); + assertThat(instrumentation.requestQueuedTime().getCount()).isZero(); + + // allow the queued request to be processed + hasCapacity.set(true); + queued.schedule(); + // cancel the QueuedChannel response future + queuedResponse.get().cancel(false); + // The future on the other side of the system should be canceled as well + assertThat(settableResponses).hasSize(4); + assertThat(settableResponses.get(3)) + .hasValueSatisfying(item -> assertThat(item).isCancelled()); + + assertThat(instrumentation.requestsQueued().getCount()).isZero(); + assertThat(instrumentation.requestQueuedTime().getCount()).isOne(); + assertThat(instrumentation.requestQueuedTime().getSnapshot().getMax()).isPositive(); } @Test @@ -307,12 +456,8 @@ public void testQueuedResponseAvoidsExecutingCancelled() { } private OngoingStubbing>> mockHasCapacity() { - return when(delegate.maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS)) + return Mockito.lenient() + .when(delegate.maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS)) .thenReturn(maybeResponse); } - - private OngoingStubbing>> mockNoCapacity() { - return when(delegate.maybeExecute(endpoint, request, DO_NOT_SKIP_LIMITS)) - .thenReturn(Optional.empty()); - } } diff --git a/dialogue-core/src/test/resources/tracing/QueuedChannelTest/testQueueTracing.log b/dialogue-core/src/test/resources/tracing/QueuedChannelTest/testQueueTracing.log index c49648013..b460e6e24 100644 --- a/dialogue-core/src/test/resources/tracing/QueuedChannelTest/testQueueTracing.log +++ b/dialogue-core/src/test/resources/tracing/QueuedChannelTest/testQueueTracing.log @@ -1,3 +1 @@ -{"traceId":"f82b794f6c9f66a2","parentSpanId":null,"spanId":"89eef7fe768d4d16","type":"LOCAL","operation":"Dialogue-request-enqueued","startTimeMicroSeconds":1714404335723248,"durationNanoSeconds":99958,"metadata":{"queue":"queue-type","channel":"my-channel"}} -{"traceId":"5dcaf0bcdfd12e98","parentSpanId":null,"spanId":"2f997a8b9e46f2af","type":"LOCAL","operation":"Dialogue-request-enqueued","startTimeMicroSeconds":1714404335723267,"durationNanoSeconds":110416,"metadata":{"queue":"queue-type","channel":"my-channel"}} -{"traceId":"405e0148094f702c","parentSpanId":null,"spanId":"ac9647b054367824","type":"LOCAL","operation":"Dialogue-request-enqueued","startTimeMicroSeconds":1714404335723335,"durationNanoSeconds":61583,"metadata":{"queue":"queue-type","channel":"my-channel"}} +{"traceId":"a793d5942b7a9c8d","parentSpanId":null,"spanId":"9d194d802e1bdd01","type":"LOCAL","operation":"Dialogue-request-enqueued","startTimeMicroSeconds":1731620193557566,"durationNanoSeconds":3299875,"metadata":{"queue":"queue-type","channel":"my-channel"}}