Skip to content

Commit

Permalink
Return BaseTable in TrinoCatalog.loadTable
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 7, 2025
1 parent 92ef79b commit 0d00c6a
Show file tree
Hide file tree
Showing 12 changed files with 31 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ public ConnectorTableHandle getTableHandle(

BaseTable table;
try {
table = (BaseTable) catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), tableName.getTableName()));
table = catalog.loadTable(session, new SchemaTableName(tableName.getSchemaName(), tableName.getTableName()));
}
catch (TableNotFoundException e) {
return null;
Expand Down Expand Up @@ -883,7 +883,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
IcebergTableHandle tableHandle = checkValidTableHandle(table);
// This method does not calculate column metadata for the projected columns
checkArgument(tableHandle.getProjectedColumns().isEmpty(), "Unexpected projected columns");
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
BaseTable icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
List<ColumnMetadata> columns = getColumnMetadatas(SchemaParser.fromJson(tableHandle.getTableSchemaJson()), typeManager);
return new ConnectorTableMetadata(tableHandle.getSchemaTableName(), columns, getIcebergTableProperties(icebergTable), getTableComment(icebergTable));
}
Expand Down Expand Up @@ -1858,11 +1858,11 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
IcebergTableHandle table)
{
IcebergOptimizeHandle optimizeHandle = (IcebergOptimizeHandle) executeHandle.procedureHandle();
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
BaseTable icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);

int tableFormatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
int tableFormatVersion = icebergTable.operations().current().formatVersion();
if (tableFormatVersion > OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION) {
throw new TrinoException(NOT_SUPPORTED, format(
"%s is not supported for Iceberg table format version > %d. Table %s format version is %s.",
Expand Down Expand Up @@ -2056,7 +2056,7 @@ private void executeOptimizeManifests(ConnectorSession session, IcebergTableExec
{
checkArgument(executeHandle.procedureHandle() instanceof IcebergOptimizeManifestsHandle, "Unexpected procedure handle %s", executeHandle.procedureHandle());

BaseTable icebergTable = (BaseTable) catalog.loadTable(session, executeHandle.schemaTableName());
BaseTable icebergTable = catalog.loadTable(session, executeHandle.schemaTableName());
List<ManifestFile> manifests = icebergTable.currentSnapshot().allManifests(icebergTable.io());
if (manifests.isEmpty()) {
return;
Expand Down Expand Up @@ -2100,7 +2100,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
{
IcebergExpireSnapshotsHandle expireSnapshotsHandle = (IcebergExpireSnapshotsHandle) executeHandle.procedureHandle();

Table table = catalog.loadTable(session, executeHandle.schemaTableName());
BaseTable table = catalog.loadTable(session, executeHandle.schemaTableName());
Duration retention = requireNonNull(expireSnapshotsHandle.retentionThreshold(), "retention is null");
validateTableExecuteParameters(
table,
Expand Down Expand Up @@ -2142,15 +2142,15 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
}

private static void validateTableExecuteParameters(
Table table,
BaseTable table,
SchemaTableName schemaTableName,
String procedureName,
Duration retentionThreshold,
Duration minRetention,
String minRetentionParameterName,
String sessionMinRetentionParameterName)
{
int tableFormatVersion = ((BaseTable) table).operations().current().formatVersion();
int tableFormatVersion = table.operations().current().formatVersion();
if (tableFormatVersion > CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION) {
// It is not known if future version won't bring any new kind of metadata or data files
// because of the way procedures are implemented it is safer to fail here than to potentially remove
Expand Down Expand Up @@ -2182,7 +2182,7 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu
{
IcebergRemoveOrphanFilesHandle removeOrphanFilesHandle = (IcebergRemoveOrphanFilesHandle) executeHandle.procedureHandle();

Table table = catalog.loadTable(session, executeHandle.schemaTableName());
BaseTable table = catalog.loadTable(session, executeHandle.schemaTableName());
Duration retention = requireNonNull(removeOrphanFilesHandle.retentionThreshold(), "retention is null");
validateTableExecuteParameters(
table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public final class IcebergUtil

private IcebergUtil() {}

public static Table loadIcebergTable(TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
public static BaseTable loadIcebergTable(TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table)
{
TableOperations operations = tableOperationsProvider.createTableOperations(
catalog,
Expand All @@ -229,7 +229,7 @@ public static Table loadIcebergTable(TrinoCatalog catalog, IcebergTableOperation
return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER);
}

public static Table getIcebergTableWithMetadata(
public static BaseTable getIcebergTableWithMetadata(
TrinoCatalog catalog,
IcebergTableOperationsProvider tableOperationsProvider,
ConnectorSession session,
Expand Down Expand Up @@ -299,7 +299,7 @@ public static List<Integer> buildPath(Map<Integer, Integer> indexParents, int fi
return ImmutableList.copyOf(path.reversed());
}

public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
public static Map<String, Object> getIcebergTableProperties(BaseTable icebergTable)
{
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
properties.put(FILE_FORMAT_PROPERTY, getFileFormat(icebergTable));
Expand All @@ -318,7 +318,7 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
properties.put(LOCATION_PROPERTY, icebergTable.location());
}

int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion();
int formatVersion = icebergTable.operations().current().formatVersion();
properties.put(FORMAT_VERSION_PROPERTY, formatVersion);

if (icebergTable.properties().containsKey(COMMIT_NUM_RETRIES)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public Map<String, Object> getMaterializedViewProperties(ConnectorSession sessio
.getSchemaTableName();

try {
Table storageTable = loadTable(session, definition.getStorageTable().orElseThrow().getSchemaTableName());
BaseTable storageTable = loadTable(session, definition.getStorageTable().orElseThrow().getSchemaTableName());
return ImmutableMap.<String, Object>builder()
.putAll(getIcebergTableProperties(storageTable))
.put(STORAGE_SCHEMA, storageTableName.getSchemaName())
Expand Down Expand Up @@ -246,7 +246,7 @@ protected Transaction newCreateOrReplaceTableTransaction(
BaseTable table;
Optional<TableMetadata> metadata = Optional.empty();
try {
table = (BaseTable) loadTable(session, new SchemaTableName(schemaTableName.getSchemaName(), schemaTableName.getTableName()));
table = loadTable(session, new SchemaTableName(schemaTableName.getSchemaName(), schemaTableName.getTableName()));
metadata = Optional.of(table.operations().current());
}
catch (TableNotFoundException _) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ Transaction newCreateOrReplaceTableTransaction(
* @return Iceberg table loaded
* @throws UnknownTableTypeException if table is not of Iceberg type in the metastore
*/
Table loadTable(ConnectorSession session, SchemaTableName schemaTableName);
BaseTable loadTable(ConnectorSession session, SchemaTableName schemaTableName);

/**
* Bulk load column metadata. The returned map may contain fewer entries then asked for.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -598,7 +597,7 @@ private void getCommentsFromIcebergMetadata(
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName table)
public BaseTable loadTable(ConnectorSession session, SchemaTableName table)
{
if (viewCache.asMap().containsKey(table) || materializedViewCache.asMap().containsKey(table)) {
throw new TableNotFoundException(table);
Expand Down Expand Up @@ -711,7 +710,7 @@ private Optional<List<ColumnMetadata>> getCachedColumnMetadata(com.amazonaws.ser
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
BaseTable table = loadTable(session, schemaTableName);
try {
deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.Transaction;
Expand Down Expand Up @@ -421,7 +420,7 @@ public Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
BaseTable table = loadTable(session, schemaTableName);
TableMetadata metadata = table.operations().current();

io.trino.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
Expand Down Expand Up @@ -475,14 +474,14 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
public BaseTable loadTable(ConnectorSession session, SchemaTableName schemaTableName)
{
TableMetadata metadata;
try {
metadata = uncheckedCacheGet(
tableMetadataCache,
schemaTableName,
() -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current());
() -> loadIcebergTable(this, tableOperationsProvider, session, schemaTableName).operations().current());
}
catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -338,7 +337,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
BaseTable table = loadTable(session, schemaTableName);

jdbcCatalog.dropTable(toIdentifier(schemaTableName), false);
try {
Expand Down Expand Up @@ -381,14 +380,14 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
public BaseTable loadTable(ConnectorSession session, SchemaTableName schemaTableName)
{
TableMetadata metadata;
try {
metadata = uncheckedCacheGet(
tableMetadataCache,
schemaTableName,
() -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current());
() -> loadIcebergTable(this, tableOperationsProvider, session, schemaTableName).operations().current());
}
catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
Expand Down Expand Up @@ -199,7 +198,7 @@ public Optional<Iterator<RelationCommentMetadata>> streamRelationComments(
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName table)
public BaseTable loadTable(ConnectorSession session, SchemaTableName table)
{
TableMetadata metadata;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class TrinoRestCatalog
private final Cache<Namespace, Namespace> remoteNamespaceMappingCache;
private final Cache<TableIdentifier, TableIdentifier> remoteTableMappingCache;

private final Cache<SchemaTableName, Table> tableCache = EvictableCacheBuilder.newBuilder()
private final Cache<SchemaTableName, BaseTable> tableCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();

Expand Down Expand Up @@ -489,7 +489,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
public BaseTable loadTable(ConnectorSession session, SchemaTableName schemaTableName)
{
Namespace namespace = toNamespace(schemaTableName.getSchemaName());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
}

@Override
public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName)
public BaseTable loadTable(ConnectorSession session, SchemaTableName schemaTableName)
{
TableMetadata metadata;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public static BaseTable loadTable(String tableName,
false,
new IcebergConfig().isHideMaterializedViewStorageTable(),
directExecutor());
return (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName(schemaName, tableName));
return loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName(schemaName, tableName));
}

public static Map<String, Long> getMetadataFileAndUpdatedMillis(TrinoFileSystem trinoFileSystem, String tableLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected String getMetadataLocation(String tableName)
{
TrinoCatalogFactory catalogFactory = ((IcebergConnector) getQueryRunner().getCoordinator().getConnector("iceberg")).getInjector().getInstance(TrinoCatalogFactory.class);
TrinoCatalog trinoCatalog = catalogFactory.create(getSession().getIdentity().toConnectorIdentity());
BaseTable table = (BaseTable) trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName));
BaseTable table = trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName));
return table.operations().current().metadataFileLocation();
}

Expand All @@ -115,7 +115,7 @@ protected String getTableLocation(String tableName)
{
TrinoCatalogFactory catalogFactory = ((IcebergConnector) getQueryRunner().getCoordinator().getConnector("iceberg")).getInjector().getInstance(TrinoCatalogFactory.class);
TrinoCatalog trinoCatalog = catalogFactory.create(getSession().getIdentity().toConnectorIdentity());
BaseTable table = (BaseTable) trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName));
BaseTable table = trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName));
return table.operations().current().location();
}

Expand Down

0 comments on commit 0d00c6a

Please sign in to comment.