Skip to content

Commit

Permalink
Clean up query executor configs (#15095)
Browse files Browse the repository at this point in the history
* Clean up query executor configs
  • Loading branch information
Jackie-Jiang authored Feb 21, 2025
1 parent 98dd54d commit 83437a2
Show file tree
Hide file tree
Showing 23 changed files with 338 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT);
boolean defaultUseSpool = _config.getProperty(CommonConstants.Broker.CONFIG_OF_SPOOLS,
CommonConstants.Broker.DEFAULT_OF_SPOOLS);
boolean defaultEnableGroupTrim = _config.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_GROUP_TRIM,
CommonConstants.Broker.DEFAULT_BROKER_ENABLE_GROUP_TRIM);
boolean defaultEnableGroupTrim = _config.getProperty(CommonConstants.Broker.CONFIG_OF_MSE_ENABLE_GROUP_TRIM,
CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM);

queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder()
.database(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,6 @@ public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
return checkedParseIntPositive(QueryOptionKey.MAX_EXECUTION_THREADS, maxExecutionThreadsString);
}

@Nullable
public static Integer getGroupTrimSize(Map<String, String> queryOptions) {
String groupTrimSize = queryOptions.get(QueryOptionKey.GROUP_TRIM_SIZE);
// NOTE: Non-positive value means turning off the intermediate level trim
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_SIZE, groupTrimSize);
}

@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
Expand All @@ -241,6 +234,13 @@ public static Integer getMinBrokerGroupTrimSize(Map<String, String> queryOptions
return uncheckedParseInt(QueryOptionKey.MIN_BROKER_GROUP_TRIM_SIZE, minBrokerGroupTrimSizeString);
}

@Nullable
public static Integer getMSEMinGroupTrimSize(Map<String, String> queryOptions) {
String mseMinGroupTrimSizeString = queryOptions.get(QueryOptionKey.MSE_MIN_GROUP_TRIM_SIZE);
// NOTE: Non-positive value means turning off the intermediate stage trim
return uncheckedParseInt(QueryOptionKey.MSE_MIN_GROUP_TRIM_SIZE, mseMinGroupTrimSizeString);
}

