Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Athena adapter #3154

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Feat: Athena adapter #3154

wants to merge 2 commits into from

Conversation

erindru
Copy link
Collaborator

@erindru erindru commented Sep 19, 2024

Initial implementation of an Athena adapter. Addresses #1315

@georgesittas
Copy link
Contributor

Released v25.22.0, should be able to upgrade and get the CI working now.

branches:
only:
- main
#- snowflake
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: i'll uncomment these immediately prior to merging. It's helpful to be able to run the Athena integration tests for this PR

@erindru erindru force-pushed the erin/athena-adapter branch 5 times, most recently from 88e2e48 to 3110b64 Compare September 19, 2024 23:15
# SQLMesh options
s3_warehouse_location: t.Optional[str] = None
concurrent_tasks: int = 4
register_comments: bool = False # because Athena doesnt support comments in most cases
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we even let users set true here then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we force it to false and make it unsettable? Is that the register_comments: t.Literal[False] = False syntax?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly



def _ensure_valid_location(value: str) -> str:
if not value.startswith("s3://"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be validated in the connection config instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also used in _table_location to ensure that final location is valid since that is assembled at runtime and relies on user input from physical_properties

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'd still add validation to the config to fail faster.

exp.select(
exp.case()
.when(
# 'awsdatacatalog' is the default catalog that is invisible for all intents and purposes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't quite explain why do we set catalog to NULL of the actual value is awsdatacatalog. What does "invisible" actually mean here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because the integration test test_get_data_objects expects that if it passes a schema like test_schema_x (as opposed to a catalog-schema combo like test_catalog.test_schema_x) to get_data_objects(), the resulting data objects should have None set on the catalog property.

I'll amend the comment

)
.from_(exp.to_table("information_schema.tables", alias="t"))
.where(
exp.and_(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AND seems redundant here


elif self.s3_warehouse_location:
# If the user has set `s3_warehouse_location` in the connection config, the base URI is <s3_warehouse_location>/<catalog>/<schema>/
catalog_name = table.catalog if hasattr(table, "catalog") else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at signatures we always expect table to be of type exp.Table. How could catalog and db attributes be missing, provided they are property methods in that class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this didnt get updated when I tightened up the signature

# Assume the user has set a default location for this schema in the metastore
return None

table_name = table.name if hasattr(table, "name") else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


is_hive = self._table_type(table_properties) == "hive"

# Filter any PARTITIONED BY properties from the main column list since they cant be specified in both places
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good old hive

Use the user-specified table_properties to figure out of this is a Hive or an Iceberg table
"""
# if table_type is not defined or is not set to "iceberg", this is a Hive table
if table_properties and (table_type := table_properties.get("table_type", None)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not using storage_format for this instead?

Copy link
Member

@izeigerman izeigerman Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically any value that is not iceberg should be treated as hive.

Copy link
Collaborator Author

@erindru erindru Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about storage_format but decided not to use it because it describes a different concept. Both Hive and Iceberg tables support different storage formats.

For example, a Hive table can be STORED AS PARQUET or STORED AS ORC or if you really dont like your colleagues STORED AS TEXTFILE.

Same for Iceberg, the internal format can be set to parquet or orc or whatever the engine supports.

So storage_format=hive / storage_format=iceberg doesn't make sense because they're table formats that can encompass a particular storage format.

We dont have a top-level table_format property and I didnt want to add one just for Athena

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, when used with spark iceberg is provided through storage_format because the SparkSQL syntax looks like:

CREATE TABLE ... USING [iceberg|parquet|etc]

Should we be consistent?

# To make a CTAS expression persist as iceberg, alongside setting `table_type=iceberg` (which the user has already
# supplied in physical_properties and is thus set above), you also need to set:
# - is_external=false
# - table_location='s3://<path>'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ensure somehow that the location has already been set at this point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice catch!

Copy link
Collaborator Author

@erindru erindru Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the location will be set already a few lines up if the user supplied it (or s3_warehouse_location was set in the config). The original idea was that if it wasnt set at all, Athena can figure out what to do.

But ive just done some tests and unlike Trino, it looks like Athena will not automatically generate table locations for you if the schema the table is in was created with a location set.

I created a schema using CREATE SCHEMA foo LOCATION 's3://path' and then tried to create both Hive and Iceberg tables in that schema without setting a location explicitly. Both times it failed with an error asking to set the location.

So i'll tighten this up and throw an error if SQLMesh cant figure out the table location

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants