Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a fix for generating the target partition path if base and part… #213

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [16.3.3] - 2021-02-16
### Fixed
* Issue where replication location could not be generated. See [#212](https://github.com/HotelsDotCom/circus-train/issues/212).

## [16.3.2] - 2020-10-27
### Fixed
* Issue where external AVRO schemas generated lots of copy jobs. See [#203](https://github.com/HotelsDotCom/circus-train/issues/203).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
import org.apache.hadoop.hive.metastore.api.Table;
Expand Down Expand Up @@ -175,7 +177,8 @@ public void updateMetadata(
List<Partition> partitionsToAlter = new ArrayList<>(sourcePartitions.size());
List<ColumnStatistics> statisticsToSet = new ArrayList<>(sourcePartitions.size());
for (Partition sourcePartition : sourcePartitions) {
Path replicaPartitionLocation = locationManager.getPartitionLocation(sourcePartition);
Path replicaPartitionLocation = createReplicaPartitionLocation(sourceTableAndStatistics, sourcePartition,
locationManager);
LOG.debug("Generated replica partition path: {}", replicaPartitionLocation);

Partition replicaPartition = tableFactory
Expand Down Expand Up @@ -268,6 +271,29 @@ public void updateMetadata(
}
}

private Path createReplicaPartitionLocation(
TableAndStatistics sourceTableAndStatistics,
Partition sourcePartition,
ReplicaLocationManager locationManager) {
try {
return locationManager.getPartitionLocation(sourcePartition);
} catch (CircusTrainException e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is thrown when base location doesn't match partition location, strangely enough the actual data copy doesn't have a problem with this so this logic fixes it by generating the partition name (instead of taking the name from the source partition folder).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm actually a bit concerned this might not work correctly with how this is implemented: https://github.com/HotelsDotCom/circus-train/blob/main/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/source/HdfsSnapshotLocationManager.java#L124

I need to find out how the different copiers generate the target paths.

String subPath;
try {
subPath = Warehouse
.makePartName(sourceTableAndStatistics.getTable().getPartitionKeys(), sourcePartition.getValues());
} catch (MetaException e1) {
throw new CircusTrainException(e1);
}
Path replicationPath = new Path(locationManager.getPartitionBaseLocation(), subPath);
LOG
.warn(
"Couldn't get partition location from folder will generate one instead and use: '{}', original error: {}",
replicationPath, e.getMessage());
return replicationPath;
}
}

private List<Partition> getOldPartitions(
PartitionsAndStatistics sourcePartitionsAndStatistics,
String replicaDatabaseName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
* Copyright (C) 2016-2021 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.DATABASE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.EVOLUTION_COLUMN;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PARTITIONED_TABLE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PART_00000;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_ENCODED_TABLE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_PARTITIONED_TABLE;
import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_UNPARTITIONED_TABLE;
Expand Down Expand Up @@ -259,6 +260,68 @@ public void checkAssertion() throws Exception {
runner.run(config.getAbsolutePath());
}

@Test
public void partitionedTableDifferentBasePaths() throws Exception {
// base path .../base1/
Table hiveTable = TestUtils
.createPartitionedTable(sourceCatalog.client(), DATABASE, PARTITIONED_TABLE,
new URI(sourceWarehouseUri + "/base1/"));

// base path .../base1/
URI partitionEurope = URI.create(sourceWarehouseUri + "/base1/continent=Europe");
URI partitionUk = URI.create(partitionEurope + "/country=UK");
File dataFileUk = new File(partitionUk.getPath(), PART_00000);
FileUtils.writeStringToFile(dataFileUk, "1\tadam\tlondon\n2\tsusan\tglasgow\n");

// base path .../base2/
URI partitionAsia = URI.create(sourceWarehouseUri + "/base2/continent=Asia");
URI partitionChina = URI.create(partitionAsia + "/country=China");
File dataFileChina = new File(partitionChina.getPath(), PART_00000);
FileUtils.writeStringToFile(dataFileChina, "1\tchun\tbeijing\n2\tlee\tshanghai\n");
LOG
.info(">>>> Partitions added: {}",
sourceCatalog
.client()
.add_partitions(Arrays
.asList(newTablePartition(hiveTable, Arrays.asList("Europe", "UK"), partitionUk),
newTablePartition(hiveTable, Arrays.asList("Asia", "China"), partitionChina))));
LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE));

exit.expectSystemExitWithStatus(0);
File config = dataFolder.getFile("partitioned-single-table-no-housekeeping.yml");
CircusTrainRunner runner = CircusTrainRunner
.builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation)
.sourceMetaStore(sourceCatalog.getThriftConnectionUri(), sourceCatalog.connectionURL(),
sourceCatalog.driverClassName())
.replicaMetaStore(replicaCatalog.getThriftConnectionUri())
.build();
exit.checkAssertionAfterwards(new Assertion() {
@Override
public void checkAssertion() throws Exception {
Table table = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE);
String warehouseBase = replicaWarehouseUri.toURI().toString();
System.out.println(table.getSd().getLocation());
System.out.println(warehouseBase + "ct_database/ct_table_p");
assertTrue(table.getSd().getLocation().matches(warehouseBase + "/ct_database/ct_table_p"));
List<Partition> listPartitions = replicaCatalog
.client()
.listPartitions(DATABASE, PARTITIONED_TABLE, (short) -1);
assertThat(listPartitions.size(), is(2));
assertTrue(listPartitions
.get(0)
.getSd()
.getLocation()
.matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Asia/country=China"));
assertTrue(listPartitions
.get(1)
.getSd()
.getLocation()
.matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Europe/country=UK"));
}
});
runner.run(config.getAbsolutePath());
}

@Test
public void partitionedTableNoHousekeepingWithTableReplicationParameters() throws Exception {
helper.createPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE));
Expand Down Expand Up @@ -1427,21 +1490,18 @@ public void partitionedTableColumnAdditionInStruct() throws Exception {
structData.put("name", "adam");
structData.put("city", "blackpool");

Table replicaTable = replicaHelper.createParquetPartitionedTable(
toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE),
DATABASE,
PARTITIONED_TABLE,
schema,
EVOLUTION_COLUMN,
structData,
1);
Table replicaTable = replicaHelper
.createParquetPartitionedTable(toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE,
PARTITIONED_TABLE, schema, EVOLUTION_COLUMN, structData, 1);
LOG.info(">>>> Table {} ", replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE));

replicaTable.getParameters().put("com.hotels.bdp.circustrain.replication.event", "event_id");
replicaCatalog.client().alter_table(DATABASE, PARTITIONED_TABLE, replicaTable);

// Create the source table with an additional column in the struct.
helper.createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN, structData);
helper
.createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN,
structData);

Schema schemaV2 = SchemaBuilder
.builder("name.space")
Expand All @@ -1450,7 +1510,7 @@ public void partitionedTableColumnAdditionInStruct() throws Exception {
.requiredInt("id")
.name(EVOLUTION_COLUMN)
.type()
.record( EVOLUTION_COLUMN + "_struct")
.record(EVOLUTION_COLUMN + "_struct")
.fields()
.requiredString("name")
.requiredString("city")
Expand All @@ -1464,21 +1524,14 @@ public void partitionedTableColumnAdditionInStruct() throws Exception {
structData.put("city", "blackpool");
structData.put("dob", "22/09/1992");

Table table = helper.createParquetPartitionedTable(
toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE),
DATABASE,
PARTITIONED_TABLE,
schemaV2,
EVOLUTION_COLUMN,
structData,
2);
Table table = helper
.createParquetPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE,
PARTITIONED_TABLE, schemaV2, EVOLUTION_COLUMN, structData, 2);
LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE));

// Create the source partition with the original struct.
URI partition = URI.create(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE) + "/hour=" + 1);
sourceCatalog.client().add_partitions(Arrays.asList(
newTablePartition(table, Arrays.asList("1"), partition)
));
sourceCatalog.client().add_partitions(Arrays.asList(newTablePartition(table, Arrays.asList("1"), partition)));