@Nullable
public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
String groupByTrimThreshold = queryOptions.get(QueryOptionKey.GROUP_TRIM_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@
import org.apache.pinot.core.query.prefetch.FetchPlannerRegistry;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,72 +65,78 @@
* The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}.
*/
public class InstancePlanMakerImplV2 implements PlanMaker {
// Instance config key for maximum number of threads used to execute the query
// Set as pinot.server.query.executor.max.execution.threads
public static final String MAX_EXECUTION_THREADS_KEY = "max.execution.threads";
public static final int DEFAULT_MAX_EXECUTION_THREADS = -1;

public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity";
public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000;
public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY = "min.init.indexed.table.capacity";
public static final int DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128;
public static final String NUM_GROUPS_LIMIT_KEY = "num.groups.limit";
public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000;

// By default, group trimming in AggregateOperator is disabled
public static final int DEFAULT_GROUP_TRIM_SIZE = -1;

// Instance config key for minimum segment-level group trim size
// Set as pinot.server.query.executor.min.segment.group.trim.size
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = "min.segment.group.trim.size";
public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1;
// Instance config key for minimum server-level group trim size
// Caution: Setting it to non-positive value (disable trim) or large value can give more accurate result, but can
// potentially cause memory issue
// Set as pinot.server.query.executor.min.server.group.trim.size
public static final String MIN_SERVER_GROUP_TRIM_SIZE_KEY = "min.server.group.trim.size";
public static final int DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE = GroupByUtils.DEFAULT_MIN_NUM_GROUPS;
// set as pinot.server.query.executor.groupby.trim.threshold
public static final String GROUPBY_TRIM_THRESHOLD_KEY = "groupby.trim.threshold";
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
public static final int DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT = 1;
public static final int DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT = 10_000;

// The following fields are deprecated and will be removed after 1.4 release
// Use CommonConstants.Server.* instead
@Deprecated
public static final String MAX_EXECUTION_THREADS_KEY = Server.MAX_EXECUTION_THREADS;
@Deprecated
public static final int DEFAULT_MAX_EXECUTION_THREADS = Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS;
@Deprecated
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = Server.MAX_INITIAL_RESULT_HOLDER_CAPACITY;
@Deprecated
public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
@Deprecated
public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY = Server.MIN_INITIAL_INDEXED_TABLE_CAPACITY;
@Deprecated
public static final int DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
@Deprecated
public static final String NUM_GROUPS_LIMIT_KEY = Server.NUM_GROUPS_LIMIT;
@Deprecated
public static final int DEFAULT_NUM_GROUPS_LIMIT = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT;
@Deprecated
public static final String MIN_SEGMENT_GROUP_TRIM_SIZE_KEY = Server.MIN_SEGMENT_GROUP_TRIM_SIZE;
@Deprecated
public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE =
Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE;
@Deprecated
public static final String MIN_SERVER_GROUP_TRIM_SIZE_KEY = Server.MIN_SERVER_GROUP_TRIM_SIZE;
@Deprecated
public static final int DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE = Server.DEFAULT_QUERY_EXECUTOR_MIN_SERVER_GROUP_TRIM_SIZE;
@Deprecated
public static final String GROUPBY_TRIM_THRESHOLD_KEY = Server.GROUPBY_TRIM_THRESHOLD;
@Deprecated
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = Server.DEFAULT_QUERY_EXECUTOR_GROUPBY_TRIM_THRESHOLD;

private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

private final FetchPlanner _fetchPlanner = FetchPlannerRegistry.getPlanner();
private int _maxExecutionThreads = DEFAULT_MAX_EXECUTION_THREADS;
private int _maxInitialResultHolderCapacity = DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
private int _minInitialIndexedTableCapacity = DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
private int _maxExecutionThreads = Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS;
private int _maxInitialResultHolderCapacity = Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
private int _minInitialIndexedTableCapacity = Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
// Limit on number of groups stored for each segment, beyond which no new group will be created
private int _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT;
private int _numGroupsLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT;
// Used for SQL GROUP BY (server combine)
private int _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
private int _minServerGroupTrimSize = DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
private int _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;

public InstancePlanMakerImplV2() {
}
private int _minSegmentGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE;
private int _minServerGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SERVER_GROUP_TRIM_SIZE;
private int _groupByTrimThreshold = Server.DEFAULT_QUERY_EXECUTOR_GROUPBY_TRIM_THRESHOLD;

@Override
public void init(PinotConfiguration queryExecutorConfig) {
_maxExecutionThreads = queryExecutorConfig.getProperty(MAX_EXECUTION_THREADS_KEY, DEFAULT_MAX_EXECUTION_THREADS);
_maxInitialResultHolderCapacity = queryExecutorConfig.getProperty(MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY,
DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_minInitialIndexedTableCapacity = queryExecutorConfig.getProperty(MIN_INITIAL_INDEXED_TABLE_CAPACITY_KEY,
DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
_numGroupsLimit = queryExecutorConfig.getProperty(NUM_GROUPS_LIMIT_KEY, DEFAULT_NUM_GROUPS_LIMIT);
_maxExecutionThreads = queryExecutorConfig.getProperty(Server.MAX_EXECUTION_THREADS,
Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS);
_maxInitialResultHolderCapacity = queryExecutorConfig.getProperty(Server.MAX_INITIAL_RESULT_HOLDER_CAPACITY,
Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_minInitialIndexedTableCapacity = queryExecutorConfig.getProperty(Server.MIN_INITIAL_INDEXED_TABLE_CAPACITY,
Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY);
_numGroupsLimit =
queryExecutorConfig.getProperty(Server.NUM_GROUPS_LIMIT, Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT);
Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit,
"Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d",
_maxInitialResultHolderCapacity, _numGroupsLimit);
Preconditions.checkState(_minInitialIndexedTableCapacity <= _numGroupsLimit,
"Invalid configuration: minInitialIndexedTableCapacity: %d must be smaller or equal to numGroupsLimit: %d",
_minInitialIndexedTableCapacity, _numGroupsLimit);
_minSegmentGroupTrimSize =
queryExecutorConfig.getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE_KEY, DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE);
_minServerGroupTrimSize =
queryExecutorConfig.getProperty(MIN_SERVER_GROUP_TRIM_SIZE_KEY, DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE);
_groupByTrimThreshold = queryExecutorConfig.getProperty(GROUPBY_TRIM_THRESHOLD_KEY, DEFAULT_GROUPBY_TRIM_THRESHOLD);
_minSegmentGroupTrimSize = queryExecutorConfig.getProperty(Server.MIN_SEGMENT_GROUP_TRIM_SIZE,
Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE);
_minServerGroupTrimSize = queryExecutorConfig.getProperty(Server.MIN_SERVER_GROUP_TRIM_SIZE,
Server.DEFAULT_QUERY_EXECUTOR_MIN_SERVER_GROUP_TRIM_SIZE);
_groupByTrimThreshold = queryExecutorConfig.getProperty(Server.GROUPBY_TRIM_THRESHOLD,
Server.DEFAULT_QUERY_EXECUTOR_GROUPBY_TRIM_THRESHOLD);
Preconditions.checkState(_groupByTrimThreshold > 0,
"Invalid configurable: groupByTrimThreshold: %d must be positive", _groupByTrimThreshold);
LOGGER.info("Initialized plan maker with maxExecutionThreads: {}, maxInitialResultHolderCapacity: {}, "
Expand Down Expand Up @@ -277,8 +283,7 @@ private void applyQueryOptions(QueryContext queryContext) {
queryContext.setNumThreadsExtractFinalResult(DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT);
}
// Set chunkSizeExtractFinalResult
Integer chunkSizeExtractFinalResult =
QueryOptionsUtils.getChunkSizeExtractFinalResult(queryOptions);
Integer chunkSizeExtractFinalResult = QueryOptionsUtils.getChunkSizeExtractFinalResult(queryOptions);
if (chunkSizeExtractFinalResult != null) {
queryContext.setChunkSizeExtractFinalResult(chunkSizeExtractFinalResult);
} else {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
* Config for SegmentPruner.
*/
public class SegmentPrunerConfig {
public static final String SEGMENT_PRUNER_NAMES_KEY = "class";
@Deprecated
public static final String SEGMENT_PRUNER_NAMES_KEY = Server.CLASS;

private final int _numSegmentPruners;
private final List<String> _segmentPrunerNames;
private final List<PinotConfiguration> _segmentPrunerConfigs;

public SegmentPrunerConfig(PinotConfiguration segmentPrunerConfig) {
List<String> segmentPrunerNames =
segmentPrunerConfig.getProperty(SEGMENT_PRUNER_NAMES_KEY, Server.DEFAULT_QUERY_EXECUTOR_PRUNER_CLASS);
segmentPrunerConfig.getProperty(Server.CLASS, Server.DEFAULT_QUERY_EXECUTOR_PRUNER_CLASS);
_numSegmentPruners = segmentPrunerNames.size();
_segmentPrunerNames = new ArrayList<>(_numSegmentPruners);
_segmentPrunerConfigs = new ArrayList<>(_numSegmentPruners);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.query.config.SegmentPrunerConfig;
import org.apache.pinot.core.query.pruner.SegmentPrunerService;
import org.apache.pinot.core.query.pruner.SegmentPrunerStatistics;
import org.apache.pinot.core.query.request.ServerQueryRequest;
Expand All @@ -85,6 +85,7 @@
import org.apache.pinot.spi.exception.QueryCancelledException;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -110,18 +111,17 @@ public synchronized void init(PinotConfiguration config, InstanceDataManager ins
throws ConfigurationException {
_instanceDataManager = instanceDataManager;
_serverMetrics = serverMetrics;
QueryExecutorConfig queryExecutorConfig = new QueryExecutorConfig(config);
LOGGER.info("Trying to build SegmentPrunerService");
_segmentPrunerService = new SegmentPrunerService(queryExecutorConfig.getPrunerConfig());
String planMakerClass = queryExecutorConfig.getPlanMakerClass();
_segmentPrunerService = new SegmentPrunerService(new SegmentPrunerConfig(config.subset(Server.PRUNER)));
String planMakerClass = config.getProperty(Server.PLAN_MAKER_CLASS, Server.DEFAULT_QUERY_EXECUTOR_PLAN_MAKER_CLASS);
LOGGER.info("Trying to build PlanMaker with class: {}", planMakerClass);
try {
_planMaker = PluginManager.get().createInstance(planMakerClass);
} catch (Exception e) {
throw new RuntimeException("Caught exception while creating PlanMaker with class: " + planMakerClass);
}
_planMaker.init(config);
_defaultTimeoutMs = queryExecutorConfig.getTimeOut();
_defaultTimeoutMs = config.getProperty(Server.TIMEOUT, Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
_enablePrefetch = Boolean.parseBoolean(config.getProperty(ENABLE_PREFETCH));
LOGGER.info("Initialized query executor with defaultTimeoutMs: {}, enablePrefetch: {}", _defaultTimeoutMs,
_enablePrefetch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pinot.core.util.MemoizedClassAssociation;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.utils.CommonConstants.Server;


/**
Expand Down Expand Up @@ -108,25 +109,24 @@ public class QueryContext {
// Whether to skip reordering scan filters for the query
private boolean _skipScanFilterReorder;
// Maximum number of threads used to execute the query
private int _maxExecutionThreads = InstancePlanMakerImplV2.DEFAULT_MAX_EXECUTION_THREADS;
private int _maxExecutionThreads = Server.DEFAULT_QUERY_EXECUTOR_MAX_EXECUTION_THREADS;
// The following properties apply to group-by queries
// Maximum initial capacity of the group-by result holder
private int _maxInitialResultHolderCapacity = InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
private int _maxInitialResultHolderCapacity = Server.DEFAULT_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
// Initial capacity of the indexed table
private int _minInitialIndexedTableCapacity = InstancePlanMakerImplV2.DEFAULT_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
private int _minInitialIndexedTableCapacity = Server.DEFAULT_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY;
// Limit of number of groups stored in each segment
private int _numGroupsLimit = InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT;
private int _numGroupsLimit = Server.DEFAULT_QUERY_EXECUTOR_NUM_GROUPS_LIMIT;
// Minimum number of groups to keep per segment when trimming groups for SQL GROUP BY
private int _minSegmentGroupTrimSize = InstancePlanMakerImplV2.DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
private int _minSegmentGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SEGMENT_GROUP_TRIM_SIZE;
// Minimum number of groups to keep across segments when trimming groups for SQL GROUP BY
private int _minServerGroupTrimSize = InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
private int _minServerGroupTrimSize = Server.DEFAULT_QUERY_EXECUTOR_MIN_SERVER_GROUP_TRIM_SIZE;
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
private int _groupTrimThreshold = Server.DEFAULT_QUERY_EXECUTOR_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsExtractFinalResult = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT;
// Parallel chunk size for final reduce
private int _chunkSizeExtractFinalResult =
InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
private int _chunkSizeExtractFinalResult = InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
Expand Down
Loading

0 comments on commit 83437a2

Please sign in to comment.