From 1305174d6f1db850d2ef152463307aa0fc97f52c Mon Sep 17 00:00:00 2001 From: patduin Date: Mon, 15 Feb 2021 16:22:15 +0100 Subject: [PATCH 1/2] Added a fix for generating the target partition path if base and partition location are not matching in the source --- CHANGELOG.md | 4 + .../bdp/circustrain/core/replica/Replica.java | 28 ++++- .../CircusTrainHdfsHdfsIntegrationTest.java | 103 ++++++++++++++---- .../integration/IntegrationTestHelper.java | 52 ++++----- 4 files changed, 132 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4171fa53..8ea3eab2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## [16.3.3] - 2021-02-16 +### Fixed +* Issue where replication location could not be generated. See [#212](https://github.com/HotelsDotCom/circus-train/issues/212). + ## [16.3.2] - 2020-10-27 ### Fixed * Issue where external AVRO schemas generated lots of copy jobs. See [#203](https://github.com/HotelsDotCom/circus-train/issues/203). diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java index 05ded78f..4d605948 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.Table; @@ -175,7 +177,8 @@ public void updateMetadata( List partitionsToAlter = new ArrayList<>(sourcePartitions.size()); List statisticsToSet = new ArrayList<>(sourcePartitions.size()); for (Partition sourcePartition : sourcePartitions) { - Path replicaPartitionLocation = locationManager.getPartitionLocation(sourcePartition); + Path replicaPartitionLocation = createReplicaPartitionLocation(sourceTableAndStatistics, sourcePartition, + locationManager); LOG.debug("Generated replica partition path: {}", replicaPartitionLocation); Partition replicaPartition = tableFactory @@ -268,6 +271,29 @@ public void updateMetadata( } } + private Path createReplicaPartitionLocation( + TableAndStatistics sourceTableAndStatistics, + Partition sourcePartition, + ReplicaLocationManager locationManager) { + try { + return locationManager.getPartitionLocation(sourcePartition); + } catch (CircusTrainException e) { + String subPath; + try { + subPath = Warehouse + .makePartName(sourceTableAndStatistics.getTable().getPartitionKeys(), sourcePartition.getValues()); + } catch (MetaException e1) { + throw new CircusTrainException(e1); + } + Path replicationPath = new Path(locationManager.getPartitionBaseLocation(), subPath); + LOG + .warn( + "Couldn't get partition location from folder will generate one instead and use: '{}', original error: {}", + replicationPath, e.getMessage()); + return replicationPath; + } + } + private List getOldPartitions( PartitionsAndStatistics sourcePartitionsAndStatistics, String replicaDatabaseName, diff --git a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java index c86a1e41..67b18a8a 100644 --- a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java +++ b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.DATABASE; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.EVOLUTION_COLUMN; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PARTITIONED_TABLE; +import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.PART_00000; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_ENCODED_TABLE; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_PARTITIONED_TABLE; import static com.hotels.bdp.circustrain.integration.IntegrationTestHelper.SOURCE_MANAGED_UNPARTITIONED_TABLE; @@ -259,6 +260,68 @@ public void checkAssertion() throws Exception { runner.run(config.getAbsolutePath()); } + @Test + public void partitionedTableDifferentBasePaths() throws Exception { + // base path .../base1/ + Table hiveTable = TestUtils + .createPartitionedTable(sourceCatalog.client(), DATABASE, PARTITIONED_TABLE, + new URI(sourceWarehouseUri + "/base1/")); + + // base path .../base1/ + URI partitionEurope = URI.create(sourceWarehouseUri + "/base1/continent=Europe"); + URI partitionUk = URI.create(partitionEurope + "/country=UK"); + File dataFileUk = new File(partitionUk.getPath(), PART_00000); + FileUtils.writeStringToFile(dataFileUk, "1\tadam\tlondon\n2\tsusan\tglasgow\n"); + + // base path .../base2/ + URI partitionAsia = URI.create(sourceWarehouseUri + "/base2/continent=Asia"); + URI partitionChina = URI.create(partitionAsia + "/country=China"); + File dataFileChina = new File(partitionChina.getPath(), PART_00000); + FileUtils.writeStringToFile(dataFileChina, "1\tchun\tbeijing\n2\tlee\tshanghai\n"); + LOG + .info(">>>> Partitions added: {}", + sourceCatalog + .client() + .add_partitions(Arrays + .asList(newTablePartition(hiveTable, Arrays.asList("Europe", "UK"), partitionUk), + newTablePartition(hiveTable, Arrays.asList("Asia", "China"), partitionChina)))); + LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE)); + + exit.expectSystemExitWithStatus(0); + File config = dataFolder.getFile("partitioned-single-table-no-housekeeping.yml"); + CircusTrainRunner runner = CircusTrainRunner + .builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation) + .sourceMetaStore(sourceCatalog.getThriftConnectionUri(), sourceCatalog.connectionURL(), + sourceCatalog.driverClassName()) + .replicaMetaStore(replicaCatalog.getThriftConnectionUri()) + .build(); + exit.checkAssertionAfterwards(new Assertion() { + @Override + public void checkAssertion() throws Exception { + Table table = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); + String warehouseBase = replicaWarehouseUri.toURI().toString(); + System.out.println(table.getSd().getLocation()); + System.out.println(warehouseBase + "ct_database/ct_table_p"); + assertTrue(table.getSd().getLocation().matches(warehouseBase + "/ct_database/ct_table_p")); + List listPartitions = replicaCatalog + .client() + .listPartitions(DATABASE, PARTITIONED_TABLE, (short) -1); + assertThat(listPartitions.size(), is(2)); + assertTrue(listPartitions + .get(0) + .getSd() + .getLocation() + .matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Asia/country=China")); + assertTrue(listPartitions + .get(1) + .getSd() + .getLocation() + .matches(warehouseBase + "ct_database/ct_table_p/ctp.*?/continent=Europe/country=UK")); + } + }); + runner.run(config.getAbsolutePath()); + } + @Test public void partitionedTableNoHousekeepingWithTableReplicationParameters() throws Exception { helper.createPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE)); @@ -1427,21 +1490,18 @@ public void partitionedTableColumnAdditionInStruct() throws Exception { structData.put("name", "adam"); structData.put("city", "blackpool"); - Table replicaTable = replicaHelper.createParquetPartitionedTable( - toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE), - DATABASE, - PARTITIONED_TABLE, - schema, - EVOLUTION_COLUMN, - structData, - 1); + Table replicaTable = replicaHelper + .createParquetPartitionedTable(toUri(replicaWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE, + PARTITIONED_TABLE, schema, EVOLUTION_COLUMN, structData, 1); LOG.info(">>>> Table {} ", replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE)); replicaTable.getParameters().put("com.hotels.bdp.circustrain.replication.event", "event_id"); replicaCatalog.client().alter_table(DATABASE, PARTITIONED_TABLE, replicaTable); // Create the source table with an additional column in the struct. - helper.createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN, structData); + helper + .createData(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), schema, "1", 1, EVOLUTION_COLUMN, + structData); Schema schemaV2 = SchemaBuilder .builder("name.space") @@ -1450,7 +1510,7 @@ public void partitionedTableColumnAdditionInStruct() throws Exception { .requiredInt("id") .name(EVOLUTION_COLUMN) .type() - .record( EVOLUTION_COLUMN + "_struct") + .record(EVOLUTION_COLUMN + "_struct") .fields() .requiredString("name") .requiredString("city") @@ -1464,21 +1524,14 @@ public void partitionedTableColumnAdditionInStruct() throws Exception { structData.put("city", "blackpool"); structData.put("dob", "22/09/1992"); - Table table = helper.createParquetPartitionedTable( - toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), - DATABASE, - PARTITIONED_TABLE, - schemaV2, - EVOLUTION_COLUMN, - structData, - 2); + Table table = helper + .createParquetPartitionedTable(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE), DATABASE, + PARTITIONED_TABLE, schemaV2, EVOLUTION_COLUMN, structData, 2); LOG.info(">>>> Table {} ", sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE)); // Create the source partition with the original struct. URI partition = URI.create(toUri(sourceWarehouseUri, DATABASE, PARTITIONED_TABLE) + "/hour=" + 1); - sourceCatalog.client().add_partitions(Arrays.asList( - newTablePartition(table, Arrays.asList("1"), partition) - )); + sourceCatalog.client().add_partitions(Arrays.asList(newTablePartition(table, Arrays.asList("1"), partition))); CircusTrainRunner runner = CircusTrainRunner .builder(DATABASE, sourceWarehouseUri, replicaWarehouseUri, housekeepingDbLocation) @@ -1496,7 +1549,8 @@ public void checkAssertion() throws Exception { Table sourceTable = sourceCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); List cols = sourceTable.getSd().getCols(); assertThat(cols.get(0), is(new FieldSchema("id", "int", ""))); - assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); + assertThat(cols.get(1), + is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); PartitionIterator partitionIterator = new PartitionIterator(sourceCatalog.client(), sourceTable, (short) 1000); List partitions = new ArrayList<>(); while (partitionIterator.hasNext()) { @@ -1510,7 +1564,8 @@ public void checkAssertion() throws Exception { Table replicaTable = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); cols = replicaTable.getSd().getCols(); assertThat(cols.get(0), is(new FieldSchema("id", "int", ""))); - assertThat(cols.get(1), is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); + assertThat(cols.get(1), + is(new FieldSchema(EVOLUTION_COLUMN, "struct", ""))); partitionIterator = new PartitionIterator(replicaCatalog.client(), replicaTable, (short) 1000); partitions = new ArrayList<>(); while (partitionIterator.hasNext()) { diff --git a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java index f551c214..d0553904 100644 --- a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java +++ b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java @@ -70,8 +70,7 @@ public class IntegrationTestHelper { } void createPartitionedTable(URI sourceTableUri) throws Exception { - Table hiveTable = TestUtils - .createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri); + Table hiveTable = TestUtils.createPartitionedTable(metaStoreClient, DATABASE, PARTITIONED_TABLE, sourceTableUri); URI partitionEurope = URI.create(sourceTableUri + "/continent=Europe"); URI partitionUk = URI.create(partitionEurope + "/country=UK"); @@ -91,38 +90,34 @@ void createPartitionedTable(URI sourceTableUri) throws Exception { } Table createParquetPartitionedTable( - URI tableUri, - String database, - String table, - Schema schema, - String fieldName, - Object fieldData, - int version) throws Exception { + URI tableUri, + String database, + String table, + Schema schema, + String fieldName, + Object fieldData, + int version) + throws Exception { List columns = new ArrayList<>(); AvroObjectInspectorGenerator schemaInspector = new AvroObjectInspectorGenerator(schema); for (int i = 0; i < schemaInspector.getColumnNames().size(); i++) { - columns.add(new FieldSchema( - schemaInspector.getColumnNames().get(i), schemaInspector.getColumnTypes().get(i).toString(), "" - )); + columns + .add(new FieldSchema(schemaInspector.getColumnNames().get(i), + schemaInspector.getColumnTypes().get(i).toString(), "")); } List partitionKeys = Arrays.asList(new FieldSchema("hour", "string", "")); Table parquetTable = TestUtils - .createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys, - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(), - MapredParquetOutputFormat.class.getName()); + .createPartitionedTable(metaStoreClient, database, table, tableUri, columns, partitionKeys, + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", MapredParquetInputFormat.class.getName(), + MapredParquetOutputFormat.class.getName()); URI partition = createData(tableUri, schema, Integer.toString(version), version, fieldName, fieldData); - metaStoreClient.add_partitions(Arrays.asList(newTablePartition(parquetTable, - Arrays.asList(Integer.toString(version)), partition))); + metaStoreClient + .add_partitions( + Arrays.asList(newTablePartition(parquetTable, Arrays.asList(Integer.toString(version)), partition))); return metaStoreClient.getTable(database, table); } - URI createData( - URI tableUri, - Schema schema, - String hour, - int id, - String fieldName, - Object data) throws IOException { + URI createData(URI tableUri, Schema schema, String hour, int id, String fieldName, Object data) throws IOException { GenericData.Record record = new GenericData.Record(schema); record.put("id", id); @@ -144,10 +139,8 @@ URI createData( parentFolder.mkdirs(); File partitionFile = new File(parentFolder, "parquet0000"); Path filePath = new Path(partitionFile.toURI()); - ParquetWriter writer = AvroParquetWriter.builder(filePath) - .withSchema(schema) - .withConf(new Configuration()) - .build(); + ParquetWriter writer = AvroParquetWriter.builder(filePath).withSchema(schema).withConf(new Configuration()).build(); try { writer.write(record); @@ -201,8 +194,7 @@ void createManagedPartitionedTable(URI sourceTableUri) throws Exception { } void createPartitionedView() throws Exception { - Table view = TestUtils - .createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE); + Table view = TestUtils.createPartitionedView(metaStoreClient, DATABASE, SOURCE_PARTITIONED_VIEW, PARTITIONED_TABLE); metaStoreClient .add_partitions(Arrays .asList(newViewPartition(view, Arrays.asList("Europe", "UK")), From 87739e8240feb5f1b2723f737ca25c6d21a8ba96 Mon Sep 17 00:00:00 2001 From: patduin Date: Mon, 15 Feb 2021 16:45:09 +0100 Subject: [PATCH 2/2] cleaned up and fixed test --- .../bdp/circustrain/core/replica/Replica.java | 17 ++++++++--------- .../CircusTrainHdfsHdfsIntegrationTest.java | 4 +--- .../integration/IntegrationTestHelper.java | 2 +- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java index 4d605948..3e74a9e6 100644 --- a/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java +++ b/circus-train-core/src/main/java/com/hotels/bdp/circustrain/core/replica/Replica.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -278,19 +278,18 @@ private Path createReplicaPartitionLocation( try { return locationManager.getPartitionLocation(sourcePartition); } catch (CircusTrainException e) { - String subPath; try { - subPath = Warehouse + String partitionName = Warehouse .makePartName(sourceTableAndStatistics.getTable().getPartitionKeys(), sourcePartition.getValues()); + Path replicationPath = new Path(locationManager.getPartitionBaseLocation(), partitionName); + LOG + .warn( + "Couldn't get partition location from folder will generate one instead and use: '{}', original error: {}", + replicationPath, e.getMessage()); + return replicationPath; } catch (MetaException e1) { throw new CircusTrainException(e1); } - Path replicationPath = new Path(locationManager.getPartitionBaseLocation(), subPath); - LOG - .warn( - "Couldn't get partition location from folder will generate one instead and use: '{}', original error: {}", - replicationPath, e.getMessage()); - return replicationPath; } } diff --git a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java index 67b18a8a..3212631e 100644 --- a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java +++ b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/CircusTrainHdfsHdfsIntegrationTest.java @@ -300,9 +300,7 @@ public void partitionedTableDifferentBasePaths() throws Exception { public void checkAssertion() throws Exception { Table table = replicaCatalog.client().getTable(DATABASE, PARTITIONED_TABLE); String warehouseBase = replicaWarehouseUri.toURI().toString(); - System.out.println(table.getSd().getLocation()); - System.out.println(warehouseBase + "ct_database/ct_table_p"); - assertTrue(table.getSd().getLocation().matches(warehouseBase + "/ct_database/ct_table_p")); + assertTrue(table.getSd().getLocation().matches(warehouseBase + "ct_database/ct_table_p")); List listPartitions = replicaCatalog .client() .listPartitions(DATABASE, PARTITIONED_TABLE, (short) -1); diff --git a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java index d0553904..c469eb76 100644 --- a/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java +++ b/circus-train-integration-tests/src/test/java/com/hotels/bdp/circustrain/integration/IntegrationTestHelper.java @@ -1,5 +1,5 @@ /** - * Copyright (C) 2016-2020 Expedia, Inc. + * Copyright (C) 2016-2021 Expedia, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.