diff --git a/CHANGELOG.md b/CHANGELOG.md index cd397942..199da260 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ + + +## [16.3.4] - 2021-02-16 +### Fixed +* Issue where replication location could not be generated. See [#212](https://github.com/HotelsDotCom/circus-train/issues/212). + ## [16.3.3] - 2020-12-10 ### Fixed * Issue where rename table operation would be incorrect if tables are in different databases. diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java index 05ded78f..3e74a9e6 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.Table; @@ -175,7 +177,8 @@ public void updateMetadata( List partitionsToAlter = new ArrayList<>(sourcePartitions.size()); List statisticsToSet = new ArrayList<>(sourcePartitions.size()); for (Partition sourcePartition : sourcePartitions) { - Path replicaPartitionLocation = locationManager.getPartitionLocation(sourcePartition); + Path replicaPartitionLocation = createReplicaPartitionLocation(sourceTableAndStatistics, sourcePartition, + locationManager); LOG.debug("Generated replica partition path: {}", replicaPartitionLocation); Partition replicaPartition = tableFactory @@ -268,6 +271,28 @@ public void updateMetadata( } } + private Path createReplicaPartitionLocation( + TableAndStatistics sourceTableAndStatistics, + Partition sourcePartition, + ReplicaLocationManager locationManager) { + try { + return locationManager.getPartitionLocation(sourcePartition); + } catch (CircusTrainException e) { + try { + String partitionName = Warehouse + .makePartName(sourceTableAndStatistics.getTable().getPartitionKeys(), sourcePartition.getValues()); + Path replicationPath = new Path(locationManager.getPartitionBaseLocation(), partitionName); + LOG + .warn( + "Couldn't get partition location from folder will generate one instead and use: '{}', original error: {}", + replicationPath, e.getMessage()); + return replicationPath; + } catch (MetaException e1) { + throw new CircusTrainException(e1); + } + } + } + private List getOldPartitions( PartitionsAndStatistics sourcePartitionsAndStatistics, String replicaDatabaseName, diff --git a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java index c86a1e41..3212631e 100644 --- a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java +++ b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.DATABASE; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.EVOLUTION_COLUMN; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PARTITIONED_TABLE; +import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PART_00000; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_ENCODED_TABLE; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_PARTITIONED_TABLE; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_UNPARTITIONED_TABLE; @@ -259,6 +260,66 @@ public void checkAssertion() throws Exception { runner.run(config.getAbsolutePath()); } + @Test + public void partitionedTableDifferentBasePaths() throws Exception { + // base path .../base1/ + Table hiveTable = TestUtils + .createPartitionedTable(sourceCatalog.client(), DATABASE, PARTITIONED_TABLE, + new URI(sourceWarehouseUri + "/base1/")); + + // base path .../base1/ + URI partitionEurope = URI.create(sourceWarehouseUri + "/base1/continent=Europe"); + URI partitionUk = URI.create(partitionEurope + "/country=UK"); + File dataFileUk = new File(partitionUk.getPath(), PART_00000); + FileUtils.writeStringToFile(dataFileUk, "1\tadam\tlondon\n2\tsusan\tglasgow\n"); + + // base path .../base2/ + URI partitionAsia = URI.create(sourceWarehouseUri + "/base2/continent=Asia"); + URI partitionChina = URI.create(partitionAsia + "/country=China"); + File dataFileChina = new File(partitionChina.getPath(), PART_00000); + FileUtils.writeStringToFile(dataFileChina, "1\tchun\tbeijing\n2\tlee\tshanghai\n"); + LOG + .info(">>>> Partitions added: {}", + sourceCatalog + .client() + .add_partitions(Arrays + .asList(newTablePartition(hiveTable, Arrays.asList("Europe", "UK"), partitionUk), + newTablePartition(hiveTable, Arrays.asList("Asia", "China"), partitionChina)))); + LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE)); + + exit.expectSystemExitWithStatus(0); + File config = dataFolder.getFile("partitioned-single-table-no-housekeeping.yml"); + CircusTrainRunner runner = CircusTrainRunner + .builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation) + .sourceMetaStore(sourceCatalog.getThriftConnectionUri(), sourceCatalog.connectionURL(), + sourceCatalog.driverClassName()) + .replicaMetaStore(replicaCatalog.getThriftConnectionUri()) + .build(); + exit.checkAssertionAfterwards(new Assertion() { + @Override + public void checkAssertion() throws Exception { + Table table = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); + String warehouseBase = replicaWarehouseUri.toURI().toString(); + assertTrue(table.getSd().getLocation().matches(warehouseBase + "ct_database/ct_table_p")); + List listPartitions = replicaCatalog + .client() + .listPartitions(DATABASE, PARTITIONED_TABLE, (short) -1); + assertThat(listPartitions.size(), is(2)); + assertTrue(listPartitions + .get(0) + .getSd() + .getLocation() + .matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Asia/country=China")); + assertTrue(listPartitions + .get(1) + .getSd() + .getLocation() + .matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Europe/country=UK")); + } + }); + runner.run(config.getAbsolutePath()); + } + @Test public void partitionedTableNoHousekeepingWithTableReplicationParameters() throws Exception { helper.createPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE)); @@ -1427,21 +1488,18 @@ public void partitionedTableColumnAdditionInStruct() throws Exception { structData.put("name", "adam"); structData.put("city", "blackpool"); - Table replicaTable = replicaHelper.createParquetPartitionedTable( - toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE), - DATABASE, - PARTITIONED_TABLE, - schema, - EVOLUTION_COLUMN, - structData, - 1); + Table replicaTable = replicaHelper + .createParquetPartitionedTable(toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE, + PARTITIONED_TABLE, schema, EVOLUTION_COLUMN, structData, 1); LOG.info(">>>> Table {} ", replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE)); replicaTable.getParameters().put("com.hotels.bdp.circustrain.replication.event", "event_id"); replicaCatalog.client().alter_table(DATABASE, PARTITIONED_TABLE, replicaTable); // Create the source table with an additional column in the struct. - helper.createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN, structData); + helper + .createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN, + structData); Schema schemaV2 = SchemaBuilder .builder("name.space") @@ -1450,7 +1508,7 @@ public void partitionedTableColumnAdditionInStruct() throws Exception { .requiredInt("id") .name(EVOLUTION_COLUMN) .type() - .record( EVOLUTION_COLUMN + "_struct") + .record(EVOLUTION_COLUMN + "_struct") .fields() .requiredString("name") .requiredString("city") @@ -1464,21 +1522,14 @@ public void partitionedTableColumnAdditionInStruct() throws Exception { structData.put("city", "blackpool"); structData.put("dob", "22/09/1992"); - Table table = helper.createParquetPartitionedTable( - toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), - DATABASE, - PARTITIONED_TABLE, - schemaV2, - EVOLUTION_COLUMN, - structData, - 2); + Table table = helper + .createParquetPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE, + PARTITIONED_TABLE, schemaV2, EVOLUTION_COLUMN, structData, 2); LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE)); // Create the source partition with the original struct. URI partition = URI.create(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE) + "/hour=" + 1); - sourceCatalog.client().add_partitions(Arrays.asList( - newTablePartition(table, Arrays.asList("1"), partition) - )); + sourceCatalog.client().add_partitions(Arrays.asList(newTablePartition(table, Arrays.asList("1"), partition))); CircusTrainRunner runner = CircusTrainRunner .builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation) @@ -1496,7 +1547,8 @@ public void checkAssertion() throws Exception { Table sourceTable = sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); List cols = sourceTable.getSd().getCols(); assertThat(cols.get(0), is(new FieldSchema("id", "int", ""))); - assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); + assertThat(cols.get(1), + is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); PartitionIterator partitionIterator = new PartitionIterator(sourceCatalog.client(), sourceTable, (short) 1000); List partitions = new ArrayList<>(); while (partitionIterator.hasNext()) { @@ -1510,7 +1562,8 @@ public void checkAssertion() throws Exception { Table replicaTable = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); cols = replicaTable.getSd().getCols(); assertThat(cols.get(0), is(new FieldSchema("id", "int", ""))); - assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); + assertThat(cols.get(1), + is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); partitionIterator = new PartitionIterator(replicaCatalog.client(), replicaTable, (short) 1000); partitions = new ArrayList<>(); while (partitionIterator.hasNext()) { diff --git a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java index f551c214..c469eb76 100644 --- a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java +++ b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,8 +70,7 @@ public class IntegrationTestHelper { } void createPartitionedTable(URI sourceTableUri) throws Exception { - Table hiveTable = TestUtils - .createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri); + Table hiveTable = TestUtils.createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri); URI partitionEurope = URI.create(sourceTableUri + "/continent=Europe"); URI partitionUk = URI.create(partitionEurope + "/country=UK"); @@ -91,38 +90,34 @@ void createPartitionedTable(URI sourceTableUri) throws Exception { } Table createParquetPartitionedTable( - URI tableUri, - String database, - String table, - Schema schema, - String fieldName, - Object fieldData, - int version) throws Exception { + URI tableUri, + String database, + String table, + Schema schema, + String fieldName, + Object fieldData, + int version) + throws Exception { List columns = new ArrayList<>(); AvroObjectInspectorGenerator schemaInspector = new AvroObjectInspectorGenerator(schema); for (int i = 0; i < schemaInspector.getColumnNames().size(); i++) { - columns.add(new FieldSchema( - schemaInspector.getColumnNames().get(i), schemaInspector.getColumnTypes().get(i).toString(), "" - )); + columns + .add(new FieldSchema(schemaInspector.getColumnNames().get(i), + schemaInspector.getColumnTypes().get(i).toString(), "")); } List partitionKeys = Arrays.asList(new FieldSchema("hour", "string", "")); Table parquetTable = TestUtils - .createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys, - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(), - MapredParquetOutputFormat.class.getName()); + .createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys, + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(), + MapredParquetOutputFormat.class.getName()); URI partition = createData(tableUri, schema, Integer.toString(version), version, fieldName, fieldData); - metaStoreClient.add_partitions(Arrays.asList(newTablePartition(parquetTable, - Arrays.asList(Integer.toString(version)), partition))); + metaStoreClient + .add_partitions( + Arrays.asList(newTablePartition(parquetTable, Arrays.asList(Integer.toString(version)), partition))); return metaStoreClient.getTable(database, table); } - URI createData( - URI tableUri, - Schema schema, - String hour, - int id, - String fieldName, - Object data) throws IOException { + URI createData(URI tableUri, Schema schema, String hour, int id, String fieldName, Object data) throws IOException { GenericData.Record record = new GenericData.Record(schema); record.put("id", id); @@ -144,10 +139,8 @@ URI createData( parentFolder.mkdirs(); File partitionFile = new File(parentFolder, "parquet0000"); Path filePath = new Path(partitionFile.toURI()); - ParquetWriter writer = AvroParquetWriter.builder(filePath) - .withSchema(schema) - .withConf(new Configuration()) - .build(); + ParquetWriter writer = AvroParquetWriter.builder(filePath).withSchema(schema).withConf(new Configuration()).build(); try { writer.write(record); @@ -201,8 +194,7 @@ void createManagedPartitionedTable(URI sourceTableUri) throws Exception { } void createPartitionedView() throws Exception { - Table view = TestUtils - .createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE); + Table view = TestUtils.createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE); metaStoreClient .add_partitions(Arrays .asList(newViewPartition(view, Arrays.asList("Europe", "UK")),