Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51047][SS][TESTS] Add tests to verify scan ordering for non-zero start ordinals as well as non-ascending ordinals #49747

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ class UnsafeRowDataEncoder(
writer.resetRowWriter()
rangeScanKeyFieldsWithOrdinal.zipWithIndex.foreach { case (fieldWithOrdinal, idx) =>
val field = fieldWithOrdinal._1
// We must use idx here since we are already operating on the prefix which
// already has the relevant range ordinals projected to the front.
val value = row.get(idx, field.dataType)
// Note that we cannot allocate a smaller buffer here even if the value is null
// because the effective byte array is considered variable size and needs to have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
}
}

Seq(true, false).foreach { colFamiliesEnabled =>
test(s"rocksdb range scan - variable size non-ordering columns with non-zero start ordinal " +
s"with colFamiliesEnabled=$colFamiliesEnabled") {

tryWithProviderResource(newStoreProvider(keySchema,
RangeKeyScanStateEncoderSpec(
keySchema, Seq(1)), colFamiliesEnabled)) { provider =>

def getRandStr(): String = Random.alphanumeric.filter(_.isLetter)
.take(Random.nextInt() % 10 + 1).mkString

val store = provider.getStore(0)

// use non-default col family if column families are enabled
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(cfName,
keySchema, valueSchema,
RangeKeyScanStateEncoderSpec(keySchema, Seq(1)))
}

val timerTimestamps = Seq(931, 8000, 452300, 4200, -1, 90, 1, 2, 8,
-230, -14569, -92, -7434253, 35, 6, 9, -323, 5)
timerTimestamps.foreach { ts =>
val keyRow = dataToKeyRow(getRandStr(), ts)
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}

val result = store.iterator(cfName).map { kv =>
val key = keyRowToData(kv.key)
key._2
}.toSeq
assert(result === timerTimestamps.sorted)
store.commit()

// test with a different set of power of 2 timestamps
val store1 = provider.getStore(1)
val timerTimestamps1 = Seq(-32, -64, -256, 64, 32, 1024, 4096, 0)
timerTimestamps1.foreach { ts =>
val keyRow = dataToKeyRow(getRandStr(), ts)
val valueRow = dataToValueRow(1)
store1.put(keyRow, valueRow, cfName)
assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
}

val result1 = store1.iterator(cfName).map { kv =>
val key = keyRowToData(kv.key)
key._2
}.toSeq
assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
store1.commit()
}
}
}

testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan - variable size non-ordering columns with " +
"double type values are supported",
Expand Down Expand Up @@ -453,6 +510,67 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
}
}

Seq(true, false).foreach { colFamiliesEnabled =>
Seq(Seq(1, 2), Seq(2, 1)).foreach { sortIndexes =>
test(s"rocksdb range scan multiple ordering columns - with non-zero start ordinal - " +
s"variable size non-ordering columns with colFamiliesEnabled=$colFamiliesEnabled " +
s"sortIndexes=${sortIndexes.mkString(",")}") {

val testSchema: StructType = StructType(
Seq(StructField("key1", StringType, false),
StructField("key2", LongType, false),
StructField("key3", IntegerType, false)))

val schemaProj = UnsafeProjection.create(Array[DataType](StringType, LongType, IntegerType))

tryWithProviderResource(newStoreProvider(testSchema,
RangeKeyScanStateEncoderSpec(testSchema, sortIndexes), colFamiliesEnabled)) { provider =>
val store = provider.getStore(0)

val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(cfName,
testSchema, valueSchema,
RangeKeyScanStateEncoderSpec(testSchema, sortIndexes))
}

val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000),
(1L, 27), (1L, 394), (1L, 5), (3L, 980),
(-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), (-1020L, 2),
(35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 2323), (-3122L, 323))
timerTimestamps.foreach { ts =>
// order by long col first and then by int col
val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](UTF8String
.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), ts._1,
ts._2)))
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}

val result = store.iterator(cfName).map { kv =>
val keyRow = kv.key
(keyRow.getLong(1), keyRow.getInt(2))
}.toSeq

def getOrderedTs(
orderedInput: Seq[(Long, Int)],
sortIndexes: Seq[Int]): Seq[(Long, Int)] = {
sortIndexes match {
case Seq(1, 2) => orderedInput.sortBy(x => (x._1, x._2))
case Seq(2, 1) => orderedInput.sortBy(x => (x._2, x._1))
case _ => throw new IllegalArgumentException(s"Invalid sortIndexes: " +
s"${sortIndexes.mkString(",")}")
}
}

assert(result === getOrderedTs(timerTimestamps, sortIndexes))
store.commit()
}
}
}
}

testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan multiple ordering columns - variable size " +
s"non-ordering columns",
Expand Down Expand Up @@ -1065,97 +1183,113 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(exception.getMessage.contains("Found long, expecting union"))
}

testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan multiple non-contiguous ordering columns",
TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled =>
val testSchema: StructType = StructType(
Seq(
StructField("ordering1", LongType, false),
StructField("key2", StringType, false),
StructField("ordering2", IntegerType, false),
StructField("string2", StringType, false),
StructField("ordering3", DoubleType, false)
Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { sortIndexes =>
testWithColumnFamiliesAndEncodingTypes(
s"rocksdb range scan multiple non-contiguous ordering columns " +
s"and sortIndexes=${sortIndexes.mkString(",")}",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
val testSchema: StructType = StructType(
Seq(
StructField("ordering1", LongType, false),
StructField("key2", StringType, false),
StructField("ordering2", IntegerType, false),
StructField("string2", StringType, false),
StructField("ordering3", DoubleType, false)
)
)
)

val testSchemaProj = UnsafeProjection.create(Array[DataType](
val testSchemaProj = UnsafeProjection.create(Array[DataType](
immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): _*))
val rangeScanOrdinals = Seq(0, 2, 4)

tryWithProviderResource(
newStoreProvider(
testSchema,
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
colFamiliesEnabled
)
) { provider =>
val store = provider.getStore(0)
// Multiply by 2 to get the actual ordinals in the row
val rangeScanOrdinals = sortIndexes.map(_ * 2)

val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(
cfName,
tryWithProviderResource(
newStoreProvider(
testSchema,
valueSchema,
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
colFamiliesEnabled
)
}
) { provider =>
val store = provider.getStore(0)

val orderedInput = Seq(
// Make sure that the first column takes precedence, even if the
// later columns are greater
(-2L, 0, 99.0),
(-1L, 0, 98.0),
(0L, 0, 97.0),
(2L, 0, 96.0),
// Make sure that the second column takes precedence, when the first
// column is all the same
(3L, -2, -1.0),
(3L, -1, -2.0),
(3L, 0, -3.0),
(3L, 2, -4.0),
// Finally, make sure that the third column takes precedence, when the
// first two ordering columns are the same.
(4L, -1, -127.0),
(4L, -1, 0.0),
(4L, -1, 64.0),
(4L, -1, 127.0)
)
val scrambledInput = Random.shuffle(orderedInput)

scrambledInput.foreach { record =>
val keyRow = testSchemaProj.apply(
new GenericInternalRow(
Array[Any](
record._1,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._2,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._3
)
val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
if (colFamiliesEnabled) {
store.createColFamilyIfAbsent(
cfName,
testSchema,
valueSchema,
RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
)
}

val orderedInput = Seq(
// Make sure that the first column takes precedence, even if the
// later columns are greater
(-2L, 0, 99.0),
(-1L, 0, 98.0),
(0L, 0, 97.0),
(2L, 0, 96.0),
// Make sure that the second column takes precedence, when the first
// column is all the same
(3L, -2, -1.0),
(3L, -1, -2.0),
(3L, 0, -3.0),
(3L, 2, -4.0),
// Finally, make sure that the third column takes precedence, when the
// first two ordering columns are the same.
(4L, -1, -127.0),
(4L, -1, 0.0),
(4L, -1, 64.0),
(4L, -1, 127.0)
)
val scrambledInput = Random.shuffle(orderedInput)

scrambledInput.foreach { record =>
val keyRow = testSchemaProj.apply(
new GenericInternalRow(
Array[Any](
record._1,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._2,
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString),
record._3
)
)
)

// The value is just a "dummy" value of 1
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}
// The value is just a "dummy" value of 1
val valueRow = dataToValueRow(1)
store.put(keyRow, valueRow, cfName)
assert(valueRowToData(store.get(keyRow, cfName)) === 1)
}

val result = store
.iterator(cfName)
.map { kv =>
val keyRow = kv.key
val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4))
(key._1, key._2, key._3)
val result = store
.iterator(cfName)
.map { kv =>
val keyRow = kv.key
(keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4))
}
.toSeq

def getOrderedInput(
orderedInput: Seq[(Long, Int, Double)],
sortIndexes: Seq[Int]): Seq[(Long, Int, Double)] = {
sortIndexes match {
case Seq(0, 1, 2) => orderedInput.sortBy(x => (x._1, x._2, x._3))
case Seq(0, 2, 1) => orderedInput.sortBy(x => (x._1, x._3, x._2))
case Seq(2, 1, 0) => orderedInput.sortBy(x => (x._3, x._2, x._1))
case Seq(2, 0, 1) => orderedInput.sortBy(x => (x._3, x._1, x._2))
case _ => throw new IllegalArgumentException(s"Invalid sortIndexes: " +
s"${sortIndexes.mkString(",")}")
}
}
.toSeq

assert(result === orderedInput)
assert(result === getOrderedInput(orderedInput, sortIndexes))
store.commit()
}
}
}


testWithColumnFamiliesAndEncodingTypes(
"rocksdb range scan multiple ordering columns - variable size " +
s"non-ordering columns with null values in first ordering column",
Expand Down