Skip to content

Commit 8ceb4fd

Browse files
vikramahuja1001akashrn5
authored andcommitted
[CARBONDATA-4190] Integrate Carbondata with Spark 3.1.1 version
Why is this PR needed? To integrate Carbondata with Spark3.1.1 What changes were proposed in this PR? Refactored code to add changes to support Spark 3.1.1 along with Spark 2.3 and 2.4 versions Changes: 1. Compile Related Changes 1. New Spark package in MV, Streaming and spark-integration. 2. API wise changes as per spark changes 2. Spark has moved to Proleptic Gregorian Calendar, due to which timestamp related changes in carbondata are also required. 3. Show segment by select command refactor 4. Few Lucene test cases ignored due to the deadlock in spark DAGSchedular, which does not allow it to work. 5. Alter rename: Parser enabled in Carbon and check for carbon 6. doExecuteColumnar() changes in CarbonDataSourceScan.scala 7. char/varchar changes from spark side. 8. Rule name changed in MV 9. In univocity parser, CSVParser version changed. 10. New Configs added in SparkTestQueryExecutor to keep some behaviour same as 2.3 and 2.4 Does this PR introduce any user interface change? No Is any new testcase added? No This closes apache#4141
1 parent 18665cc commit 8ceb4fd

File tree

178 files changed

+4548
-1650
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

178 files changed

+4548
-1650
lines changed

LICENSE

+3-1
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,6 @@
210210
BSD 2-Clause
211211
------------
212212

213-
com.github.luben:zstd-jni
213+
com.github.luben:zstd-jni
214+
215+
com.github.paul-hammant:paranamer

examples/flink/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<name>Apache CarbonData :: Flink Examples</name>
3131

3232
<properties>
33-
<flink.version>1.1.4</flink.version>
33+
<flink.version>1.12.2</flink.version>
3434
<dev.path>${basedir}/../../dev</dev.path>
3535
</properties>
3636

examples/spark/pom.xml

+12-6
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@
3838
<groupId>org.apache.carbondata</groupId>
3939
<artifactId>carbondata-spark_${spark.binary.version}</artifactId>
4040
<version>${project.version}</version>
41-
<exclusions>
42-
<exclusion>
43-
<groupId>org.apache.hive</groupId>
44-
<artifactId>hive-exec</artifactId>
45-
</exclusion>
46-
</exclusions>
4741
</dependency>
4842
<dependency>
4943
<groupId>org.apache.httpcomponents</groupId>
@@ -81,6 +75,11 @@
8175
<artifactId>scalatest_${scala.binary.version}</artifactId>
8276
<scope>test</scope>
8377
</dependency>
78+
<dependency>
79+
<groupId>com.fasterxml.jackson.core</groupId>
80+
<artifactId>jackson-databind</artifactId>
81+
<version>${dep.jackson.version}</version>
82+
</dependency>
8483
</dependencies>
8584

8685
<build>
@@ -214,5 +213,12 @@
214213
<spark.binary.version>2.4</spark.binary.version>
215214
</properties>
216215
</profile>
216+
<profile>
217+
<id>spark-3.1</id>
218+
<properties>
219+
<spark.binary.version>3.1</spark.binary.version>
220+
<dep.jackson.version>2.10.0</dep.jackson.version>
221+
</properties>
222+
</profile>
217223
</profiles>
218224
</project>

examples/spark/src/main/scala/org/apache/carbondata/examples/StreamingWithRowParserExample.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{File, PrintWriter}
2121
import java.net.ServerSocket
2222

2323
import org.apache.spark.sql.{CarbonEnv, SparkSession}
24-
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
24+
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
2525

2626
import org.apache.carbondata.core.util.path.CarbonTablePath
2727
import org.apache.carbondata.examples.util.ExampleUtils
@@ -165,7 +165,7 @@ object StreamingWithRowParserExample {
165165
// Write data from socket stream to carbondata file
166166
qry = readSocketDF.writeStream
167167
.format("carbondata")
168-
.trigger(ProcessingTime("5 seconds"))
168+
.trigger(Trigger.ProcessingTime("5 seconds"))
169169
.option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath))
170170
.option("dbName", "default")
171171
.option("tableName", "stream_table_with_row_parser")

