Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50994][CORE] Perform RDD conversion under tracked execution #49678

Closed
wants to merge 4 commits into from

Conversation

BOOTMGR
Copy link
Contributor

@BOOTMGR BOOTMGR commented Jan 26, 2025

What changes were proposed in this pull request?

  • A new lazy variable materializedRdd is introduced which actualyl holds RDD after it is created (by executing plan).
  • Dataset#rdd is wrapped within withNewRDDExecutionId, which takes care of important setup tasks, like updating Spark properties in SparkContext's thread-locals, before executing the SparkPlan to fetch data
  • Dataset#rdd acts like any other RDD operations like reduce or foreachPartition and operates on materializedRdd with new execution id (and initialising it if not done yet)

Why are the changes needed?

When Dataset is converted into RDD, It executes SpakPlan without any execution context. This leads to:

  1. No tracking is available on Spark UI for stages which are necessary to build the RDD.
  2. Spark properties which are local to thread may not be set in the RDD execution context. This leads to these properties not being sent with TaskContext but some operations like reading parquet files depend on these properties (eg, case-sesitivity).

Test scenario:

test("SPARK-50994: RDD conversion is performed with execution context") {
    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
        withTempDir(dir => {
          val dummyDF = Seq((1, 1.0), (2, 2.0), (3, 3.0), (1, 1.0)).toDF("a", "A")
          dummyDF.write.format("parquet").mode("overwrite").save(dir.getCanonicalPath)

          val df = spark.read.parquet(dir.getCanonicalPath)
          val encoder = ExpressionEncoder(df.schema)
          val deduplicated = df.dropDuplicates(Array("a"))
          val df2 = deduplicated.flatMap(row => Seq(row))(encoder).rdd

          val output = spark.createDataFrame(df2, df.schema)
          checkAnswer(output, Seq(Row(1, 1.0), Row(2, 2.0), Row(3, 3.0)))
        })
      }
    }
  }

In the above scenario,

  • Call to .rdd triggers execution which performs shuffle after reading parquet
  • However, while reading parquet file spark.sql.caseSensitive is not set (even though it is passed during session creation) which is referred into SQLConf by parquet-mr reader
  • This leads to unexpected and wrong result of dropDuplicates as it would drop duplicates by either a or 'A'. Expectation is to drop duplicates by column a
  • This behaviour is not applicable to vectorized parquet reader because it reads case-sensitivity flag from hadoopContext hence is disabled.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing testcases & new test case added for specific scenario

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Jan 26, 2025
@BOOTMGR BOOTMGR changed the title Perform RDD conversion under tracked execution SPARK-50994: Perform RDD conversion under tracked execution Jan 26, 2025
@BOOTMGR BOOTMGR changed the title SPARK-50994: Perform RDD conversion under tracked execution [SPARK-50994][SQL] Perform RDD conversion under tracked execution Jan 26, 2025
Correct because `checkAnswer` in testcase calls `rdd.count()` which is now a tracked operation and Spark event listener is invoked for the same
@BOOTMGR BOOTMGR changed the title [SPARK-50994][SQL] Perform RDD conversion under tracked execution [SPARK-50994][SQL][WIP] Perform RDD conversion under tracked execution Jan 26, 2025
@BOOTMGR
Copy link
Contributor Author

BOOTMGR commented Jan 26, 2025

Marking WIP, this would require some more work around event listeners and observable due to exposure of RDD stages.

`materializedRdd` is the actual holder which is initialized on-demand by operations like `.rdd`, `foreachPartition` etc.
@BOOTMGR BOOTMGR changed the title [SPARK-50994][SQL][WIP] Perform RDD conversion under tracked execution [SPARK-50994][SQL] Perform RDD conversion under tracked execution Jan 27, 2025
@BOOTMGR
Copy link
Contributor Author

BOOTMGR commented Jan 27, 2025

Ready for view

@BOOTMGR BOOTMGR changed the title [SPARK-50994][SQL] Perform RDD conversion under tracked execution [SPARK-50994][CORE] Perform RDD conversion under tracked execution Feb 5, 2025
@BOOTMGR
Copy link
Contributor Author

BOOTMGR commented Feb 5, 2025

@dongjoon-hyun / @HyukjinKwon seeking your attention.

@@ -2721,6 +2721,25 @@ class DataFrameSuite extends QueryTest
parameters = Map("name" -> ".whatever")
)
}

test("SPARK-50994: RDD conversion is performed with execution context") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal conf and ideally shouldn't be set by users. Do we have other examples like JSON/CSV scan with some configs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen any additional failures linked to this behaviour. The only instance we encountered was in production, where legacy requirements forced us to work with a case-sensitive schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix is also included in #48325, shall we take some tests from it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing. Testcases on #48325 are more meaningful.

@bersprockets do you mind If I paste your testcase here?
@cloud-fan I will still keep added testcase here. Let me know if you think otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BOOTMGR paste away!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I took a close look at #48325 and I see that It takes stab at a bigger problem: SQLConf are not propagated when actual execution of RDD happens (when iterator is called) because that is triggered on-demand by user. This PR only ensures that when RDD is computed, It gets correct SQLConf but not during iterator traversal.

I followed conversation there and I agree with you that all SQLConf accesses should have been done during RDD computation (by storing configs locally) but not when iterator is called. I also agree with @bersprockets 's view that fixing it everywhere would be troublesome and there is not guarantee for future additions. I believe that change needs some bigger considerations like how we see interoperability between Dataset and RDD. I am ready to volunteer there.

However, I feel this change should ship independently because

  1. We need to have correct configs set when RDD computation happens. This is needed regardless of [SPARK-47193][SQL] Ensure SQL conf is propagated to executors when actions are called on RDD returned by Dataset#rdd #48325 . We can wait for it later.
  2. We need to have tracking on Spark UI for stages submitted during RDD computation. For example, Snowflake's official spark connector internally converts DF to RDD for serialising it into CSV format. Due to this, none of the dependent stages are show on Spark UI.

Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 to ship this fix first as it's straightforward. I was only asking to take some tests from #48325 so that we don't need to set an internal non-user-facing conf to reproduce the bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. I could reproduce the same issue for spark.sql.legacy.timeParserPolicy but that is also an internal conf. Rest other scenarios mentioned there are caused by iterator issue discussed above.
Please let me know If I can look into any particular thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, let's merge it as it is.

cloud-fan
cloud-fan approved these changes Feb 18, 2025
@cloud-fan
Copy link
Contributor

thanks, merging to master/4.0!

@cloud-fan cloud-fan closed this in 07e6a06 Feb 27, 2025
cloud-fan pushed a commit that referenced this pull request Feb 27, 2025
### What changes were proposed in this pull request?
- A new lazy variable `materializedRdd` is introduced which actualyl holds RDD after it is created (by executing plan).
- `Dataset#rdd` is wrapped within `withNewRDDExecutionId`, which takes care of important setup tasks, like updating Spark properties in `SparkContext`'s thread-locals, before executing the `SparkPlan` to fetch data
- `Dataset#rdd` acts like any other RDD operations like `reduce` or `foreachPartition` and operates on `materializedRdd` with new execution id (and initialising it if not done yet)

### Why are the changes needed?
When `Dataset` is converted into `RDD`, It executes `SpakPlan` without any execution context. This leads to:
1. No tracking is available on Spark UI for stages which are necessary to build the `RDD`.
2. Spark properties which are local to thread may not be set in the `RDD` execution context. This leads to these properties not being sent with `TaskContext` but some operations like reading parquet files depend on these properties (eg, case-sesitivity).

Test scenario:
```java
test("SPARK-50994: RDD conversion is performed with execution context") {
    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
        withTempDir(dir => {
          val dummyDF = Seq((1, 1.0), (2, 2.0), (3, 3.0), (1, 1.0)).toDF("a", "A")
          dummyDF.write.format("parquet").mode("overwrite").save(dir.getCanonicalPath)

          val df = spark.read.parquet(dir.getCanonicalPath)
          val encoder = ExpressionEncoder(df.schema)
          val deduplicated = df.dropDuplicates(Array("a"))
          val df2 = deduplicated.flatMap(row => Seq(row))(encoder).rdd

          val output = spark.createDataFrame(df2, df.schema)
          checkAnswer(output, Seq(Row(1, 1.0), Row(2, 2.0), Row(3, 3.0)))
        })
      }
    }
  }
```
In the above scenario,
- Call to `.rdd` triggers execution which performs shuffle after reading parquet
- However, while reading parquet file `spark.sql.caseSensitive` is not set (even though it is passed during session creation) which is referred into `SQLConf` by `parquet-mr` reader
- This leads to unexpected and wrong result of `dropDuplicates` as it would drop duplicates by either `a` or 'A'. Expectation is to drop duplicates by column `a`
- This behaviour is not applicable to vectorized parquet reader because it reads case-sensitivity flag from `hadoopContext` hence is disabled.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing testcases & new test case added for specific scenario

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49678 from BOOTMGR/SPARK-50994.

Authored-by: BOOTMGR <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 07e6a06)
Signed-off-by: Wenchen Fan <[email protected]>
Pajaraja pushed a commit to Pajaraja/spark that referenced this pull request Mar 6, 2025
### What changes were proposed in this pull request?
- A new lazy variable `materializedRdd` is introduced which actualyl holds RDD after it is created (by executing plan).
- `Dataset#rdd` is wrapped within `withNewRDDExecutionId`, which takes care of important setup tasks, like updating Spark properties in `SparkContext`'s thread-locals, before executing the `SparkPlan` to fetch data
- `Dataset#rdd` acts like any other RDD operations like `reduce` or `foreachPartition` and operates on `materializedRdd` with new execution id (and initialising it if not done yet)

### Why are the changes needed?
When `Dataset` is converted into `RDD`, It executes `SpakPlan` without any execution context. This leads to:
1. No tracking is available on Spark UI for stages which are necessary to build the `RDD`.
2. Spark properties which are local to thread may not be set in the `RDD` execution context. This leads to these properties not being sent with `TaskContext` but some operations like reading parquet files depend on these properties (eg, case-sesitivity).

Test scenario:
```java
test("SPARK-50994: RDD conversion is performed with execution context") {
    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
        withTempDir(dir => {
          val dummyDF = Seq((1, 1.0), (2, 2.0), (3, 3.0), (1, 1.0)).toDF("a", "A")
          dummyDF.write.format("parquet").mode("overwrite").save(dir.getCanonicalPath)

          val df = spark.read.parquet(dir.getCanonicalPath)
          val encoder = ExpressionEncoder(df.schema)
          val deduplicated = df.dropDuplicates(Array("a"))
          val df2 = deduplicated.flatMap(row => Seq(row))(encoder).rdd

          val output = spark.createDataFrame(df2, df.schema)
          checkAnswer(output, Seq(Row(1, 1.0), Row(2, 2.0), Row(3, 3.0)))
        })
      }
    }
  }
```
In the above scenario,
- Call to `.rdd` triggers execution which performs shuffle after reading parquet
- However, while reading parquet file `spark.sql.caseSensitive` is not set (even though it is passed during session creation) which is referred into `SQLConf` by `parquet-mr` reader
- This leads to unexpected and wrong result of `dropDuplicates` as it would drop duplicates by either `a` or 'A'. Expectation is to drop duplicates by column `a`
- This behaviour is not applicable to vectorized parquet reader because it reads case-sensitivity flag from `hadoopContext` hence is disabled.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing testcases & new test case added for specific scenario

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#49678 from BOOTMGR/SPARK-50994.

Authored-by: BOOTMGR <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@@ -2704,7 +2704,7 @@ class SQLQuerySuite extends SQLQuerySuiteBase with DisableAdaptiveExecutionSuite
checkAnswer(sql(s"SELECT id FROM $targetTable"),
Row(1) :: Row(2) :: Row(3) :: Nil)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(commands.size == 3)
assert(commands.size == 4)
Copy link
Contributor

