Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map BadQueryRequestException to QueryException.QUERY_VALIDATION_ERROR #14917

Merged
merged 19 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
aa2f562
Reclassify BadQueryRequestException as QUERY_VALIDATION_ERROR
real-mj-song Jan 23, 2025
0114c7a
Use BadQueryRequestException for column type error for sum/max/min ag…
real-mj-song Jan 24, 2025
89de614
Update testQueryExceptions to test correct QueryException code
real-mj-song Jan 24, 2025
1d8ac33
Import style cleanup
real-mj-song Jan 24, 2025
d1257f2
Update query exception type in OfflineClusterIntegrationTest
real-mj-song Jan 24, 2025
781733d
Throws BadQueryRequestException in reducer if QUERY_VALIDATION_ERROR_…
real-mj-song Jan 25, 2025
bf47aac
Throw BadQueryRequestException for IllegalArgumentException in BaseCo…
real-mj-song Jan 25, 2025
ec7cec1
Linter fix
real-mj-song Jan 25, 2025
e0e1b09
Patch all implementations of BaseCombineOperator
real-mj-song Jan 27, 2025
713c197
Increment metrics when QUERY_EXECUTION/VALIDATION_ERROR caught in Mul…
real-mj-song Jan 27, 2025
d51a37e
Increment BrokerMeter.QUERY_VALIDATION_EXCEPTIONS metric instead
real-mj-song Jan 27, 2025
beb88e0
Throw QUERY_VALIDATION_ERROR if its error code found when processing …
real-mj-song Jan 28, 2025
520bf69
Simplify error throw logic in BaseCombineOperator
real-mj-song Jan 29, 2025
c3397d8
Increment QUERY_VALIDATION_EXCEPTIONS broker metric in single stage b…
real-mj-song Jan 29, 2025
1bce9ff
Fix static import style
real-mj-song Jan 29, 2025
4f13fb0
Handle both IllegalArgumentException and BadQueryRequestException in …
real-mj-song Jan 29, 2025
d2e85cb
Refactor using QueryInfoException
real-mj-song Jan 29, 2025
b8a42e8
Missing license header for QueryInfoException
real-mj-song Jan 29, 2025
c3b0b48
Revert QueryRunnerTest
real-mj-song Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.ResultTable;
Expand All @@ -73,6 +74,7 @@
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.auth.TableAuthorizationResult;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -269,11 +271,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR;
if (t instanceof BadQueryRequestException) {
// provide more specific error code if available
queryException = QueryException.QUERY_VALIDATION_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update broker metrics - BrokerMeter.QUERY_VALIDATION_EXCEPTIONS ?

We should also update this in SingleStageBrokerRequestHandler?

Copy link
Contributor Author

@real-mj-song real-mj-song Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SingleStageBrokerRequestHandler, this is not needed because BadQueryRequestException thrown by ServerQueryExecutorV1Impl is stored in BrokerResponseNative object. It's directly parsed in PinotClientRequest::getPinotQueryResponse.

For this MultiStageBrokerRequestHandler, this was needed because query was handled by _queryDispatcher.submitAndReduce which may throw an exception. The Java exception needs to be caught on the broker request handler level.

Copy link
Contributor Author

@real-mj-song real-mj-song Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diagrams for these two error reporting scenarios. Many middle steps are omitted and only relevant steps are shown 👇🏼

PinotClientRequest::processSqlQueryPost
  - BaseSingleStageBrokerRequestHandler::handleRequest
    - SingleConnectionBrokerRequestHandler::processBrokerRequest // broker metric is updated here
      - ServerQueryExecutorV1Impl::executeInternal // BadQueryRequestException is caught. InstanceResponseBlock is updated here
    - SingleConnectionBrokerRequestHandler::processBrokerRequest // No exception is caught here. Error code is embedded inside brokerResponse
PinotClientRequest::getPinotQueryResponse // 700 error code is parsed

MultiStage query

PinotClientRequest::processSqlQueryPost
  - MultiStageBrokerRequestHandler::handleRequest
    - QueryDispatcher::submitAndReduce // exception is thrown if ANY block errors. I re-throw BadQueryRequestException here
  - MultiStageBrokerRequestHandler::handleRequest // catch BadQueryRequestException. Exceptions in requestContext are added to brokerResponse via augmentStatistics
PinotClientRequest::getPinotQueryResponse // 700 error code is parsed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric incremented now. Resolved

}