examples/spark/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{File, PrintWriter}
2121
import java.net.ServerSocket
2222

2323
import org.apache.spark.sql.{CarbonEnv, SparkSession}
24-
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
24+
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
2525

2626
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
2727
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -151,7 +151,7 @@ object StructuredStreamingExample {
151151
// Write data from socket stream to carbondata file
152152
qry = readSocketDF.writeStream
153153
.format("carbondata")
154-
.trigger(ProcessingTime("5 seconds"))
154+
.trigger(Trigger.ProcessingTime("5 seconds"))
155155
.option("checkpointLocation",
156156
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
157157
.option("dbName", "default")

examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala

+2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ object ExampleUtils {
8585
} else {
8686
"local[" + workThreadNum.toString() + "]"
8787
}
88+
// TODO: Analyse the legacy configs and add test cases for non legacy ones
8889
val spark = SparkSession
8990
.builder()
9091
.master(masterUrl)
@@ -93,6 +94,7 @@ object ExampleUtils {
9394
.config("spark.driver.host", "localhost")
9495
.config("spark.sql.crossJoin.enabled", "true")
9596
.config("spark.sql.extensions", "org.apache.spark.sql.CarbonExtensions")
97+
.config("spark.sql.legacy.timeParserPolicy", "LEGACY")
9698
.enableHiveSupport()
9799
.getOrCreate()
98100
CarbonEnv.getInstance(spark)

examples/spark/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
104104
TableLevelCompactionOptionExample.exampleBody(spark)
105105
}
106106

107-
test("LuceneIndexExample") {
107+
// Below test case ignored due to the Deadlock in spark code
108+
// TODO: To be fixed when spark removes deadlock in opensource code.
109+
ignore("LuceneIndexExample") {
108110
LuceneIndexExample.exampleBody(spark)
109111
}
110112

index/examples/pom.xml

+6-3
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,6 @@
8181
<profiles>
8282
<profile>
8383
<id>spark-2.3</id>
84-
<activation>
85-
<activeByDefault>true</activeByDefault>
86-
</activation>
8784
<properties>
8885
<spark.binary.version>2.3</spark.binary.version>
8986
</properties>
@@ -94,6 +91,12 @@
9491
<spark.binary.version>2.4</spark.binary.version>
9592
</properties>
9693
</profile>
94+
<profile>
95+
<id>spark-3.1</id>
96+
<properties>
97+
<spark.binary.version>3.1</spark.binary.version>
98+
</properties>
99+
</profile>
97100
</profiles>
98101

99102
</project>

index/secondary-index/pom.xml

+6-7
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,6 @@
3939
<groupId>org.apache.carbondata</groupId>
4040
<artifactId>carbondata-spark_${spark.binary.version}</artifactId>
4141
<version>${project.version}</version>
42-
<exclusions>
43-
<exclusion>
44-
<groupId>org.apache.hive</groupId>
45-
<artifactId>hive-exec</artifactId>
46-
</exclusion>
47-
</exclusions>
4842
<scope>test</scope>
4943
</dependency>
5044
<dependency>
@@ -54,7 +48,6 @@
5448
<dependency>
5549
<groupId>org.scalatest</groupId>
5650
<artifactId>scalatest_${scala.binary.version}</artifactId>
57-
<version>2.2.1</version>
5851
<scope>test</scope>
5952
</dependency>
6053
<dependency>
@@ -178,6 +171,12 @@
178171
<spark.binary.version>2.4</spark.binary.version>
179172
</properties>
180173
</profile>
174+
<profile>
175+
<id>spark-3.1</id>
176+
<properties>
177+
<spark.binary.version>3.1</spark.binary.version>
178+
</properties>
179+
</profile>
181180
</profiles>
182181

183182
</project>

0 commit comments

Comments
 (0)