Skip to content

Commit

Permalink
Add Hive catalog with S3 location integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Sep 21, 2024
1 parent c7f571c commit 78ec1b7
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 103 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ allprojects {
param.environment("PROJECT_VERSION", project.version)

// Gravitino CI Docker image
param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:hive-0.1.13")
param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:hive-0.1.14")
param.environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:kerberos-hive-0.1.5")
param.environment("GRAVITINO_CI_DORIS_DOCKER_IMAGE", "apache/gravitino-ci:doris-0.1.5")
param.environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "apache/gravitino-ci:trino-0.1.6")
param.environment("GRAVITINO_CI_RANGER_DOCKER_IMAGE", "apache/gravitino-ci:ranger-0.1.1")
param.environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE", "apache/kafka:3.7.0")
param.environment("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE", "localstack/localstack:latest")

val dockerRunning = project.rootProject.extra["dockerRunning"] as? Boolean ?: false
val macDockerConnector = project.rootProject.extra["macDockerConnector"] as? Boolean ?: false
Expand Down
2 changes: 2 additions & 0 deletions catalogs/catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ dependencies {
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.testcontainers.localstack)
testImplementation(libs.hadoop2.s3)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.hive.integration.test;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.shaded.org.awaitility.Awaitility;

public class CatalogHiveS3IT extends CatalogHiveIT {

private static final Logger LOGGER = LoggerFactory.getLogger(CatalogHiveS3IT.class);

private static final String S3_BUCKET_NAME = "my-test-bucket";
// private static final String S3_BUCKET_NAME = "paimon-bucket/test1";
// private LocalStackContainer localStackContainer;
private GravitinoLocalStackContainer gravitinoLocalStackContainer;

private static final String S3_ACCESS_KEY = "S3_ACCESS_KEY";
private static final String S3_SECRET_KEY = "S3_SECRET_KEY";
private static final String S3_ENDPOINT = "S3_ENDPOINT";

private String getS3Endpoint;
private String accessKey;
private String secretKey;

@Override
protected void startNecessaryContainer() {
containerSuite.startLocalStackContainer();
gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-user", "--user-name", "anonymous");
return result.getExitCode() == 0;
} catch (Exception e) {
LOGGER.info("LocalStack is not ready yet for: ", e);
return false;
}
});

gravitinoLocalStackContainer.executeInContainer(
"awslocal", "s3", "mb", "s3://" + S3_BUCKET_NAME);

Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-access-key", "--user-name", "anonymous");

// Get access key and secret key from result
String[] lines = result.getStdout().split("\n");
accessKey = lines[3].split(":")[1].trim().substring(1, 21);
secretKey = lines[5].split(":")[1].trim().substring(1, 41);

LOGGER.info("Access key: " + accessKey);
LOGGER.info("Secret key: " + secretKey);

getS3Endpoint =
String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566);

gravitinoLocalStackContainer.executeInContainer(
"awslocal",
"s3api",
"put-bucket-acl",
"--bucket",
"my-test-bucket",
"--acl",
"public-read-write");

Map<String, String> hiveContainerEnv =
ImmutableMap.of(
S3_ACCESS_KEY,
accessKey,
S3_SECRET_KEY,
secretKey,
S3_ENDPOINT,
getS3Endpoint,
HiveContainer.HIVE_RUNTIME_VERSION,
HiveContainer.HIVE3);

containerSuite.startHiveContainer(hiveContainerEnv);
}

@Override
protected void initFileSystem() throws IOException {
// Use S3a file system
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", accessKey);
conf.set("fs.s3a.secret.key", secretKey);
conf.set("fs.s3a.endpoint", getS3Endpoint);
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
conf.set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
conf.set("fs.s3a.path.style.access", "true");
conf.set("fs.s3a.connection.ssl.enabled", "false");
fileSystem = FileSystem.get(URI.create("s3a://" + S3_BUCKET_NAME), conf);
}

@Override
protected void initSparkSession() {
sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Hive Catalog integration test")
.config("hive.metastore.uris", HIVE_METASTORE_URIS)
.config(
"spark.sql.warehouse.dir",
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT))
.config("spark.hadoop.fs.s3a.access.key", accessKey)
.config("spark.hadoop.fs.s3a.secret.key", secretKey)
.config("spark.hadoop.fs.s3a.endpoint", getS3Endpoint)
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.sql.storeAssignmentPolicy", "LEGACY")
.config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
.enableHiveSupport()
.getOrCreate();
}