@LuciferYang LuciferYang Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BOOTMGR After this change, this test has shown a tendency to become flaky. I noticed its failure in the Maven daily test, but it seemed stable before (or maybe I just didn't encounter the issue before). Could you investigate this problem?

also cc @cloud-fan

- SPARK-25271: Hive ctas commands should use data source if it is convertible *** FAILED ***
  List(org.apache.spark.sql.execution.SparkPlanInfo@cab1821f, org.apache.spark.sql.execution.SparkPlanInfo@4cf80e6, org.apache.spark.sql.execution.SparkPlanInfo@39acc973, org.apache.spark.sql.execution.SparkPlanInfo@fcface5, org.apache.spark.sql.execution.SparkPlanInfo@8316aebc) had size 5 instead of expected size 4 (SQLQuerySuite.scala:2707)

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were you able to reproduce it locally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test multiple times locally but it never failed. I also triggered test case execution with some debug logs on CI twice but it did not fail there either.

This change adds one extra execution stage (which was not tracker earlier) due to RDD mapping needed by ColumnarToRow transition. I will check If that codebase has any dynamic behaviour but most likely that should not be the case since all parameters and data is always the same.

It could be some other change impacting execution too so I'll do some more runs today to find which extra node is getting added.

Copy link
Contributor

@LuciferYang LuciferYang Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a way to reproduce it locally yet. If it's difficult to reproduce, we can set it aside for now and investigate it later when there's an easier way to reproduce it.

kazemaksOG pushed a commit to kazemaksOG/spark-custom-scheduler that referenced this pull request Mar 27, 2025
### What changes were proposed in this pull request?
- A new lazy variable `materializedRdd` is introduced which actualyl holds RDD after it is created (by executing plan).
- `Dataset#rdd` is wrapped within `withNewRDDExecutionId`, which takes care of important setup tasks, like updating Spark properties in `SparkContext`'s thread-locals, before executing the `SparkPlan` to fetch data
- `Dataset#rdd` acts like any other RDD operations like `reduce` or `foreachPartition` and operates on `materializedRdd` with new execution id (and initialising it if not done yet)

### Why are the changes needed?
When `Dataset` is converted into `RDD`, It executes `SpakPlan` without any execution context. This leads to:
1. No tracking is available on Spark UI for stages which are necessary to build the `RDD`.
2. Spark properties which are local to thread may not be set in the `RDD` execution context. This leads to these properties not being sent with `TaskContext` but some operations like reading parquet files depend on these properties (eg, case-sesitivity).

Test scenario:
```java
test("SPARK-50994: RDD conversion is performed with execution context") {
    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
        withTempDir(dir => {
          val dummyDF = Seq((1, 1.0), (2, 2.0), (3, 3.0), (1, 1.0)).toDF("a", "A")
          dummyDF.write.format("parquet").mode("overwrite").save(dir.getCanonicalPath)

          val df = spark.read.parquet(dir.getCanonicalPath)
          val encoder = ExpressionEncoder(df.schema)
          val deduplicated = df.dropDuplicates(Array("a"))
          val df2 = deduplicated.flatMap(row => Seq(row))(encoder).rdd

          val output = spark.createDataFrame(df2, df.schema)
          checkAnswer(output, Seq(Row(1, 1.0), Row(2, 2.0), Row(3, 3.0)))
        })
      }
    }
  }
```
In the above scenario,
- Call to `.rdd` triggers execution which performs shuffle after reading parquet
- However, while reading parquet file `spark.sql.caseSensitive` is not set (even though it is passed during session creation) which is referred into `SQLConf` by `parquet-mr` reader
- This leads to unexpected and wrong result of `dropDuplicates` as it would drop duplicates by either `a` or 'A'. Expectation is to drop duplicates by column `a`
- This behaviour is not applicable to vectorized parquet reader because it reads case-sensitivity flag from `hadoopContext` hence is disabled.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing testcases & new test case added for specific scenario

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#49678 from BOOTMGR/SPARK-50994.

Authored-by: BOOTMGR <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants