Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12425: A few reproduction tests and a preliminary patch, WIP #1529

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Differentiate between initial build on user index creation and next f…
…ull rebuilds

Added some ugly testing to IndexAvailabilityTest to confirm queries with the two build statuses
ekaterinadimitrova2 committed Mar 15, 2025
commit 5ed5bafe2d4a63569980668f928a8472cdd3ed2e
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/ReadCommand.java
Original file line number Diff line number Diff line change
@@ -1097,7 +1097,7 @@ private Index.QueryPlan buildIndexQueryPlan(IndexMetadata index, TableMetadata m
Index.Status status = getIndexStatus(FBUtilities.getBroadcastAddressAndPort(),
metadata.keyspace,
indexName);
if (status != Index.Status.FULL_REBUILD_STARTED)
if (status != Index.Status.INITIALIZED)
availableIndexes.add(plannedIndex);
else
{
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
@@ -1041,6 +1041,7 @@ default boolean isTopK()
enum Status
{
UNKNOWN,
INITIALIZED,
FULL_REBUILD_STARTED,
BUILD_FAILED,
BUILD_SUCCEEDED,
31 changes: 15 additions & 16 deletions src/java/org/apache/cassandra/index/SecondaryIndexManager.java
Original file line number Diff line number Diff line change
@@ -290,7 +290,7 @@ private synchronized Future<?> createIndex(IndexMetadata indexDef, boolean isNew
if (writableIndexes.put(index.getIndexMetadata().name, index) == null)
logger.info("Index [{}] registered and writable.", index.getIndexMetadata().name);

markIndexesBuilding(ImmutableSet.of(index), true, isNewCF);
markIndexesBuilding(ImmutableSet.of(index), true, isNewCF, true);

return buildIndex(index);
}
@@ -636,7 +636,7 @@ private Future<?> buildIndexesAsync(Collection<SSTableReader> sstables, Set<Inde

// Mark all indexes as building: this step must happen first, because if any index can't be marked, the whole
// process needs to abort
markIndexesBuilding(indexes, isFullRebuild, false);
markIndexesBuilding(indexes, isFullRebuild, false, false);

// Build indexes in a try/catch, so that any index not marked as either built or failed will be marked as failed:
final Set<Index> builtIndexes = Sets.newConcurrentHashSet();
@@ -775,16 +775,20 @@ private String getIndexNames(Set<Index> indexes)
* the SecondaryIndexManager instance, it means all invocations for all different indexes will go through the same
* lock, but this is fine as the work done while holding such lock is trivial.
* <p>
* isCreateIndex is used to differentiate whether a full rebuild is invoked when user is creating a new index or
* full rebuild is started by failed scrub or nodetool rebuild command. It matters as we want to fall back to ALLOW FILTERING
* (for queries with ALLOW FILTERING) only in the case a user creates a new index.
* {@link #markIndexBuilt(Index, boolean)} or {@link #markIndexFailed(Index, boolean)} should be always called after
* the rebuilding has finished, so that the index build state can be correctly managed and the index rebuilt.
*
* @param indexes the index to be marked as building
* @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
* @param isNewCF {@code true} if this method is invoked when initializing a new table/columnfamily (i.e. loading a CF at startup),
* {@code false} for all other cases (i.e. newly added index)
* @param isCreateIndex {@code true} if this method is invoked when creating a new index, {@code false} otherwise
*/
@VisibleForTesting
public synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullRebuild, boolean isNewCF)
public synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullRebuild, boolean isNewCF, boolean isCreateIndex)
{
String keyspaceName = baseCfs.keyspace.getName();

@@ -809,7 +813,10 @@ public synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullR
if (isFullRebuild)
{
needsFullRebuild.remove(indexName);
makeIndexNonQueryable(index, Index.Status.FULL_REBUILD_STARTED);
if (!isCreateIndex)
makeIndexNonQueryable(index, Index.Status.FULL_REBUILD_STARTED);
else if (!isNewCF)
makeIndexNonQueryable(index, Index.Status.INITIALIZED);
}

if (counter.getAndIncrement() == 0 && DatabaseDescriptor.isDaemonInitialized() && !isNewCF)
@@ -819,7 +826,7 @@ public synchronized void markIndexesBuilding(Set<Index> indexes, boolean isFullR

/**
* Marks the specified index as built if there are no in progress index builds and the index is not failed.
* {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method.
* {@link #markIndexesBuilding(Set, boolean, boolean, boolean)} should always be invoked before this method.
*
* @param index the index to be marked as built
* @param isFullRebuild {@code true} if this method is invoked as a full index rebuild, {@code false} otherwise
@@ -845,7 +852,7 @@ public synchronized void markIndexBuilt(Index index, boolean isFullRebuild)

/**
* Marks the specified index as failed.
* {@link #markIndexesBuilding(Set, boolean, boolean)} should always be invoked before this method.
* {@link #markIndexesBuilding(Set, boolean, boolean, boolean)} should always be invoked before this method.
*
* @param index the index to be marked as built
* @param isInitialBuild {@code true} if the index failed during its initial build, {@code false} otherwise
@@ -1853,23 +1860,15 @@ public static <E extends Endpoints<E>> E filterForQuery(E liveEndpoints,

assert liveEndpoints.endpoints().containsAll(badNodes);

boolean considerAllowFiltering;
if (initial == badNodes.size())
considerAllowFiltering = false;
else
{
considerAllowFiltering = true;
if (badNodes.size() < initial && !badNodes.isEmpty())
throw new InvalidRequestException(REQUIRES_HIGHER_MESSAGING_VERSION);
}
boolean considerAllowFiltering = badNodes.isEmpty();

E queryableEndpoints = liveEndpoints.filter(replica -> {
for (Index index : indexQueryPlan.getIndexes())
{
Index.Status status = getIndexStatus(replica.endpoint(), keyspace.getName(), index.getIndexMetadata().name);

// if the status of the index is building and there is allow filtering - that is ok too
if (considerAllowFiltering && status == Index.Status.FULL_REBUILD_STARTED && allowFiltering)
if (considerAllowFiltering && status == Index.Status.INITIALIZED && allowFiltering)
continue;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to think about it more thoroughly, but this looks like a good place to place the client warnings that are currently thrown on the replica side. We might have a warning message per index-building replica, so clients can know what nodes are still initializing their indexes and are going to use filtering.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added:

// if the status of the index is building and there is allow filtering - that is ok too
if (considerAllowFiltering && status == Index.Status.INITIAL_BUILD_STARTED && !index.isQueryable(status) && allowFiltering)
{
ClientWarn.instance.warn(String.format("Query fell back to ALLOW FILTERING because index %s is still building on endpoint %s",
index.getIndexMetadata().name,
replica.endpoint()));
continue;
}

which led to multiple warnings for the same node in tests.

I decided to just bring on single node C* and try single query on index build:

cqlsh:k> CREATE CUSTOM INDEX ON t(k) USING 'StorageAttachedIndex';
cqlsh:k> SELECT * FROM t WHERE k=200 ALLOW FILTERING;

 pk | i | j | k | vec
----+---+---+---+-----

(0 rows)

Warnings :
Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoint localhost/127.0.0.1:7000

I have to dig into this tomorrow.... no more energy today

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that's because of virtual nodes, with 16 tokens per node. Rather than throwing the client warning immediately, the endpoints can be collected in a set:

Set<InetAddressAndPort> filteringEndpoints = new HashSet<>();

and then throw a single warning after the loop with the unique addresses. For example:

Query fell back to ALLOW FILTERING because index t_k_idx is still building on endpoints 192.168.0.1:7000, 192.168.0.2:7000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering whether fell back to ALLOW FILTERING will be clear enough for users, considering that they have just written ALLOW FILTERING in the query and, strictly, ALLOW FILTERING is a permission to filter and not the action of filtering. Perhaps the message would be a bit clearer this way:

The query won't use the indexes a, b and c on endpoints 192.168.0.1:7000, 192.168.0.2:7000 because the indexes are still building on those nodes. 

Feel free to ignore if you don't agree; I'm just giving ideas.


if (!index.isQueryable(status))
Original file line number Diff line number Diff line change
@@ -52,76 +52,167 @@
import static org.awaitility.Awaitility.await;

/**
* Distributed tests for ANN options.
* Distributed tests for ALLOW FILTERING during index build.
*/
public class AllowFilteringDuringIndexBuildDistributedTest extends TestBaseImpl
{
private static final String INDEX_NOT_AVAILABLE_MESSAGE = "^Operation failed - received 0 responses" +
" and 2 failures: INDEX_NOT_AVAILABLE from .+" +
" INDEX_NOT_AVAILABLE from .+$";
private static final int NUM_REPLICAS = 2;
private static final int RF = 2;

/**
* Test that ALLOW FILTERING during index builds is accepted in clusters with all nodes in DS 11.
*/
@Test
public void testAllowFilteringDuringIndexBuildWithAllDS11() throws Throwable
public void testAllowFilteringDuringInitialIndexBuildWithAllDS11() throws Throwable
{
CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.setInt(MessagingService.VERSION_DS_11);

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, null);
testSelectWithAllowFilteringDuringIndexBuilding(cluster, null, true, false);
}
}

@Test
public void testAllowFilteringDuringIndexRebuildWithAllDS11NewCF() throws Throwable
{
CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.setInt(MessagingService.VERSION_DS_11);

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, false, true);
}
}

@Test
public void testAllowFilteringDuringIndexRebuildWithAllDS11ExistingCF() throws Throwable
{
CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.setInt(MessagingService.VERSION_DS_11);

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, false, false);
}
}

