Skip to content

Commit

Permalink
[CARBONDATA-4065] Support MERGE INTO SQL Command
Browse files Browse the repository at this point in the history
Why is this PR needed?
In order to support MERGE INTO SQL Command in Carbondata
The previous Scala Parser having trouble to parse the complicated Merge Into SQL Command

What changes were proposed in this PR?
Add an ANTLR parser, and support parse MERGE INTO SQL Command to DataSet Command

Does this PR introduce any user interface change?
Yes.
The PR introduces the MERGE INTO SQL Command.

Is any new testcase added?
Yes

This closes #4032

Co-authored-by: Zhangshunyu <[email protected]>
  • Loading branch information
2 people authored and QiangCai committed Jan 11, 2021
1 parent 4d8a01f commit e019806
Show file tree
Hide file tree
Showing 19 changed files with 1,997 additions and 35 deletions.
22 changes: 17 additions & 5 deletions dev/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
</Match>

<Match>
<Source name="~.*\.scala" />
<Source name="~.*\.scala"/>
</Match>

<Match>
<Source name="~.*Test\.java" />
<Source name="~.*Test\.java"/>
</Match>

<!-- This method creates stream but the caller methods are responsible for closing the stream -->
Expand Down Expand Up @@ -95,7 +95,7 @@
<Match>
<Class name="org.apache.carbondata.core.scan.aggregator.impl.BitSet"/>
<Or>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Or>
</Match>
<Match>
Expand Down Expand Up @@ -126,6 +126,18 @@
<Class name="org.apache.carbondata.events.OperationContext"/>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>
<Match> <Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> <Field name="indexExprWrapper"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Match>
<Match> <Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/> <Field name="validSegments"/> <Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/> </Match>
<Match>
<Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/>
<Field name="indexExprWrapper"/>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>
<Match>
<Class name="~org.apache.spark.sql.secondaryindex.jobs.BlockletIndexInputFormat"/>
<Field name="validSegments"/>
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED"/>
</Match>

<Match>
<Package name="org.apache.spark.sql.parser"/>
</Match>
</FindBugsFilter>
38 changes: 36 additions & 2 deletions docs/scd-and-cdc-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,42 @@ Below is the detailed description of the `merge` API operation.
* `whenNotMatched` clause can have only the `insertExpr` action. The new row is generated based on the specified column and corresponding expressions. Users do not need to specify all the columns in the target table. For unspecified target columns, NULL is inserted.
* `whenNotMatchedAndExistsOnlyOnTarget` clause is executed when row does not match source and exists only in target. This clause can have only delete action.

**NOTE:** SQL syntax for merge is not yet supported.
#### MERGE SQL

Below sql merges a set of updates, insertions, and deletions based on a source table
into a target carbondata table.

```
MERGE INTO target_table_identifier
USING source_table_identifier
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
```

#### MERGE SQL Operation Semantics
Below is the detailed description of the `merge` SQL operation.
* `table_identifier` a table name, optionally qualified with a database name
* `merge_condition` how the rows from one relation are combined with the rows of another relation. An expression with a return type of Boolean.
* `WHEN MATCHED` clauses are executed when a source row matches a target table row based on the match condition,
clauses can have at most one UPDATE and one DELETE action, These clauses have the following semantics.
* The UPDATE action in merge only updates the specified columns of the matched target row.
* The DELETE action will delete the matched row.
* WHEN MATCHED clauses can have at most one UPDATE and one DELETE action. The UPDATE action in merge only updates the specified columns of the matched target row. The DELETE action will delete the matched row.
* Each WHEN MATCHED clause can have an optional condition. If this clause condition exists, the UPDATE or DELETE action is executed for any matching source-target row pair row only when when the clause condition is true.
* If there are multiple WHEN MATCHED clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). All WHEN MATCHED clauses, except the last one, must have conditions.
* If both WHEN MATCHED clauses have conditions and neither of the conditions are true for a matching source-target row pair, then the matched target row is left unchanged.
* To update all the columns of the target carbondata table with the corresponding columns of the source dataset, use UPDATE SET *. This is equivalent to UPDATE SET col1 = source.col1 [, col2 = source.col2 ...] for all the columns of the target carbondata table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.
* `matched_action` can be DELETE | UPDATE SET * |UPDATE SET column1 = value1 [, column2 = value2 ...]
* `WHEN NOT MATCHED` clause is executed when a source row does not match any target row based on the match condition, these clauses have the following semantics.
* WHEN NOT MATCHED clauses can only have the INSERT action. The new row is generated based on the specified column and corresponding expressions. All the columns in the target table do not need to be specified. For unspecified target columns, NULL is inserted.
* Each WHEN NOT MATCHED clause can have an optional condition. If the clause condition is present, a source row is inserted only if that condition is true for that row. Otherwise, the source column is ignored.
* If there are multiple WHEN NOT MATCHED clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). All WHEN NOT MATCHED clauses, except the last one, must have conditions.
* To insert all the columns of the target carbondata table with the corresponding columns of the source dataset, use INSERT *. This is equivalent to INSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...]) for all the columns of the target carbondata table. Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query will throw an error.
* `not_matched_action` can be INSERT * | INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

