Skip to content

Commit

Permalink
[SPARK-51044][SS][TESTS] Add ordering related tests for list state
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add ordering related tests for list state

### Why are the changes needed?
Improve test coverage around relative ordering of items added to list state

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests

```
[info] Run completed in 7 seconds, 638 milliseconds.
[info] Total number of tests run: 22
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 22, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49742 from anishshri-db/task/SPARK-51044.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Feb 1, 2025
1 parent cfc3f1f commit 23704d5
Showing 1 changed file with 144 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,150 @@ class ListStateSuite extends StateVariableSuiteBase {
}
}

// verify that relative ordering of inserted elements is maintained on retrieval - long type
test("list state - value ordering - long type") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
stringEncoder, TimeMode.None())

val testState: ListState[Long] = handle.getListState[Long]("testState", Encoders.scalaLong,
TTLConfig.NONE)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")

var testSeq = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L,
-230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L)
testState.put(testSeq.toArray)
assert(testState.get().toSeq === testSeq)
testState.appendValue(100L)
testState.appendValue(9L)
testState.appendValue(48972L)
testSeq = testSeq ++ Seq(100L, 9L, 48972L)
assert(testState.get().toSeq === testSeq)

var appendSeq = Seq(-1L, 2942450L, 7L)
testState.appendList(appendSeq.toArray)
testSeq = testSeq ++ appendSeq
assert(testState.get().toSeq === testSeq)

testState.clear()
testState.appendValue(3451L)
testState.appendValue(24L)
testState.appendValue(-14342429L)
testSeq = Seq(3451L, 24L, -14342429L)
assert(testState.get().toSeq === testSeq)

appendSeq = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L,
-230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L)
testState.appendList(appendSeq.toArray)
testSeq = testSeq ++ appendSeq
assert(testState.get().toSeq === testSeq)

testState.clear()
assert(!testState.exists())
assert(testState.get().toSeq === Seq.empty[Long])
store.commit()
}
}

// verify that relative ordering of inserted elements is maintained on retrieval - string type
test("list state - value ordering - string type") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
stringEncoder, TimeMode.None())

val testState: ListState[String] = handle.getListState[String]("testState",
Encoders.STRING, TTLConfig.NONE)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")

var testSeq = Seq("test", "actual", "state", "value", "ordering", "string", "", "type")
testState.put(testSeq.toArray)
assert(testState.get().toSeq === testSeq)
testState.appendValue("hello")
testState.appendValue("world")
testState.appendValue("verification")
testSeq = testSeq ++ Seq("hello", "world", "verification")
assert(testState.get().toSeq === testSeq)

var appendSeq = Seq("test string", "stateful", "processor", "handle")
testState.appendList(appendSeq.toArray)
testSeq = testSeq ++ appendSeq
assert(testState.get().toSeq === testSeq)

testState.clear()
testState.appendValue(" validate space in front")
testState.appendValue("validate space in back ")
testState.appendValue(" validate space in both ")
testSeq = Seq(" validate space in front", "validate space in back ",
" validate space in both ")
assert(testState.get().toSeq === testSeq)

appendSeq = Seq("test", "actual", "state", "value", "ordering", "string", "", "type",
"hello", "world", "verification")
testState.appendList(appendSeq.toArray)
testSeq = testSeq ++ appendSeq
assert(testState.get().toSeq === testSeq)

testState.clear()
assert(!testState.exists())
assert(testState.get().toSeq === Seq.empty[String])
store.commit()
}
}

// verify that relative ordering of inserted elements is maintained on retrieval - case class type
test("list state - value ordering - case class type") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
stringEncoder, TimeMode.None())

val testState: ListState[TestClass] = handle.getListState[TestClass]("testState",
Encoders.product[TestClass], TTLConfig.NONE)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")

var testSeq = Seq(TestClass(931L, "test"), TestClass(8000L, "verification"),
TestClass(452300L, "state"), TestClass(4200L, "actual"), TestClass(-1L, "value"),
TestClass(90L, "ordering"), TestClass(1L, "string"))
testState.put(testSeq.toArray)
assert(testState.get().toSeq === testSeq)
testState.appendValue(TestClass(2L, "type"))
testState.appendValue(TestClass(-323L, "hello"))
testState.appendValue(TestClass(48972L, " verify with space"))
testSeq = testSeq ++ Seq(TestClass(2L, "type"), TestClass(-323L, "hello"),
TestClass(48972L, " verify with space"))

assert(testState.get().toSeq === testSeq)

var appendSeq = Seq(TestClass(-1L, "space at end "),
TestClass(2942450L, " space before and after "), TestClass(7L, "sample_string"))
testState.appendList(appendSeq.toArray)
testSeq = testSeq ++ appendSeq
assert(testState.get().toSeq === testSeq)

testState.clear()
testState.appendValue(TestClass(3451L, "values"))
testState.appendValue(TestClass(24L, "ordering"))
testState.appendValue(TestClass(-14342429L, "state"))
testSeq = Seq(TestClass(3451L, "values"), TestClass(24L, "ordering"),
TestClass(-14342429L, "state"))
assert(testState.get().toSeq === testSeq)

appendSeq = Seq(TestClass(931L, "test"), TestClass(8000L, "verification"),
TestClass(452300L, "state"), TestClass(4200L, "actual"), TestClass(-1L, "value"),
TestClass(90L, "ordering"), TestClass(1L, "string"))
testState.appendList(appendSeq.toArray)
testSeq = testSeq ++ appendSeq
assert(testState.get().toSeq === testSeq)

testState.clear()
assert(!testState.exists())
assert(testState.get().toSeq === Seq.empty[TestClass])
store.commit()
}
}

test("List state operations for multiple instance") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider =>
val store = provider.getStore(0)
Expand Down

0 comments on commit 23704d5

Please sign in to comment.