Skip to content

Commit 36759f3

Browse files
zustonkaijchen
authored andcommitted
[BUG] Potenial memory leak when encountering disk unhealthy (#370)
### What changes were proposed in this pull request? Fix potential memory leak when encountering disk unhealthy ### Why are the changes needed? When encountering disk unhealthy and exceed the timeout of pendingDataShuffleFlushEvent, it will release memory. But in current codebase, it wont release the data reference and cause the memory leak. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need.
1 parent 753f7b3 commit 36759f3

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,8 @@ private void flushToFile(ShuffleDataFlushEvent event) {
208208
// just log the error, don't throw the exception and stop the flush thread
209209
LOG.error("Exception happened when process flush shuffle data for " + event, e);
210210
} finally {
211-
ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
212-
if (shuffleBuffer != null) {
213-
shuffleBuffer.clearInFlushBuffer(event.getEventId());
214-
}
211+
cleanupFlushEventData(event);
215212
if (shuffleServer != null) {
216-
shuffleServer.getShuffleBufferManager().releaseMemory(event.getSize(), true, false);
217213
long duration = System.currentTimeMillis() - start;
218214
if (writeSuccess) {
219215
LOG.debug("Flush to file success in " + duration + " ms and release " + event.getSize() + " bytes");
@@ -310,14 +306,22 @@ void processPendingEvents() throws Exception {
310306
addPendingEventsInternal(event);
311307
}
312308

313-
private void dropPendingEvent(PendingShuffleFlushEvent event) {
314-
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
309+
private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
310+
ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
311+
if (shuffleBuffer != null) {
312+
shuffleBuffer.clearInFlushBuffer(event.getEventId());
313+
}
315314
if (shuffleServer != null) {
316315
shuffleServer.getShuffleBufferManager().releaseMemory(
317-
event.getEvent().getSize(), true, false);
316+
event.getSize(), true, false);
318317
}
319318
}
320319

320+
private void dropPendingEvent(PendingShuffleFlushEvent event) {
321+
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
322+
cleanupFlushEventData(event.getEvent());
323+
}
324+
321325
@VisibleForTesting
322326
void addPendingEvents(ShuffleDataFlushEvent event) {
323327
addPendingEventsInternal(new PendingShuffleFlushEvent(event));

0 commit comments

Comments
 (0)