Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
erindru committed Sep 20, 2024
1 parent 3110b64 commit 1415d7d
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 62 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ venv/
ENV/
env.bak/
venv.bak/
venv*/

# Spyder project settings
.spyderproject
Expand Down
23 changes: 10 additions & 13 deletions docs/integrations/engines/athena.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,38 @@ Note that PyAthena uses [boto3](https://boto3.amazonaws.com/v1/documentation/api

These options are specific to SQLMesh itself and are not passed to PyAthena

| Option | Description | Type | Required |
|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|----------|
| `s3_warehouse_location` | Set the base path in S3 where SQLMesh will place table data. Only required if the schemas dont have default locations set or you arent specifying the location in the model. See [S3 Locations](#s3-locations) below. | string | N |
| Option | Description | Type | Required |
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|----------|
| `s3_warehouse_location` | Set the base path in S3 where SQLMesh will instruct Athena to place table data. Only required if you arent specifying the location in the model itself. See [S3 Locations](#s3-locations) below. | string | N |

## Model properties

The Athena adapter recognises the following model [physical_properties](../../concepts/models/overview.md#physical_properties):

| Name | Description | Type | Default |
|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|
| `s3_base_location`| `s3://` base URI of where the snapshot tables for this model should be located. Overrides `s3_warehouse_location` if one is configured. | string | |
| `s3_base_location`| `s3://` base URI of where the snapshot tables for this model should be written. Overrides `s3_warehouse_location` if one is configured. | string | |
| `table_type` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | `hive` |


## S3 Locations
When creating tables, Athena needs to know where in S3 the table data is located. You cannot issue a `CREATE TABLE` statement without specifying a `LOCATION` for the table data.

If the schema you're creating the table under had a `LOCATION` set when it was created, Athena places the table in this location. Otherwise, it throws an error.
In addition, unlike other engines such as Trino, Athena will not infer a table location if you set a _schema_ location via `CREATE SCHEMA <schema> LOCATION 's3://schema/location'`.

Therefore, in order for SQLMesh to issue correct `CREATE TABLE` statements to Athena, there are a few strategies you can use to ensure the Athena tables are pointed to the correct S3 locations:
Therefore, in order for SQLMesh to issue correct `CREATE TABLE` statements to Athena, you need to configure where the tables should be stored. There are two options for this:

- Manually pre-create the `sqlmesh__` physical schemas via `CREATE SCHEMA <schema> LOCATION 's3://base/location'`. Then when SQLMesh issues `CREATE TABLE` statements for tables within that schema, Athena knows where the data should go
- Set `s3_warehouse_location` in the connection config. SQLMesh will set the table `LOCATION` to be `<s3_warehouse_location>/<schema_name>/<table_name>` when it issues a `CREATE TABLE` statement
- Set `s3_base_location` in the model `physical_properties`. SQLMesh will set the table `LOCATION` to be `<s3_base_location>/<table_name>`. This takes precedence over the `s3_warehouse_location` set in the connection config or the `LOCATION` property on the target schema
- **Project-wide:** set `s3_warehouse_location` in the connection config. SQLMesh will set the table `LOCATION` to be `<s3_warehouse_location>/<schema_name>/<snapshot_table_name>` when it creates a snapshot of your model.
- **Per-model:** set `s3_base_location` in the model `physical_properties`. SQLMesh will set the table `LOCATION` to be `<s3_base_location>/<snapshot_table_name>` every time it creates a snapshot of your model. This takes precedence over any `s3_warehouse_location` set in the connection config.

Note that if you opt to pre-create the schemas with a `LOCATION` already configured, you might want to look at [physical_schema_mapping](../../guides/configuration.md#physical-table-schemas) for better control of the schema names.

## Limitations
Athena was initially designed to read data stored in S3 and to do so without changing that data. This means that it does not have good support for mutating tables. In particular, it will not delete data from Hive tables.

Consequently, any SQLMesh model types that needs to delete or merge data from existing tables will not work. In addition, [forward only changes](../../concepts/plans.md#forward-only-change) that mutate the schemas of existing tables have a high chance of failure because Athena supports very limited schema modifications on Hive tables.
Consequently, any SQLMesh model types that needs to delete or merge data from existing tables will not work on Hive tables. In addition, [forward only changes](../../concepts/plans.md#forward-only-change) that mutate the schemas of existing tables have a high chance of failure because Athena supports very limited schema modifications on Hive tables.

However, Athena does support [Apache Iceberg](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html) tables which allow a full range of operations. These can be used for more complex model types such as [`INCREMENTAL_BY_UNIQUE_KEY`](../../concepts/models/model_kinds.md#incremental_by_unique_key) and [`SCD_TYPE_2`](../../concepts/models/model_kinds.md#scd-type-2).

To use an Iceberg table for a model, set `table_type='iceberg'` in the model [physical_properties](../../concepts/models/overview.md#physical_properties).

In general, Iceberg tables offer the most flexibility and you'll run into the least SQLMesh limitations when using them.
However, they're a newer feature of Athena so you may run into Athena limitations that arent present in Hive tables, [particularly around supported data types](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-supported-data-types.html).
In general, Iceberg tables offer the most flexibility and you'll run into the least SQLMesh limitations when using them. However, we create Hive tables by default because Athena creates Hive tables by default, so Iceberg tables are opt-in rather than opt-out.
48 changes: 22 additions & 26 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ def _get_data_objects(
exp.select(
exp.case()
.when(
# 'awsdatacatalog' is the default catalog that is invisible for all intents and purposes
# it just happens to show up in information_schema queries
# calling code expects data objects in the default catalog to have their catalog set to None
exp.column("table_catalog", table="t").eq("awsdatacatalog"),
exp.Null(),
)
Expand All @@ -134,11 +133,7 @@ def _get_data_objects(
.as_("type"),
)
.from_(exp.to_table("information_schema.tables", alias="t"))
.where(
exp.and_(
exp.column("table_schema", table="t").eq(schema),
)
)
.where(exp.column("table_schema", table="t").eq(schema))
)
if object_names:
query = query.where(exp.column("table_name", table="t").isin(*object_names))
Expand Down Expand Up @@ -312,20 +307,18 @@ def _build_table_properties_exp(
# STORED AS PARQUET
properties.append(exp.FileFormatProperty(this=storage_format))

if table and (location := self._table_location(table_properties, table)):
if table and (location := self._table_location_or_raise(table_properties, table)):
properties.append(location)

if is_iceberg and expression:
# To make a CTAS expression persist as iceberg, alongside setting `table_type=iceberg`, you also need to set is_external=false
# Note that SQLGlot does the right thing with LocationProperty and writes it as `location` (Iceberg) instead of `external_location` (Hive)
# ref: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties
properties.append(exp.Property(this=exp.var("is_external"), value="false"))

for name, value in table_properties.items():
properties.append(exp.Property(this=exp.var(name), value=value))

if is_iceberg and expression:
# To make a CTAS expression persist as iceberg, alongside setting `table_type=iceberg` (which the user has already
# supplied in physical_properties and is thus set above), you also need to set:
# - is_external=false
# - table_location='s3://<path>'
# ref: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties
properties.append(exp.Property(this=exp.var("is_external"), value="false"))

if properties:
return exp.Properties(expressions=properties)

Expand All @@ -342,7 +335,7 @@ def _table_type(
"""
Use the user-specified table_properties to figure out of this is a Hive or an Iceberg table
"""
# if table_type is not defined or is not set to "iceberg", this is a Hive table
# if we cant detect any indication of Iceberg, this is a Hive table
if table_properties and (table_type := table_properties.get("table_type", None)):
if "iceberg" in table_type.sql(dialect=self.dialect).lower():
return "iceberg"
Expand All @@ -365,6 +358,16 @@ def _query_table_type(
return "hive"
return "iceberg"

def _table_location_or_raise(
self, table_properties: t.Optional[t.Dict[str, exp.Expression]], table: exp.Table
) -> exp.LocationProperty:
location = self._table_location(table_properties, table)
if not location:
raise SQLMeshError(
f"Cannot figure out location for table {table}. Please either set `s3_base_location` in `physical_properties` or set `s3_warehouse_location` in the Athena connection config"
)
return location

def _table_location(
self,
table_properties: t.Optional[t.Dict[str, exp.Expression]],
Expand All @@ -384,18 +387,11 @@ def _table_location(

elif self.s3_warehouse_location:
# If the user has set `s3_warehouse_location` in the connection config, the base URI is <s3_warehouse_location>/<catalog>/<schema>/
catalog_name = table.catalog if hasattr(table, "catalog") else None
schema_name = table.db if hasattr(table, "db") else None
base_uri = os.path.join(
self.s3_warehouse_location, catalog_name or "", schema_name or ""
)
base_uri = os.path.join(self.s3_warehouse_location, table.catalog or "", table.db or "")
else:
# Assume the user has set a default location for this schema in the metastore
return None

table_name = table.name if hasattr(table, "name") else None
full_uri = _ensure_valid_location(os.path.join(base_uri, table_name or ""))

full_uri = _ensure_valid_location(os.path.join(base_uri, table.text("this") or ""))
return exp.LocationProperty(this=exp.Literal.string(full_uri))

def _find_matching_columns(
Expand Down
57 changes: 36 additions & 21 deletions tests/core/engine_adapter/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlmesh.core.engine_adapter import AthenaEngineAdapter
from sqlmesh.core.model import load_sql_based_model
from sqlmesh.core.model.definition import SqlModel
from sqlmesh.utils.errors import SQLMeshError

from tests.core.engine_adapter import to_sql_calls

Expand All @@ -31,7 +32,6 @@ def adapter(make_mocked_engine_adapter: t.Callable) -> AthenaEngineAdapter:
exp.to_table("schema.table"),
"s3://some/location/table/",
),
(None, None, exp.Table(db=exp.Identifier(this="test")), None),
# Location set to bucket
("s3://bucket", None, exp.to_table("schema.table"), "s3://bucket/schema/table/"),
("s3://bucket", {}, exp.to_table("schema.table"), "s3://bucket/schema/table/"),
Expand Down Expand Up @@ -73,18 +73,18 @@ def test_table_location(
expected_location: t.Optional[str],
) -> None:
adapter.s3_warehouse_location = config_s3_warehouse_location
location = adapter._table_location(table_properties, table)
final_location = None

if location and expected_location:
final_location = (
location.this.name
) # extract the unquoted location value from the LocationProperty

assert final_location == expected_location
if expected_location is None:
with pytest.raises(SQLMeshError, match=r"Cannot figure out location for table.*"):
adapter._table_location_or_raise(table_properties, table)
else:
location = adapter._table_location_or_raise(
table_properties, table
).this.name # extract the unquoted location value from the LocationProperty
assert location == expected_location

if table_properties is not None:
assert "location" not in table_properties
# this get consumed by _table_location because we dont want it to end up in a TBLPROPERTIES clause
assert "s3_base_location" not in table_properties


def test_create_schema(adapter: AthenaEngineAdapter) -> None:
Expand Down Expand Up @@ -163,7 +163,7 @@ def test_create_table_iceberg(adapter: AthenaEngineAdapter) -> None:
]


def test_create_table_inferred_location(adapter: AthenaEngineAdapter) -> None:
def test_create_table_no_location(adapter: AthenaEngineAdapter) -> None:
expressions = d.parse(
"""
MODEL (
Expand All @@ -176,11 +176,12 @@ def test_create_table_inferred_location(adapter: AthenaEngineAdapter) -> None:
)
model: SqlModel = t.cast(SqlModel, load_sql_based_model(expressions))

adapter.create_table(
model.name,
columns_to_types=model.columns_to_types_or_raise,
table_properties=model.physical_properties,
)
with pytest.raises(SQLMeshError, match=r"Cannot figure out location.*"):
adapter.create_table(
model.name,
columns_to_types=model.columns_to_types_or_raise,
table_properties=model.physical_properties,
)

adapter.s3_warehouse_location = "s3://bucket/prefix"
adapter.create_table(
Expand All @@ -190,7 +191,6 @@ def test_create_table_inferred_location(adapter: AthenaEngineAdapter) -> None:
)

assert to_sql_calls(adapter) == [
"CREATE EXTERNAL TABLE IF NOT EXISTS `test_table` (`a` INT)",
"CREATE EXTERNAL TABLE IF NOT EXISTS `test_table` (`a` INT) LOCATION 's3://bucket/prefix/test_table/'",
]

Expand Down Expand Up @@ -220,10 +220,22 @@ def test_ctas_iceberg(adapter: AthenaEngineAdapter):
)

assert to_sql_calls(adapter) == [
'CREATE TABLE IF NOT EXISTS "foo"."bar" WITH (location=\'s3://bucket/prefix/foo/bar/\', table_type=\'iceberg\', is_external=false) AS SELECT CAST("a" AS INTEGER) AS "a" FROM (SELECT 1) AS "_subquery"'
'CREATE TABLE IF NOT EXISTS "foo"."bar" WITH (location=\'s3://bucket/prefix/foo/bar/\', is_external=false, table_type=\'iceberg\') AS SELECT CAST("a" AS INTEGER) AS "a" FROM (SELECT 1) AS "_subquery"'
]


def test_ctas_iceberg_no_specific_location(adapter: AthenaEngineAdapter):
with pytest.raises(SQLMeshError, match=r"Cannot figure out location.*"):
adapter.ctas(
table_name="foo.bar",
columns_to_types={"a": exp.DataType.build("int")},
query_or_df=parse_one("select 1", into=exp.Select),
table_properties={"table_type": exp.Literal.string("iceberg")},
)

assert to_sql_calls(adapter) == []


def test_replace_query(adapter: AthenaEngineAdapter, mocker: MockerFixture):
mocker.patch(
"sqlmesh.core.engine_adapter.athena.AthenaEngineAdapter.table_exists", return_value=True
Expand All @@ -250,15 +262,17 @@ def test_replace_query(adapter: AthenaEngineAdapter, mocker: MockerFixture):
)
adapter.cursor.execute.reset_mock()

adapter.s3_warehouse_location = "s3://foo"
adapter.replace_query(
table_name="test",
query_or_df=parse_one("select 1 as a", into=exp.Select),
columns_to_types={"a": exp.DataType.build("int")},
table_properties={},
)

# gets recreated as a Hive table because table_exists=False and nothing in the properties indicates it should be Iceberg
assert to_sql_calls(adapter) == [
'CREATE TABLE IF NOT EXISTS "test" AS SELECT CAST("a" AS INTEGER) AS "a" FROM (SELECT 1 AS "a") AS "_subquery"'
'CREATE TABLE IF NOT EXISTS "test" WITH (external_location=\'s3://foo/test/\') AS SELECT CAST("a" AS INTEGER) AS "a" FROM (SELECT 1 AS "a") AS "_subquery"'
]


Expand Down Expand Up @@ -288,8 +302,9 @@ def test_truncate_table(adapter: AthenaEngineAdapter):


def test_create_state_table(adapter: AthenaEngineAdapter):
adapter.s3_warehouse_location = "s3://base"
adapter.create_state_table("_snapshots", {"name": exp.DataType.build("varchar")})

assert to_sql_calls(adapter) == [
"CREATE TABLE IF NOT EXISTS `_snapshots` (`name` STRING) TBLPROPERTIES ('table_type'='iceberg')"
"CREATE TABLE IF NOT EXISTS `_snapshots` (`name` STRING) LOCATION 's3://base/_snapshots/' TBLPROPERTIES ('table_type'='iceberg')"
]
Loading

0 comments on commit 1415d7d

Please sign in to comment.