##### Example code to implement cdc/scd scenario

Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios.
Please refer example class [MergeTestCase](https://github.com/apache/carbondata/blob/master/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala) to understand and implement scd and cdc scenarios using api.
Please refer example class [DataMergeIntoExample](https://github.com/apache/carbondata/blob/master/examples/spark/src/main/scala/org/apache/carbondata/examples/DataMergeIntoExample.scala) to understand and implement scd and cdc scenarios using sql.
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.examples

import org.apache.spark.sql.SparkSession

import org.apache.carbondata.examples.util.ExampleUtils

object DataMergeIntoExample {

def main(args: Array[String]) {
val spark = ExampleUtils.createSparkSession("DataManagementExample")
deleteExampleBody(spark)
deleteWithExpressionExample(spark)
updateExampleBody(spark)
updateWithExpressionExample(spark)
updateSpecificColWithExpressionExample(spark)
insertExampleBody(spark)
insertWithExpressionExample(spark)
insertSpecificColWithExpressionExample(spark)
spark.close()
}

def initTable(spark: SparkSession): Unit = {
spark.sql("DROP TABLE IF EXISTS A")
spark.sql("DROP TABLE IF EXISTS B")

spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS A(
| id Int,
| price Int,
| state String
| )
| STORED AS carbondata
""".stripMargin)

spark.sql(
s"""
| CREATE TABLE IF NOT EXISTS B(
| id Int,
| price Int,
| state String
| )
| STORED AS carbondata
""".stripMargin)

spark.sql(s"""INSERT INTO A VALUES (1,100,"MA")""")
spark.sql(s"""INSERT INTO A VALUES (2,200,"NY")""")
spark.sql(s"""INSERT INTO A VALUES (3,300,"NH")""")
spark.sql(s"""INSERT INTO A VALUES (4,400,"FL")""")

spark.sql(s"""INSERT INTO B VALUES (1,1,"MA (updated)")""")
spark.sql(s"""INSERT INTO B VALUES (2,3,"NY (updated)")""")
spark.sql(s"""INSERT INTO B VALUES (3,3,"CA (updated)")""")
spark.sql(s"""INSERT INTO B VALUES (5,5,"TX (updated)")""")
spark.sql(s"""INSERT INTO B VALUES (7,7,"LO (updated)")""")
}

def dropTables(spark: SparkSession): Unit = {
spark.sql("DROP TABLE IF EXISTS A")
spark.sql("DROP TABLE IF EXISTS B")
}

def deleteExampleBody(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED THEN DELETE"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def deleteWithExpressionExample(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND B.ID=2 THEN DELETE"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def updateExampleBody(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED THEN UPDATE SET *"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def updateWithExpressionExample(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 THEN UPDATE SET *"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def updateSpecificColWithExpressionExample(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
// In this example, it will only update the state
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 THEN UPDATE SET " +
"STATE=B.STATE"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def updateSpecificMultiColWithExpressionExample(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN MATCHED AND A.ID=2 THEN UPDATE SET A" +
".STATE=B.STATE, A.PRICE=B.PRICE"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def insertExampleBody(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED THEN INSERT *"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def insertWithExpressionExample(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED AND B.ID=7 THEN INSERT *"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}

def insertSpecificColWithExpressionExample(spark: SparkSession): Unit = {
dropTables(spark)
initTable(spark)
val sqlText = "MERGE INTO A USING B ON A.ID=B.ID WHEN NOT MATCHED AND B.ID=7 THEN INSERT (A" +
".ID,A.PRICE, A.state) VALUES (B.ID,B.PRICE, 'test-string')"
spark.sql(sqlText)
spark.sql(s"""SELECT * FROM A""").show()
dropTables(spark)
}
}
21 changes: 21 additions & 0 deletions integration/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@

<dependencies>
<!-- carbon -->
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${antlr4.version}</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hive</artifactId>
Expand Down Expand Up @@ -528,6 +533,22 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
<configuration>
<visitor>true</visitor>
<sourceDirectory>../spark/src/main/antlr4</sourceDirectory>
<treatWarningsAsErrors>true</treatWarningsAsErrors>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Loading

0 comments on commit e019806

Please sign in to comment.