String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
requestContext.setErrorCode(queryException.getErrorCode());
return new BrokerResponseNative(
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
QueryException.getException(queryException, consolidatedMessage));
} finally {
Tracing.getThreadAccountant().clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
Expand Down Expand Up @@ -191,10 +192,17 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti
// Not all operators have associated segment, so do this at best effort.
IndexSegment segment = operator.getIndexSegment();
if (segment == null) {
if (e instanceof IllegalArgumentException) {
return new BadQueryRequestException(e);
}
return e;
}
throw new RuntimeException(
"Caught exception while doing operator: " + operator.getClass() + " on segment: " + segment.getSegmentName(),
e);

String errorMessage = "Caught exception while doing operator: " + operator.getClass()
+ " on segment: " + segment.getSegmentName();
if (e instanceof IllegalArgumentException) {
throw new BadQueryRequestException(errorMessage, e);
}
throw new RuntimeException(errorMessage, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,7 +112,11 @@ protected void processSegments() {
@Override
protected void onProcessSegmentsException(Throwable t) {
_processingException.compareAndSet(null, t);
_blockingQueue.offer(new ExceptionResultsBlock(t));
if (t instanceof BadQueryRequestException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerQueryExecutorV1Impl, we are setting QUERY_VALIDATION_ERROR for BadQueryRequestException. Do we require the same changes here?

Either way I see this change only being made in BaseSingleBlockCombineOperator. What about GroupByCombineOperator and BaseStreamingCombineOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerQueryExecutorV1Impl, we are setting QUERY_VALIDATION_ERROR for BadQueryRequestException. Do we require the same changes here?

It does. Without this change, generic RuntimeException is thrown to QueryDispatcher.java which will bypass that check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way I see this change only being made in BaseSingleBlockCombineOperator. What about GroupByCombineOperator and BaseStreamingCombineOperator?

Yeah just noticed these implement the same interface. Will update them as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All patched.

_blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t));
} else {
_blockingQueue.offer(new ExceptionResultsBlock(t));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class MaxAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute max for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute max for non-numeric type: " + blockValSet.getValueType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class MinAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute min for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute min for non-numeric type: " + blockValSet.getValueType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class SumAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -139,7 +140,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType());
}
updateAggregationResultHolder(aggregationResultHolder, sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E
// Do not log verbose error for BadQueryRequestException and QueryCancelledException.
if (e instanceof BadQueryRequestException) {
LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
instanceResponse.addException(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
} else if (e instanceof QueryCancelledException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled while processing requestId: {}", requestId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,15 @@ public void testQueryExceptions()

testQueryException("SELECT POTATO(ArrTime) FROM mytable",
useMultiStageQueryEngine()
? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_EXECUTION_ERROR_CODE);
? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_VALIDATION_ERROR_CODE);

// ArrTime expects a numeric type
testQueryException("SELECT COUNT(*) FROM mytable where ArrTime = 'potato'",
QueryException.QUERY_EXECUTION_ERROR_CODE);
QueryException.QUERY_VALIDATION_ERROR_CODE);

// Cannot use numeric aggregate function for string column
testQueryException("SELECT MAX(OriginState) FROM mytable where ArrTime > 5",
QueryException.QUERY_VALIDATION_ERROR_CODE);
}

private void testQueryException(String query, int errorCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine)
testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
} else {
response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError"));
}

// invalid argument
Expand All @@ -1043,7 +1043,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine)
testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
} else {
response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError"));
}

// invalid argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
Expand Down Expand Up @@ -76,6 +77,7 @@
import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
import org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchObserver;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -459,7 +461,12 @@ public static QueryResult runReducer(long requestId,
}
// TODO: Improve the error handling, e.g. return partial response
if (block.isErrorBlock()) {
throw new RuntimeException("Received error query execution result block: " + block.getExceptions());
Map<Integer, String> queryExceptions = block.getExceptions();
if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) {
throw new BadQueryRequestException("Received error query execution result block: " + queryExceptions);
}

throw new RuntimeException("Received error query execution result block: " + queryExceptions);
}
assert block.isSuccessfulEndOfStreamBlock();
MultiStageQueryStats queryStats = block.getQueryStats();
Expand Down
Loading