Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys #50033

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

ahshahid
Copy link

@ahshahid ahshahid commented Feb 21, 2025

What changes were proposed in this pull request?

In the DagScheduler and Stage, the actions of successful task completion is done in a Read Lock for the active Stage. While the actions of Failure task, which result in retry of the stage's task are done under Write lock for that stage.
Because ReadWriteLocks are not upgrdable ( from read to write), and in a particular case of successful task completion , the thread owning the Read Lock in Stage, invokes the code of making a new Stage attempt, seeking Write Lock results in dead lock, so a ThreadLocal is introduced in the Stage class, which keeps track of Threads which have already taken a Read Lock, so that they do not attempt to acquire Write Lock.

The logic in DagScehduler code's is unchanged, the diff arising out of formatting diffs, due to putting some piece if code in try / finally for release of Locks.

Since for the failure case, the collection of rollback stages and eventual retry are done, separately , in different threads, write lock is taken two times.
The race within that window is prevented by marking that stage in treatAllPartitionsAsMissingForStage variable.

With the above changes, it turns out that two existing tests have wrong assertions , IMO, as all partitions are not subjected to be retried.

Why are the changes needed?

There is a race condition, where a successful task completion concurrent with a task failure , for an inDeterminate stage, results in a situation , where instead of re-executing all partitions, only some are retried. This results in data loss.
The race condition identified is as follows:
a) A successful result stage task, is yet to mark in the boolean array tracking partitions success/failure as true/false.
b) A concurrent failed result task, belonging to an InDeterminate stage, idenitfies all the stages which needs/ can be rolled back. For Result Stage, it looks into the array of successful partitions. As none is marked as true, the ResultStage and dependent stages are delegated to thread pool for retry.
c) Between the time of collecting stages to rollback and re-try of stages, the successful task marks boolean as true.
d) The Retry of Stage, as a result, misses the partition marked as successful, for retry.

An existing test (core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala) , has incorrect assertions regarding the number of partitions being retried , for an inDeterminate stage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test reproducing the race condition.
Have a functional test which exposes following:
Data Loss due to buggy Stage.isInDeterminate SPARK-51016
Race condition causing Data Loss , even if above bug's PR is taken.

Attaching two files for reproducing the functional bug , showing the race condition causing data corruption.

I am attaching 2 files for bug test
bugrepro.patch
BugTest.txt

This is needed to coax the single VM test to reproduce the issue. It has lots of interception and tweaks to ensure that system is able to hit the data loss situation.
( like each partition writes only a shuffle file containing keys evaluating to same hashCode and deleting the shuffle file at right time etc)
The BugTest itself.
a) If the bugrepro.patch is applied to current master and the BugTest run, it will fail immediately with assertion failure where instead of 12 rows, 6 rows show up in result.

b) If the bugrepro.patch is applied on top of PR PR-SPARK-51016 , then the BugTest will fail after one or two or more iterations, indicating the race condition in DataScheduler/Stage interaction.

c) But if the same BugTest is run on branch containing fix for this bug as well as the PR PR-SPARK-51016, it will pass in all the 100 iteration.

Was this patch authored or co-authored using generative AI tooling?

No

…in retrying all partitions in case of indeterministic shuffle keys
@github-actions github-actions bot added the CORE label Feb 21, 2025
@ahshahid
Copy link
Author

I will try to write a unit test which demonstrates race condition, which can be part of the checkin..

@ahshahid ahshahid changed the title SPARK-51272. Fix for the race condition in Scheduler causing failure … [SPARK-51272][CORE]. Fix for the race condition in Scheduler causing failure in retrying all partitions in case of indeterministic shuffle keys Feb 21, 2025
…on which will fail if all partitions are not retried for indeterministic shuffle stage
Copy link
Contributor

@sririshindra sririshindra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a very rudimentary pass, I will go through it more thoroughly. Please remove the TODOs you added to the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants