Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Apply non-functional refactoring and fix typos #281

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ select = [
"RET", # flake8-return
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
"ERA", # eradicate
"PGH", # pygrep-hooks
"PL", # Pylint
"PERF", # Perflint
"RUF", # ruff
Expand Down
9 changes: 6 additions & 3 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,12 @@ def copy_table_structure(
raise RuntimeError("Table already exists")

columns = [column._copy() for column in from_table.columns]

if as_temp_table:
new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"])
new_table.create(bind=connection)
return new_table

new_table = sa.Table(table_name, meta, *columns)
new_table.create(bind=connection)
return new_table
Expand All @@ -200,7 +202,7 @@ def drop_table(self, table: sa.Table, connection: sa.engine.Connection):
"""Drop table data."""
table.drop(bind=connection)

def clone_table(
def clone_table( # noqa: PLR0913
self, new_table_name, table, metadata, connection, temp_table
) -> sa.Table:
"""Clone a table."""
Expand Down Expand Up @@ -321,6 +323,7 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)

return TEXT() if isinstance(individual_type, VARCHAR) else individual_type

@staticmethod
Expand Down Expand Up @@ -412,7 +415,7 @@ def create_empty_table( # type: ignore[override] # noqa: PLR0913
new_table.create(bind=connection)
return new_table

def prepare_column(
def prepare_column( # noqa: PLR0913
self,
full_table_name: str | FullyQualifiedName,
column_name: str,
Expand Down Expand Up @@ -460,7 +463,7 @@ def prepare_column(
column_object=column_object,
)

def _create_empty_column( # type: ignore[override]
def _create_empty_column( # type: ignore[override] # noqa: PLR0913
self,
schema_name: str,
table_name: str,
Expand Down
8 changes: 4 additions & 4 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ def process_batch(self, context: dict) -> None:

def generate_temp_table_name(self):
"""Uuid temp table name."""
# sa.exc.IdentifierError: Identifier
# sa.exc.IdentifierError: Identifier # noqa: ERA001
# 'temp_test_optional_attributes_388470e9_fbd0_47b7_a52f_d32a2ee3f5f6'
# exceeds maximum length of 63 characters
# Is hit if we have a long table name, there is no limit on Temporary tables
# in postgres, used a guid just in case we are using the same session
return f"{str(uuid.uuid4()).replace('-', '_')}"

def bulk_insert_records( # type: ignore[override]
def bulk_insert_records( # type: ignore[override] # noqa: PLR0913
self,
table: sa.Table,
schema: dict,
Expand Down Expand Up @@ -176,7 +176,7 @@ def bulk_insert_records( # type: ignore[override]
connection.execute(insert, data_to_insert)
return True

def upsert(
def upsert( # noqa: PLR0913
self,
from_table: sa.Table,
to_table: sa.Table,
Expand Down Expand Up @@ -293,7 +293,7 @@ def schema_name(self) -> str | None:
Returns:
The target schema name.
"""
# Look for a default_target_scheme in the configuration fle
# Look for a default_target_scheme in the configuration file
default_target_schema: str = self.config.get("default_target_schema", None)
parts = self.stream_name.split("-")

Expand Down
2 changes: 1 addition & 1 deletion target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def __init__(
th.BooleanType,
default=False,
description=(
"When activate version is sent from a tap this specefies "
"When activate version is sent from a tap this specifies "
+ "if we should delete the records that don't match, or mark "
+ "them with a date in the `_sdc_deleted_at` column. This config "
+ "option is ignored if `activate_version` is set to false."
Expand Down
87 changes: 38 additions & 49 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ def singer_file_to_target(file_name, target) -> None:


def remove_metadata_columns(row: dict) -> dict:
new_row = {}
for column in row.keys():
if not column.startswith("_sdc"):
new_row[column] = row[column]
return new_row
return {column: row[column] for column in row if not column.startswith("_sdc")}


def verify_data(
Expand All @@ -106,43 +102,43 @@ def verify_data(
engine = create_engine(target)
full_table_name = f"{target.config['default_target_schema']}.{table_name}"
with engine.connect() as connection:
if primary_key is not None and check_data is not None:
if isinstance(check_data, dict):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = remove_metadata_columns(result.first()._asdict())
assert result_dict == check_data
elif isinstance(check_data, list):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = [
remove_metadata_columns(row._asdict()) for row in result.all()
]

# bytea columns are returned as memoryview objects
# we need to convert them to bytes to allow comparison with check_data
for row in result_dict:
for col in row:
if isinstance(row[col], memoryview):
row[col] = bytes(row[col])

assert result_dict == check_data
else:
raise ValueError("Invalid check_data - not dict or list of dicts")
else:
if primary_key is None or check_data is None:
result = connection.execute(
sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}")
)
assert result.first()[0] == number_of_rows

elif isinstance(check_data, dict):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = remove_metadata_columns(result.first()._asdict())
assert result_dict == check_data
elif isinstance(check_data, list):
result = connection.execute(
sqlalchemy.text(
f"SELECT * FROM {full_table_name} ORDER BY {primary_key}"
)
)
assert result.rowcount == number_of_rows
result_dict = [
remove_metadata_columns(row) for row in result.mappings().all()
]

# bytea columns are returned as memoryview objects
# we need to convert them to bytes to allow comparison with check_data
for row in result_dict:
for col in row:
if isinstance(row[col], memoryview):
row[col] = bytes(row[col])

assert result_dict == check_data
else:
raise ValueError("Invalid check_data - not dict or list of dicts")


def test_sqlalchemy_url_config(postgres_config_no_ssl):
"""Be sure that passing a sqlalchemy_url works
Expand Down Expand Up @@ -439,7 +435,7 @@ def test_encoded_string_data(postgres_target):
https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character.
chr(0) is disallowed because text data types cannot store that character.

Note you will recieve a ValueError: A string literal cannot contain NUL (0x00) characters. Which seems like a reasonable error.
Note you will receive a ValueError: A string literal cannot contain NUL (0x00) characters. Which seems like a reasonable error.
See issue https://github.com/MeltanoLabs/target-postgres/issues/60 for more details.
"""

Expand Down Expand Up @@ -499,18 +495,11 @@ def test_anyof(postgres_target):

# Any of nullable array of strings or single string.
# {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]}
if column.name == "parent_ids":
assert isinstance(column.type, ARRAY)

# Any of nullable string.
# {"anyOf":[{"type":"string"},{"type":"null"}]}
if column.name == "commit_message":
if column.name in ["commit_message", "legacy_id"]:
assert isinstance(column.type, TEXT)

# Any of nullable string or integer.
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
if column.name == "legacy_id":
assert isinstance(column.type, TEXT)
elif column.name == "parent_ids":
assert isinstance(column.type, ARRAY)


def test_new_array_column(postgres_target):
Expand Down Expand Up @@ -747,7 +736,7 @@ def test_activate_version_deletes_data_properly(postgres_target):
def test_reserved_keywords(postgres_target):
"""Target should work regardless of column names

Postgres has a number of resereved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
Postgres has a number of reserved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
"""
file_name = "reserved_keywords.singer"
singer_file_to_target(file_name, postgres_target)
Expand Down
Loading