Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 173 struct evolution #174

Merged
merged 22 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.hotels.bdp.circustrain.core.PartitionsAndStatistics;
import com.hotels.bdp.circustrain.core.TableAndStatistics;
import com.hotels.bdp.circustrain.core.event.EventUtils;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.exception.MetaStoreClientException;
import com.hotels.hcommon.hive.metastore.util.LocationUtils;
Expand All @@ -73,6 +74,7 @@ public class Replica extends HiveEndpoint {
private final ReplicaCatalogListener replicaCatalogListener;
private final ReplicationMode replicationMode;
private final TableReplication tableReplication;
private final AlterTableService alterTableService;
private int partitionBatchSize = 1000;

/**
Expand All @@ -85,13 +87,15 @@ public class Replica extends HiveEndpoint {
ReplicaTableFactory replicaTableFactory,
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication) {
TableReplication tableReplication,
AlterTableService alterTableService) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
tableFactory = replicaTableFactory;
this.housekeepingListener = housekeepingListener;
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.alterTableService = alterTableService;
}

/**
Expand All @@ -106,6 +110,7 @@ public class Replica extends HiveEndpoint {
HousekeepingListener housekeepingListener,
ReplicaCatalogListener replicaCatalogListener,
TableReplication tableReplication,
AlterTableService alterTableService,
int partitionBatchSize) {
super(replicaCatalog.getName(), replicaHiveConf, replicaMetaStoreClientSupplier);
this.replicaCatalogListener = replicaCatalogListener;
Expand All @@ -114,6 +119,7 @@ public class Replica extends HiveEndpoint {
replicationMode = tableReplication.getReplicationMode();
this.tableReplication = tableReplication;
this.partitionBatchSize = partitionBatchSize;
this.alterTableService = alterTableService;
}

public void updateMetadata(
Expand Down Expand Up @@ -310,7 +316,7 @@ private Optional<Table> updateTableMetadata(
makeSureCanReplicate(oldReplicaTable.get(), replicaTable.getTable());
LOG.debug("Existing replica table found, altering.");
try {
client.alter_table(replicaDatabaseName, replicaTableName, replicaTable.getTable());
alterTableService.alterTable(client, oldReplicaTable.get(), replicaTable.getTable());
updateTableColumnStatistics(client, replicaTable);
} catch (TException e) {
throw new MetaStoreClientException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2016-2019 Expedia, Inc.
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,9 @@
import com.hotels.bdp.circustrain.api.event.ReplicaCatalogListener;
import com.hotels.bdp.circustrain.api.listener.HousekeepingListener;
import com.hotels.bdp.circustrain.core.HiveEndpointFactory;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.bdp.circustrain.core.replica.hive.CopyPartitionsOperation;
import com.hotels.bdp.circustrain.core.replica.hive.RenameTableOperation;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

@Profile({ Modules.REPLICATION })
Expand Down Expand Up @@ -61,7 +64,9 @@ public ReplicaFactory(
@Override
public Replica newInstance(TableReplication tableReplication) {
ReplicaTableFactory replicaTableFactory = replicaTableFactoryPicker.newInstance(tableReplication);
AlterTableService alterTableService = new AlterTableService(new CopyPartitionsOperation(),
new RenameTableOperation());
return new Replica(replicaCatalog, replicaHiveConf, replicaMetaStoreClientSupplier, replicaTableFactory,
housekeepingListener, replicaCatalogListener, tableReplication);
housekeepingListener, replicaCatalogListener, tableReplication, alterTableService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.bdp.circustrain.core.replica.Replica;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

public class AlterTableService {

private static final Logger LOG = LoggerFactory.getLogger(Replica.class);

private CopyPartitionsOperation copyPartitionsOperation;
private RenameTableOperation renameTableOperation;

public AlterTableService(
CopyPartitionsOperation copyPartitionsOperation,
RenameTableOperation renameTableOperation) {
this.copyPartitionsOperation = copyPartitionsOperation;
this.renameTableOperation = renameTableOperation;
}

public void alterTable(CloseableMetaStoreClient client, Table oldTable, Table newTable) throws TException {
List<FieldSchema> oldColumns = oldTable.getSd().getCols();
List<FieldSchema> newColumns = newTable.getSd().getCols();
if (hasAnyChangedColumns(oldColumns, newColumns)) {
LOG
.info("Found columns that have changed type, attempting to recreate target table with the new columns."
+ "Old columns: {}, new columns: {}", oldColumns, newColumns);
Table tempTable = new Table(newTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this copy all the table properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does indeed. We can either (1) scrub all the table params here, or (2) I've created a DropTableService which removes custom table params before calling the drop table command. If thats overly complicated I can remove the new service and just do (1) instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK so if we removed the params here then we don't have to worry about the potential of Beekeeper picking up the drop of the temp table etc.? I think I prefer that approach unless some of the params are useful to have on the temp table? I don't think any of the intermediate operations use them in any way so probably not? And to be clear, the original table params will be kept on the final end result table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay cool, I'm happy to do this, I'll remove the DropTableService, and just remove the params as part of this service.

The intermediate ops dont use the params, but i think beekeeper may pick up on the alter partition events (though will ignore them as the location doesnt change). And yes, the original params will be on the final table - i'll add some tests for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried this out, and actually it doesn't seem possible. We can't do an alter table on the table as it still has the old schema, so it fails with the same error. So the DropTableService looks better as we do the alter/drop once everything has been updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let's go with that for now and keep an eye on it when this is in use.

String tempName = newTable.getTableName() + "_temp";
tempTable.setTableName(tempName);
try {
client.createTable(tempTable);
copyPartitionsOperation.execute(client, newTable, tempTable);
renameTableOperation.execute(client, tempTable, newTable);
} finally {
client.dropTable(newTable.getDbName(), tempName, false, true);
}
} else {
client.alter_table(newTable.getDbName(), newTable.getTableName(), newTable);
}
}

private boolean hasAnyChangedColumns(List<FieldSchema> oldColumns, List<FieldSchema> newColumns) {
Map<String, FieldSchema> oldColumnsMap = oldColumns.stream()
.collect(Collectors.toMap(FieldSchema::getName, Function.identity()));
for (FieldSchema newColumn : newColumns) {
if (oldColumnsMap.containsKey(newColumn.getName())) {
FieldSchema oldColumn = oldColumnsMap.get(newColumn.getName());
if (!oldColumn.getType().equals(newColumn.getType())) {
return true;
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;
import com.hotels.hcommon.hive.metastore.iterator.PartitionIterator;

public class CopyPartitionsOperation {

private static final Logger LOG = LoggerFactory.getLogger(CopyPartitionsOperation.class);
private static final short DEFAULT_BATCH_SIZE = 1000;

private short partitionBatchSize;

public CopyPartitionsOperation() {
this(DEFAULT_BATCH_SIZE);
}

@VisibleForTesting
CopyPartitionsOperation(short partitionBatchSize) {
this.partitionBatchSize = partitionBatchSize;
}

/**
* Copies partitions from oldTable to newTable, partitions copied are modified to take the schema of newTable
*/
public void execute(CloseableMetaStoreClient client, Table oldTable, Table newTable) throws TException {
int count = 0;
String databaseName = newTable.getDbName();
String tableName = newTable.getTableName();
PartitionIterator partitionIterator = new PartitionIterator(client, oldTable, partitionBatchSize);
while (partitionIterator.hasNext()) {
List<Partition> batch = new ArrayList<>();
for (int i = 0; i < partitionBatchSize && partitionIterator.hasNext(); i++) {
Partition partition = partitionIterator.next();
count++;
Partition copy = new Partition(partition);
copy.setDbName(databaseName);
copy.setTableName(tableName);
StorageDescriptor sd = new StorageDescriptor(partition.getSd());
sd.setCols(newTable.getSd().getCols());
copy.setSd(sd);
batch.add(copy);
}
LOG.info("Copying batch of size {} to {}.{}", batch.size(), databaseName, tableName);
client.add_partitions(batch);
}
LOG.info("Copied {} partitions to {}.{}", count, databaseName, tableName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2016-2020 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hotels.bdp.circustrain.core.replica.hive;

import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

public class RenameTableOperation {

private final static Logger LOG = LoggerFactory.getLogger(RenameTableOperation.class);

/**
* <p>
* NOTE: assumes both `from` and `to` exist
* </p>
* Renames tables 'from' table into 'to' table, at the end of the operation 'from' will be gone and 'to' will be
* renamed.
*/
public void execute(CloseableMetaStoreClient client, Table from, Table to) throws TException {
LOG
.info("Renaming table {}.{} to {}.{}", from.getDbName(), from.getTableName(), to.getDbName(),
to.getTableName());
from = client.getTable(from.getDbName(), from.getTableName());
to = client.getTable(to.getDbName(), to.getTableName());
String fromTableName = from.getTableName();
String toTableName = to.getTableName();
String toTableNameTemp = toTableName + "_original";
try {
from.setTableName(toTableName);
to.setTableName(toTableNameTemp);
client.alter_table(to.getDbName(), toTableName, to);
client.alter_table(from.getDbName(), fromTableName, from);
} finally {
client.dropTable(to.getDbName(), toTableNameTemp, false, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to double check beekeeper props and we don't accidentally delete folders when we drop this table.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.hotels.bdp.circustrain.api.metadata.TableTransformation;
import com.hotels.bdp.circustrain.core.PartitionsAndStatistics;
import com.hotels.bdp.circustrain.core.TableAndStatistics;
import com.hotels.bdp.circustrain.core.replica.hive.AlterTableService;
import com.hotels.hcommon.hive.metastore.client.api.CloseableMetaStoreClient;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -118,6 +119,7 @@ public class ReplicaTest {
private @Captor ArgumentCaptor<SetPartitionsStatsRequest> setStatsRequestCaptor;
private @Mock HousekeepingListener houseKeepingListener;
private @Mock ReplicaCatalogListener replicaCatalogListener;
private @Mock AlterTableService alterTableService;

private final ReplicaTableFactory tableFactory = new ReplicaTableFactory(SOURCE_META_STORE_URIS,
TableTransformation.IDENTITY, PartitionTransformation.IDENTITY, ColumnStatisticsTransformation.IDENTITY);
Expand Down Expand Up @@ -178,14 +180,14 @@ private void convertExistingReplicaTableToView() {

private Replica newReplica(TableReplication tableReplication) {
return new Replica(replicaCatalog, hiveConf, metaStoreClientSupplier, tableFactory, houseKeepingListener,
replicaCatalogListener, tableReplication, TEST_PARTITION_BATCH_SIZE);
replicaCatalogListener, tableReplication, alterTableService, TEST_PARTITION_BATCH_SIZE);
}

@Test
public void alteringExistingUnpartitionedReplicaTableSucceeds() throws TException, IOException {
existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId");
replica.updateMetadata(EVENT_ID, tableAndStatistics, DB_NAME, TABLE_NAME, mockReplicaLocationManager);
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
verify(mockReplicaLocationManager, never()).addCleanUpLocation(anyString(), any(Path.class));
}
Expand All @@ -195,7 +197,7 @@ public void alteringExistingUnpartitionedReplicaTableWithNoStatsSucceeds() throw
tableAndStatistics = new TableAndStatistics(sourceTable, null);
existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId");
replica.updateMetadata(EVENT_ID, tableAndStatistics, DB_NAME, TABLE_NAME, mockReplicaLocationManager);
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockMetaStoreClient, never()).updateTableColumnStatistics(any(ColumnStatistics.class));
verify(mockReplicaLocationManager, never()).addCleanUpLocation(anyString(), any(Path.class));
}
Expand All @@ -206,7 +208,7 @@ public void alteringExistingUnpartitionedReplicaViewSucceeds() throws TException
convertExistingReplicaTableToView();
existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId");
replica.updateMetadata(EVENT_ID, tableAndStatistics, DB_NAME, TABLE_NAME, mockReplicaLocationManager);
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockReplicaLocationManager, never()).addCleanUpLocation(anyString(), any(Path.class));
}

Expand Down Expand Up @@ -332,7 +334,7 @@ public void alteringExistingPartitionedReplicaTableSucceeds() throws TException,
new PartitionsAndStatistics(sourceTable.getPartitionKeys(), Collections.<Partition>emptyList(),
Collections.<String, List<ColumnStatisticsObj>>emptyMap()),
DB_NAME, TABLE_NAME, mockReplicaLocationManager);
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
verify(mockReplicaLocationManager, never()).addCleanUpLocation(anyString(), any(Path.class));
}
Expand All @@ -350,7 +352,7 @@ public void alteringExistingPartitionedReplicaViewSucceeds() throws TException,
new PartitionsAndStatistics(sourceTable.getPartitionKeys(), Collections.<Partition>emptyList(),
Collections.<String, List<ColumnStatisticsObj>>emptyMap()),
DB_NAME, TABLE_NAME, mockReplicaLocationManager);
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockReplicaLocationManager, never()).addCleanUpLocation(anyString(), any(Path.class));
}

