Skip to content

Commit

Permalink
Feat: Athena adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
erindru committed Sep 19, 2024
1 parent 25f4760 commit c5598bf
Show file tree
Hide file tree
Showing 16 changed files with 1,000 additions and 18 deletions.
19 changes: 10 additions & 9 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,16 @@ workflows:
matrix:
parameters:
engine:
- snowflake
- databricks
- redshift
- bigquery
- clickhouse-cloud
filters:
branches:
only:
- main
#- snowflake
#- databricks
#- redshift
#- bigquery
#- clickhouse-cloud
- athena
#filters:
# branches:
# only:
# - main
- trigger_private_tests:
requires:
- style_and_slow_tests
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ install-doc:
pip3 install -r ./docs/requirements.txt

install-engine-test:
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino,mssql,clickhouse]"
pip3 install -e ".[dev,web,slack,mysql,postgres,databricks,redshift,bigquery,snowflake,trino,mssql,clickhouse,athena]"

install-pre-commit:
pre-commit install
Expand Down Expand Up @@ -209,3 +209,6 @@ redshift-test: guard-REDSHIFT_HOST guard-REDSHIFT_USER guard-REDSHIFT_PASSWORD g

clickhouse-cloud-test: guard-CLICKHOUSE_CLOUD_HOST guard-CLICKHOUSE_CLOUD_USERNAME guard-CLICKHOUSE_CLOUD_PASSWORD engine-clickhouse-install
pytest -n auto -x -m "clickhouse_cloud" --retries 3 --junitxml=test-results/junit-clickhouse-cloud.xml

athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3_WAREHOUSE_LOCATION engine-athena-install
pytest -n auto -x -m "athena" --retries 3 --junitxml=test-results/junit-athena.xml
1 change: 1 addition & 0 deletions docs/guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ Example snowflake connection configuration:

These pages describe the connection configuration options for each execution engine.

* [Athena](../integrations/engines/athena.md)
* [BigQuery](../integrations/engines/bigquery.md)
* [Databricks](../integrations/engines/databricks.md)
* [DuckDB](../integrations/engines/duckdb.md)
Expand Down
70 changes: 70 additions & 0 deletions docs/integrations/engines/athena.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Athena

## Installation

```
pip install "sqlmesh[athena]"
```

## Connection options

### PyAthena connection options

