diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml index 1c44e46..e02f7b9 100644 --- a/.github/workflows/ci_workflow.yml +++ b/.github/workflows/ci_workflow.yml @@ -36,7 +36,7 @@ jobs: pipx install poetry - name: Install dependencies run: | - poetry install + poetry install --all-extras - name: Run pytest run: | poetry run pytest --capture=no diff --git a/README.md b/README.md index a1b48c3..6fdffa7 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,7 @@ tap-carbon-intensity | target-postgres --config /path/to/target-postgres-config. ```bash pipx install poetry -poetry install +poetry install --all-extras pipx install pre-commit pre-commit install ``` @@ -152,6 +152,8 @@ develop your own Singer taps and targets. ## Data Types +### Mapping + The below table shows how this tap will map between jsonschema datatypes and Postgres datatypes. | jsonschema | Postgres | @@ -202,7 +204,20 @@ The below table shows how this tap will map between jsonschema datatypes and Pos Note that while object types are mapped directly to jsonb, array types are mapped to a jsonb array. -If a column has multiple jsonschema types, the following order is using to order Postgres types, from highest priority to lowest priority. +When using [pgvector], this type mapping applies, additionally to the table above. + +| jsonschema | Postgres | +|------------------------------------------------|----------| +| array (with additional SCHEMA annotations [1]) | vector | + +[1] `"additionalProperties": {"storage": {"type": "vector", "dim": 4}}` + +### Resolution Order + +If a column has multiple jsonschema types, there is a priority list for +resolving the best type candidate, from the highest priority to the +lowest priority. + - ARRAY(JSONB) - JSONB - TEXT @@ -215,3 +230,9 @@ If a column has multiple jsonschema types, the following order is using to order - INTEGER - BOOLEAN - NOTYPE + +When using [pgvector], the `pgvector.sqlalchemy.Vector` type is added to the bottom +of the list. + + +[pgvector]: https://github.com/pgvector/pgvector diff --git a/docker-compose.yml b/docker-compose.yml index f2d453c..a187f7c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: "2.1" services: postgres: - image: docker.io/postgres:latest + image: ankane/pgvector:latest command: postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key -c ssl_ca_file=/var/lib/postgresql/ca.crt -c hba_file=/var/lib/postgresql/pg_hba.conf environment: POSTGRES_USER: postgres @@ -13,6 +13,7 @@ services: POSTGRES_INITDB_ARGS: --auth-host=cert # Not placed in the data directory (/var/lib/postgresql/data) because of https://gist.github.com/mrw34/c97bb03ea1054afb551886ffc8b63c3b?permalink_comment_id=2678568#gistcomment-2678568 volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql - ./ssl/server.crt:/var/lib/postgresql/server.crt # Certificate verifying the server's identity to the client. - ./ssl/server.key:/var/lib/postgresql/server.key # Private key to verify the server's certificate is legitimate. - ./ssl/ca.crt:/var/lib/postgresql/ca.crt # Certificate authority to use when verifying the client's identity to the server. @@ -20,9 +21,11 @@ services: ports: - "5432:5432" postgres_no_ssl: # Borrowed from https://github.com/MeltanoLabs/tap-postgres/blob/main/.github/workflows/test.yml#L13-L23 - image: docker.io/postgres:latest + image: ankane/pgvector:latest environment: POSTGRES_PASSWORD: postgres + volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql ports: - 5433:5432 ssh: @@ -37,17 +40,20 @@ services: - PASSWORD_ACCESS=false - USER_NAME=melty volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql - ./ssh_tunnel/ssh-server-config:/config/ssh_host_keys:ro ports: - "127.0.0.1:2223:2222" networks: - inner postgresdb: - image: postgres:13.0 + image: ankane/pgvector:latest environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres POSTGRES_DB: main + volumes: + - ./target_postgres/tests/init.sql:/docker-entrypoint-initdb.d/init.sql networks: inner: ipv4_address: 10.5.0.5 diff --git a/poetry.lock b/poetry.lock index 8e082fa..e61b03b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -774,6 +774,43 @@ files = [ {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, ] +[[package]] +name = "numpy" +version = "1.24.4" +description = "Fundamental package for array computing in Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "numpy-1.24.4-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c0bfb52d2169d58c1cdb8cc1f16989101639b34c7d3ce60ed70b19c63eba0b64"}, + {file = "numpy-1.24.4-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ed094d4f0c177b1b8e7aa9cba7d6ceed51c0e569a5318ac0ca9a090680a6a1b1"}, + {file = "numpy-1.24.4-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:79fc682a374c4a8ed08b331bef9c5f582585d1048fa6d80bc6c35bc384eee9b4"}, + {file = "numpy-1.24.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ffe43c74893dbf38c2b0a1f5428760a1a9c98285553c89e12d70a96a7f3a4d6"}, + {file = "numpy-1.24.4-cp310-cp310-win32.whl", hash = "sha256:4c21decb6ea94057331e111a5bed9a79d335658c27ce2adb580fb4d54f2ad9bc"}, + {file = "numpy-1.24.4-cp310-cp310-win_amd64.whl", hash = "sha256:b4bea75e47d9586d31e892a7401f76e909712a0fd510f58f5337bea9572c571e"}, + {file = "numpy-1.24.4-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:f136bab9c2cfd8da131132c2cf6cc27331dd6fae65f95f69dcd4ae3c3639c810"}, + {file = "numpy-1.24.4-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e2926dac25b313635e4d6cf4dc4e51c8c0ebfed60b801c799ffc4c32bf3d1254"}, + {file = "numpy-1.24.4-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:222e40d0e2548690405b0b3c7b21d1169117391c2e82c378467ef9ab4c8f0da7"}, + {file = "numpy-1.24.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7215847ce88a85ce39baf9e89070cb860c98fdddacbaa6c0da3ffb31b3350bd5"}, + {file = "numpy-1.24.4-cp311-cp311-win32.whl", hash = "sha256:4979217d7de511a8d57f4b4b5b2b965f707768440c17cb70fbf254c4b225238d"}, + {file = "numpy-1.24.4-cp311-cp311-win_amd64.whl", hash = "sha256:b7b1fc9864d7d39e28f41d089bfd6353cb5f27ecd9905348c24187a768c79694"}, + {file = "numpy-1.24.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1452241c290f3e2a312c137a9999cdbf63f78864d63c79039bda65ee86943f61"}, + {file = "numpy-1.24.4-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:04640dab83f7c6c85abf9cd729c5b65f1ebd0ccf9de90b270cd61935eef0197f"}, + {file = "numpy-1.24.4-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5425b114831d1e77e4b5d812b69d11d962e104095a5b9c3b641a218abcc050e"}, + {file = "numpy-1.24.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dd80e219fd4c71fc3699fc1dadac5dcf4fd882bfc6f7ec53d30fa197b8ee22dc"}, + {file = "numpy-1.24.4-cp38-cp38-win32.whl", hash = "sha256:4602244f345453db537be5314d3983dbf5834a9701b7723ec28923e2889e0bb2"}, + {file = "numpy-1.24.4-cp38-cp38-win_amd64.whl", hash = "sha256:692f2e0f55794943c5bfff12b3f56f99af76f902fc47487bdfe97856de51a706"}, + {file = "numpy-1.24.4-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2541312fbf09977f3b3ad449c4e5f4bb55d0dbf79226d7724211acc905049400"}, + {file = "numpy-1.24.4-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:9667575fb6d13c95f1b36aca12c5ee3356bf001b714fc354eb5465ce1609e62f"}, + {file = "numpy-1.24.4-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f3a86ed21e4f87050382c7bc96571755193c4c1392490744ac73d660e8f564a9"}, + {file = "numpy-1.24.4-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d11efb4dbecbdf22508d55e48d9c8384db795e1b7b51ea735289ff96613ff74d"}, + {file = "numpy-1.24.4-cp39-cp39-win32.whl", hash = "sha256:6620c0acd41dbcb368610bb2f4d83145674040025e5536954782467100aa8835"}, + {file = "numpy-1.24.4-cp39-cp39-win_amd64.whl", hash = "sha256:befe2bf740fd8373cf56149a5c23a0f601e82869598d41f8e188a0e9869926f8"}, + {file = "numpy-1.24.4-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:31f13e25b4e304632a4619d0e0777662c2ffea99fcae2029556b17d8ff958aef"}, + {file = "numpy-1.24.4-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:95f7ac6540e95bc440ad77f56e520da5bf877f87dca58bd095288dce8940532a"}, + {file = "numpy-1.24.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:e98f220aa76ca2a977fe435f5b04d7b3470c0a2e6312907b37ba6068f26787f2"}, + {file = "numpy-1.24.4.tar.gz", hash = "sha256:80f5e3a4e498641401868df4208b74581206afbee7cf7b8329daae82676d9463"}, +] + [[package]] name = "packaging" version = "23.2" @@ -787,13 +824,13 @@ files = [ [[package]] name = "paramiko" -version = "3.3.1" +version = "3.4.0" description = "SSH2 protocol library" optional = false python-versions = ">=3.6" files = [ - {file = "paramiko-3.3.1-py3-none-any.whl", hash = "sha256:b7bc5340a43de4287bbe22fe6de728aa2c22468b2a849615498dd944c2f275eb"}, - {file = "paramiko-3.3.1.tar.gz", hash = "sha256:6a3777a961ac86dbef375c5f5b8d50014a1a96d0fd7f054a43bc880134b0ff77"}, + {file = "paramiko-3.4.0-py3-none-any.whl", hash = "sha256:43f0b51115a896f9c00f59618023484cb3a14b98bbceab43394a39c6739b7ee7"}, + {file = "paramiko-3.4.0.tar.gz", hash = "sha256:aac08f26a31dc4dffd92821527d1682d99d52f9ef6851968114a8728f3c274d3"}, ] [package.dependencies] @@ -918,6 +955,19 @@ tzdata = ">=2020.1" [package.extras] test = ["time-machine (>=2.6.0)"] +[[package]] +name = "pgvector" +version = "0.2.4" +description = "pgvector support for Python" +optional = true +python-versions = ">=3.8" +files = [ + {file = "pgvector-0.2.4-py2.py3-none-any.whl", hash = "sha256:548e1f88d3c7433020c1c177feddad2f36915c262852d621f9018fcafff6870b"}, +] + +[package.dependencies] +numpy = "*" + [[package]] name = "pkgutil-resolve-name" version = "1.3.10" @@ -1003,7 +1053,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp311-cp311-win32.whl", hash = "sha256:dc4926288b2a3e9fd7b50dc6a1909a13bbdadfc67d93f3374d984e56f885579d"}, {file = "psycopg2_binary-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:b76bedd166805480ab069612119ea636f5ab8f8771e640ae103e05a4aae3e417"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:8532fd6e6e2dc57bcb3bc90b079c60de896d2128c5d9d6f24a63875a95a088cf"}, - {file = "psycopg2_binary-2.9.9-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b0605eaed3eb239e87df0d5e3c6489daae3f7388d455d0c0b4df899519c6a38d"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f8544b092a29a6ddd72f3556a9fcf249ec412e10ad28be6a0c0d948924f2212"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d423c8d8a3c82d08fe8af900ad5b613ce3632a1249fd6a223941d0735fce493"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:2e5afae772c00980525f6d6ecf7cbca55676296b580c0e6abb407f15f3706996"}, @@ -1012,8 +1061,6 @@ files = [ {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:cb16c65dcb648d0a43a2521f2f0a2300f40639f6f8c1ecbc662141e4e3e1ee07"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_ppc64le.whl", hash = "sha256:911dda9c487075abd54e644ccdf5e5c16773470a6a5d3826fda76699410066fb"}, {file = "psycopg2_binary-2.9.9-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:57fede879f08d23c85140a360c6a77709113efd1c993923c59fde17aa27599fe"}, - {file = "psycopg2_binary-2.9.9-cp312-cp312-win32.whl", hash = "sha256:64cf30263844fa208851ebb13b0732ce674d8ec6a0c86a4e160495d299ba3c93"}, - {file = "psycopg2_binary-2.9.9-cp312-cp312-win_amd64.whl", hash = "sha256:81ff62668af011f9a48787564ab7eded4e9fb17a4a6a74af5ffa6a457400d2ab"}, {file = "psycopg2_binary-2.9.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:2293b001e319ab0d869d660a704942c9e2cce19745262a8aba2115ef41a0a42a"}, {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:03ef7df18daf2c4c07e2695e8cfd5ee7f748a1d54d802330985a78d2a5a6dca9"}, {file = "psycopg2_binary-2.9.9-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a602ea5aff39bb9fac6308e9c9d82b9a35c2bf288e184a816002c9fae930b77"}, @@ -1873,7 +1920,10 @@ files = [ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"] testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"] +[extras] +pgvector = ["pgvector"] + [metadata] lock-version = "2.0" python-versions = "<3.13,>=3.8.1" -content-hash = "94fac1eb94f683deb254b8194a895caa8e120b5b102ddac384858cd1ea5251c4" +content-hash = "94def8eec452286849b1956daf2e1e66aa7377dcdc745adf19db230e1a71c6f7" diff --git a/pyproject.toml b/pyproject.toml index 96c7dfd..8b91b95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ packages = [ python = "<3.13,>=3.8.1" requests = "^2.25.1" singer-sdk = ">=0.28,<0.34" +pgvector = { version="^0.2.4", optional = true } psycopg2-binary = "2.9.9" sqlalchemy = ">=2.0,<3.0" sshtunnel = "0.4.0" @@ -51,11 +52,17 @@ types-simplejson = "^3.19.0.2" types-sqlalchemy = "^1.4.53.38" types-jsonschema = "^4.19.0.3" +[tool.poetry.extras] +pgvector = ["pgvector"] + [tool.mypy] exclude = "tests" [[tool.mypy.overrides]] -module = ["sshtunnel"] +module = [ + "pgvector.sqlalchemy", + "sshtunnel", +] ignore_missing_imports = true [tool.isort] diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 369eb46..3e38ef9 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -115,6 +115,14 @@ def prepare_table( # type: ignore[override] connection=connection, ) return table + # To make table reflection work properly with pgvector, + # the module needs to be imported beforehand. + try: + from pgvector.sqlalchemy import Vector # noqa: F401 + except ImportError: + self.logger.debug( + "Unable to handle pgvector's `Vector` type. Please install `pgvector`." + ) meta.reflect(connection, only=[table_name]) table = meta.tables[ full_table_name @@ -280,6 +288,51 @@ def pick_individual_type(jsonschema_type: dict): if "object" in jsonschema_type["type"]: return JSONB() if "array" in jsonschema_type["type"]: + # Select between different kinds of `ARRAY` data types. + # + # This currently leverages an unspecified definition for the Singer SCHEMA, + # using the `additionalProperties` attribute to convey additional type + # information, agnostic of the target database. + # + # In this case, it is about telling different kinds of `ARRAY` types apart: + # Either it is a vanilla `ARRAY`, to be stored into a `jsonb[]` type, or, + # alternatively, it can be a "vector" kind `ARRAY` of floating point + # numbers, effectively what pgvector is storing in its `VECTOR` type. + # + # Still, `type: "vector"` is only a surrogate label here, because other + # database systems may use different types for implementing the same thing, + # and need to translate accordingly. + """ + Schema override rule in `meltano.yml`: + + type: "array" + items: + type: "number" + additionalProperties: + storage: + type: "vector" + dim: 4 + + Produced schema annotation in `catalog.json`: + + {"type": "array", + "items": {"type": "number"}, + "additionalProperties": {"storage": {"type": "vector", "dim": 4}}} + """ + if ( + "additionalProperties" in jsonschema_type + and "storage" in jsonschema_type["additionalProperties"] + ): + storage_properties = jsonschema_type["additionalProperties"]["storage"] + if ( + "type" in storage_properties + and storage_properties["type"] == "vector" + ): + # On PostgreSQL/pgvector, use the corresponding type definition + # from its SQLAlchemy dialect. + from pgvector.sqlalchemy import Vector + + return Vector(storage_properties["dim"]) return ARRAY(JSONB()) if jsonschema_type.get("format") == "date-time": return TIMESTAMP() @@ -313,6 +366,13 @@ def pick_best_sql_type(sql_type_array: list): NOTYPE, ] + try: + from pgvector.sqlalchemy import Vector + + precedence_order.append(Vector) + except ImportError: + pass + for sql_type in precedence_order: for obj in sql_type_array: if isinstance(obj, sql_type): @@ -332,7 +392,7 @@ def create_empty_table( # type: ignore[override] """Create an empty target table. Args: - full_table_name: the target table name. + table_name: the target table name. schema: the JSON schema for the new table. primary_keys: list of key properties. partition_keys: list of partition keys. @@ -427,7 +487,7 @@ def _create_empty_column( # type: ignore[override] """Create a new column. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The name of the new column. sql_type: SQLAlchemy type engine to be used in creating the new column. @@ -491,7 +551,7 @@ def _adapt_column_type( # type: ignore[override] """Adapt table column type to support the new JSON schema type. Args: - full_table_name: The target table name. + table_name: The target table name. column_name: The target column name. sql_type: The new SQLAlchemy type. @@ -519,7 +579,7 @@ def _adapt_column_type( # type: ignore[override] return # Not the same type, generic type or compatible types - # calling merge_sql_types for assistnace + # calling merge_sql_types for assistance. compatible_sql_type = self.merge_sql_types([current_type, sql_type]) if str(compatible_sql_type) == str(current_type): @@ -722,7 +782,7 @@ def _get_column_type( # type: ignore[override] """Get the SQL type of the declared column. Args: - full_table_name: The name of the table. + table_name: The name of the table. column_name: The name of the column. Returns: diff --git a/target_postgres/tests/data_files/array_float_vector.singer b/target_postgres/tests/data_files/array_float_vector.singer new file mode 100644 index 0000000..9f4cd04 --- /dev/null +++ b/target_postgres/tests/data_files/array_float_vector.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_float_vector", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}, "additionalProperties": {"storage": {"type": "vector", "dim": 4}}}}}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 1, "value": [ 1.1, 2.1, 1.1, 1.3 ]}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 2, "value": [ 1.0, 1.0, 1.0, 2.3 ]}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 3, "value": [ 2.0, 1.2, 1.0, 0.9 ]}} +{"type": "STATE", "value": {"array_float_vector": 3}} diff --git a/target_postgres/tests/init.sql b/target_postgres/tests/init.sql new file mode 100644 index 0000000..0aa0fc2 --- /dev/null +++ b/target_postgres/tests/init.sql @@ -0,0 +1 @@ +CREATE EXTENSION IF NOT EXISTS vector; diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index f676f8f..82a71b6 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -2,6 +2,7 @@ # flake8: noqa import copy import io +import typing as t from contextlib import redirect_stdout from decimal import Decimal from pathlib import Path @@ -28,6 +29,8 @@ postgres_config_ssh_tunnel, ) +METADATA_COLUMN_PREFIX = "_sdc" + # The below syntax is documented at https://docs.pytest.org/en/stable/deprecations.html#calling-fixtures-directly @pytest.fixture(scope="session", name="postgres_config") @@ -75,102 +78,114 @@ def singer_file_to_target(file_name, target) -> None: # TODO should set schemas for each tap individually so we don't collide -def remove_metadata_columns(row: dict) -> dict: - new_row = {} - for column in row.keys(): - if not column.startswith("_sdc"): - new_row[column] = row[column] - return new_row - - -def verify_data( - target: TargetPostgres, - table_name: str, - number_of_rows: int = 1, - primary_key: str | None = None, - check_data: dict | list[dict] | None = None, -): - """Checks whether the data in a table matches a provided data sample. - - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - primary_key: The primary key of the table. - number_of_rows: The expected number of rows that should be in the table. - check_data: A dictionary representing the full contents of the first row in the - table, as determined by lowest primary_key value, or else a list of - dictionaries representing every row in the table. - """ - engine = create_engine(target) - full_table_name = f"{target.config['default_target_schema']}.{table_name}" - with engine.connect() as connection: - if primary_key is not None and check_data is not None: - if isinstance(check_data, dict): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" +class AssertionHelper: + def __init__(self, target: TargetPostgres, metadata_column_prefix: str): + self.target = target + self.metadata_column_prefix = metadata_column_prefix + + def remove_metadata_columns(self, row: dict) -> dict: + new_row = {} + for column in row.keys(): + if not column.startswith(self.metadata_column_prefix): + new_row[column] = row[column] + return new_row + + def verify_data( + self, + table_name: str, + number_of_rows: int = 1, + primary_key: t.Union[str, None] = None, + check_data: t.Union[t.Dict, t.List[t.Dict], None] = None, + ): + """Checks whether the data in a table matches a provided data sample. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + primary_key: The primary key of the table. + number_of_rows: The expected number of rows that should be in the table. + check_data: A dictionary representing the full contents of the first row in the + table, as determined by lowest primary_key value, or else a list of + dictionaries representing every row in the table. + """ + engine = create_engine(self.target) + full_table_name = f"{self.target.config['default_target_schema']}.{table_name}" + with engine.connect() as connection: + if primary_key is not None and check_data is not None: + if isinstance(check_data, dict): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = remove_metadata_columns(result.first()._asdict()) - assert result_dict == check_data - elif isinstance(check_data, list): - result = connection.execute( - sqlalchemy.text( - f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + assert result.rowcount == number_of_rows + result_dict = self.remove_metadata_columns(result.first()._asdict()) + assert result_dict == check_data + elif isinstance(check_data, list): + result = connection.execute( + sqlalchemy.text( + f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" + ) ) - ) - assert result.rowcount == number_of_rows - result_dict = [ - remove_metadata_columns(row._asdict()) for row in result.all() - ] - assert result_dict == check_data + assert result.rowcount == number_of_rows + result_dict = [ + self.remove_metadata_columns(row._asdict()) + for row in result.all() + ] + assert result_dict == check_data + else: + raise ValueError("Invalid check_data - not dict or list of dicts") else: - raise ValueError("Invalid check_data - not dict or list of dicts") - else: - result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + result = connection.execute( + sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + ) + assert result.first()[0] == number_of_rows + engine.dispose() + + def verify_schema( + self, + table_name: str, + check_columns: dict = None, + ): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + engine = create_engine(self.target) + schema = self.target.config["default_target_schema"] + with engine.connect() as connection: + meta = sqlalchemy.MetaData() + table = sqlalchemy.Table( + table_name, meta, schema=schema, autoload_with=connection ) - assert result.first()[0] == number_of_rows - engine.dispose() - + for column in table.c: + # Ignore `_sdc` metadata columns when verifying table schema. + if column.name.startswith(self.metadata_column_prefix): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError: + raise ValueError( + f"Invalid check_columns - missing definition for column: {column.name}" + ) + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + engine.dispose() -def verify_schema( - target: TargetPostgres, - table_name: str, - check_columns: dict = None, -): - """Checks whether the schema of a database table matches the provided column definitions. - Args: - target: The target to obtain a database connection from. - table_name: The schema and table name of the table to check data for. - check_columns: A dictionary mapping column names to their definitions. Currently, - it is all about the `type` attribute which is compared. - """ - engine = create_engine(target) - schema = target.config["default_target_schema"] - with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( - table_name, meta, schema=schema, autoload_with=connection - ) - for column in table.c: - # Ignore `_sdc` columns for now. - if column.name.startswith("_sdc"): - continue - try: - column_type_expected = check_columns[column.name]["type"] - except KeyError: - raise ValueError( - f"Invalid check_columns - missing definition for column: {column.name}" - ) - if not isinstance(column.type, column_type_expected): - raise TypeError( - f"Column '{column.name}' (with type '{column.type}') " - f"does not match expected type: {column_type_expected}" - ) - engine.dispose() +@pytest.fixture +def helper(postgres_target) -> AssertionHelper: + return AssertionHelper( + target=postgres_target, metadata_column_prefix=METADATA_COLUMN_PREFIX + ) def test_sqlalchemy_url_config(postgres_config_no_ssl): @@ -287,11 +302,11 @@ def test_special_chars_in_attributes(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_optional_attributes(postgres_target): +def test_optional_attributes(postgres_target, helper): file_name = "optional_attributes.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "optional": "This is optional"} - verify_data(postgres_target, "test_optional_attributes", 4, "id", row) + helper.verify_data("test_optional_attributes", 4, "id", row) def test_schema_no_properties(postgres_target): @@ -311,7 +326,7 @@ def test_large_numeric_primary_key(postgres_target): # TODO test that data is correct -def test_schema_updates(postgres_target): +def test_schema_updates(postgres_target, helper): file_name = "schema_updates.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -323,16 +338,16 @@ def test_schema_updates(postgres_target): "a5": None, "a6": None, } - verify_data(postgres_target, "test_schema_updates", 6, "id", row) + helper.verify_data("test_schema_updates", 6, "id", row) -def test_multiple_state_messages(postgres_target): +def test_multiple_state_messages(postgres_target, helper): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_multiple_state_messages_a", 6, "id", row) + helper.verify_data("test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - verify_data(postgres_target, "test_multiple_state_messages_b", 6, "id", row) + helper.verify_data("test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -349,7 +364,7 @@ def test_multiple_schema_messages(postgres_target, caplog): assert "Schema has changed for stream" not in caplog.text -def test_relational_data(postgres_target): +def test_relational_data(postgres_target, helper): file_name = "user_location_data.singer" singer_file_to_target(file_name, postgres_target) @@ -406,12 +421,12 @@ def test_relational_data(postgres_target): }, ] - verify_data(postgres_target, "test_users", 8, "id", users) - verify_data(postgres_target, "test_locations", 5, "id", locations) - verify_data(postgres_target, "test_user_in_location", 5, "id", user_in_location) + helper.verify_data("test_users", 8, "id", users) + helper.verify_data("test_locations", 5, "id", locations) + helper.verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(postgres_target): +def test_no_primary_keys(postgres_target, helper): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(postgres_target) table_name = "test_no_pk" @@ -430,7 +445,7 @@ def test_no_primary_keys(postgres_target): file_name = f"{table_name}_append.singer" singer_file_to_target(file_name, postgres_target) - verify_data(postgres_target, table_name, 16) + helper.verify_data(table_name, 16) def test_no_type(postgres_target): @@ -438,20 +453,19 @@ def test_no_type(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_duplicate_records(postgres_target): +def test_duplicate_records(postgres_target, helper): file_name = "duplicate_records.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "metric": 100} - verify_data(postgres_target, "test_duplicate_records", 2, "id", row) + helper.verify_data("test_duplicate_records", 2, "id", row) -def test_array_boolean(postgres_target): +def test_array_boolean(postgres_target, helper): file_name = "array_boolean.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [True, False]} - verify_data(postgres_target, "array_boolean", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_boolean", 3, "id", row) + helper.verify_schema( "array_boolean", check_columns={ "id": {"type": BIGINT}, @@ -460,13 +474,32 @@ def test_array_boolean(postgres_target): ) -def test_array_number(postgres_target): +def test_array_float_vector(postgres_target, helper): + pgvector_sa = pytest.importorskip("pgvector.sqlalchemy") + + file_name = "array_float_vector.singer" + singer_file_to_target(file_name, postgres_target) + row = { + "id": 1, + "value": "[1.1,2.1,1.1,1.3]", + } + helper.verify_data("array_float_vector", 3, "id", row) + + helper.verify_schema( + "array_float_vector", + check_columns={ + "id": {"type": BIGINT}, + "value": {"type": pgvector_sa.Vector}, + }, + ) + + +def test_array_number(postgres_target, helper): file_name = "array_number.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} - verify_data(postgres_target, "array_number", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_number", 3, "id", row) + helper.verify_schema( "array_number", check_columns={ "id": {"type": BIGINT}, @@ -475,13 +508,12 @@ def test_array_number(postgres_target): ) -def test_array_string(postgres_target): +def test_array_string(postgres_target, helper): file_name = "array_string.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["apple", "orange", "pear"]} - verify_data(postgres_target, "array_string", 4, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_string", 4, "id", row) + helper.verify_schema( "array_string", check_columns={ "id": {"type": BIGINT}, @@ -490,13 +522,12 @@ def test_array_string(postgres_target): ) -def test_array_timestamp(postgres_target): +def test_array_timestamp(postgres_target, helper): file_name = "array_timestamp.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} - verify_data(postgres_target, "array_timestamp", 3, "id", row) - verify_schema( - postgres_target, + helper.verify_data("array_timestamp", 3, "id", row) + helper.verify_schema( "array_timestamp", check_columns={ "id": {"type": BIGINT}, @@ -505,7 +536,7 @@ def test_array_timestamp(postgres_target): ) -def test_object_mixed(postgres_target): +def test_object_mixed(postgres_target, helper): file_name = "object_mixed.singer" singer_file_to_target(file_name, postgres_target) row = { @@ -522,9 +553,8 @@ def test_object_mixed(postgres_target): "nested_object": {"foo": "bar"}, }, } - verify_data(postgres_target, "object_mixed", 1, "id", row) - verify_schema( - postgres_target, + helper.verify_data("object_mixed", 1, "id", row) + helper.verify_schema( "object_mixed", check_columns={ "id": {"type": BIGINT}, @@ -533,7 +563,7 @@ def test_object_mixed(postgres_target): ) -def test_encoded_string_data(postgres_target): +def test_encoded_string_data(postgres_target, helper): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -546,11 +576,11 @@ def test_encoded_string_data(postgres_target): file_name = "encoded_strings.singer" singer_file_to_target(file_name, postgres_target) row = {"id": 1, "info": "simple string 2837"} - verify_data(postgres_target, "test_strings", 11, "id", row) + helper.verify_data("test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - verify_data(postgres_target, "test_strings_in_objects", 11, "id", row) + helper.verify_data("test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - verify_data(postgres_target, "test_strings_in_arrays", 6, "id", row) + helper.verify_data("test_strings_in_arrays", 6, "id", row) def test_tap_appl(postgres_target): @@ -574,14 +604,13 @@ def test_large_int(postgres_target): singer_file_to_target(file_name, postgres_target) -def test_anyof(postgres_target): +def test_anyof(postgres_target, helper): """Test that anyOf is handled correctly""" table_name = "commits" file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) - verify_schema( - postgres_target, + helper.verify_schema( table_name, check_columns={ # {"type":"string"} @@ -690,7 +719,7 @@ def test_activate_version_soft_delete(postgres_target): result = connection.execute( sqlalchemy.text( - f"SELECT * FROM {full_table_name} where _sdc_deleted_at is NOT NULL" + f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) assert result.rowcount == 2 diff --git a/tox.ini b/tox.ini index 0c287e8..85c03b5 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,7 @@ isolated_build = true allowlist_externals = poetry commands = - poetry install -v + poetry install --all-extras -v poetry run pytest poetry run black --check target_postgres/ poetry run flake8 target_postgres @@ -21,14 +21,14 @@ commands = # To execute, run `tox -e pytest` envlist = py37, py38, py39 commands = - poetry install -v + poetry install --all-extras -v poetry run pytest [testenv:format] # Attempt to auto-resolve lint errors before they are raised. # To execute, run `tox -e format` commands = - poetry install -v + poetry install --all-extras -v poetry run black target_postgres/ poetry run isort target_postgres @@ -36,7 +36,7 @@ commands = # Raise an error if lint and style standards are not met. # To execute, run `tox -e lint` commands = - poetry install -v + poetry install --all-extras -v poetry run black --check --diff target_postgres/ poetry run isort --check target_postgres poetry run flake8 target_postgres