Expand Down Expand Up @@ -464,7 +466,7 @@ private void alterExistingPartitionedReplicaTableWithNewPartitionsInBatches(
.updateMetadata(EVENT_ID, tableAndStatistics, partitionsAndStatistics, DB_NAME, TABLE_NAME,
mockReplicaLocationManager);

verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
verify(mockReplicaLocationManager, times(numTestAlterPartitions)).addCleanUpLocation(anyString(), any(Path.class));
verify(mockMetaStoreClient, times(numAlterBatches)).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture());
Expand Down Expand Up @@ -597,7 +599,7 @@ public void alteringExistingPartitionedReplicaViewWithPartitionsSucceeds() throw
.updateMetadata(EVENT_ID, tableAndStatistics, partitionsAndStatistics, DB_NAME, TABLE_NAME,
mockReplicaLocationManager);

verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
verify(mockMetaStoreClient).alter_partitions(eq(DB_NAME), eq(TABLE_NAME), alterPartitionCaptor.capture());
verify(mockMetaStoreClient).add_partitions(addPartitionCaptor.capture());
Expand Down Expand Up @@ -640,7 +642,7 @@ public int compare(ColumnStatistics o1, ColumnStatistics o2) {
public void updateMetadataCalledWithoutPartitionsDoesNotCleanUpLocations() throws TException, IOException {
existingReplicaTable.getParameters().put(REPLICATION_EVENT.parameterName(), "previousEventId");
replica.updateMetadata(EVENT_ID, tableAndStatistics, DB_NAME, TABLE_NAME, mockReplicaLocationManager);
verify(mockMetaStoreClient).alter_table(eq(DB_NAME), eq(TABLE_NAME), any(Table.class));
verify(alterTableService).alterTable(eq(mockMetaStoreClient), eq(existingReplicaTable), any(Table.class));
verify(mockMetaStoreClient).updateTableColumnStatistics(columnStatistics);
verify(mockReplicaLocationManager, never()).addCleanUpLocation(anyString(), any(Path.class));
}
Expand Down
Loading