SQLMesh leverages the [PyAthena](https://github.com/laughingman7743/PyAthena) DBAPI driver to connect to Athena. Therefore, the connection options relate to the PyAthena connection options.
Note that PyAthena uses [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) under the hood so you can also use [boto3 environment variables](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-environment-variables) for configuration.

| Option | Description | Type | Required |
|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
| `type` | Engine type name - must be `athena` | string | Y |
| `aws_access_key_id` | The access key for your AWS user | string | N |
| `aws_secret_access_key` | The secret key for your AWS user | string | N |
| `role_arn` | The ARN of a role to assume once authenticated | string | N |
| `role_session_name` | The session name to use when assuming `role_arn` | string | N |
| `region_name` | The AWS region to use | string | N |
| `work_group` | The Athena [workgroup](https://docs.aws.amazon.com/athena/latest/ug/workgroups-manage-queries-control-costs.html) to send queries to | string | N |
| `s3_staging_dir` | The S3 location for Athena to write query results. Only required if not using `work_group` OR the configured `work_group` doesnt have a results location set | string | N |
| `schema_name` | The default schema to place objects in if a schema isnt specified. Defaults to `default` | string | N |
| `catalog_name` | The default catalog to place schemas in. Defaults to `AwsDataCatalog` | string | N |

### SQLMesh connection options

These options are specific to SQLMesh itself and are not passed to PyAthena

| Option | Description | Type | Required |
|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|----------|
| `s3_warehouse_location` | Set the base path in S3 where SQLMesh will place table data. Only required if the schemas dont have default locations set or you arent specifying the location in the model. See [S3 Locations](#s3-locations) below. | string | N |

## Model properties

The Athena adapter recognises the following model [physical_properties](../../concepts/models/overview.md#physical_properties):

| Name | Description | Type | Default |
|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|
| `s3_base_location`| `s3://` base URI of where the snapshot tables for this model should be located. Overrides `s3_warehouse_location` if one is configured. | string | |
| `table_type` | Sets the [table_type](https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties) Athena uses when creating the table. Valid values are `hive` or `iceberg`. | string | `hive` |


## S3 Locations
When creating tables, Athena needs to know where in S3 the table data is located. You cannot issue a `CREATE TABLE` statement without specifying a `LOCATION` for the table data.

If the schema you're creating the table under had a `LOCATION` set when it was created, Athena places the table in this location. Otherwise, it throws an error.

Therefore, in order for SQLMesh to issue correct `CREATE TABLE` statements to Athena, there are a few strategies you can use to ensure the Athena tables are pointed to the correct S3 locations:

- Manually pre-create the `sqlmesh__` physical schemas via `CREATE SCHEMA <schema> LOCATION 's3://base/location'`. Then when SQLMesh issues `CREATE TABLE` statements for tables within that schema, Athena knows where the data should go
- Set `s3_warehouse_location` in the connection config. SQLMesh will set the table `LOCATION` to be `<s3_warehouse_location>/<schema_name>/<table_name>` when it issues a `CREATE TABLE` statement
- Set `s3_base_location` in the model `physical_properties`. SQLMesh will set the table `LOCATION` to be `<s3_base_location>/<table_name>`. This takes precedence over the `s3_warehouse_location` set in the connection config or the `LOCATION` property on the target schema

Note that if you opt to pre-create the schemas with a `LOCATION` already configured, you might want to look at [physical_schema_mapping](../../guides/configuration.md#physical-table-schemas) for better control of the schema names.

## Limitations
Athena was initially designed to read data stored in S3 and to do so without changing that data. This means that it does not have good support for mutating tables. In particular, it will not delete data from Hive tables.

Consequently, any SQLMesh model types that needs to delete or merge data from existing tables will not work. In addition, [forward only changes](../../concepts/plans.md#forward-only-change) that mutate the schemas of existing tables have a high chance of failure because Athena supports very limited schema modifications on Hive tables.

However, Athena does support [Apache Iceberg](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html) tables which allow a full range of operations. These can be used for more complex model types such as [`INCREMENTAL_BY_UNIQUE_KEY`](../../concepts/models/model_kinds.md#incremental_by_unique_key) and [`SCD_TYPE_2`](../../concepts/models/model_kinds.md#scd-type-2).

To use an Iceberg table for a model, set `table_type='iceberg'` in the model [physical_properties](../../concepts/models/overview.md#physical_properties).

In general, Iceberg tables offer the most flexibility and you'll run into the least SQLMesh limitations when using them.
However, they're a newer feature of Athena so you may run into Athena limitations that arent present in Hive tables, [particularly around supported data types](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-supported-data-types.html).
1 change: 1 addition & 0 deletions docs/integrations/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ SQLMesh supports integrations with the following tools:
## Execution engines
SQLMesh supports the following execution engines for running SQLMesh projects:

* [Athena](./engines/athena.md)
* [BigQuery](./engines/bigquery.md)
* [Databricks](./engines/databricks.md)
* [DuckDB](./engines/duckdb.md)
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ nav:
- integrations/dbt.md
- integrations/github.md
- Execution engines:
- integrations/engines/athena.md
- integrations/engines/bigquery.md
- integrations/engines/clickhouse.md
- integrations/engines/databricks.md
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ markers =
spark_pyspark: test for Spark with PySpark dependency
# Engine Adapters
engine: test all engine adapters
athena: test for Athena
bigquery: test for BigQuery
clickhouse: test for Clickhouse (standalone mode)
clickhouse_cluster: test for Clickhouse (cluster mode)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"sqlglot[rs]~=25.22.0",
],
extras_require={
"athena": ["PyAthena[Pandas]"],
"bigquery": [
"google-cloud-bigquery[pandas]",
"google-cloud-bigquery-storage",
Expand Down
60 changes: 60 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,66 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
return {"compress": compress, "client_name": f"SQLMesh/{__version__}", **settings}


class AthenaConnectionConfig(ConnectionConfig):
# PyAthena connection options
aws_access_key_id: t.Optional[str] = None
aws_secret_access_key: t.Optional[str] = None
role_arn: t.Optional[str] = None
role_session_name: t.Optional[str] = None
region_name: t.Optional[str] = None
work_group: t.Optional[str] = None
s3_staging_dir: t.Optional[str] = None
schema_name: t.Optional[str] = None
catalog_name: t.Optional[str] = None

# SQLMesh options
s3_warehouse_location: t.Optional[str] = None
concurrent_tasks: int = 4
register_comments: bool = False # because Athena doesnt support comments in most cases
pre_ping: Literal[False] = False

type_: Literal["athena"] = Field(alias="type", default="athena")

@model_validator(mode="after")
@model_validator_v1_args
def _root_validator(cls, values: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
work_group = values.get("work_group")
s3_staging_dir = values.get("s3_staging_dir")

if not work_group and not s3_staging_dir:
raise ConfigError("At least one of work_group or s3_staging_dir must be set")

return values

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
return {
"aws_access_key_id",
"aws_secret_access_key",
"role_arn",
"role_session_name",
"region_name",
"work_group",
"s3_staging_dir",
"schema_name",
"catalog_name",
}

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
return engine_adapter.AthenaEngineAdapter

@property
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
return {"s3_warehouse_location": self.s3_warehouse_location}

@property
def _connection_factory(self) -> t.Callable:
from pyathena import connect # type: ignore

return connect


CONNECTION_CONFIG_TO_TYPE = {
# Map all subclasses of ConnectionConfig to the value of their `type_` field.
tpe.all_field_infos()["type_"].default: tpe
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
from sqlmesh.core.engine_adapter.trino import TrinoEngineAdapter
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter

DIALECT_TO_ENGINE_ADAPTER = {
"hive": SparkEngineAdapter,
Expand All @@ -31,6 +32,7 @@
"mysql": MySQLEngineAdapter,
"mssql": MSSQLEngineAdapter,
"trino": TrinoEngineAdapter,
"athena": AthenaEngineAdapter,
}

DIALECT_ALIASES = {
Expand Down
Loading

0 comments on commit c5598bf

Please sign in to comment.