Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 173 struct evolution #174

Merged
merged 22 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## [16.1.0] - TBD
### Fixed
Fixed issue where replication breaks if struct columns have changed. See [#173](https://github.com/HotelsDotCom/circus-train/issues/173).

## [16.0.0] - 2020-02-26
### Changed
* Minimum supported Java version is now 8 (was 7).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.hotels.bdp.circustrain.core.PartitionsAndStatistics;
import com.hotels.bdp.circustrain.core.TableAndStatistics;
import com.hotels.bdp.circustrain.core.event.EventUtils;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.exception.MetaStoreClientException;
import com.hotels.hcommon.hive.metastore.util.LocationUtils;
Expand All @@ -73,6 +74,7 @@ public class Replica extends HiveEndpoint {
private final ReplicaCatalogListener replicaCatalogListener;
private final ReplicationMode replicationMode;
private final TableReplication tableReplication;
private final AlterTableService alterTableService;
private int partitionBatchSize = 1000;

/**
Expand All @@ -85,13 +87,15 @@ public class Replica extends HiveEndpoint {
ReplicaTableFactory replicaTableFactory,
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication) {
TableReplication tableReplication,
AlterTableService alterTableService) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
tableFactory = replicaTableFactory;
this.housekeepingListener = housekeepingListener;
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.alterTableService = alterTableService;
}

/**
Expand All @@ -106,6 +110,7 @@ public class Replica extends HiveEndpoint {
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication,
AlterTableService alterTableService,
int partitionBatchSize) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
Expand All @@ -114,6 +119,7 @@ public class Replica extends HiveEndpoint {
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.partitionBatchSize = partitionBatchSize;
this.alterTableService = alterTableService;
}

public void updateMetadata(
Expand Down Expand Up @@ -310,7 +316,7 @@ private Optional<Table> updateTableMetadata(
makeSureCanReplicate(oldReplicaTable.get(), replicaTable.getTable());
LOG.debug("Existing replica table found, altering.");
try {
client.alter_table(replicaDatabaseName, replicaTableName, replicaTable.getTable());
alterTableService.alterTable(client, oldReplicaTable.get(), replicaTable.getTable());
updateTableColumnStatistics(client, replicaTable);
} catch (TException e) {
throw new MetaStoreClientException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,11 @@
import com.hotels.bdp.circustrain.api.event.ReplicaCatalogListener;
import com.hotels.bdp.circustrain.api.listener.HousekeepingListener;
import com.hotels.bdp.circustrain.core.HiveEndpointFactory;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.bdp.circustrain.core.replica.hive.CopyPartitionsOperation;
import com.hotels.bdp.circustrain.core.replica.hive.DropTableService;
import com.hotels.bdp.circustrain.core.replica.hive.RenameTableOperation;
import com.hotels.bdp.circustrain.core.transformation.TableParametersTransformation;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

@Profile({ Modules.REPLICATION })
Expand All @@ -41,6 +46,7 @@ public class ReplicaFactory implements HiveEndpointFactory<Replica> {
private final HousekeepingListener housekeepingListener;
private final ReplicaCatalogListener replicaCatalogListener;
private final ReplicaTableFactoryProvider replicaTableFactoryPicker;
private final TableParametersTransformation tableParametersTransformation;

@Autowired
public ReplicaFactory(
Expand All @@ -49,19 +55,24 @@ public ReplicaFactory(
Supplier<CloseableMetaStoreClient> replicaMetaStoreClientSupplier,
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
ReplicaTableFactoryProvider replicaTableFactoryPicker) {
ReplicaTableFactoryProvider replicaTableFactoryPicker,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ReplicaTableFactoryProvider replicaTableFactoryPicker,
ReplicaTableFactoryProvider replicaTableFactoryProvider,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done :)

TableParametersTransformation tableParametersTransformation) {
this.replicaCatalog = replicaCatalog;
this.replicaHiveConf = replicaHiveConf;
this.replicaMetaStoreClientSupplier = replicaMetaStoreClientSupplier;
this.housekeepingListener = housekeepingListener;
this.replicaCatalogListener = replicaCatalogListener;
this.replicaTableFactoryPicker = replicaTableFactoryPicker;
this.tableParametersTransformation = tableParametersTransformation;
}

@Override
public Replica newInstance(TableReplication tableReplication) {
ReplicaTableFactory replicaTableFactory = replicaTableFactoryPicker.newInstance(tableReplication);
DropTableService dropTableService = new DropTableService(tableParametersTransformation);
AlterTableService alterTableService = new AlterTableService(dropTableService, new CopyPartitionsOperation(),
new RenameTableOperation(dropTableService));
return new Replica(replicaCatalog, replicaHiveConf, replicaMetaStoreClientSupplier, replicaTableFactory,
housekeepingListener, replicaCatalogListener, tableReplication);
housekeepingListener, replicaCatalogListener, tableReplication, alterTableService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

public class AlterTableService {

private static final Logger LOG = LoggerFactory.getLogger(AlterTableService.class);

private DropTableService dropTableService;
private CopyPartitionsOperation copyPartitionsOperation;
private RenameTableOperation renameTableOperation;

public AlterTableService(
DropTableService dropTableService,
CopyPartitionsOperation copyPartitionsOperation,
RenameTableOperation renameTableOperation) {
this.dropTableService = dropTableService;
this.copyPartitionsOperation = copyPartitionsOperation;
this.renameTableOperation = renameTableOperation;
}

public void alterTable(CloseableMetaStoreClient client, Table oldTable, Table newTable) throws TException {
List<FieldSchema> oldColumns = oldTable.getSd().getCols();
List<FieldSchema> newColumns = newTable.getSd().getCols();
if (hasAnyChangedColumns(oldColumns, newColumns)) {
LOG
.info("Found columns that have changed type, attempting to recreate target table with the new columns."
+ "Old columns: {}, new columns: {}", oldColumns, newColumns);
Table tempTable = new Table(newTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this copy all the table properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does indeed. We can either (1) scrub all the table params here, or (2) I've created a DropTableService which removes custom table params before calling the drop table command. If thats overly complicated I can remove the new service and just do (1) instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK so if we removed the params here then we don't have to worry about the potential of Beekeeper picking up the drop of the temp table etc.? I think I prefer that approach unless some of the params are useful to have on the temp table? I don't think any of the intermediate operations use them in any way so probably not? And to be clear, the original table params will be kept on the final end result table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay cool, I'm happy to do this, I'll remove the DropTableService, and just remove the params as part of this service.

The intermediate ops dont use the params, but i think beekeeper may pick up on the alter partition events (though will ignore them as the location doesnt change). And yes, the original params will be on the final table - i'll add some tests for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this out, and actually it doesn't seem possible. We can't do an alter table on the table as it still has the old schema, so it fails with the same error. So the DropTableService looks better as we do the alter/drop once everything has been updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's go with that for now and keep an eye on it when this is in use.

String tempName = newTable.getTableName() + "_temp";
tempTable.setTableName(tempName);
try {
client.createTable(tempTable);
copyPartitionsOperation.execute(client, newTable, tempTable);
renameTableOperation.execute(client, tempTable, newTable);
} finally {
dropTableService.removeCustomParamsAndDrop(client, tempTable.getDbName(), tempName);
}
} else {
client.alter_table(newTable.getDbName(), newTable.getTableName(), newTable);
}
}

private boolean hasAnyChangedColumns(List<FieldSchema> oldColumns, List<FieldSchema> newColumns) {
Map<String, FieldSchema> oldColumnsMap = oldColumns.stream()
.collect(Collectors.toMap(FieldSchema::getName, Function.identity()));
for (FieldSchema newColumn : newColumns) {
if (oldColumnsMap.containsKey(newColumn.getName())) {
FieldSchema oldColumn = oldColumnsMap.get(newColumn.getName());
if (!oldColumn.getType().equals(newColumn.getType())) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator;

public class CopyPartitionsOperation {

private static final Logger LOG = LoggerFactory.getLogger(CopyPartitionsOperation.class);
private static final short DEFAULT_BATCH_SIZE = 1000;

private short partitionBatchSize;

public CopyPartitionsOperation() {
this(DEFAULT_BATCH_SIZE);
}

@VisibleForTesting
CopyPartitionsOperation(short partitionBatchSize) {
this.partitionBatchSize = partitionBatchSize;
}

/**
* Copies partitions from oldTable to newTable, partitions copied are modified to take the schema of newTable
*/
public void execute(CloseableMetaStoreClient client, Table oldTable, Table newTable) throws TException {
int count = 0;
String databaseName = newTable.getDbName();
String tableName = newTable.getTableName();
PartitionIterator partitionIterator = new PartitionIterator(client, oldTable, partitionBatchSize);
while (partitionIterator.hasNext()) {
List<Partition> batch = new ArrayList<>();
for (int i = 0; i < partitionBatchSize && partitionIterator.hasNext(); i++) {
Partition partition = partitionIterator.next();
count++;
Partition copy = new Partition(partition);
copy.setDbName(databaseName);
copy.setTableName(tableName);
StorageDescriptor sd = new StorageDescriptor(partition.getSd());
sd.setCols(newTable.getSd().getCols());
copy.setSd(sd);
batch.add(copy);
}
LOG.info("Copying batch of size {} to {}.{}", batch.size(), databaseName, tableName);
client.add_partitions(batch);
}
LOG.info("Copied {} partitions to {}.{}", count, databaseName, tableName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.Map;

import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.bdp.circustrain.core.transformation.TableParametersTransformation;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

public class DropTableService {

private static final Logger LOG = LoggerFactory.getLogger(DropTableService.class);

private TableParametersTransformation tableParametersTransformation;

public DropTableService(TableParametersTransformation tableParametersTransformation) {
this.tableParametersTransformation = tableParametersTransformation;
}

public void removeCustomParamsAndDrop(
CloseableMetaStoreClient client,
String databaseName,
String tableName) throws TException {
Table table;
try {
table = client.getTable(databaseName, tableName);
} catch (NoSuchObjectException e) {
return;
}
Map<String, String> tableParameters = table.getParameters();
if (tableParameters != null && !tableParameters.isEmpty()) {
Map<String, String> transformationTableParameters = tableParametersTransformation.getTableParameters();
if (!transformationTableParameters.isEmpty()) {
transformationTableParameters.entrySet().forEach(parameter -> {
tableParameters.remove(parameter.getKey(), parameter.getValue());
});
table.setParameters(tableParameters);
client.alter_table(databaseName, tableName, table);
}
}
LOG
.info("Dropping table '{}.{}'.", table.getDbName(), table.getTableName());
client.dropTable(table.getDbName(), table.getTableName(), false, true);
}
}
Loading