Skip to content

Commit

Permalink
Issue 173 struct evolution (#174)
Browse files Browse the repository at this point in the history
* Initial commit with flakey test

* Updates to test

* Updates to test

* Adding column in between two columns

* Some changes to use Parquet

* Test now failing successfully

* Creating AlterTableService and operations

* Fixing integration test

* Fixing integration test

* Fixing unit tests and copyright

* Some fixes

* Adding DropTableService which removes custom table parameters before dropping

* Adding changelog and other bits

* Added catch for non existent table

* Fixing some bits, renaming, adding external key to table when dropping

* Undoing unnecessary changes

* Fixes

* Fixing pom

* Fixing pom and test names

* Minor change

* Fixing tests
  • Loading branch information
Max Jacobs authored Mar 16, 2020
1 parent 61c91b3 commit f6f5389
Show file tree
Hide file tree
Showing 17 changed files with 1,108 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
* Updated S3S3Copier to have a configurable max number of threads to pass to TransferManager.
* Fix AssumeRoleCredentialProvider not auto-renewing credentials on expiration.

### Fixed
* Fixed issue where replication breaks if struct columns have changed. See [#173](https://github.com/HotelsDotCom/circus-train/issues/173).

## [16.0.0] - 2020-02-26
### Changed
* Minimum supported Java version is now 8 (was 7).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.hotels.bdp.circustrain.core.PartitionsAndStatistics;
import com.hotels.bdp.circustrain.core.TableAndStatistics;
import com.hotels.bdp.circustrain.core.event.EventUtils;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.exception.MetaStoreClientException;
import com.hotels.hcommon.hive.metastore.util.LocationUtils;
Expand All @@ -73,6 +74,7 @@ public class Replica extends HiveEndpoint {
private final ReplicaCatalogListener replicaCatalogListener;
private final ReplicationMode replicationMode;
private final TableReplication tableReplication;
private final AlterTableService alterTableService;
private int partitionBatchSize = 1000;

/**
Expand All @@ -85,13 +87,15 @@ public class Replica extends HiveEndpoint {
ReplicaTableFactory replicaTableFactory,
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication) {
TableReplication tableReplication,
AlterTableService alterTableService) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
tableFactory = replicaTableFactory;
this.housekeepingListener = housekeepingListener;
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.alterTableService = alterTableService;
}

/**
Expand All @@ -106,6 +110,7 @@ public class Replica extends HiveEndpoint {
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication,
AlterTableService alterTableService,
int partitionBatchSize) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
Expand All @@ -114,6 +119,7 @@ public class Replica extends HiveEndpoint {
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.partitionBatchSize = partitionBatchSize;
this.alterTableService = alterTableService;
}

public void updateMetadata(
Expand Down Expand Up @@ -310,7 +316,7 @@ private Optional<Table> updateTableMetadata(
makeSureCanReplicate(oldReplicaTable.get(), replicaTable.getTable());
LOG.debug("Existing replica table found, altering.");
try {
client.alter_table(replicaDatabaseName, replicaTableName, replicaTable.getTable());
alterTableService.alterTable(client, oldReplicaTable.get(), replicaTable.getTable());
updateTableColumnStatistics(client, replicaTable);
} catch (TException e) {
throw new MetaStoreClientException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,11 @@
import com.hotels.bdp.circustrain.api.event.ReplicaCatalogListener;
import com.hotels.bdp.circustrain.api.listener.HousekeepingListener;
import com.hotels.bdp.circustrain.core.HiveEndpointFactory;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.bdp.circustrain.core.replica.hive.CopyPartitionsOperation;
import com.hotels.bdp.circustrain.core.replica.hive.DropTableService;
import com.hotels.bdp.circustrain.core.replica.hive.RenameTableOperation;
import com.hotels.bdp.circustrain.core.transformation.TableParametersTransformation;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

@Profile({ Modules.REPLICATION })
Expand All @@ -49,19 +54,22 @@ public ReplicaFactory(
Supplier<CloseableMetaStoreClient> replicaMetaStoreClientSupplier,
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
ReplicaTableFactoryProvider replicaTableFactoryPicker) {
ReplicaTableFactoryProvider replicaTableFactoryProvider) {
this.replicaCatalog = replicaCatalog;
this.replicaHiveConf = replicaHiveConf;
this.replicaMetaStoreClientSupplier = replicaMetaStoreClientSupplier;
this.housekeepingListener = housekeepingListener;
this.replicaCatalogListener = replicaCatalogListener;
this.replicaTableFactoryPicker = replicaTableFactoryPicker;
this.replicaTableFactoryPicker = replicaTableFactoryProvider;
}

@Override
public Replica newInstance(TableReplication tableReplication) {
ReplicaTableFactory replicaTableFactory = replicaTableFactoryPicker.newInstance(tableReplication);
DropTableService dropTableService = new DropTableService();
AlterTableService alterTableService = new AlterTableService(dropTableService, new CopyPartitionsOperation(),
new RenameTableOperation(dropTableService));
return new Replica(replicaCatalog, replicaHiveConf, replicaMetaStoreClientSupplier, replicaTableFactory,
housekeepingListener, replicaCatalogListener, tableReplication);
housekeepingListener, replicaCatalogListener, tableReplication, alterTableService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

public class AlterTableService {

private static final Logger LOG = LoggerFactory.getLogger(AlterTableService.class);

private DropTableService dropTableService;
private CopyPartitionsOperation copyPartitionsOperation;
private RenameTableOperation renameTableOperation;

public AlterTableService(
DropTableService dropTableService,
CopyPartitionsOperation copyPartitionsOperation,
RenameTableOperation renameTableOperation) {
this.dropTableService = dropTableService;
this.copyPartitionsOperation = copyPartitionsOperation;
this.renameTableOperation = renameTableOperation;
}

public void alterTable(CloseableMetaStoreClient client, Table oldTable, Table newTable) throws TException {
List<FieldSchema> oldColumns = oldTable.getSd().getCols();
List<FieldSchema> newColumns = newTable.getSd().getCols();
if (hasAnyChangedColumns(oldColumns, newColumns)) {
LOG
.info("Found columns that have changed type, attempting to recreate target table with the new columns."
+ "Old columns: {}, new columns: {}", oldColumns, newColumns);
Table tempTable = new Table(newTable);
String tempName = newTable.getTableName() + "_temp";
tempTable.setTableName(tempName);
try {
client.createTable(tempTable);
copyPartitionsOperation.execute(client, newTable, tempTable);
renameTableOperation.execute(client, tempTable, newTable);
} finally {
dropTableService.removeTableParamsAndDrop(client, tempTable.getDbName(), tempName);
}
} else {
client.alter_table(newTable.getDbName(), newTable.getTableName(), newTable);
}
}

private boolean hasAnyChangedColumns(List<FieldSchema> oldColumns, List<FieldSchema> newColumns) {
Map<String, FieldSchema> oldColumnsMap = oldColumns.stream()
.collect(Collectors.toMap(FieldSchema::getName, Function.identity()));
for (FieldSchema newColumn : newColumns) {
if (oldColumnsMap.containsKey(newColumn.getName())) {
FieldSchema oldColumn = oldColumnsMap.get(newColumn.getName());
if (!oldColumn.getType().equals(newColumn.getType())) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator;

public class CopyPartitionsOperation {

private static final Logger LOG = LoggerFactory.getLogger(CopyPartitionsOperation.class);
private static final short DEFAULT_BATCH_SIZE = 1000;

private short partitionBatchSize;

public CopyPartitionsOperation() {
this(DEFAULT_BATCH_SIZE);
}

@VisibleForTesting
CopyPartitionsOperation(short partitionBatchSize) {
this.partitionBatchSize = partitionBatchSize;
}

/**
* Copies partitions from oldTable to newTable, partitions copied are modified to take the schema of newTable
*/
public void execute(CloseableMetaStoreClient client, Table oldTable, Table newTable) throws TException {
int count = 0;
String databaseName = newTable.getDbName();
String tableName = newTable.getTableName();
PartitionIterator partitionIterator = new PartitionIterator(client, oldTable, partitionBatchSize);
while (partitionIterator.hasNext()) {
List<Partition> batch = new ArrayList<>();
for (int i = 0; i < partitionBatchSize && partitionIterator.hasNext(); i++) {
Partition partition = partitionIterator.next();
count++;
Partition copy = new Partition(partition);
copy.setDbName(databaseName);
copy.setTableName(tableName);
StorageDescriptor sd = new StorageDescriptor(partition.getSd());
sd.setCols(newTable.getSd().getCols());
copy.setSd(sd);
batch.add(copy);
}
LOG.info("Copying batch of size {} to {}.{}", batch.size(), databaseName, tableName);
client.add_partitions(batch);
}
LOG.info("Copied {} partitions to {}.{}", count, databaseName, tableName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.Collections;
import java.util.Map;

import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

public class DropTableService {

private static final Logger LOG = LoggerFactory.getLogger(DropTableService.class);
private static final String EXTERNAL_KEY = "EXTERNAL";
private static final String IS_EXTERNAL = "TRUE";

/**
* Removes all parameters from a table before dropping the table.
*/
public void removeTableParamsAndDrop(
CloseableMetaStoreClient client,
String databaseName,
String tableName) throws TException {
Table table;
try {
table = client.getTable(databaseName, tableName);
} catch (NoSuchObjectException e) {
return;
}
Map<String, String> tableParameters = table.getParameters();
if (tableParameters != null && !tableParameters.isEmpty()) {
if (isExternal(tableParameters)) {
table.setParameters(Collections.singletonMap(EXTERNAL_KEY, IS_EXTERNAL));
} else {
table.setParameters(Collections.emptyMap());
}
client.alter_table(databaseName, tableName, table);
}
LOG
.info("Dropping table '{}.{}'.", table.getDbName(), table.getTableName());
client.dropTable(table.getDbName(), table.getTableName(), false, true);
}

private boolean isExternal(Map<String, String> tableParameters) {
CaseInsensitiveMap caseInsensitiveParams = new CaseInsensitiveMap(tableParameters);
return IS_EXTERNAL.equalsIgnoreCase((String) caseInsensitiveParams.get(EXTERNAL_KEY));
}
}
Loading

0 comments on commit f6f5389

Please sign in to comment.