CircusTrainRunner runner = CircusTrainRunner
.builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation)
Expand All @@ -1496,7 +1549,8 @@ public void checkAssertion() throws Exception {
Table sourceTable = sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE);
List<FieldSchema> cols = sourceTable.getSd().getCols();
assertThat(cols.get(0), is(new FieldSchema("id", "int", "")));
assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
assertThat(cols.get(1),
is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
PartitionIterator partitionIterator = new PartitionIterator(sourceCatalog.client(), sourceTable, (short) 1000);
List<Partition> partitions = new ArrayList<>();
while (partitionIterator.hasNext()) {
Expand All @@ -1510,7 +1564,8 @@ public void checkAssertion() throws Exception {
Table replicaTable = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE);
cols = replicaTable.getSd().getCols();
assertThat(cols.get(0), is(new FieldSchema("id", "int", "")));
assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
assertThat(cols.get(1),
is(new FieldSchema(EVOLUTION_COLUMN, "struct<name:string,city:string,dob:string>", "")));
partitionIterator = new PartitionIterator(replicaCatalog.client(), replicaTable, (short) 1000);
partitions = new ArrayList<>();
while (partitionIterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public class IntegrationTestHelper {
}

void createPartitionedTable(URI sourceTableUri) throws Exception {
Table hiveTable = TestUtils
.createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri);
Table hiveTable = TestUtils.createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri);

URI partitionEurope = URI.create(sourceTableUri + "/continent=Europe");
URI partitionUk = URI.create(partitionEurope + "/country=UK");
Expand All @@ -91,38 +90,34 @@ void createPartitionedTable(URI sourceTableUri) throws Exception {
}

Table createParquetPartitionedTable(
URI tableUri,
String database,
String table,
Schema schema,
String fieldName,
Object fieldData,
int version) throws Exception {
URI tableUri,
String database,
String table,
Schema schema,
String fieldName,
Object fieldData,
int version)
throws Exception {
List<FieldSchema> columns = new ArrayList<>();
AvroObjectInspectorGenerator schemaInspector = new AvroObjectInspectorGenerator(schema);
for (int i = 0; i < schemaInspector.getColumnNames().size(); i++) {
columns.add(new FieldSchema(
schemaInspector.getColumnNames().get(i), schemaInspector.getColumnTypes().get(i).toString(), ""
));
columns
.add(new FieldSchema(schemaInspector.getColumnNames().get(i),
schemaInspector.getColumnTypes().get(i).toString(), ""));
}
List<FieldSchema> partitionKeys = Arrays.asList(new FieldSchema("hour", "string", ""));
Table parquetTable = TestUtils
.createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys,
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName());
.createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys,
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName());
URI partition = createData(tableUri, schema, Integer.toString(version), version, fieldName, fieldData);
metaStoreClient.add_partitions(Arrays.asList(newTablePartition(parquetTable,
Arrays.asList(Integer.toString(version)), partition)));
metaStoreClient
.add_partitions(
Arrays.asList(newTablePartition(parquetTable, Arrays.asList(Integer.toString(version)), partition)));
return metaStoreClient.getTable(database, table);
}

URI createData(
URI tableUri,
Schema schema,
String hour,
int id,
String fieldName,
Object data) throws IOException {
URI createData(URI tableUri, Schema schema, String hour, int id, String fieldName, Object data) throws IOException {
GenericData.Record record = new GenericData.Record(schema);
record.put("id", id);

Expand All @@ -144,10 +139,8 @@ URI createData(
parentFolder.mkdirs();
File partitionFile = new File(parentFolder, "parquet0000");
Path filePath = new Path(partitionFile.toURI());
ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(filePath)
.withSchema(schema)
.withConf(new Configuration())
.build();
ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData
.Record>builder(filePath).withSchema(schema).withConf(new Configuration()).build();

try {
writer.write(record);
Expand Down Expand Up @@ -201,8 +194,7 @@ void createManagedPartitionedTable(URI sourceTableUri) throws Exception {
}

void createPartitionedView() throws Exception {
Table view = TestUtils
.createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE);
Table view = TestUtils.createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE);
metaStoreClient
.add_partitions(Arrays
.asList(newViewPartition(view, Arrays.asList("Europe", "UK")),
Expand Down