Skip to content

Commit aa25cfa

Browse files
authored
[#1086] [Doc] Simplify the Gluten code and add the doc (#1322)
* gluten integrate for branch-0.8 * spotless check * add WriteBufferManagerTest test * todo * remove addPartition method, add some docs
1 parent cf25897 commit aa25cfa

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,16 @@ After apply the patch and rebuild spark, add following configuration in spark co
258258
spark.dynamicAllocation.enabled true
259259
```
260260

261+
### Support Spark Columnar Shuffle with Gluten
262+
To support spark columnar shuffle with Uniffle, use Gluten client
263+
refer to [Gluten Project](https://github.com/oap-project/gluten)
264+
265+
Update Spark conf to enable integration of Uniffle with Gluten:
266+
```
267+
spark.plugins io.glutenproject.GlutenPlugin
268+
spark.shuffle.manager org.apache.spark.shuffle.gluten.uniffle.GlutenRssShuffleManager
269+
```
270+
261271
### Deploy MapReduce Client
262272

263273
1. Add client jar to the classpath of each NodeManager, e.g., <HADOOP>/share/hadoop/mapreduce/

client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
8686
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
8787
private final Set<ShuffleServerInfo> shuffleServersForData;
8888
private final long[] partitionLengths;
89-
private final boolean isMemoryShuffleEnabled;
9089
private final Function<String, Boolean> taskFailureCallback;
9190
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
9291

9392
/** used by columnar rss shuffle writer implementation */
9493
protected final long taskAttemptId;
9594

9695
protected final ShuffleWriteMetrics shuffleWriteMetrics;
96+
protected final boolean isMemoryShuffleEnabled;
9797

9898
private final BlockingQueue<Object> finishEventQueue = new LinkedBlockingQueue<>();
9999

@@ -213,7 +213,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
213213
}
214214
}
215215

216-
private void writeImpl(Iterator<Product2<K, V>> records) {
216+
protected void writeImpl(Iterator<Product2<K, V>> records) throws IOException {
217217
List<ShuffleBlockInfo> shuffleBlockInfos;
218218
boolean isCombine = shuffleDependency.mapSideCombine();
219219
Function1<V, C> createCombiner = null;
@@ -243,7 +243,7 @@ private void writeImpl(Iterator<Product2<K, V>> records) {
243243
processShuffleBlockInfos(shuffleBlockInfos);
244244
}
245245
long checkStartTs = System.currentTimeMillis();
246-
checkBlockSendResult(blockIds);
246+
internalCheckBlockSendResult();
247247
long commitStartTs = System.currentTimeMillis();
248248
long checkDuration = commitStartTs - checkStartTs;
249249
if (!isMemoryShuffleEnabled) {
@@ -309,6 +309,10 @@ protected List<CompletableFuture<Long>> postBlockEvent(
309309
return futures;
310310
}
311311

312+
protected void internalCheckBlockSendResult() {
313+
checkBlockSendResult(blockIds);
314+
}
315+
312316
@VisibleForTesting
313317
protected void checkBlockSendResult(Set<Long> blockIds) {
314318
boolean interrupted = false;

0 commit comments

Comments
 (0)