/**
* Test that ALLOW FILTERING during index builds is rejected in clusters with all nodes below DS 11.
*/
@Test
public void testAllowFilteringDuringIndexBuildWithAllDS10() throws Throwable
public void testAllowFilteringDuringInitialIndexBuildWithAllDS10() throws Throwable
{
CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.setInt(MessagingService.VERSION_DS_10);

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster,
"^Operation failed - received 0 responses" +
" and 2 failures: INDEX_NOT_AVAILABLE from .+, " +
"INDEX_NOT_AVAILABLE from .+$");
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, true, false);
}
}

@Test
public void testAllowFilteringDuringIndexRebuildWithAllDS10NewCF() throws Throwable
{
CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.setInt(MessagingService.VERSION_DS_10);

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, false, true);
}
}

/**
* Test that ALLOW FILTERING during index builds are rejected in clusters with some nodes below DS 11.
*/
@Test
public void testAllowFIlteringDuringIndxBuildWithMixedDS10AndDS11() throws Throwable
public void testAllowFilteringDuringIndexRebuildWithAllDS10ExistingCF() throws Throwable
{
CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.setInt(MessagingService.VERSION_DS_10);

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, false, false);
}
}

@Test
public void testAllowFilteringDuringInitialIndexBuildWithMixedDS10AndDS11() throws Throwable
{
assert CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.getInt() >= MessagingService.VERSION_DS_11;

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withInstanceInitializer(BB::install)
.withConfig(config -> config.with(GOSSIP).with(NETWORK).with(NATIVE_PROTOCOL))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, true, false);
}
}

@Test
public void testAllowFilteringDuringIndexRebuildWithMixedDS10AndDS11NewCF() throws Throwable
{
assert CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.getInt() >= MessagingService.VERSION_DS_11;

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withInstanceInitializer(AllowFilteringDuringIndexBuildDistributedTest.BB::install)
.withConfig(config -> config.with(GOSSIP).with(NETWORK).with(NATIVE_PROTOCOL))
.start(), RF))
.withInstanceInitializer(BB::install)
.withConfig(config -> config.with(GOSSIP).with(NETWORK).with(NATIVE_PROTOCOL))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, SecondaryIndexManager.REQUIRES_HIGHER_MESSAGING_VERSION);
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, false, true);
}
}

private static void testSelectWithAllowFilteringDuringIndexBuilding(Cluster cluster, String expectedErrorMessage)
@Test
public void testAllowFilteringDuringIndexRebuildWithMixedDS10AndDS11ExistingCF() throws Throwable
{
assert CassandraRelevantProperties.DS_CURRENT_MESSAGING_VERSION.getInt() >= MessagingService.VERSION_DS_11;

try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withInstanceInitializer(BB::install)
.withConfig(config -> config.with(GOSSIP).with(NETWORK).with(NATIVE_PROTOCOL))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, INDEX_NOT_AVAILABLE_MESSAGE, false, false);
}
}

@Test(expected = IllegalArgumentException.class)
public void testInitialBuildWithNewCFShouldFail() throws Throwable
{
try (Cluster cluster = init(Cluster.build(NUM_REPLICAS)
.withConfig(config -> config.with(GOSSIP).with(NETWORK))
.start(), RF))
{
testSelectWithAllowFilteringDuringIndexBuilding(cluster, null, true, true);
}
}

private static void testSelectWithAllowFilteringDuringIndexBuilding(Cluster cluster,
String expectedErrorMessage,
boolean isInitialBuild,
boolean isNewCF)
{
if (isInitialBuild && isNewCF) {
throw new IllegalArgumentException("Initial build cannot happen with a new CF");
}

cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int PRIMARY KEY, n int, v vector<float, 2>)"));
cluster.schemaChange(withKeyspace("CREATE CUSTOM INDEX ON %s.t(n) USING 'StorageAttachedIndex'"));

Index.Status expectedStatus = isInitialBuild ? Index.Status.INITIALIZED : Index.Status.FULL_REBUILD_STARTED;

for (int i = 1; i <= cluster.size(); i++)
markIndexBuilding(cluster.get(i), KEYSPACE, "t", "t_n_idx");
markIndexBuilding(cluster.get(i), KEYSPACE, "t", "t_n_idx", isInitialBuild, isNewCF);

for (int i = 1; i <= cluster.size(); i++)
for (int j = 1; j <= cluster.size(); j++)
waitForIndexingStatus(cluster.get(i), KEYSPACE, "t_n_idx", cluster.get(j), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(i), KEYSPACE, "t_n_idx", cluster.get(j), expectedStatus);

String select = withKeyspace("SELECT * FROM %s.t WHERE n = 1 ALLOW FILTERING");

@@ -132,7 +223,7 @@ private static void testSelectWithAllowFilteringDuringIndexBuilding(Cluster clus
coordinator.execute(select, ConsistencyLevel.ONE);
else
Assertions.assertThatThrownBy(() -> coordinator.execute(select, ConsistencyLevel.ONE))
.hasMessageMatching(expectedErrorMessage);
.hasMessageMatching(expectedErrorMessage);
}
}

@@ -158,16 +249,21 @@ public static int currentVersion()
{
return MessagingService.VERSION_DS_10;
}
}
}

private static void markIndexBuilding(IInvokableInstance node, String keyspace, String table, String indexName)
private static void markIndexBuilding(IInvokableInstance node,
String keyspace,
String table,
String indexName,
boolean isInitialBuild,
boolean isNewCF)
{
node.runOnInstance(() -> {
SecondaryIndexManager sim = Schema.instance.getKeyspaceInstance(keyspace)
.getColumnFamilyStore(table)
.indexManager;
.getColumnFamilyStore(table)
.indexManager;
Index index = sim.getIndexByName(indexName);
sim.markIndexesBuilding(Collections.singleton(index), true, false);
sim.markIndexesBuilding(Collections.singleton(index), true, isNewCF, isInitialBuild);
});
}

Original file line number Diff line number Diff line change
@@ -123,7 +123,21 @@ public void verifyIndexStatusPropagation() throws Exception
assertIndexingStatus(cluster);

// mark ks2 index2 as indexing on node1
markIndexBuilding(cluster.get(1), ks2, cf1, index2);
markIndexBuilding(cluster.get(1), ks2, cf1, index2, true);
// on node2, it observes that node1 ks2.index2 is not queryable
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), Index.Status.INITIALIZED);
// other indexes or keyspaces should not be affected
assertIndexingStatus(cluster);

// mark ks2 index2 as queryable on node1
markIndexQueryable(cluster.get(1), ks2, cf1, index2);
// on node2, it observes that node1 ks2.index2 is queryable
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), Index.Status.BUILD_SUCCEEDED);
// other indexes or keyspaces should not be affected
assertIndexingStatus(cluster);

// mark ks2 index2 as indexing on node1
markIndexBuilding(cluster.get(1), ks2, cf1, index2, false);
// on node2, it observes that node1 ks2.index2 is not queryable
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
// other indexes or keyspaces should not be affected
@@ -239,7 +253,18 @@ private void shouldSkipNonQueryableNode(int nodes, List<Integer>... nonQueryable
}

