-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: Add test cases for arrays and objects, and introduce verify_schema
#250
base: main
Are you sure you want to change the base?
Changes from all commits
fe92e45
c0d4a89
8b3ea4f
a9d1796
723e1fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ smoke-test | |
.meltano/** | ||
.tox/** | ||
.secrets/** | ||
.idea | ||
.vscode/** | ||
output/** | ||
.env | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} | ||
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} | ||
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} | ||
{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} | ||
{"type": "STATE", "value": {"array_boolean": 3}} |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} | ||
{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} | ||
{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} | ||
{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} | ||
{"type": "STATE", "value": {"array_number": 3}} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} | ||
{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} | ||
{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} | ||
{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} | ||
{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} | ||
{"type": "STATE", "value": {"array_string": 4}} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} | ||
{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} | ||
{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} | ||
{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} | ||
{"type": "STATE", "value": {"array_timestamp": 3}} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} | ||
{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} | ||
{"type": "STATE", "value": {"object_mixed": 1}} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,8 +11,8 @@ | |
import sqlalchemy | ||
from singer_sdk.exceptions import MissingKeyPropertiesError | ||
from singer_sdk.testing import get_target_test_class, sync_end_to_end | ||
from sqlalchemy.dialects.postgresql import ARRAY | ||
from sqlalchemy.types import TEXT, TIMESTAMP | ||
from sqlalchemy.dialects.postgresql import ARRAY, JSONB | ||
from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP | ||
|
||
from target_postgres.connector import PostgresConnector | ||
from target_postgres.target import TargetPostgres | ||
|
@@ -94,7 +94,7 @@ def verify_data( | |
|
||
Args: | ||
target: The target to obtain a database connection from. | ||
full_table_name: The schema and table name of the table to check data for. | ||
table_name: The schema and table name of the table to check data for. | ||
primary_key: The primary key of the table. | ||
number_of_rows: The expected number of rows that should be in the table. | ||
check_data: A dictionary representing the full contents of the first row in the | ||
|
@@ -132,6 +132,45 @@ def verify_data( | |
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") | ||
) | ||
assert result.first()[0] == number_of_rows | ||
engine.dispose() | ||
|
||
|
||
def verify_schema( | ||
target: TargetPostgres, | ||
table_name: str, | ||
check_columns: dict = None, | ||
): | ||
"""Checks whether the schema of a database table matches the provided column definitions. | ||
|
||
Args: | ||
target: The target to obtain a database connection from. | ||
table_name: The schema and table name of the table to check data for. | ||
check_columns: A dictionary mapping column names to their definitions. Currently, | ||
it is all about the `type` attribute which is compared. | ||
""" | ||
engine = create_engine(target) | ||
schema = target.config["default_target_schema"] | ||
with engine.connect() as connection: | ||
meta = sqlalchemy.MetaData() | ||
table = sqlalchemy.Table( | ||
table_name, meta, schema=schema, autoload_with=connection | ||
) | ||
for column in table.c: | ||
# Ignore `_sdc` columns for now. | ||
if column.name.startswith("_sdc"): | ||
continue | ||
try: | ||
column_type_expected = check_columns[column.name]["type"] | ||
except KeyError: | ||
raise ValueError( | ||
f"Invalid check_columns - missing definition for column: {column.name}" | ||
) | ||
if not isinstance(column.type, column_type_expected): | ||
raise TypeError( | ||
f"Column '{column.name}' (with type '{column.type}') " | ||
f"does not match expected type: {column_type_expected}" | ||
) | ||
engine.dispose() | ||
|
||
|
||
def test_sqlalchemy_url_config(postgres_config_no_ssl): | ||
|
@@ -406,11 +445,92 @@ def test_duplicate_records(postgres_target): | |
verify_data(postgres_target, "test_duplicate_records", 2, "id", row) | ||
|
||
|
||
def test_array_data(postgres_target): | ||
file_name = "array_data.singer" | ||
def test_array_boolean(postgres_target): | ||
file_name = "array_boolean.singer" | ||
singer_file_to_target(file_name, postgres_target) | ||
row = {"id": 1, "value": [True, False]} | ||
verify_data(postgres_target, "array_boolean", 3, "id", row) | ||
verify_schema( | ||
postgres_target, | ||
"array_boolean", | ||
check_columns={ | ||
"id": {"type": BIGINT}, | ||
"value": {"type": ARRAY}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arrays have inner data types such as ARRAY[BOOLEAN] (not sure if that's the right syntax). We should additionally check for that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point. Let me check how We are not looking at multiple dimensions here, or eventual nestedness, right? It would just be about »verifying the type of the first element within the array against its specification, when the array is not empty«, correct? |
||
}, | ||
) | ||
|
||
|
||
def test_array_number(postgres_target): | ||
file_name = "array_number.singer" | ||
singer_file_to_target(file_name, postgres_target) | ||
row = {"id": 1, "value": [Decimal("42.42"), Decimal("84.84"), 23]} | ||
verify_data(postgres_target, "array_number", 3, "id", row) | ||
verify_schema( | ||
postgres_target, | ||
"array_number", | ||
check_columns={ | ||
"id": {"type": BIGINT}, | ||
"value": {"type": ARRAY}, | ||
}, | ||
) | ||
|
||
|
||
def test_array_string(postgres_target): | ||
file_name = "array_string.singer" | ||
singer_file_to_target(file_name, postgres_target) | ||
row = {"id": 1, "value": ["apple", "orange", "pear"]} | ||
verify_data(postgres_target, "array_string", 4, "id", row) | ||
verify_schema( | ||
postgres_target, | ||
"array_string", | ||
check_columns={ | ||
"id": {"type": BIGINT}, | ||
"value": {"type": ARRAY}, | ||
}, | ||
) | ||
|
||
|
||
def test_array_timestamp(postgres_target): | ||
file_name = "array_timestamp.singer" | ||
singer_file_to_target(file_name, postgres_target) | ||
row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} | ||
verify_data(postgres_target, "array_timestamp", 3, "id", row) | ||
verify_schema( | ||
postgres_target, | ||
"array_timestamp", | ||
check_columns={ | ||
"id": {"type": BIGINT}, | ||
"value": {"type": ARRAY}, | ||
}, | ||
) | ||
|
||
|
||
def test_object_mixed(postgres_target): | ||
file_name = "object_mixed.singer" | ||
singer_file_to_target(file_name, postgres_target) | ||
row = {"id": 1, "fruits": ["apple", "orange", "pear"]} | ||
verify_data(postgres_target, "test_carts", 4, "id", row) | ||
row = { | ||
"id": 1, | ||
"value": { | ||
"string": "foo", | ||
"integer": 42, | ||
"float": Decimal("42.42"), | ||
"timestamp": "2023-12-13T01:15:02", | ||
"array_boolean": [True, False], | ||
"array_float": [Decimal("42.42"), Decimal("84.84")], | ||
"array_integer": [42, 84], | ||
"array_string": ["foo", "bar"], | ||
"nested_object": {"foo": "bar"}, | ||
}, | ||
} | ||
verify_data(postgres_target, "object_mixed", 1, "id", row) | ||
verify_schema( | ||
postgres_target, | ||
"object_mixed", | ||
check_columns={ | ||
"id": {"type": BIGINT}, | ||
"value": {"type": JSONB}, | ||
}, | ||
) | ||
|
||
|
||
def test_encoded_string_data(postgres_target): | ||
|
@@ -456,41 +576,32 @@ def test_large_int(postgres_target): | |
|
||
def test_anyof(postgres_target): | ||
"""Test that anyOf is handled correctly""" | ||
engine = create_engine(postgres_target) | ||
table_name = "commits" | ||
file_name = f"{table_name}.singer" | ||
schema = postgres_target.config["default_target_schema"] | ||
singer_file_to_target(file_name, postgres_target) | ||
with engine.connect() as connection: | ||
meta = sqlalchemy.MetaData() | ||
table = sqlalchemy.Table( | ||
"commits", meta, schema=schema, autoload_with=connection | ||
) | ||
for column in table.c: | ||
# {"type":"string"} | ||
if column.name == "id": | ||
assert isinstance(column.type, TEXT) | ||
|
||
verify_schema( | ||
postgres_target, | ||
table_name, | ||
check_columns={ | ||
# {"type":"string"} | ||
"id": {"type": TEXT}, | ||
# Any of nullable date-time. | ||
# Note that postgres timestamp is equivalent to jsonschema date-time. | ||
# {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} | ||
if column.name in {"authored_date", "committed_date"}: | ||
assert isinstance(column.type, TIMESTAMP) | ||
|
||
"authored_date": {"type": TIMESTAMP}, | ||
"committed_date": {"type": TIMESTAMP}, | ||
# Any of nullable array of strings or single string. | ||
# {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} | ||
if column.name == "parent_ids": | ||
assert isinstance(column.type, ARRAY) | ||
|
||
"parent_ids": {"type": ARRAY}, | ||
# Any of nullable string. | ||
# {"anyOf":[{"type":"string"},{"type":"null"}]} | ||
if column.name == "commit_message": | ||
assert isinstance(column.type, TEXT) | ||
|
||
"commit_message": {"type": TEXT}, | ||
# Any of nullable string or integer. | ||
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} | ||
if column.name == "legacy_id": | ||
assert isinstance(column.type, TEXT) | ||
"legacy_id": {"type": TEXT}, | ||
}, | ||
) | ||
|
||
|
||
def test_new_array_column(postgres_target): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spot relates to an issue we observed after adding more test cases. Other than any other related changes to the test suite, this one affects the connector implementation. Without that change, there seems to be a flaw which apparently qualifies as a resource leak, manifesting in connection pool overflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would want to understand this fully before merging. What is the root cause of this problem? Are we confident that this is all that's required to close the leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Sebastian. Thanks for your review. I can only say that this spot has been found out by educated guessing and a bit more trial-and-error, so it was kind of empirically determined, including the solution.
Apologies that I can't come up with a better reasoning/rationale, but I can offer to submit another repro PR without all the other changes here, in order to better demonstrate the issue.