Skip to content

Commit

Permalink
[CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version
Browse files Browse the repository at this point in the history
Why is this PR needed?
To integrate Carbondata with Spark3.1.1

What changes were proposed in this PR?
Refactored code to add changes to support Spark 3.1.1 along with Spark 2.3 and 2.4 versions
Changes:

1. Compile Related Changes
	1. New Spark package in MV, Streaming and spark-integration.
	2. API wise changes as per spark changes
2. Spark has moved to Proleptic Gregorian Calendar, due to which timestamp related changes in carbondata are also required.
3. Show segment by select command refactor
4. Few Lucene test cases ignored due to the deadlock in spark DAGSchedular, which does not allow it to work.
5. Alter rename: Parser enabled in Carbon and check for carbon
6. doExecuteColumnar() changes in CarbonDataSourceScan.scala
7. char/varchar changes from spark side.
8. Rule name changed in MV
9. In univocity parser, CSVParser version changed.
10. New Configs added in SparkTestQueryExecutor to keep some behaviour same as 2.3 and 2.4

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This closes #4141
  • Loading branch information
vikramahuja1001 authored and akashrn5 committed Jun 23, 2021
1 parent 18665cc commit 8ceb4fd
Show file tree
Hide file tree
Showing 178 changed files with 4,548 additions and 1,650 deletions.
4 changes: 3 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,6 @@
BSD 2-Clause
------------

com.github.luben:zstd-jni
com.github.luben:zstd-jni

com.github.paul-hammant:paranamer
2 changes: 1 addition & 1 deletion examples/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<name>Apache CarbonData :: Flink Examples</name>

<properties>
<flink.version>1.1.4</flink.version>
<flink.version>1.12.2</flink.version>
<dev.path>${basedir}/../../dev</dev.path>
</properties>

Expand Down
18 changes: 12 additions & 6 deletions examples/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark_${spark.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down Expand Up @@ -81,6 +75,11 @@
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${dep.jackson.version}</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -214,5 +213,12 @@
<spark.binary.version>2.4</spark.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<properties>
<spark.binary.version>3.1</spark.binary.version>
<dep.jackson.version>2.10.0</dep.jackson.version>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{File, PrintWriter}
import java.net.ServerSocket

import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}

import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.examples.util.ExampleUtils
Expand Down Expand Up @@ -165,7 +165,7 @@ object StreamingWithRowParserExample {
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
.option("dbName", "default")
.option("tableName", "stream_table_with_row_parser")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{File, PrintWriter}
import java.net.ServerSocket

import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}

import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonTablePath
Expand Down Expand Up @@ -151,7 +151,7 @@ object StructuredStreamingExample {
// Write data from socket stream to carbondata file
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.trigger(Trigger.ProcessingTime("5 seconds"))
.option("checkpointLocation",
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
.option("dbName", "default")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ object ExampleUtils {
} else {
"local[" + workThreadNum.toString() + "]"
}
// TODO: Analyse the legacy configs and add test cases for non legacy ones
val spark = SparkSession
.builder()
.master(masterUrl)
Expand All @@ -93,6 +94,7 @@ object ExampleUtils {
.config("spark.driver.host", "localhost")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")
.enableHiveSupport()
.getOrCreate()
CarbonEnv.getInstance(spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
TableLevelCompactionOptionExample.exampleBody(spark)
}

test("LuceneIndexExample") {
// Below test case ignored due to the Deadlock in spark code
// TODO: To be fixed when spark removes deadlock in opensource code.
ignore("LuceneIndexExample") {
LuceneIndexExample.exampleBody(spark)
}

Expand Down
9 changes: 6 additions & 3 deletions index/examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@
<profiles>
<profile>
<id>spark-2.3</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.binary.version>2.3</spark.binary.version>
</properties>
Expand All @@ -94,6 +91,12 @@
<spark.binary.version>2.4</spark.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<properties>
<spark.binary.version>3.1</spark.binary.version>
</properties>
</profile>
</profiles>

</project>
13 changes: 6 additions & 7 deletions index/secondary-index/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-spark_${spark.binary.version}</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -54,7 +48,6 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>2.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -178,6 +171,12 @@
<spark.binary.version>2.4</spark.binary.version>
</properties>
</profile>
<profile>
<id>spark-3.1</id>
<properties>
<spark.binary.version>3.1</spark.binary.version>
</properties>
</profile>
</profiles>

</project>
Loading

0 comments on commit 8ceb4fd

Please sign in to comment.