@Test
public void testAllowFilteringDuringIndexBuildsOn3NodeCluster() throws Exception
public void testAllowFilteringDuringIndexInitialBuildOn3NodeCluster() throws Exception
{
testAllowFilteringDuringIndexBuildsOn3NodeCluster(true, Index.Status.INITIALIZED);
}

@Test
public void testAllowFilteringDuringIndexFullRebuildOn3NodeCluster() throws Exception
{
testAllowFilteringDuringIndexBuildsOn3NodeCluster(false, Index.Status.FULL_REBUILD_STARTED);
}

public void testAllowFilteringDuringIndexBuildsOn3NodeCluster(boolean isCreateIndex, Index.Status buildStatus) throws Exception
{
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is long and ugly, but covers all cases. I will refactor it soon. It wasn't a priority for now

try (Cluster cluster = init(Cluster.build(3)
.withConfig(config -> config.with(GOSSIP)
@@ -324,24 +349,36 @@ public void testAllowFilteringDuringIndexBuildsOn3NodeCluster() throws Exception
cluster.schemaChange(String.format(CREATE_INDEX, index1, ks2, table, "v1"));
waitForIndexQueryable(cluster, ks2);
cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index1, node), Index.Status.BUILD_SUCCEEDED));
markIndexBuilding(cluster.get(1), ks2, table, index1);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(2), ks2, table, index1);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(3), ks2, table, index1);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(1), ks2, table, index1, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(1), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(1), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(1), buildStatus);
markIndexBuilding(cluster.get(2), ks2, table, index1, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(2), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(2), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(2), buildStatus);
markIndexBuilding(cluster.get(3), ks2, table, index1, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(3), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(3), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(3), buildStatus);
assertIndexingStatus(cluster, ks2, index1);

executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
3);
if (isCreateIndex)
{
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
3);
}
else
{
assertThatThrownBy(() -> executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
3)).hasMessageMatching("^Operation failed - received 0 responses" +
" and 2 failures: INDEX_NOT_AVAILABLE from .+" +
" INDEX_NOT_AVAILABLE from .+$");
}

markIndexQueryable(cluster.get(1), ks2, table, index1);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED);
@@ -353,6 +390,12 @@ public void testAllowFilteringDuringIndexBuildsOn3NodeCluster() throws Exception
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED);
assertIndexingStatus(cluster, ks2, index1);

executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
3);

cluster.schemaChange(String.format("CREATE CUSTOM INDEX %s ON %s.%s(vec) USING 'StorageAttachedIndex'",
vectorIndex, ks2, table));
cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, vectorIndex, node), Index.Status.BUILD_SUCCEEDED));
@@ -376,18 +419,18 @@ public void testAllowFilteringDuringIndexBuildsOn3NodeCluster() throws Exception

// Create one more index but mark it as building
cluster.schemaChange(String.format(CREATE_INDEX, index2, ks2, table, "v2"));
markIndexBuilding(cluster.get(1), ks2, table, index2);
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(2), ks2, table, index2);
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(3), ks2, table, index2);
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(1), ks2, table, index2, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(1), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(1), buildStatus);
markIndexBuilding(cluster.get(2), ks2, table, index2, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(2), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(2), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(2), buildStatus);
markIndexBuilding(cluster.get(3), ks2, table, index2, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(3), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(3), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(3), buildStatus);
assertIndexingStatus(cluster);

assertThatThrownBy(() ->
@@ -398,27 +441,40 @@ public void testAllowFilteringDuringIndexBuildsOn3NodeCluster() throws Exception
0))
.hasMessageContaining(StatementRestrictions.NON_CLUSTER_ORDERING_REQUIRES_ALL_RESTRICTED_NON_PARTITION_KEY_COLUMNS_INDEXED_MESSAGE);

// Verify that the query works with ALLOW FILTERING
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 AND v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
2);
// Verify that the query works or not with ALLOW FILTERING or not

