diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index ac49d7d6f694..435787270158 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -778,6 +778,17 @@ public boolean hasIndxBasedOrdering() return nonPrimaryKeyRestrictions.restrictions().stream().anyMatch(SingleRestriction::isIndexBasedOrdering); } + public boolean hasIndxBasedBoundedAnn() + { + for (SingleRestriction restriction : nonPrimaryKeyRestrictions.restrictions()) + { + if (restriction.isBoundedAnn()) + return true; + } + + return false; + } + public void throwRequiresAllowFilteringError(TableMetadata table) { if (hasIndxBasedOrdering()) @@ -994,7 +1005,7 @@ private boolean hasUnrestrictedClusteringColumns() return table.clusteringColumns().size() != clusteringColumnsRestrictions.size(); } - public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, QueryState queryState, SelectOptions selectOptions) + public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, QueryState queryState, SelectOptions selectOptions, boolean allowFiltering) { boolean hasAnnOptions = selectOptions.hasANNOptions(); @@ -1007,7 +1018,14 @@ public RowFilter getRowFilter(IndexRegistry indexManager, QueryOptions options, } ANNOptions annOptions = selectOptions.parseANNOptions(); - RowFilter rowFilter = RowFilter.builder(indexManager) + + // Ordering on non-clustering column requires each restricted column to be indexed except for + // fully-specified partition keys. ANN queries do not currently work correctly when filtering is required, so we + // fail even though ALLOW FILTERING was passed. + if (allowFiltering && (hasIndxBasedOrdering() || hasIndxBasedBoundedAnn())) + allowFiltering = false; + + RowFilter rowFilter = RowFilter.builder(indexManager, allowFiltering) .buildFromRestrictions(this, table, options, queryState, annOptions); if (hasAnnOptions && !rowFilter.hasANN()) diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index ace05ee3545f..f3e254de68e4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -1008,7 +1008,7 @@ private NavigableSet> getRequestedRows(QueryOptions options, Query public RowFilter getRowFilter(QueryOptions options, QueryState state) throws InvalidRequestException { IndexRegistry indexRegistry = IndexRegistry.obtain(table); - return restrictions.getRowFilter(indexRegistry, options, state, selectOptions); + return restrictions.getRowFilter(indexRegistry, options, state, selectOptions, parameters.allowFiltering); } private ResultSet process(PartitionIterator partitions, diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 7fa00f0436e5..0910d0f76c24 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -18,12 +18,7 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; @@ -64,7 +59,6 @@ import org.apache.cassandra.guardrails.Guardrails; import org.apache.cassandra.guardrails.Threshold; import org.apache.cassandra.index.Index; -import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputPlus; @@ -90,13 +84,14 @@ import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; +import static org.apache.cassandra.index.SecondaryIndexManager.getIndexStatus; import static org.apache.cassandra.utils.MonotonicClock.approxTime; /** * General interface for storage-engine read commands (common to both range and * single partition commands). *

- * This contains all the informations needed to do a local read. + * This contains all the information needed to do a local read. */ public abstract class ReadCommand extends AbstractReadQuery { @@ -216,7 +211,7 @@ public int digestVersion() * this allows us to use the command as a carrier of the digest version even if we only call * setIsDigestQuery on some copy of it. * - * @param digestVersion the version for the digest is this command is used for digest query.. + * @param digestVersion the version for the digest is this command is used for digest query. * @return this read command. */ public ReadCommand setDigestVersion(int digestVersion) @@ -405,9 +400,8 @@ public UnfilteredPartitionIterator executeLocally(ReadExecutionController execut ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata()); Index.Searcher searcher = null; - if (indexQueryPlan != null) + if (indexQueryPlan != null && cfs.indexManager.searcherFor(indexQueryPlan, rowFilter().allowFiltering)) { - cfs.indexManager.checkQueryability(indexQueryPlan); searcher = indexSearcher(); Index index = indexQueryPlan.getFirst(); Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.keyspace, cfs.metadata.name, index.getIndexMetadata().name); @@ -530,9 +524,9 @@ public ReadExecutionController executionController(boolean trackRepairedStatus) } /** - * Allow to post-process the result of the query after it has been reconciled on the coordinator + * Allow to post-process the result of the query after it has been reconciled on the coordinator, * but before it is passed to the CQL layer to return the ResultSet. - * + *

* See CASSANDRA-8717 for why this exists. */ public PartitionIterator postReconciliationProcessing(PartitionIterator result) @@ -740,7 +734,7 @@ public Message createMessage(boolean trackRepairedData) // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which - // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). + // are to some extent an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs, ReadExecutionController controller) @@ -768,7 +762,7 @@ protected LongPredicate getPurgeEvaluator() * Note that in general the returned string will not be exactly the original user string, first * because there isn't always a single syntax for a given query, but also because we don't have * all the information needed (we know the non-PK columns queried but not the PK ones as internally - * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * we query them all). So this shouldn't be relied on too strongly, but this should be good enough for * debugging purpose which is what this is for. */ public String toCQLString() @@ -832,7 +826,7 @@ InputCollector iteratorsForRange(ColumnFamilyStore. * input for a query. Separates them according to repaired status and of repaired * status is being tracked, handles the merge and wrapping in a digest generator of * the repaired iterators. - * + *

* Intentionally not AutoCloseable so we don't mistakenly use this in ARM blocks * as this prematurely closes the underlying iterators */ @@ -1040,7 +1034,7 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException int flags = in.readByte(); boolean isDigest = isDigest(flags); boolean acceptsTransient = acceptsTransient(flags); - // Shouldn't happen or it's a user error (see comment above) but + // Shouldn't happen, or it's a user error (see comment above) but // better complain loudly than doing the wrong thing. if (isForThrift(flags)) throw new IllegalStateException("Received a command with the thrift flag set. " @@ -1072,17 +1066,62 @@ public ReadCommand deserialize(DataInputPlus in, int version) throws IOException if (hasIndex) { IndexMetadata index = deserializeIndexMetadata(in, version, metadata); - if (index != null) - { - Index.Group indexGroup = Keyspace.openAndGetStore(metadata).indexManager.getIndexGroup(index); - if (indexGroup != null) - indexQueryPlan = indexGroup.queryPlanFor(rowFilter); - } + indexQueryPlan = buildIndexQueryPlan(index, metadata, rowFilter); } return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan); } + /** + * Builds a query plan for the specified index, considering index availability and status. + * + * @param index The index metadata to build the plan for + * @param metadata The table metadata that contains the index + * @param rowFilter The row filter to be applied in the query + * @return A query plan that uses available indexes, or null if: + * - the provided index is null + * - no index group is found for the index + * - no query plan could be created for the row filter + * - all indexes are in INITIAL_BUILD_STARTED state and not queryable + *

+ * If some indexes are in INITIAL_BUILD_STARTED state but others are available, + * returns a new query plan using only the available indexes. + */ + private static Index.QueryPlan buildIndexQueryPlan(IndexMetadata index, TableMetadata metadata, RowFilter rowFilter) + { + if (index == null) + return null; + + Index.Group indexGroup = Keyspace.openAndGetStore(metadata).indexManager.getIndexGroup(index); + if (indexGroup == null) + return null; + + Index.QueryPlan queryPlan = indexGroup.queryPlanFor(rowFilter); + if (queryPlan == null) + return null; + + Set allIndexes = queryPlan.getIndexes(); + Set availableIndexes = new HashSet<>(); + + for (Index plannedIndex : allIndexes) + { + String indexName = plannedIndex.getIndexMetadata().name; + Index.Status status = getIndexStatus(FBUtilities.getBroadcastAddressAndPort(), + metadata.keyspace, + indexName); + + if (status != Index.Status.INITIAL_BUILD_STARTED && plannedIndex.isQueryable(status)) + availableIndexes.add(plannedIndex); + } + + if (availableIndexes.isEmpty()) + return null; + + return availableIndexes.size() < allIndexes.size() + ? indexGroup.queryPlanForIndices(rowFilter, availableIndexes) + : queryPlan; + } + private @Nullable IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException { try diff --git a/src/java/org/apache/cassandra/db/filter/ANNOptions.java b/src/java/org/apache/cassandra/db/filter/ANNOptions.java index 68bf14417f09..37db98547b2a 100644 --- a/src/java/org/apache/cassandra/db/filter/ANNOptions.java +++ b/src/java/org/apache/cassandra/db/filter/ANNOptions.java @@ -37,6 +37,9 @@ */ public class ANNOptions { + public static final String REQUIRES_HIGHER_MESSAGING_VERSION = + "ANN options are not supported in clusters below DS 11."; + public static final String RERANK_K_OPTION_NAME = "rerank_k"; public static final ANNOptions NONE = new ANNOptions(null); @@ -81,7 +84,7 @@ public void validate(QueryState state, String keyspace, int limit) if (MessagingService.current_version < MessagingService.VERSION_DS_11) badNodes.add(FBUtilities.getBroadcastAddressAndPort()); if (!badNodes.isEmpty()) - throw new InvalidRequestException("ANN options are not supported in clusters below DS 11."); + throw new InvalidRequestException(REQUIRES_HIGHER_MESSAGING_VERSION); } /** diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 4d35566fa41f..e330e987c3ad 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -79,13 +79,44 @@ public class RowFilter private static final Logger logger = LoggerFactory.getLogger(RowFilter.class); public static final Serializer serializer = new Serializer(); - public static final RowFilter NONE = new RowFilter(FilterElement.NONE); + public static final RowFilter NONE = new RowFilter(FilterElement.NONE, false); private final FilterElement root; - protected RowFilter(FilterElement root) + /** + * Indicates whether ALLOW FILTERING was specified in the CQL query and whether the respective type of query allows + * the query to be executed when using ALLOW FILTERING. + *

+ * By default, CQL only allows queries that can be executed efficiently by: + * - Using the primary key + * - Using a secondary index + * - Scanning all data when no WHERE clause is present + *

+ * When a query requires filtering data that cannot be handled efficiently (e.g., filtering on a non-indexed column), + * CQL will reject it unless ALLOW FILTERING is specified. This is because such queries may need to scan large amounts + * of data to return a small result set, leading to unpredictable performance. + *

+ * For example, given a table: + * CREATE TABLE users (username text PRIMARY KEY, birth_year int, country text); + * CREATE INDEX ON users(birth_year); + *

+ * These queries are allowed by default: + * - SELECT * FROM users WHERE username = 'joe' // Uses primary key + * - SELECT * FROM users WHERE birth_year = 1981 // Uses secondary index + * - SELECT * FROM users // Full scan is explicit + *

+ * This query requires ALLOW FILTERING: + * - SELECT * FROM users WHERE birth_year = 1981 AND country = 'FR' ALLOW FILTERING + *

+ * When true, this field indicates the query is allowed to perform potentially expensive filtering operations + * that may scan large portions of the table to satisfy the query conditions. + */ + public final boolean allowFiltering; + + protected RowFilter(FilterElement root, boolean allowFiltering) { this.root = root; + this.allowFiltering = allowFiltering; } public FilterElement root() @@ -294,7 +325,7 @@ public RowFilter without(Expression expression) if (root.size() == 1) return RowFilter.NONE; - return new RowFilter(root.filter(e -> !e.equals(expression))); + return new RowFilter(root.filter(e -> !e.equals(expression)), allowFiltering); } public RowFilter withoutExpressions() @@ -307,12 +338,12 @@ public RowFilter withoutExpressions() */ public RowFilter withoutDisjunctions() { - return new RowFilter(root.withoutDisjunctions()); + return new RowFilter(root.withoutDisjunctions(), allowFiltering); } public RowFilter restrict(Predicate filter) { - return new RowFilter(root.filter(filter)); + return new RowFilter(root.filter(filter), allowFiltering); } public boolean isEmpty() @@ -328,12 +359,12 @@ public String toString() public static Builder builder() { - return new Builder(null); + return new Builder(null, false); } - public static Builder builder(IndexRegistry indexRegistry) + public static Builder builder(IndexRegistry indexRegistry, boolean allowFiltering) { - return new Builder(indexRegistry); + return new Builder(indexRegistry, allowFiltering); } public static class Builder @@ -341,15 +372,17 @@ public static class Builder private FilterElement.Builder current = new FilterElement.Builder(false); private final IndexRegistry indexRegistry; + private final Boolean allowFiltering; - public Builder(IndexRegistry indexRegistry) + public Builder(IndexRegistry indexRegistry, boolean allowFiltering) { this.indexRegistry = indexRegistry; + this.allowFiltering = allowFiltering; } public RowFilter build() { - return new RowFilter(current.build()); + return new RowFilter(current.build(), allowFiltering); } public RowFilter buildFromRestrictions(StatementRestrictions restrictions, @@ -363,7 +396,7 @@ public RowFilter buildFromRestrictions(StatementRestrictions restrictions, if (Guardrails.queryFilters.enabled(queryState)) Guardrails.queryFilters.guard(root.numFilteredValues(), "Select query", false, queryState); - return new RowFilter(root); + return new RowFilter(root, allowFiltering); } private FilterElement doBuild(StatementRestrictions restrictions, @@ -420,7 +453,7 @@ public void addAllAsConjunction(Consumer addToRowFilterDelegate) { // If we're in disjunction mode, we must not pass the current builder to addToRowFilter. // We create a new conjunction sub-builder instead and add all expressions there. - var builder = new Builder(indexRegistry); + var builder = new Builder(indexRegistry, allowFiltering); addToRowFilterDelegate.accept(builder); if (builder.current.expressions.size() == 1 && builder.current.children.isEmpty()) @@ -1840,20 +1873,37 @@ public static class Serializer public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException { out.writeBoolean(false); // Old "is for thrift" boolean + FilterElement.serializer.serialize(filter.root, out, version); + + // CNDB-12425 - allowFiltering is only serialized in DS 11 and above + if (version >= MessagingService.VERSION_DS_11) + out.writeBoolean(filter.allowFiltering); } public RowFilter deserialize(DataInputPlus in, int version, TableMetadata metadata) throws IOException { - in.readBoolean(); // Unused + // Skip unused "isForThrift" boolean from legacy versions + in.readBoolean(); + FilterElement operation = FilterElement.serializer.deserialize(in, version, metadata); - return new RowFilter(operation); + + // CNDB-12425 - allowFiltering was added in DS 11 + boolean allowFiltering = version >= MessagingService.VERSION_DS_11 && in.readBoolean(); + + return new RowFilter(operation, allowFiltering); } public long serializedSize(RowFilter filter, int version) { - return 1 // unused boolean - + FilterElement.serializer.serializedSize(filter.root, version); + long size = 1; // unused boolean + size += FilterElement.serializer.serializedSize(filter.root, version); + + // CNDB-12425 - We allow filtering during index build in DS 11 and above + if (version >= MessagingService.VERSION_DS_11) + size += TypeSizes.BOOL_SIZE; // for allowFiltering + + return size; } } } diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java index beb8280e674a..628abbfd7610 100644 --- a/src/java/org/apache/cassandra/index/Index.java +++ b/src/java/org/apache/cassandra/index/Index.java @@ -807,6 +807,17 @@ Indexer indexerFor(Predicate indexSelector, @Nullable QueryPlan queryPlanFor(RowFilter rowFilter); + /** + * Returns a new {@link QueryPlan} for the specified {@link RowFilter} and set of {@link Index}, or {@code null} + * if none of the indexes in this group supports the expression in the row filter. + * + * @param rowFilter a row filter + * @param indexes a set of indexes + * @return a new query plan for the specified {@link RowFilter} and {@link Set}, {@code null} otherwise + */ + @Nullable + QueryPlan queryPlanForIndices(RowFilter rowFilter, Set indexes); + /** * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction). * @@ -1030,6 +1041,7 @@ default boolean isTopK() enum Status { UNKNOWN, + INITIAL_BUILD_STARTED, FULL_REBUILD_STARTED, BUILD_FAILED, BUILD_SUCCEEDED, diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java index 990bb8afb83f..13af357e35a5 100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -21,7 +21,6 @@ package org.apache.cassandra.index; import java.util.Collection; -import java.util.HashSet; import java.util.Collections; import java.util.Optional; import java.util.Set; @@ -239,6 +238,13 @@ public Index.QueryPlan queryPlanFor(RowFilter rowFilter) return null; } + @Nullable + @Override + public Index.QueryPlan queryPlanForIndices(RowFilter rowFilter, Set indexes) + { + return null; + } + @Nullable @Override public SSTableFlushObserver getFlushObserver(Descriptor descriptor, LifecycleNewTracker tracker, TableMetadata tableMetadata, long keyCount) diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index d4c75ed2a4f1..7c18bc5dcddc 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -63,6 +63,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import org.apache.cassandra.locator.TokenMetadataProvider; +import org.apache.cassandra.utils.NoSpamLogger; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +120,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.notifications.INotification; import org.apache.cassandra.notifications.INotificationConsumer; import org.apache.cassandra.notifications.SSTableAddedNotification; @@ -125,6 +128,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.schema.Indexes; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.SinglePartitionPager; import org.apache.cassandra.tracing.Tracing; @@ -172,7 +176,7 @@ *

* Finally, this class provides a clear and safe lifecycle to manage index builds, either full rebuilds via * {@link this#rebuildIndexesBlocking(Set)} or builds of new sstables - * added via {@link org.apache.cassandra.notifications.SSTableAddedNotification}s, guaranteeing + * added via {@link SSTableAddedNotification}s, guaranteeing * the following: *