From 42743e9b973ac3da88bc183a325ace16d2699dc4 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Thu, 30 Jan 2025 21:38:18 -0800 Subject: [PATCH 1/2] [SPARK-51047] Add tests to verify scan ordering for non-zero start ordinals as well as non-ascending ordinals --- .../streaming/state/RocksDBStateEncoder.scala | 2 + .../state/RocksDBStateStoreSuite.scala | 286 +++++++++++++----- 2 files changed, 213 insertions(+), 75 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala index dd98477784546..08633999bbc9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala @@ -516,6 +516,8 @@ class UnsafeRowDataEncoder( writer.resetRowWriter() rangeScanKeyFieldsWithOrdinal.zipWithIndex.foreach { case (fieldWithOrdinal, idx) => val field = fieldWithOrdinal._1 + // We must use idx here since we are already operating on the prefix which + // already has the relevant range ordinals projected to the front. val value = row.get(idx, field.dataType) // Note that we cannot allocate a smaller buffer here even if the value is null // because the effective byte array is considered variable size and needs to have diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 1057783c90eaa..b33fbc7ac39f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + Seq(true, false).foreach { colFamiliesEnabled => + test(s"rocksdb range scan - variable size non-ordering columns with non-zero start ordinal " + + s"with colFamiliesEnabled=$colFamiliesEnabled") { + + tryWithProviderResource(newStoreProvider(keySchema, + RangeKeyScanStateEncoderSpec( + keySchema, Seq(1)), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + // use non-default col family if column families are enabled + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchema, valueSchema, + RangeKeyScanStateEncoderSpec(keySchema, Seq(1))) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L, + -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L) + timerTimestamps.foreach { ts => + val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter) + .take(Random.nextInt() % 10 + 1).mkString, + ts.asInstanceOf[Int]) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val key = keyRowToData(kv.key) + key._2 + }.toSeq + assert(result === timerTimestamps.sorted) + store.commit() + + // test with a different set of power of 2 timestamps + val store1 = provider.getStore(1) + val timerTimestamps1 = Seq(-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L) + timerTimestamps1.foreach { ts => + val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter) + .take(Random.nextInt() % 10 + 1).mkString, + ts.asInstanceOf[Int]) + val valueRow = dataToValueRow(1) + store1.put(keyRow, valueRow, cfName) + assert(valueRowToData(store1.get(keyRow, cfName)) === 1) + } + + val result1 = store1.iterator(cfName).map { kv => + val key = keyRowToData(kv.key) + key._2 + }.toSeq + assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted) + store1.commit() + } + } + } + testWithColumnFamiliesAndEncodingTypes( "rocksdb range scan - variable size non-ordering columns with " + "double type values are supported", @@ -453,6 +510,68 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + Seq(true, false).foreach { colFamiliesEnabled => + Seq(Seq(1, 2), Seq(2, 1)).foreach { sortIndexes => + test(s"rocksdb range scan multiple ordering columns - with non-zero start ordinal - " + + s"variable size non-ordering columns with colFamiliesEnabled=$colFamiliesEnabled " + + s"sortIndexes=${sortIndexes.mkString(",")}") { + + val testSchema: StructType = StructType( + Seq(StructField("key1", StringType, false), + StructField("key2", LongType, false), + StructField("key3", IntegerType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](StringType, LongType, IntegerType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, sortIndexes), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, sortIndexes)) + } + + val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000), + (1L, 27), (1L, 394), (1L, 5), (3L, 980), + (-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), (-1020L, 2), + (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 2323), (-3122L, 323)) + timerTimestamps.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](UTF8String + .fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), ts._1, + ts._2))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val keyRow = kv.key + val key = (keyRow.getString(0), keyRow.getLong(1), keyRow.getInt(2)) + (key._2, key._3) + }.toSeq + + def getOrderedTs( + orderedInput: Seq[(Long, Int)], + sortIndexes: Seq[Int]): Seq[(Long, Int)] = { + sortIndexes match { + case Seq(1, 2) => orderedInput.sortBy(x => (x._1, x._2)) + case Seq(2, 1) => orderedInput.sortBy(x => (x._2, x._1)) + case _ => throw new IllegalArgumentException(s"Invalid sortIndexes: " + + s"${sortIndexes.mkString(",")}") + } + } + + assert(result === getOrderedTs(timerTimestamps, sortIndexes)) + store.commit() + } + } + } + } + testWithColumnFamiliesAndEncodingTypes( "rocksdb range scan multiple ordering columns - variable size " + s"non-ordering columns", @@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(exception.getMessage.contains("Found long, expecting union")) } - testWithColumnFamiliesAndEncodingTypes( - "rocksdb range scan multiple non-contiguous ordering columns", - TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled => - val testSchema: StructType = StructType( - Seq( - StructField("ordering1", LongType, false), - StructField("key2", StringType, false), - StructField("ordering2", IntegerType, false), - StructField("string2", StringType, false), - StructField("ordering3", DoubleType, false) + Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { sortIndexes => + testWithColumnFamiliesAndEncodingTypes( + s"rocksdb range scan multiple non-contiguous ordering columns " + + s"and sortIndexes=${sortIndexes.mkString(",")}", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val testSchema: StructType = StructType( + Seq( + StructField("ordering1", LongType, false), + StructField("key2", StringType, false), + StructField("ordering2", IntegerType, false), + StructField("string2", StringType, false), + StructField("ordering3", DoubleType, false) + ) ) - ) - val testSchemaProj = UnsafeProjection.create(Array[DataType]( + val testSchemaProj = UnsafeProjection.create(Array[DataType]( immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): _*)) - val rangeScanOrdinals = Seq(0, 2, 4) - - tryWithProviderResource( - newStoreProvider( - testSchema, - RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals), - colFamiliesEnabled - ) - ) { provider => - val store = provider.getStore(0) + // Multiply by 2 to get the actual ordinals in the row + val rangeScanOrdinals = sortIndexes.map(_ * 2) - val cfName = if (colFamiliesEnabled) "testColFamily" else "default" - if (colFamiliesEnabled) { - store.createColFamilyIfAbsent( - cfName, + tryWithProviderResource( + newStoreProvider( testSchema, - valueSchema, - RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals) + RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals), + colFamiliesEnabled ) - } + ) { provider => + val store = provider.getStore(0) - val orderedInput = Seq( - // Make sure that the first column takes precedence, even if the - // later columns are greater - (-2L, 0, 99.0), - (-1L, 0, 98.0), - (0L, 0, 97.0), - (2L, 0, 96.0), - // Make sure that the second column takes precedence, when the first - // column is all the same - (3L, -2, -1.0), - (3L, -1, -2.0), - (3L, 0, -3.0), - (3L, 2, -4.0), - // Finally, make sure that the third column takes precedence, when the - // first two ordering columns are the same. - (4L, -1, -127.0), - (4L, -1, 0.0), - (4L, -1, 64.0), - (4L, -1, 127.0) - ) - val scrambledInput = Random.shuffle(orderedInput) - - scrambledInput.foreach { record => - val keyRow = testSchemaProj.apply( - new GenericInternalRow( - Array[Any]( - record._1, - UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), - record._2, - UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), - record._3 - ) + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent( + cfName, + testSchema, + valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals) ) + } + + val orderedInput = Seq( + // Make sure that the first column takes precedence, even if the + // later columns are greater + (-2L, 0, 99.0), + (-1L, 0, 98.0), + (0L, 0, 97.0), + (2L, 0, 96.0), + // Make sure that the second column takes precedence, when the first + // column is all the same + (3L, -2, -1.0), + (3L, -1, -2.0), + (3L, 0, -3.0), + (3L, 2, -4.0), + // Finally, make sure that the third column takes precedence, when the + // first two ordering columns are the same. + (4L, -1, -127.0), + (4L, -1, 0.0), + (4L, -1, 64.0), + (4L, -1, 127.0) ) + val scrambledInput = Random.shuffle(orderedInput) + + scrambledInput.foreach { record => + val keyRow = testSchemaProj.apply( + new GenericInternalRow( + Array[Any]( + record._1, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), + record._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), + record._3 + ) + ) + ) - // The value is just a "dummy" value of 1 - val valueRow = dataToValueRow(1) - store.put(keyRow, valueRow, cfName) - assert(valueRowToData(store.get(keyRow, cfName)) === 1) - } + // The value is just a "dummy" value of 1 + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } - val result = store - .iterator(cfName) - .map { kv => - val keyRow = kv.key - val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4)) - (key._1, key._2, key._3) + val result = store + .iterator(cfName) + .map { kv => + val keyRow = kv.key + val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4)) + (key._1, key._2, key._3) + } + .toSeq + + def getOrderedInput( + orderedInput: Seq[(Long, Int, Double)], + sortIndexes: Seq[Int]): Seq[(Long, Int, Double)] = { + sortIndexes match { + case Seq(0, 1, 2) => orderedInput.sortBy(x => (x._1, x._2, x._3)) + case Seq(0, 2, 1) => orderedInput.sortBy(x => (x._1, x._3, x._2)) + case Seq(2, 1, 0) => orderedInput.sortBy(x => (x._3, x._2, x._1)) + case Seq(2, 0, 1) => orderedInput.sortBy(x => (x._3, x._1, x._2)) + case _ => throw new IllegalArgumentException(s"Invalid sortIndexes: " + + s"${sortIndexes.mkString(",")}") + } } - .toSeq - assert(result === orderedInput) + assert(result === getOrderedInput(orderedInput, sortIndexes)) + store.commit() + } } } - testWithColumnFamiliesAndEncodingTypes( "rocksdb range scan multiple ordering columns - variable size " + s"non-ordering columns with null values in first ordering column", From b9206c1103097f50a2588af2c0ef39d7827ab26b Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Fri, 31 Jan 2025 19:22:42 -0800 Subject: [PATCH 2/2] Address Jungtaek's comments --- .../state/RocksDBStateStoreSuite.scala | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index b33fbc7ac39f1..4ac771a5b0baa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -350,6 +350,10 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid tryWithProviderResource(newStoreProvider(keySchema, RangeKeyScanStateEncoderSpec( keySchema, Seq(1)), colFamiliesEnabled)) { provider => + + def getRandStr(): String = Random.alphanumeric.filter(_.isLetter) + .take(Random.nextInt() % 10 + 1).mkString + val store = provider.getStore(0) // use non-default col family if column families are enabled @@ -360,12 +364,10 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid RangeKeyScanStateEncoderSpec(keySchema, Seq(1))) } - val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L, - -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L) + val timerTimestamps = Seq(931, 8000, 452300, 4200, -1, 90, 1, 2, 8, + -230, -14569, -92, -7434253, 35, 6, 9, -323, 5) timerTimestamps.foreach { ts => - val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter) - .take(Random.nextInt() % 10 + 1).mkString, - ts.asInstanceOf[Int]) + val keyRow = dataToKeyRow(getRandStr(), ts) val valueRow = dataToValueRow(1) store.put(keyRow, valueRow, cfName) assert(valueRowToData(store.get(keyRow, cfName)) === 1) @@ -380,11 +382,9 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid // test with a different set of power of 2 timestamps val store1 = provider.getStore(1) - val timerTimestamps1 = Seq(-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L) + val timerTimestamps1 = Seq(-32, -64, -256, 64, 32, 1024, 4096, 0) timerTimestamps1.foreach { ts => - val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter) - .take(Random.nextInt() % 10 + 1).mkString, - ts.asInstanceOf[Int]) + val keyRow = dataToKeyRow(getRandStr(), ts) val valueRow = dataToValueRow(1) store1.put(keyRow, valueRow, cfName) assert(valueRowToData(store1.get(keyRow, cfName)) === 1) @@ -550,8 +550,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid val result = store.iterator(cfName).map { kv => val keyRow = kv.key - val key = (keyRow.getString(0), keyRow.getLong(1), keyRow.getInt(2)) - (key._2, key._3) + (keyRow.getLong(1), keyRow.getInt(2)) }.toSeq def getOrderedTs( @@ -1268,8 +1267,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid .iterator(cfName) .map { kv => val keyRow = kv.key - val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4)) - (key._1, key._2, key._3) + (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4)) } .toSeq