From aa2f562fb3e4b372cc67fbab7a491af909486013 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Thu, 23 Jan 2025 14:15:03 -0800 Subject: [PATCH 01/19] Reclassify BadQueryRequestException as QUERY_VALIDATION_ERROR --- .../pinot/core/query/executor/ServerQueryExecutorV1Impl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 16cde432c170..a8fc0aa2da09 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -324,7 +324,7 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E // Do not log verbose error for BadQueryRequestException and QueryCancelledException. if (e instanceof BadQueryRequestException) { LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage()); - instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + instanceResponse.addException(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); } else if (e instanceof QueryCancelledException) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Cancelled while processing requestId: {}", requestId, e); From 0114c7ac4f2502a720396504ee7c6869159c86a4 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 09:21:06 -0800 Subject: [PATCH 02/19] Use BadQueryRequestException for column type error for sum/max/min aggregations --- .../operator/combine/BaseSingleBlockCombineOperator.java | 7 ++++++- .../query/aggregation/function/MaxAggregationFunction.java | 3 ++- .../query/aggregation/function/MinAggregationFunction.java | 3 ++- .../query/aggregation/function/SumAggregationFunction.java | 3 ++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java index 1c421c5c2a7f..384bf5860883 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java @@ -32,6 +32,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.spi.exception.EarlyTerminationException; +import org.apache.pinot.spi.exception.BadQueryRequestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,7 +112,11 @@ protected void processSegments() { @Override protected void onProcessSegmentsException(Throwable t) { _processingException.compareAndSet(null, t); - _blockingQueue.offer(new ExceptionResultsBlock(t)); + if (t instanceof BadQueryRequestException) { + _blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t)); + } else { + _blockingQueue.offer(new ExceptionResultsBlock(t)); + } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java index edce2f842028..992a1c68984f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java @@ -31,6 +31,7 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.exception.BadQueryRequestException; public class MaxAggregationFunction extends NullableSingleInputAggregationFunction { @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde break; } default: - throw new IllegalStateException("Cannot compute max for non-numeric type: " + blockValSet.getValueType()); + throw new BadQueryRequestException("Cannot compute max for non-numeric type: " + blockValSet.getValueType()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java index 64daf87c1cfe..5e365563fd55 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java @@ -31,6 +31,7 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.exception.BadQueryRequestException; public class MinAggregationFunction extends NullableSingleInputAggregationFunction { @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde break; } default: - throw new IllegalStateException("Cannot compute min for non-numeric type: " + blockValSet.getValueType()); + throw new BadQueryRequestException("Cannot compute min for non-numeric type: " + blockValSet.getValueType()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java index ee29c540762d..9984ef6897cc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java @@ -31,6 +31,7 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.exception.BadQueryRequestException; public class SumAggregationFunction extends NullableSingleInputAggregationFunction { @@ -139,7 +140,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde break; } default: - throw new IllegalStateException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType()); + throw new BadQueryRequestException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType()); } updateAggregationResultHolder(aggregationResultHolder, sum); } From 89de6146ea98e0397e724921526d02c124082de5 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 11:25:09 -0800 Subject: [PATCH 03/19] Update testQueryExceptions to test correct QueryException code --- .../integration/tests/BaseClusterIntegrationTestSet.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index b6799a8864f7..2fe416299fac 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -518,7 +518,10 @@ public void testQueryExceptions() ? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_EXECUTION_ERROR_CODE); testQueryException("SELECT COUNT(*) FROM mytable where ArrTime = 'potato'", - QueryException.QUERY_EXECUTION_ERROR_CODE); + QueryException.QUERY_VALIDATION_ERROR_CODE); + + testQueryException("SELECT MAX(CarrierDelay) FROM mytable where ArrTime > 5", + QueryException.QUERY_VALIDATION_ERROR_CODE); } private void testQueryException(String query, int errorCode) From 1d8ac33b1bc562b4964fa5580e6b59dcf792afdb Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 13:10:36 -0800 Subject: [PATCH 04/19] Import style cleanup --- .../core/operator/combine/BaseSingleBlockCombineOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java index 384bf5860883..623647fcacb0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java @@ -31,8 +31,8 @@ import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; -import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.exception.EarlyTerminationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From d1257f2db112d89d1ff81a852e5af88f8c73fe45 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 15:35:37 -0800 Subject: [PATCH 05/19] Update query exception type in OfflineClusterIntegrationTest --- .../integration/tests/OfflineClusterIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 3322c6bcd24e..225d1df33f9a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1034,7 +1034,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine) testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE); } else { response = postQuery(sqlQuery); - assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError")); + assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError")); } // invalid argument @@ -1043,7 +1043,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine) testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE); } else { response = postQuery(sqlQuery); - assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError")); + assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError")); } // invalid argument From 781733de235a2bd6270c4398caf598aab3a27cb5 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 17:47:08 -0800 Subject: [PATCH 06/19] Throws BadQueryRequestException in reducer if QUERY_VALIDATION_ERROR_CODE found --- .../MultiStageBrokerRequestHandler.java | 12 ++++++++++-- .../query/service/dispatch/QueryDispatcher.java | 9 ++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 60d1a366a104..82320b8f2d89 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -48,6 +48,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerQueryPhase; import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.BrokerResponseNativeV2; import org.apache.pinot.common.response.broker.ResultTable; @@ -73,6 +74,7 @@ import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.auth.TableAuthorizationResult; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.exception.DatabaseConflictException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; @@ -269,11 +271,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE); return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } catch (Throwable t) { + ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR; + if (t instanceof BadQueryRequestException) { + // provide more specific error code if available + queryException = QueryException.QUERY_VALIDATION_ERROR; + } + String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t); LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage); - requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE); + requestContext.setErrorCode(queryException.getErrorCode()); return new BrokerResponseNative( - QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage)); + QueryException.getException(queryException, consolidatedMessage)); } finally { Tracing.getThreadAccountant().clear(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 253f800d5d04..4cfaf05e6c9a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -46,6 +46,7 @@ import org.apache.calcite.runtime.PairList; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.Plan; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; @@ -76,6 +77,7 @@ import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient; import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchObserver; import org.apache.pinot.spi.accounting.ThreadExecutionContext; +import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; @@ -459,7 +461,12 @@ public static QueryResult runReducer(long requestId, } // TODO: Improve the error handling, e.g. return partial response if (block.isErrorBlock()) { - throw new RuntimeException("Received error query execution result block: " + block.getExceptions()); + Map queryExceptions = block.getExceptions(); + if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) { + throw new BadQueryRequestException("Received error query execution result block: " + queryExceptions); + } + + throw new RuntimeException("Received error query execution result block: " + queryExceptions); } assert block.isSuccessfulEndOfStreamBlock(); MultiStageQueryStats queryStats = block.getQueryStats(); From bf47aac71575a7d2ea0215f20ddcf50a82800e6e Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 17:49:53 -0800 Subject: [PATCH 07/19] Throw BadQueryRequestException for IllegalArgumentException in BaseCombineOperator --- .../core/operator/combine/BaseCombineOperator.java | 13 ++++++++++--- .../tests/BaseClusterIntegrationTestSet.java | 6 ++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 5bd480dcff05..da65aceb18de 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -38,6 +38,7 @@ import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider; +import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; @@ -191,10 +192,16 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti // Not all operators have associated segment, so do this at best effort. IndexSegment segment = operator.getIndexSegment(); if (segment == null) { + if (e instanceof IllegalArgumentException) { + return new BadQueryRequestException(e); + } return e; } - throw new RuntimeException( - "Caught exception while doing operator: " + operator.getClass() + " on segment: " + segment.getSegmentName(), - e); + + String errorMessage = "Caught exception while doing operator: " + operator.getClass() + " on segment: " + segment.getSegmentName(); + if (e instanceof IllegalArgumentException) { + throw new BadQueryRequestException(errorMessage, e); + } + throw new RuntimeException(errorMessage, e); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 2fe416299fac..45ef2d54ef27 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -515,12 +515,14 @@ public void testQueryExceptions() testQueryException("SELECT POTATO(ArrTime) FROM mytable", useMultiStageQueryEngine() - ? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_EXECUTION_ERROR_CODE); + ? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_VALIDATION_ERROR_CODE); + // ArrTime expects a numeric type testQueryException("SELECT COUNT(*) FROM mytable where ArrTime = 'potato'", QueryException.QUERY_VALIDATION_ERROR_CODE); - testQueryException("SELECT MAX(CarrierDelay) FROM mytable where ArrTime > 5", + // Cannot use numeric aggregate function for string column + testQueryException("SELECT MAX(OriginState) FROM mytable where ArrTime > 5", QueryException.QUERY_VALIDATION_ERROR_CODE); } From ec7cec129ed8f8be6662af9dec0768173569fa72 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Fri, 24 Jan 2025 18:12:53 -0800 Subject: [PATCH 08/19] Linter fix --- .../pinot/core/operator/combine/BaseCombineOperator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index da65aceb18de..b7427a9e27c6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -198,7 +198,8 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti return e; } - String errorMessage = "Caught exception while doing operator: " + operator.getClass() + " on segment: " + segment.getSegmentName(); + String errorMessage = "Caught exception while doing operator: " + operator.getClass() + + " on segment: " + segment.getSegmentName(); if (e instanceof IllegalArgumentException) { throw new BadQueryRequestException(errorMessage, e); } From e0e1b0957e84d84b9ffed2db20360bed6cb83ac6 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Mon, 27 Jan 2025 11:08:57 -0800 Subject: [PATCH 09/19] Patch all implementations of BaseCombineOperator --- .../core/operator/combine/GroupByCombineOperator.java | 5 +++++ .../operator/streaming/BaseStreamingCombineOperator.java | 7 ++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java index 633fb7d5e6f8..fd5a4d374cb8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.IndexedTable; import org.apache.pinot.core.data.table.IntermediateRecord; @@ -41,6 +42,7 @@ import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,6 +205,9 @@ public BaseResultsBlock mergeResults() Throwable processingException = _processingException.get(); if (processingException != null) { + if (processingException instanceof BadQueryRequestException) { + return new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, processingException); + } return new ExceptionResultsBlock(processingException); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java index 5bed65ec5869..5a496f7cc43f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/BaseStreamingCombineOperator.java @@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; @@ -171,7 +172,11 @@ protected void onProcessSegmentsException(Throwable t) { _processingException.compareAndSet(null, t); // Clear the blocking queue and add the exception results block to terminate the main thread _blockingQueue.clear(); - _blockingQueue.offer(new ExceptionResultsBlock(t)); + if (t instanceof BadQueryRequestException) { + _blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t)); + } else { + _blockingQueue.offer(new ExceptionResultsBlock(t)); + } } @Override From 713c197c428eb12fae6f26afe46a113f8a86b84f Mon Sep 17 00:00:00 2001 From: Tony Song Date: Mon, 27 Jan 2025 13:28:10 -0800 Subject: [PATCH 10/19] Increment metrics when QUERY_EXECUTION/VALIDATION_ERROR caught in MultiStageBrokerRequestHandler --- .../broker/requesthandler/MultiStageBrokerRequestHandler.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 82320b8f2d89..c6057df4e468 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -271,6 +271,9 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE); return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } catch (Throwable t) { + for (String table : tableNames) { + _brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_PROCESSING_EXCEPTIONS, 1); + } ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR; if (t instanceof BadQueryRequestException) { // provide more specific error code if available From d51a37e184ea56177f106728f73abce3c4169974 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Mon, 27 Jan 2025 13:34:36 -0800 Subject: [PATCH 11/19] Increment BrokerMeter.QUERY_VALIDATION_EXCEPTIONS metric instead --- .../broker/requesthandler/MultiStageBrokerRequestHandler.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index c6057df4e468..fe7d0e79334e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -271,13 +271,11 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE); return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } catch (Throwable t) { - for (String table : tableNames) { - _brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_PROCESSING_EXCEPTIONS, 1); - } ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR; if (t instanceof BadQueryRequestException) { // provide more specific error code if available queryException = QueryException.QUERY_VALIDATION_ERROR; + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); } String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t); From beb88e042c0cf52467a9ac07fa35458697bdacb4 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 15:47:34 -0800 Subject: [PATCH 12/19] Throw QUERY_VALIDATION_ERROR if its error code found when processing blocks --- .../requesthandler/MultiStageBrokerRequestHandler.java | 3 +-- .../pinot/query/service/dispatch/QueryDispatcher.java | 8 +++++--- .../query/runtime/queries/QueryRunnerAccountingTest.java | 9 +++++---- .../pinot/query/runtime/queries/QueryRunnerTest.java | 3 ++- .../pinot/query/runtime/queries/QueryRunnerTestBase.java | 3 ++- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index fe7d0e79334e..656de0cd896c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -74,7 +74,6 @@ import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.auth.TableAuthorizationResult; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.exception.DatabaseConflictException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; @@ -272,7 +271,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } catch (Throwable t) { ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR; - if (t instanceof BadQueryRequestException) { + if (t.equals(QueryException.QUERY_VALIDATION_ERROR)) { // provide more specific error code if available queryException = QueryException.QUERY_VALIDATION_ERROR; _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 4cfaf05e6c9a..606583a42df9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -50,6 +50,7 @@ import org.apache.pinot.common.proto.Plan; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -77,7 +78,6 @@ import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient; import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchObserver; import org.apache.pinot.spi.accounting.ThreadExecutionContext; -import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; @@ -92,6 +92,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.exception.QueryException.QUERY_VALIDATION_ERROR; + /** * {@code QueryDispatcher} dispatch a query to different workers. @@ -398,7 +400,7 @@ public static QueryResult runReducer(long requestId, DispatchableSubPlan subPlan, long timeoutMs, Map queryOptions, - MailboxService mailboxService) { + MailboxService mailboxService) throws ProcessingException { long startTimeMs = System.currentTimeMillis(); long deadlineMs = startTimeMs + timeoutMs; @@ -463,7 +465,7 @@ public static QueryResult runReducer(long requestId, if (block.isErrorBlock()) { Map queryExceptions = block.getExceptions(); if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) { - throw new BadQueryRequestException("Received error query execution result block: " + queryExceptions); + throw QUERY_VALIDATION_ERROR; } throw new RuntimeException("Received error query execution result block: " + queryExceptions); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java index 3aa6556b1b20..ee385ea791ec 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.query.QueryEnvironmentTestBase; @@ -91,7 +92,7 @@ public void tearDown() { } @Test - void testWithDefaultThreadAccountant() { + void testWithDefaultThreadAccountant() throws ProcessingException { Tracing.DefaultThreadResourceUsageAccountant accountant = new Tracing.DefaultThreadResourceUsageAccountant(); try (MockedStatic tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); @@ -106,7 +107,7 @@ void testWithDefaultThreadAccountant() { } @Test - void testWithPerQueryAccountantFactory() { + void testWithPerQueryAccountantFactory() throws ProcessingException { HashMap configs = getAccountingConfig(); ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); @@ -127,7 +128,7 @@ void testWithPerQueryAccountantFactory() { } @Test - void testDisableSamplingForMSE() { + void testDisableSamplingForMSE() throws ProcessingException { HashMap configs = getAccountingConfig(); configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false); @@ -161,7 +162,7 @@ public boolean isAnchorThreadInterrupted() { } @Test(expectedExceptions = EarlyTerminationException.class) - void testInterrupt() { + void testInterrupt() throws ProcessingException { HashMap configs = getAccountingConfig(); ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index b70b39a8f5da..9d970b05e1f8 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; @@ -166,7 +167,7 @@ public void tearDown() { * Test compares with expected row count only. */ @Test(dataProvider = "testDataWithSqlToFinalRowCount") - public void testSqlWithFinalRowCountChecker(String sql, int expectedRows) { + public void testSqlWithFinalRowCountChecker(String sql, int expectedRows) throws ProcessingException { ResultTable resultTable = queryRunner(sql, false).getResultTable(); Assert.assertEquals(resultTable.getRows().size(), expectedRows); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index 8fb1a6c75852..ebe2bff9e946 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -46,6 +46,7 @@ import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -110,7 +111,7 @@ protected QueryEnvironment.QueryPlannerResult planQuery(String sql) { * Dispatch query to each pinot-server. The logic should mimic QueryDispatcher.submit() but does not actually make * ser/de dispatches. */ - protected QueryDispatcher.QueryResult queryRunner(String sql, boolean trace) { + protected QueryDispatcher.QueryResult queryRunner(String sql, boolean trace) throws ProcessingException { long requestId = REQUEST_ID_GEN.getAndIncrement(); SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql); QueryEnvironment.QueryPlannerResult queryPlannerResult = _queryEnvironment.planQuery(sql, sqlNodeAndOptions, From 520bf695d66c7ecd93256c402f5f136575fe9f41 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 16:15:20 -0800 Subject: [PATCH 13/19] Simplify error throw logic in BaseCombineOperator --- .../core/operator/combine/BaseCombineOperator.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index b7427a9e27c6..7acdb507a49d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -191,15 +191,12 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti // Otherwise, try to get the segment name to help locate the segment when debugging query errors. // Not all operators have associated segment, so do this at best effort. IndexSegment segment = operator.getIndexSegment(); - if (segment == null) { - if (e instanceof IllegalArgumentException) { - return new BadQueryRequestException(e); - } - return e; + String errorMessage = null; + if (segment != null) { + errorMessage = "Caught exception while doing operator: " + operator.getClass() + + " on segment: " + segment.getSegmentName(); } - String errorMessage = "Caught exception while doing operator: " + operator.getClass() - + " on segment: " + segment.getSegmentName(); if (e instanceof IllegalArgumentException) { throw new BadQueryRequestException(errorMessage, e); } From c3397d852c7956d39b7ec1defc7969306ce14461 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 16:30:14 -0800 Subject: [PATCH 14/19] Increment QUERY_VALIDATION_EXCEPTIONS broker metric in single stage broker --- .../BaseSingleStageBrokerRequestHandler.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 65a2d0a79ef7..9bfb665fb7f7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -75,6 +75,7 @@ import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -845,6 +846,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis(); brokerResponse.setTimeUsedMs(totalTimeMs); augmentStatistics(requestContext, brokerResponse); + // include both broker side exceptions and server side exceptions + List brokerExceptions = brokerResponse.getExceptions(); + brokerExceptions.stream() + .filter(exception -> exception.getErrorCode() == QueryException.QUERY_VALIDATION_ERROR_CODE) + .findFirst() + .ifPresent(exception -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1)); if (QueryOptionsUtils.shouldDropResults(pinotQuery.getQueryOptions())) { brokerResponse.setResultTable(null); } From 1bce9ffea503badd4e9169937e2d495af6092d0d Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 16:34:56 -0800 Subject: [PATCH 15/19] Fix static import style --- .../apache/pinot/query/service/dispatch/QueryDispatcher.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 606583a42df9..61631cf51cb3 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -92,8 +92,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.common.exception.QueryException.QUERY_VALIDATION_ERROR; - /** * {@code QueryDispatcher} dispatch a query to different workers. @@ -465,7 +463,7 @@ public static QueryResult runReducer(long requestId, if (block.isErrorBlock()) { Map queryExceptions = block.getExceptions(); if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) { - throw QUERY_VALIDATION_ERROR; + throw QueryException.QUERY_VALIDATION_ERROR; } throw new RuntimeException("Received error query execution result block: " + queryExceptions); From 4f13fb0a79940aa993a1904b673ca2cd7f871745 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 16:48:48 -0800 Subject: [PATCH 16/19] Handle both IllegalArgumentException and BadQueryRequestException in BaseCombineOperator --- .../apache/pinot/core/operator/combine/BaseCombineOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 7acdb507a49d..fc4435c86915 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -197,7 +197,7 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti + " on segment: " + segment.getSegmentName(); } - if (e instanceof IllegalArgumentException) { + if (e instanceof IllegalArgumentException || e instanceof BadQueryRequestException) { throw new BadQueryRequestException(errorMessage, e); } throw new RuntimeException(errorMessage, e); From d2e85cb6c440b27148e93a350f6100cd8cabc7cc Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 23:43:28 -0800 Subject: [PATCH 17/19] Refactor using QueryInfoException --- .../MultiStageBrokerRequestHandler.java | 4 ++- .../common/exception/QueryException.java | 1 + .../common/exception/QueryInfoException.java | 32 +++++++++++++++++++ .../service/dispatch/QueryDispatcher.java | 11 ++++--- .../queries/QueryRunnerAccountingTest.java | 9 +++--- .../runtime/queries/QueryRunnerTest.java | 5 ++- .../runtime/queries/QueryRunnerTestBase.java | 3 +- 7 files changed, 50 insertions(+), 15 deletions(-) create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 656de0cd896c..843eeeb64801 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -45,6 +45,7 @@ import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.exception.QueryInfoException; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerQueryPhase; import org.apache.pinot.common.response.BrokerResponse; @@ -271,7 +272,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } catch (Throwable t) { ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR; - if (t.equals(QueryException.QUERY_VALIDATION_ERROR)) { + if (t instanceof QueryInfoException + && ((QueryInfoException) t).getProcessingException().equals(QueryException.QUERY_VALIDATION_ERROR)) { // provide more specific error code if available queryException = QueryException.QUERY_VALIDATION_ERROR; _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index f6e563f62c55..867288931b4a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -26,6 +26,7 @@ // TODO: Clean up ProcessingException (thrift) because we don't send it through the wire +// TODO: Rename this class to QueryExceptionUtil because it doesn't extend Exception public class QueryException { private QueryException() { } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java new file mode 100644 index 000000000000..74d9d4ae35f0 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java @@ -0,0 +1,32 @@ +package org.apache.pinot.common.exception; + +import org.apache.pinot.common.response.ProcessingException; + + +/** + * Exception to contain info about QueryException errors. + * Throwable version of {@link org.apache.pinot.common.response.broker.QueryProcessingException} + */ +public class QueryInfoException extends RuntimeException { + private ProcessingException _processingException; + + public QueryInfoException(String message) { + super(message); + } + + public QueryInfoException(String message, Throwable cause) { + super(message, cause); + } + + public QueryInfoException(Throwable cause) { + super(cause); + } + + public ProcessingException getProcessingException() { + return _processingException; + } + + public void setProcessingException(ProcessingException processingException) { + _processingException = processingException; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 61631cf51cb3..4477cccedc13 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -47,10 +47,10 @@ import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.exception.QueryInfoException; import org.apache.pinot.common.proto.Plan; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -398,7 +398,7 @@ public static QueryResult runReducer(long requestId, DispatchableSubPlan subPlan, long timeoutMs, Map queryOptions, - MailboxService mailboxService) throws ProcessingException { + MailboxService mailboxService) { long startTimeMs = System.currentTimeMillis(); long deadlineMs = startTimeMs + timeoutMs; @@ -462,11 +462,14 @@ public static QueryResult runReducer(long requestId, // TODO: Improve the error handling, e.g. return partial response if (block.isErrorBlock()) { Map queryExceptions = block.getExceptions(); + String errorMessage = "Received error query execution result block: " + queryExceptions; if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) { - throw QueryException.QUERY_VALIDATION_ERROR; + QueryInfoException queryInfoException = new QueryInfoException(errorMessage); + queryInfoException.setProcessingException(QueryException.QUERY_VALIDATION_ERROR); + throw queryInfoException; } - throw new RuntimeException("Received error query execution result block: " + queryExceptions); + throw new RuntimeException(errorMessage); } assert block.isSuccessfulEndOfStreamBlock(); MultiStageQueryStats queryStats = block.getQueryStats(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java index ee385ea791ec..3aa6556b1b20 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerAccountingTest.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; import org.apache.pinot.query.QueryEnvironmentTestBase; @@ -92,7 +91,7 @@ public void tearDown() { } @Test - void testWithDefaultThreadAccountant() throws ProcessingException { + void testWithDefaultThreadAccountant() { Tracing.DefaultThreadResourceUsageAccountant accountant = new Tracing.DefaultThreadResourceUsageAccountant(); try (MockedStatic tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); @@ -107,7 +106,7 @@ void testWithDefaultThreadAccountant() throws ProcessingException { } @Test - void testWithPerQueryAccountantFactory() throws ProcessingException { + void testWithPerQueryAccountantFactory() { HashMap configs = getAccountingConfig(); ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); @@ -128,7 +127,7 @@ void testWithPerQueryAccountantFactory() throws ProcessingException { } @Test - void testDisableSamplingForMSE() throws ProcessingException { + void testDisableSamplingForMSE() { HashMap configs = getAccountingConfig(); configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false); @@ -162,7 +161,7 @@ public boolean isAnchorThreadInterrupted() { } @Test(expectedExceptions = EarlyTerminationException.class) - void testInterrupt() throws ProcessingException { + void testInterrupt() { HashMap configs = getAccountingConfig(); ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index 9d970b05e1f8..8947ec7a8dd5 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.query.QueryEnvironmentTestBase; import org.apache.pinot.query.QueryServerEnclosure; @@ -167,7 +166,7 @@ public void tearDown() { * Test compares with expected row count only. */ @Test(dataProvider = "testDataWithSqlToFinalRowCount") - public void testSqlWithFinalRowCountChecker(String sql, int expectedRows) throws ProcessingException { + public void testSqlWithFinalRowCountChecker(String sql, int expectedRows) { ResultTable resultTable = queryRunner(sql, false).getResultTable(); Assert.assertEquals(resultTable.getRows().size(), expectedRows); } @@ -207,7 +206,7 @@ public void testSqlWithExceptionMsgChecker(String sql, String expectedError) { Assert.assertTrue( exceptionMessage.startsWith("Received error query execution result block: ") || exceptionMessage.startsWith( "Error occurred during stage submission") || exceptionMessage.equals(expectedError), - "Exception message didn't start with proper heading: " + exceptionMessage); + "Exception message didn't start with proper heading: " + expectedError); Assert.assertTrue(exceptionMessage.contains(expectedError), "Exception should contain: " + expectedError + ", but found: " + exceptionMessage); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index ebe2bff9e946..8fb1a6c75852 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -46,7 +46,6 @@ import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; @@ -111,7 +110,7 @@ protected QueryEnvironment.QueryPlannerResult planQuery(String sql) { * Dispatch query to each pinot-server. The logic should mimic QueryDispatcher.submit() but does not actually make * ser/de dispatches. */ - protected QueryDispatcher.QueryResult queryRunner(String sql, boolean trace) throws ProcessingException { + protected QueryDispatcher.QueryResult queryRunner(String sql, boolean trace) { long requestId = REQUEST_ID_GEN.getAndIncrement(); SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql); QueryEnvironment.QueryPlannerResult queryPlannerResult = _queryEnvironment.planQuery(sql, sqlNodeAndOptions, From b8a42e84607a693b22f73ef3a8bbdc97e7916f65 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Tue, 28 Jan 2025 23:56:19 -0800 Subject: [PATCH 18/19] Missing license header for QueryInfoException --- .../common/exception/QueryInfoException.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java index 74d9d4ae35f0..8c971a0c655d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryInfoException.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pinot.common.exception; import org.apache.pinot.common.response.ProcessingException; From c3b0b48a27ed9412b330e9f1c636fdd0e46dc402 Mon Sep 17 00:00:00 2001 From: Tony Song Date: Wed, 29 Jan 2025 00:04:14 -0800 Subject: [PATCH 19/19] Revert QueryRunnerTest --- .../org/apache/pinot/query/runtime/queries/QueryRunnerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java index 8947ec7a8dd5..b70b39a8f5da 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java @@ -206,7 +206,7 @@ public void testSqlWithExceptionMsgChecker(String sql, String expectedError) { Assert.assertTrue( exceptionMessage.startsWith("Received error query execution result block: ") || exceptionMessage.startsWith( "Error occurred during stage submission") || exceptionMessage.equals(expectedError), - "Exception message didn't start with proper heading: " + expectedError); + "Exception message didn't start with proper heading: " + exceptionMessage); Assert.assertTrue(exceptionMessage.contains(expectedError), "Exception should contain: " + expectedError + ", but found: " + exceptionMessage); }