Skip to content

Commit

Permalink
[SPARK-50808][CORE] Fix issue in writeAll with mixed types not gettin…
Browse files Browse the repository at this point in the history
…g written properly

### What changes were proposed in this pull request?

Fix a bug with LevelDB/RocksDB's batched write method (`writeAll`) not using the correct list to serialize values.
Luckily, existing use of this api is for the same class - which avoids this bug in practice.
This PR fixes the issue to ensure the api contract works as expected, and avoids issues in future.

### Why are the changes needed?
Fix existing bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New test introduced. Test fails without proposed changes.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49479 from mridulm/mridulm/fix-kvstore-WriteAll-multiple-types.

Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 98d9968)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
  • Loading branch information
Mridul Muralidharan committed Jan 18, 2025
1 parent cfa6f30 commit 3bcd973
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void writeAll(List<?> values) throws Exception {

// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : values) {
for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
Expand All @@ -191,6 +191,7 @@ public void writeAll(List<?> values) throws Exception {

try (WriteBatch batch = db().createWriteBatch()) {
while (valueIter.hasNext()) {
assert serializedValueIter.hasNext();
updateBatch(batch, valueIter.next(), serializedValueIter.next(), klass,
naturalIndex, indices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void writeAll(List<?> values) throws Exception {

// Deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : values) {
for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
Expand All @@ -223,6 +223,7 @@ public void writeAll(List<?> values) throws Exception {

try (WriteBatch writeBatch = new WriteBatch()) {
while (valueIter.hasNext()) {
assert serializedValueIter.hasNext();
updateBatch(writeBatch, valueIter.next(), serializedValueIter.next(), klass,
naturalIndex, indices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -422,6 +423,37 @@ public void testResourceCleaner() throws Exception {
}
}

@Test
public void testMultipleTypesWriteAll() throws Exception {

List<CustomType1> type1List = Arrays.asList(
createCustomType1(1),
createCustomType1(2),
createCustomType1(3),
createCustomType1(4)
);

List<CustomType2> type2List = Arrays.asList(
createCustomType2(10),
createCustomType2(11),
createCustomType2(12),
createCustomType2(13)
);

List fullList = new ArrayList();
fullList.addAll(type1List);
fullList.addAll(type2List);

db.writeAll(fullList);
for (CustomType1 value : type1List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
for (CustomType2 value : type2List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
}


private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand All @@ -432,6 +464,14 @@ private CustomType1 createCustomType1(int i) {
return t;
}

private CustomType2 createCustomType2(int i) {
CustomType2 t = new CustomType2();
t.key = "key" + i;
t.id = "id" + i;
t.parentId = "parent_id" + (i / 2);
return t;
}

private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -420,6 +421,36 @@ public void testResourceCleaner() throws Exception {
}
}

@Test
public void testMultipleTypesWriteAll() throws Exception {

List<CustomType1> type1List = Arrays.asList(
createCustomType1(1),
createCustomType1(2),
createCustomType1(3),
createCustomType1(4)
);

List<CustomType2> type2List = Arrays.asList(
createCustomType2(10),
createCustomType2(11),
createCustomType2(12),
createCustomType2(13)
);

List fullList = new ArrayList();
fullList.addAll(type1List);
fullList.addAll(type2List);

db.writeAll(fullList);
for (CustomType1 value : type1List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
for (CustomType2 value : type2List) {
assertEquals(value, db.read(value.getClass(), value.key));
}
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand All @@ -430,6 +461,14 @@ private CustomType1 createCustomType1(int i) {
return t;
}

private CustomType2 createCustomType2(int i) {
CustomType2 t = new CustomType2();
t.key = "key" + i;
t.id = "id" + i;
t.parentId = "parent_id" + (i / 2);
return t;
}

private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
Expand Down

0 comments on commit 3bcd973

Please sign in to comment.