// Verify actual results using a direct query
results = cluster.coordinator(1)
.execute("SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 AND v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM);
assertResultContains(results, Arrays.asList("partition1", "partition5"));
if (isCreateIndex)
{
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 AND v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
2);

// Verify actual results using a direct query
results = cluster.coordinator(1)
.execute("SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 AND v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM);
assertResultContains(results, Arrays.asList("partition1", "partition5"));
}
else
{
assertThatThrownBy(() -> executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + table + " WHERE v1=0 AND v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
2)).hasMessageMatching("^Operation failed - received 0 responses" +
" and 2 failures: INDEX_NOT_AVAILABLE from .+" +
" INDEX_NOT_AVAILABLE from .+$");
}

// mark the vector index as building, we should not be able to query it yet
markIndexBuilding(cluster.get(1), ks2, table, vectorIndex);
waitForIndexingStatus(cluster.get(2), ks2, vectorIndex, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, vectorIndex, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, vectorIndex, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(2), ks2, table, vectorIndex);
waitForIndexingStatus(cluster.get(2), ks2, vectorIndex, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, vectorIndex, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, vectorIndex, cluster.get(2), Index.Status.FULL_REBUILD_STARTED);
markIndexBuilding(cluster.get(1), ks2, table, vectorIndex, isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, vectorIndex, cluster.get(1), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, vectorIndex, cluster.get(1), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, vectorIndex, cluster.get(1), buildStatus);
markIndexBuilding(cluster.get(2), ks2, table, vectorIndex,isCreateIndex);
waitForIndexingStatus(cluster.get(2), ks2, vectorIndex, cluster.get(2), buildStatus);
waitForIndexingStatus(cluster.get(3), ks2, vectorIndex, cluster.get(2), buildStatus);
waitForIndexingStatus(cluster.get(1), ks2, vectorIndex, cluster.get(2), buildStatus);
assertIndexingStatus(cluster);

assertThatThrownBy(() ->
@@ -502,9 +558,9 @@ public void testAllowFilteringWithIndexBuildingOn1NodeCluster() throws Exception
executeOnAllCoordinators(cluster, "SELECT pk FROM ks2.cf1 WHERE v2='0' ALLOW FILTERING", ConsistencyLevel.LOCAL_QUORUM, 0);

// mark ks2 index2 as indexing on node1
markIndexBuilding(cluster.get(1), ks2, cf1, index2);
markIndexBuilding(cluster.get(1), ks2, cf1, index2, true);
// on node2, it observes that node1 ks2.index2 is not queryable
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(1), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(1), Index.Status.INITIALIZED);
// other indexes or keyspaces should not be affected
assertIndexingStatus(cluster, ks2, index2);

@@ -564,18 +620,23 @@ private void markIndexQueryable(IInvokableInstance node, String keyspace, String
node.runOnInstance(() -> {
SecondaryIndexManager sim = Schema.instance.getKeyspaceInstance(keyspace).getColumnFamilyStore(table).indexManager;
Index index = sim.getIndexByName(indexName);
sim.makeIndexNonQueryable(index, Index.Status.BUILD_SUCCEEDED);
//sim.makeIndexQueryable(index, Index.Status.BUILD_SUCCEEDED);
sim.markIndexBuilt(index, true);
});
}

private void markIndexBuilding(IInvokableInstance node, String keyspace, String table, String indexName)
private void markIndexBuilding(IInvokableInstance node, String keyspace, String table, String indexName, boolean isCreateIndex)
{
expectedNodeIndexQueryability.put(NodeIndex.create(keyspace, indexName, node), Index.Status.FULL_REBUILD_STARTED);
if (isCreateIndex)
expectedNodeIndexQueryability.put(NodeIndex.create(keyspace, indexName, node), Index.Status.INITIALIZED);
else
expectedNodeIndexQueryability.put(NodeIndex.create(keyspace, indexName, node), Index.Status.FULL_REBUILD_STARTED);

node.runOnInstance(() -> {
SecondaryIndexManager sim = Schema.instance.getKeyspaceInstance(keyspace).getColumnFamilyStore(table).indexManager;
Index index = sim.getIndexByName(indexName);
sim.markIndexesBuilding(Collections.singleton(index), true, false);
// KATE double-check this later...
sim.markIndexesBuilding(Collections.singleton(index), true, false, isCreateIndex);
});
}

Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
import java.util.Arrays;

import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.filter.ANNOptions;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.junit.Test;