@Override
protected Map<String, String> createSchemaProperties() {
Map<String, String> properties = super.createSchemaProperties();
properties.put("location", "s3a://" + S3_BUCKET_NAME + "/test-" + System.currentTimeMillis());
return properties;
}
}
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ hadoop2-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref
hadoop2-hdfs-client = { group = "org.apache.hadoop", name = "hadoop-hdfs-client", version.ref = "hadoop2" }
hadoop2-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop2"}
hadoop2-mapreduce-client-core = { group = "org.apache.hadoop", name = "hadoop-mapreduce-client-core", version.ref = "hadoop2"}
hadoop2-s3 = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref = "hadoop2"}
hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref = "hadoop3" }
hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop3"}
hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", version.ref = "hadoop3"}
Expand Down Expand Up @@ -178,6 +179,7 @@ testcontainers = { group = "org.testcontainers", name = "testcontainers", versio
testcontainers-mysql = { group = "org.testcontainers", name = "mysql", version.ref = "testcontainers" }
testcontainers-postgresql = { group = "org.testcontainers", name = "postgresql", version.ref = "testcontainers" }
testcontainers-junit-jupiter = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" }
testcontainers-localstack = { group = "org.testcontainers", name = "localstack", version.ref = "testcontainers" }
trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino" }
jwt-api = { group = "io.jsonwebtoken", name = "jjwt-api", version.ref = "jwt"}
jwt-impl = { group = "io.jsonwebtoken", name = "jjwt-impl", version.ref = "jwt"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ContainerSuite implements Closeable {
private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap =
new EnumMap<>(PGImageName.class);

private static volatile GravitinoLocalStackContainer gravitinoLocalStackContainer;
protected static final CloseableGroup closer = CloseableGroup.create();

private static void initIfNecessary() {
Expand Down Expand Up @@ -112,7 +113,11 @@ public Network getNetwork() {
return network;
}

public void startHiveContainer() {
public void startHiveContainer(Map<String, String> env) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.putAll(env);
builder.put("HADOOP_USER_NAME", "anonymous");

if (hiveContainer == null) {
synchronized (ContainerSuite.class) {
if (hiveContainer == null) {
Expand All @@ -121,10 +126,7 @@ public void startHiveContainer() {
HiveContainer.Builder hiveBuilder =
HiveContainer.builder()
.withHostName("gravitino-ci-hive")
.withEnvVars(
ImmutableMap.<String, String>builder()
.put("HADOOP_USER_NAME", "anonymous")
.build())
.withEnvVars(builder.build())
.withNetwork(network);
HiveContainer container = closer.register(hiveBuilder.build());
container.start();
Expand All @@ -134,6 +136,10 @@ public void startHiveContainer() {
}
}

public void startHiveContainer() {
startHiveContainer(ImmutableMap.of());
}

/**
* To start and enable Ranger plugin in Hive container, <br>
* you can specify environment variables: <br>
Expand Down Expand Up @@ -361,6 +367,29 @@ public void startKafkaContainer() {
}
}

public void startLocalStackContainer() {
if (gravitinoLocalStackContainer == null) {
synchronized (ContainerSuite.class) {
if (gravitinoLocalStackContainer == null) {
GravitinoLocalStackContainer.Builder builder =
GravitinoLocalStackContainer.builder().withNetwork(network);
GravitinoLocalStackContainer container = closer.register(builder.build());
try {
container.start();
} catch (Exception e) {
LOG.error("Failed to start LocalStack container", e);
throw new RuntimeException("Failed to start LocalStack container", e);
}
gravitinoLocalStackContainer = container;
}
}
}
}

public GravitinoLocalStackContainer getLocalStackContainer() {
return gravitinoLocalStackContainer;
}

public KafkaContainer getKafkaContainer() {
return kafkaContainer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.integration.test.container;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.testcontainers.containers.Network;

public class GravitinoLocalStackContainer extends BaseContainer {

public static final String DEFAULT_IMAGE = System.getenv("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE");
public static final String HOST_NAME = "gravitino-ci-localstack";
public static final int PORT = 4566;

public GravitinoLocalStackContainer(
String image,
String hostName,
Set<Integer> ports,
Map<String, String> extraHosts,
Map<String, String> filesToMount,
Map<String, String> envVars,
Optional<Network> network) {
super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
}

public static Builder builder() {
return new Builder();
}

@Override
protected boolean checkContainerStatus(int retryLimit) {
return true;
}

public static class Builder
extends BaseContainer.Builder<
GravitinoLocalStackContainer.Builder, GravitinoLocalStackContainer> {
public Builder() {
super();
this.image = DEFAULT_IMAGE;
this.hostName = HOST_NAME;
this.exposePorts = ImmutableSet.of(PORT);
}

@Override
public GravitinoLocalStackContainer build() {
return new GravitinoLocalStackContainer(
image, hostName, exposePorts, extraHosts, filesToMount, envVars, network);
}
}
}

0 comments on commit 78ec1b7

Please sign in to comment.