diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b842c4e7..c352948c 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.5.0 +current_version = 2.6.0 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed42c7a7..8d311c4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# tap-mssql 2.6.0 2024-09-02 +* Resolving issue LOG_BASED tables with special characters. Escaping the object + names with double quotes. Issue #74 +* Providing more flexibility with database connections to support newer + SQL Server instances like PDW via option conn_properties config. Issue #28 +* Provides the ability to dump TDS logs via new enable_tds_logging config. + # tap-mssql 2.5.0 2024-08-26 * removing tox-poetry-install - Not well supported. diff --git a/README.md b/README.md index 67d5f9da..2ec4d4de 100755 --- a/README.md +++ b/README.md @@ -134,6 +134,37 @@ The characterset for the database / source system. The default is `utf8`, howeve } ``` +Optional: + +The `"conn_properties"` allows specific tweaking of database settings via SQL set statements to send to the database instance upon connection establishment. Can be a string or another kind of iterable of strings. + +The default values set if this [settings](https://pymssql.readthedocs.io/en/stable/ref/_mssql.html) is not defined are: + +```json +"SET ARITHABORT ON; SET CONCAT_NULL_YIELDS_NULL ON; SET ANSI_NULLS ON; SET ANSI_NULL_DFLT_ON ON; SET ANSI_PADDING ON; SET ANSI_WARNINGS ON; SET ANSI_NULL_DFLT_ON ON; SET CURSOR_CLOSE_ON_COMMIT ON; SET QUOTED_IDENTIFIER ON; SET TEXTSIZE 2147483647;" +``` + +Example: override the built-in session properties supplied by pymssql by default, because one of the default settings (CURSOR_CLOSE_ON_COMMIT) is not available on PDW + + +```json +{ + "conn_properties": "SET ARITHABORT ON; SET CONCAT_NULL_YIELDS_NULL ON; SET ANSI_NULLS ON; SET ANSI_NULL_DFLT_ON ON; SET ANSI_PADDING ON; SET ANSI_WARNINGS ON; SET ANSI_NULL_DFLT_ON ON; SET QUOTED_IDENTIFIER ON; SET TEXTSIZE 2147483647;" +} +``` + +Optional: + +The `"enable_tds_logging"` When set it will dump out the underlying TDS driver logs. Useful for diagnosing issues if you are having connection issues to SQL Server databases. WARNING! this +does dump a lot of information and may log secure data, should be only used in Development +environments. + +```json +{ + "enable_tds_logging": true +} +``` + These are the same basic configuration properties used by the mssql command-line client (`mssql`). diff --git a/poetry.lock b/poetry.lock index 477883c2..785653fe 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "attrs" @@ -96,13 +96,13 @@ files = [ [[package]] name = "certifi" -version = "2024.7.4" +version = "2024.8.30" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" files = [ - {file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"}, - {file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"}, + {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, + {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, ] [[package]] @@ -1454,18 +1454,22 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [[package]] name = "zipp" -version = "3.20.0" +version = "3.20.1" description = "Backport of pathlib-compatible object wrapper for zip files" optional = false python-versions = ">=3.8" files = [ - {file = "zipp-3.20.0-py3-none-any.whl", hash = "sha256:58da6168be89f0be59beb194da1250516fdaa062ccebd30127ac65d30045e10d"}, - {file = "zipp-3.20.0.tar.gz", hash = "sha256:0145e43d89664cfe1a2e533adc75adafed82fe2da404b4bbb6b026c0157bdb31"}, + {file = "zipp-3.20.1-py3-none-any.whl", hash = "sha256:9960cd8967c8f85a56f920d5d507274e74f9ff813a0ab8889a5b5be2daf44064"}, + {file = "zipp-3.20.1.tar.gz", hash = "sha256:c22b14cc4763c5a5b04134207736c107db42e9d3ef2d9779d465f5f1bcba572b"}, ] [package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] +enabler = ["pytest-enabler (>=2.2)"] +test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] +type = ["pytest-mypy"] [metadata] lock-version = "2.0" diff --git a/pyproject.toml b/pyproject.toml index c72ae02f..282e24f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-mssql" -version = "2.5.0" +version = "2.6.0" description = "A pipelinewise compatible tap for connecting Microsoft SQL Server" authors = ["Rob Winters "] license = "GNU Affero" diff --git a/tap_mssql/__init__.py b/tap_mssql/__init__.py index a98cc34b..c972c437 100644 --- a/tap_mssql/__init__.py +++ b/tap_mssql/__init__.py @@ -1,795 +1,805 @@ -#!/usr/bin/env python3 - -import collections -import copy -import itertools -import logging - -import singer -import singer.metrics as metrics -import singer.schema -from singer import metadata, utils -from singer.catalog import Catalog, CatalogEntry -from singer.schema import Schema - -import tap_mssql.sync_strategies.common as common -import tap_mssql.sync_strategies.full_table as full_table -import tap_mssql.sync_strategies.incremental as incremental -import tap_mssql.sync_strategies.log_based as log_based -from tap_mssql.connection import MSSQLConnection, connect_with_backoff, ResultIterator - -ARRAYSIZE = 1 - -Column = collections.namedtuple( - "Column", - [ - "table_schema", - "table_name", - "column_name", - "data_type", - "character_maximum_length", - "numeric_precision", - "numeric_scale", - "is_primary_key", - ], -) - -REQUIRED_CONFIG_KEYS = ["host", "database"] - -LOGGER = singer.get_logger() -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - -STRING_TYPES = set( - [ - "char", - "enum", - "longtext", - "mediumtext", - "text", - "varchar", - "uniqueidentifier", - "nvarchar", - "nchar", - ] -) - -BYTES_FOR_INTEGER_TYPE = { - "tinyint": 1, - "smallint": 2, - "mediumint": 3, - "int": 4, - "bigint": 8, -} - -FLOAT_TYPES = set(["float", "double", "real"]) - -DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"]) - -DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"]) - -DATE_TYPES = set(["date"]) - -TIME_TYPES = set(["time"]) - -VARIANT_TYPES = set(["json"]) - - -def default_date_format(): - return False - -def default_singer_decimal(): - """ - singer_decimal can be enabled in the the config, which will use singer.decimal as a format and string as the type - use this for large/precise numbers - """ - return False - -def schema_for_column(c, config): - """Returns the Schema object for the given Column.""" - data_type = c.data_type.lower() - - inclusion = "available" - - use_date_data_type_format = config.get('use_date_datatype') or default_date_format() - use_singer_decimal = config.get('use_singer_decimal') or default_singer_decimal() - - if c.is_primary_key == 1: - inclusion = "automatic" - - result = Schema(inclusion=inclusion) - - if data_type == "bit": - result.type = ["null", "boolean"] - - elif data_type in BYTES_FOR_INTEGER_TYPE: - result.type = ["null", "integer"] - bits = BYTES_FOR_INTEGER_TYPE[data_type] * 8 - if data_type == 'tinyint': - result.minimum = 0 - result.maximum = 255 - else: - result.minimum = 0 - 2 ** (bits - 1) - result.maximum = 2 ** (bits - 1) - 1 - - elif data_type in FLOAT_TYPES: - if use_singer_decimal: - result.type = ["null","string"] - result.format = "singer.decimal" - else: - result.type = ["null", "number"] - result.multipleOf = 10 ** (0 - (c.numeric_scale or 17)) - - elif data_type in DECIMAL_TYPES: - if use_singer_decimal: - result.type = ["null","number","string"] - result.format = "singer.decimal" - result.additionalProperties = {"scale_precision": f"({c.character_maximum_length or c.numeric_precision},{c.numeric_scale})"} - else: - result.type = ["null", "number"] - result.multipleOf = 10 ** (0 - c.numeric_scale) - - elif data_type in STRING_TYPES: - result.type = ["null", "string"] - # When length is -1 it is a long column type - # https://docs.microsoft.com/en-us/sql/relational-databases/system-information-schema-views/columns-transact-sql?view=sql-server-ver15 - # -1 is not valid JSON schema - # https://json-schema.org/understanding-json-schema/reference/string.html#length - if c.character_maximum_length != -1: - result.maxLength = c.character_maximum_length - - elif data_type in DATETIME_TYPES: - result.additionalProperties = {"sql_data_type": data_type} - result.type = ["null", "string"] - result.format = "date-time" - - elif data_type in DATE_TYPES: - if use_date_data_type_format: - result.type = ["null", "string"] - result.format = "date" - else: - result.type = ["null", "string"] - result.format = "date-time" - - elif data_type in TIME_TYPES: - if use_date_data_type_format: - result.type = ["null", "string"] - result.format = "time" - else: - result.type = ["null", "string"] - result.format = "date-time" - - elif data_type in VARIANT_TYPES: - result.type = ["null", "object"] - - else: - result = Schema( - None, - inclusion="unsupported", - description="Unsupported column type", - ) - return result - - -def create_column_metadata(cols, config): - mdata = {} - mdata = metadata.write(mdata, (), "selected-by-default", False) - for c in cols: - schema = schema_for_column(c, config) - mdata = metadata.write( - mdata, - ("properties", c.column_name), - "selected-by-default", - schema.inclusion != "unsupported", - ) - mdata = metadata.write( - mdata, ("properties", c.column_name), "sql-datatype", c.data_type.lower() - ) - - return metadata.to_list(mdata) - - -def discover_catalog(mssql_conn, config): - """Returns a Catalog describing the structure of the database.""" - LOGGER.info("Preparing Catalog") - mssql_conn = MSSQLConnection(config) - filter_dbs_config = config.get("filter_dbs") - - if filter_dbs_config: - filter_dbs_clause = ",".join(["'{}'".format(db) for db in filter_dbs_config.split(",")]) - - table_schema_clause = "WHERE c.TABLE_SCHEMA IN ({})".format(filter_dbs_clause) - else: - table_schema_clause = """ - WHERE c.TABLE_SCHEMA NOT IN ( - 'information_schema', - 'INFORMATION_SCHEMA', - 'performance_schema', - 'sys' - )""" - - with connect_with_backoff(mssql_conn) as open_conn: - cur = open_conn.cursor() - LOGGER.info("Fetching tables") - cur.execute( - """SELECT TABLE_SCHEMA, - TABLE_NAME, - TABLE_TYPE - FROM INFORMATION_SCHEMA.TABLES c - {} - """.format( - table_schema_clause - ) - ) - table_info = {} - - for (db, table, table_type) in cur.fetchall(): - if db not in table_info: - table_info[db] = {} - - table_info[db][table] = {"row_count": None, "is_view": table_type == "VIEW"} - LOGGER.info("Tables fetched, fetching columns") - cur.execute( - """with constraint_columns as ( - select c.TABLE_SCHEMA - , c.TABLE_NAME - , c.COLUMN_NAME - - from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE c - - join INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc - on tc.TABLE_SCHEMA = c.TABLE_SCHEMA - and tc.TABLE_NAME = c.TABLE_NAME - and tc.CONSTRAINT_NAME = c.CONSTRAINT_NAME - and tc.CONSTRAINT_TYPE in ('PRIMARY KEY', 'UNIQUE')) - SELECT c.TABLE_SCHEMA, - c.TABLE_NAME, - c.COLUMN_NAME, - DATA_TYPE, - CHARACTER_MAXIMUM_LENGTH, - NUMERIC_PRECISION, - NUMERIC_SCALE, - case when cc.COLUMN_NAME is null then 0 else 1 end - FROM INFORMATION_SCHEMA.COLUMNS c - - left join constraint_columns cc - on cc.TABLE_NAME = c.TABLE_NAME - and cc.TABLE_SCHEMA = c.TABLE_SCHEMA - and cc.COLUMN_NAME = c.COLUMN_NAME - - {} - ORDER BY c.TABLE_SCHEMA, c.TABLE_NAME, c.ORDINAL_POSITION - """.format( - table_schema_clause - ) - ) - columns = [] - LOGGER.info(f"{ARRAYSIZE=}") - - for rec in ResultIterator(cur, ARRAYSIZE): - columns.append(Column(*rec)) - - LOGGER.info("Columns Fetched") - entries = [] - for (k, cols) in itertools.groupby(columns, lambda c: (c.table_schema, c.table_name)): - cols = list(cols) - (table_schema, table_name) = k - schema = Schema( - type="object", - properties={c.column_name: schema_for_column(c, config) for c in cols}, - ) - md = create_column_metadata(cols, config) - md_map = metadata.to_map(md) - - md_map = metadata.write(md_map, (), "database-name", table_schema) - - is_view = table_info[table_schema][table_name]["is_view"] - - if table_schema in table_info and table_name in table_info[table_schema]: - row_count = table_info[table_schema][table_name].get("row_count") - - if row_count is not None: - md_map = metadata.write(md_map, (), "row-count", row_count) - - md_map = metadata.write(md_map, (), "is-view", is_view) - - key_properties = [c.column_name for c in cols if c.is_primary_key == 1] - - md_map = metadata.write(md_map, (), "table-key-properties", key_properties) - - entry = CatalogEntry( - table=table_name, - stream=table_name, - metadata=metadata.to_list(md_map), - tap_stream_id=common.generate_tap_stream_id(table_schema, table_name), - schema=schema, - ) - - entries.append(entry) - LOGGER.info("Catalog ready") - return Catalog(entries) - - -def do_discover(mssql_conn, config): - discover_catalog(mssql_conn, config).dump() - - -def desired_columns(selected : list, table_schema): - """Return the set of column names we need to include in the SELECT. - - selected - set of column names marked as selected in the input catalog - table_schema - the most recently discovered Schema for the table - """ - all_columns = [column for column in table_schema.properties.keys()] - - available = [ - column for column, column_schema - in table_schema.properties.items() - if column_schema.inclusion == 'available' - ] - - automatic = [ - column for column, column_schema - in table_schema.properties.items() - if column_schema.inclusion == 'automatic' - ] - - unsupported = [ - column for column, column_schema - in table_schema.properties.items() - if column_schema.inclusion == 'unsupported' - ] - - unknown = [ - (column,column_schema.inclusion) - for column, column_schema - in table_schema.properties.items() - if column_schema.inclusion not in ['available', 'automatic', 'unsupported'] - ] - - if unknown: - raise Exception(f"Unknown inclusions: {unknown}") - - selected_but_unsupported = [c for c in selected if c in unsupported] - if selected_but_unsupported: - LOGGER.warning( - "Columns %s were selected but are not supported. Skipping them.", - selected_but_unsupported, - ) - - selected_but_nonexistent = [c for c in selected if c not in all_columns] - if selected_but_nonexistent: - LOGGER.warning("Columns %s were selected but do not exist.", selected_but_nonexistent) - - not_selected_but_automatic = [c for c in automatic if c not in selected] - if not_selected_but_automatic: - LOGGER.warning( - "Columns %s are primary keys but were not selected. Adding them.", - not_selected_but_automatic, - ) - - desired = [c for c in all_columns if (c in available and c in selected) or c in automatic] - - return list(dict.fromkeys(desired)) - - -def is_valid_currently_syncing_stream(selected_stream, state): - stream_metadata = metadata.to_map(selected_stream.metadata) - replication_method = stream_metadata.get((), {}).get("replication-method") - - if replication_method != "LOG_BASED": - return True - - if replication_method == "LOG_BASED" and cdc_stream_requires_historical(selected_stream, state): - return True - - return False - - -def cdc_stream_requires_historical(catalog_entry, state): - - current_lsn = singer.get_bookmark(state, catalog_entry.tap_stream_id, "lsn") - - max_lsn_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, "max_lsn_values") - - last_lsn_fetched = singer.get_bookmark(state, catalog_entry.tap_stream_id, "last_lsn_fetched") - - if (current_lsn) and (not max_lsn_values and not last_lsn_fetched): - return False - - return True - - -def resolve_catalog(discovered_catalog, streams_to_sync): - result = Catalog(streams=[]) - - # Iterate over the streams in the input catalog and match each one up - # with the same stream in the discovered catalog. - for catalog_entry in streams_to_sync: - catalog_metadata = metadata.to_map(catalog_entry.metadata) - replication_key = catalog_metadata.get((), {}).get("replication-key") - - discovered_table = discovered_catalog.get_stream(catalog_entry.tap_stream_id) - database_name = common.get_database_name(catalog_entry) - - if not discovered_table: - LOGGER.warning( - "Database %s table %s was selected but does not exist", - database_name, - catalog_entry.table, - ) - continue - - selected = [ - k - for k in discovered_table.schema.properties.keys() - if common.property_is_selected(catalog_entry, k) or k == replication_key - ] - - # These are the columns we need to select - columns = desired_columns(selected, discovered_table.schema) - result.streams.append( - CatalogEntry( - tap_stream_id=catalog_entry.tap_stream_id, - metadata=catalog_entry.metadata, - stream=catalog_entry.tap_stream_id, - table=catalog_entry.table, - schema=Schema( - type="object", - properties={col: discovered_table.schema.properties[col] for col in columns}, - ), - ) - ) - - return result - - -def get_non_cdc_streams(mssql_conn, catalog, config, state): - """Method to discover all connections which will not use CDC - - Returns the Catalog of data we're going to sync for all SELECT-based streams - (i.e. INCREMENTAL, FULL_TABLE, and LOG_BASED that require a historical sync). - LOG_BASED streams that require a historical sync are inferred from lack - of any state. - - Using the Catalog provided from the input file, this function will return a - Catalog representing exactly which tables and columns that will be emitted - by SELECT-based syncs. This is achieved by comparing the input Catalog to a - freshly discovered Catalog to determine the resulting Catalog. - - The resulting Catalog will include the following any streams marked as - "selected" that currently exist in the database. Columns marked as "selected" - and those labled "automatic" (e.g. primary keys and replication keys) will be - included. Streams will be prioritized in the following order: - 1. currently_syncing if it is SELECT-based - 2. any streams that do not have state - 3. any streams that do not have a replication method of LOG_BASED - """ - mssql_conn = MSSQLConnection(config) - discovered = discover_catalog(mssql_conn, config) - - # Filter catalog to include only selected streams - selected_streams = list(filter(lambda s: common.stream_is_selected(s), catalog.streams)) - streams_with_state = [] - streams_without_state = [] - - for stream in selected_streams: - stream_metadata = metadata.to_map(stream.metadata) - - for k, v in stream_metadata.get((), {}).items(): - LOGGER.info(f"{k}: {v}") - replication_method = stream_metadata.get((), {}).get("replication-method") - stream_state = state.get("bookmarks", {}).get(stream.tap_stream_id) - - if not stream_state: - if replication_method == "LOG_BASED": - LOGGER.info( - "LOG_BASED stream %s requires full historical sync", stream.tap_stream_id - ) - - streams_without_state.append(stream) - elif ( - stream_state - and replication_method == "LOG_BASED" - and cdc_stream_requires_historical(stream, state) - ): - is_view = common.get_is_view(stream) - - if is_view: - raise Exception( - "Unable to replicate stream({}) with cdc because it is a view.".format( - stream.stream - ) - ) - - LOGGER.info("LOG_BASED stream %s will resume its historical sync", stream.tap_stream_id) - - streams_with_state.append(stream) - elif stream_state and replication_method != "LOG_BASED": - streams_with_state.append(stream) - - # If the state says we were in the middle of processing a stream, skip - # to that stream. Then process streams without prior state and finally - # move onto streams with state (i.e. have been synced in the past) - currently_syncing = singer.get_currently_syncing(state) - - # prioritize streams that have not been processed - ordered_streams = streams_without_state + streams_with_state - - if currently_syncing: - currently_syncing_stream = list( - filter( - lambda s: s.tap_stream_id == currently_syncing - and is_valid_currently_syncing_stream(s, state), - ordered_streams, - ) - ) - - non_currently_syncing_streams = list( - filter(lambda s: s.tap_stream_id != currently_syncing, ordered_streams) - ) - - streams_to_sync = currently_syncing_stream + non_currently_syncing_streams - else: - # prioritize streams that have not been processed - streams_to_sync = ordered_streams - - return resolve_catalog(discovered, streams_to_sync) - - -def get_cdc_streams(mssql_conn, catalog, config, state): - discovered = discover_catalog(mssql_conn, config) - - selected_streams = list(filter(lambda s: common.stream_is_selected(s), catalog.streams)) - cdc_streams = [] - - for stream in selected_streams: - stream_metadata = metadata.to_map(stream.metadata) - replication_method = stream_metadata.get((), {}).get("replication-method") - - if replication_method == "LOG_BASED" and not cdc_stream_requires_historical(stream, state): - cdc_streams.append(stream) - - return resolve_catalog(discovered, cdc_streams) - - -def write_schema_message(catalog_entry, bookmark_properties=[]): - key_properties = common.get_key_properties(catalog_entry) - - singer.write_message( - singer.SchemaMessage( - stream=catalog_entry.stream, - schema=catalog_entry.schema.to_dict(), - key_properties=key_properties, - bookmark_properties=bookmark_properties, - ) - ) - - -def do_sync_incremental(mssql_conn, config, catalog_entry, state, columns): - mssql_conn = MSSQLConnection(config) - md_map = metadata.to_map(catalog_entry.metadata) - replication_key = md_map.get((), {}).get("replication-key") - write_schema_message(catalog_entry=catalog_entry, bookmark_properties=[replication_key]) - LOGGER.info("Schema written") - incremental.sync_table(mssql_conn, config, catalog_entry, state, columns) - - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - -def do_sync_historical_log(mssql_conn, config, catalog_entry, state, columns): - mssql_conn = MSSQLConnection(config) - - # Add additional keys to the schema - log_based.add_synthetic_keys_to_schema(catalog_entry) - - write_schema_message(catalog_entry) - - stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) - - # full_table.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version) - log_based.sync_historic_table(mssql_conn, config, catalog_entry, state, columns, stream_version) - - # Prefer initial_full_table_complete going forward - singer.clear_bookmark(state, catalog_entry.tap_stream_id, "version") - - state = singer.write_bookmark( - state, catalog_entry.tap_stream_id, "initial_full_table_complete", True - ) - - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - -def do_sync_full_table(mssql_conn, config, catalog_entry, state, columns): - mssql_conn = MSSQLConnection(config) - - write_schema_message(catalog_entry) - - stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) - - full_table.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version) - - # Prefer initial_full_table_complete going forward - singer.clear_bookmark(state, catalog_entry.tap_stream_id, "version") - - state = singer.write_bookmark( - state, catalog_entry.tap_stream_id, "initial_full_table_complete", True - ) - - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - -def do_sync_log_based(mssql_conn, config, catalog_entry, state, columns): - mssql_conn = MSSQLConnection(config) - md_map = metadata.to_map(catalog_entry.metadata) - stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) - replication_key = md_map.get((), {}).get("replication-key") - # Add additional keys to the schema - log_based.add_synthetic_keys_to_schema(catalog_entry) - - write_schema_message(catalog_entry=catalog_entry, bookmark_properties=[replication_key]) - LOGGER.info("Schema written") - stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) - log_based.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version) - - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - -def sync_non_cdc_streams(mssql_conn, non_cdc_catalog, config, state): - mssql_conn = MSSQLConnection(config) - - for catalog_entry in non_cdc_catalog.streams: - columns = list(catalog_entry.schema.properties.keys()) - - if not columns: - LOGGER.warning( - "There are no columns selected for stream %s, skipping it.", catalog_entry.stream - ) - continue - - state = singer.set_currently_syncing(state, catalog_entry.tap_stream_id) - - # Emit a state message to indicate that we've started this stream - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - md_map = metadata.to_map(catalog_entry.metadata) - replication_method = md_map.get((), {}).get("replication-method") - replication_key = md_map.get((), {}).get("replication-key") - primary_keys = common.get_key_properties(catalog_entry) - start_lsn = md_map.get((), {}).get("lsn") - LOGGER.info(f"Table {catalog_entry.table} proposes {replication_method} sync") - if not replication_method and config.get("default_replication_method"): - replication_method = config.get("default_replication_method") - LOGGER.info( - f"Table {catalog_entry.table} reverting to DEFAULT {replication_method} sync" - ) - - if replication_method == "INCREMENTAL" and not replication_key: - LOGGER.info( - f"No replication key for {catalog_entry.table}, using full table replication" - ) - replication_method = "FULL_TABLE" - # Check for INCREMENTAL load without primary keys removed - # INCREMENTAL loads can be performed without primary keys as long as there is a replication key - if replication_method == "LOG_BASED" and not start_lsn: - LOGGER.info(f"No initial load for {catalog_entry.table}, using full table replication") - else: - LOGGER.info(f"Table {catalog_entry.table} will use {replication_method} sync") - - database_name = common.get_database_name(catalog_entry) - - with metrics.job_timer("sync_table") as timer: - timer.tags["database"] = database_name - timer.tags["table"] = catalog_entry.table - - if replication_method == "INCREMENTAL": - LOGGER.info(f"syncing {catalog_entry.table} incrementally") - do_sync_incremental(mssql_conn, config, catalog_entry, state, columns) - elif replication_method == "FULL_TABLE": - LOGGER.info(f"syncing {catalog_entry.table} full table") - do_sync_full_table(mssql_conn, config, catalog_entry, state, columns) - elif replication_method == "LOG_BASED": - LOGGER.info(f"syncing {catalog_entry.table} cdc tables") - do_sync_historical_log(mssql_conn, config, catalog_entry, state, columns) - else: - raise Exception( - "only INCREMENTAL, LOG_BASED and FULL_TABLE replication methods are supported" - ) - - state = singer.set_currently_syncing(state, None) - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - -def sync_cdc_streams(mssql_conn, cdc_catalog, config, state): - mssql_conn = MSSQLConnection(config) - - if cdc_catalog.streams: - for catalog_entry in cdc_catalog.streams: - columns = list(catalog_entry.schema.properties.keys()) - if not columns: - LOGGER.warning( - "There are no columns selected for stream %s, skipping it.", - catalog_entry.stream, - ) - continue - - state = singer.set_currently_syncing(state, catalog_entry.tap_stream_id) - - # Emit a state message to indicate that we've started this stream - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - md_map = metadata.to_map(catalog_entry.metadata) - replication_method = md_map.get((), {}).get("replication-method") - LOGGER.info(f"Table {catalog_entry.table} proposes {replication_method} sync") - LOGGER.info(f"Table {catalog_entry.table} will use {replication_method} sync") - - database_name = common.get_database_name(catalog_entry) - - with metrics.job_timer("table_cdc_sync") as timer: - timer.tags["database"] = database_name - timer.tags["table"] = catalog_entry.table - - if replication_method == "LOG_BASED": - LOGGER.info(f"syncing {catalog_entry.table} cdc tables") - do_sync_log_based(mssql_conn, config, catalog_entry, state, columns) - else: - raise Exception("only LOG_BASED methods are supported for CDC") - - state = singer.set_currently_syncing(state, None) - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - -def do_sync(mssql_conn, config, catalog, state): - LOGGER.info("Beginning sync") - non_cdc_catalog = get_non_cdc_streams(mssql_conn, catalog, config, state) - cdc_catalog = get_cdc_streams(mssql_conn, catalog, config, state) - - for entry in non_cdc_catalog.streams: - LOGGER.info(f"Need to sync {entry.table}") - sync_non_cdc_streams(mssql_conn, non_cdc_catalog, config, state) - sync_cdc_streams(mssql_conn, cdc_catalog, config, state) - - -def log_server_params(mssql_conn): - with connect_with_backoff(mssql_conn) as open_conn: - try: - with open_conn.cursor() as cur: - cur.execute("""SELECT @@VERSION as version, @@lock_timeout as lock_wait_timeout""") - row = cur.fetchone() - LOGGER.info( - "Server Parameters: " + "version: %s, " + "lock_timeout: %s, ", - *row, - ) - except: - LOGGER.warning("Encountered error checking server params.") - - -def main_impl(): - - global ARRAYSIZE - args = utils.parse_args(REQUIRED_CONFIG_KEYS) - mssql_conn = MSSQLConnection(args.config) - log_server_params(mssql_conn) - - ARRAYSIZE = args.config.get('cursor_array_size',1) - common.ARRAYSIZE = ARRAYSIZE - - if args.discover: - do_discover(mssql_conn, args.config) - elif args.catalog: - state = args.state or {} - do_sync(mssql_conn, args.config, args.catalog, state) - elif args.properties: - catalog = Catalog.from_dict(args.properties) - state = args.state or {} - do_sync(mssql_conn, args.config, catalog, state) - else: - LOGGER.info("No properties were selected") - - -def main(): - try: - main_impl() - except Exception as exc: - LOGGER.critical(exc) - raise exc +#!/usr/bin/env python3 + +import collections +import copy +import itertools +import logging + +import singer +import singer.metrics as metrics +import singer.schema +from singer import metadata, utils +from singer.catalog import Catalog, CatalogEntry +from singer.schema import Schema + +import tap_mssql.sync_strategies.common as common +import tap_mssql.sync_strategies.full_table as full_table +import tap_mssql.sync_strategies.incremental as incremental +import tap_mssql.sync_strategies.log_based as log_based +from tap_mssql.connection import MSSQLConnection, connect_with_backoff, ResultIterator + +ARRAYSIZE = 1 + +Column = collections.namedtuple( + "Column", + [ + "table_schema", + "table_name", + "column_name", + "data_type", + "character_maximum_length", + "numeric_precision", + "numeric_scale", + "is_primary_key", + ], +) + +REQUIRED_CONFIG_KEYS = ["host", "database"] + +LOGGER = singer.get_logger() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +STRING_TYPES = set( + [ + "char", + "enum", + "longtext", + "mediumtext", + "text", + "varchar", + "uniqueidentifier", + "nvarchar", + "nchar", + ] +) + +BYTES_FOR_INTEGER_TYPE = { + "tinyint": 1, + "smallint": 2, + "mediumint": 3, + "int": 4, + "bigint": 8, +} + +FLOAT_TYPES = set(["float", "double", "real"]) + +DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"]) + +DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"]) + +DATE_TYPES = set(["date"]) + +TIME_TYPES = set(["time"]) + +VARIANT_TYPES = set(["json"]) + + +def default_date_format(): + return False + +def default_singer_decimal(): + """ + singer_decimal can be enabled in the the config, which will use singer.decimal as a format and string as the type + use this for large/precise numbers + """ + return False + +def schema_for_column(c, config): + """Returns the Schema object for the given Column.""" + data_type = c.data_type.lower() + + inclusion = "available" + + use_date_data_type_format = config.get('use_date_datatype') or default_date_format() + use_singer_decimal = config.get('use_singer_decimal') or default_singer_decimal() + + if c.is_primary_key == 1: + inclusion = "automatic" + + result = Schema(inclusion=inclusion) + + if data_type == "bit": + result.type = ["null", "boolean"] + + elif data_type in BYTES_FOR_INTEGER_TYPE: + result.type = ["null", "integer"] + bits = BYTES_FOR_INTEGER_TYPE[data_type] * 8 + if data_type == 'tinyint': + result.minimum = 0 + result.maximum = 255 + else: + result.minimum = 0 - 2 ** (bits - 1) + result.maximum = 2 ** (bits - 1) - 1 + + elif data_type in FLOAT_TYPES: + if use_singer_decimal: + result.type = ["null","string"] + result.format = "singer.decimal" + else: + result.type = ["null", "number"] + result.multipleOf = 10 ** (0 - (c.numeric_scale or 17)) + + elif data_type in DECIMAL_TYPES: + if use_singer_decimal: + result.type = ["null","number","string"] + result.format = "singer.decimal" + result.additionalProperties = {"scale_precision": f"({c.character_maximum_length or c.numeric_precision},{c.numeric_scale})"} + else: + result.type = ["null", "number"] + result.multipleOf = 10 ** (0 - c.numeric_scale) + + elif data_type in STRING_TYPES: + result.type = ["null", "string"] + # When length is -1 it is a long column type + # https://docs.microsoft.com/en-us/sql/relational-databases/system-information-schema-views/columns-transact-sql?view=sql-server-ver15 + # -1 is not valid JSON schema + # https://json-schema.org/understanding-json-schema/reference/string.html#length + if c.character_maximum_length != -1: + result.maxLength = c.character_maximum_length + + elif data_type in DATETIME_TYPES: + result.additionalProperties = {"sql_data_type": data_type} + result.type = ["null", "string"] + result.format = "date-time" + + elif data_type in DATE_TYPES: + if use_date_data_type_format: + result.type = ["null", "string"] + result.format = "date" + else: + result.type = ["null", "string"] + result.format = "date-time" + + elif data_type in TIME_TYPES: + if use_date_data_type_format: + result.type = ["null", "string"] + result.format = "time" + else: + result.type = ["null", "string"] + result.format = "date-time" + + elif data_type in VARIANT_TYPES: + result.type = ["null", "object"] + + else: + result = Schema( + None, + inclusion="unsupported", + description="Unsupported column type", + ) + return result + + +def create_column_metadata(cols, config): + mdata = {} + mdata = metadata.write(mdata, (), "selected-by-default", False) + for c in cols: + schema = schema_for_column(c, config) + mdata = metadata.write( + mdata, + ("properties", c.column_name), + "selected-by-default", + schema.inclusion != "unsupported", + ) + mdata = metadata.write( + mdata, ("properties", c.column_name), "sql-datatype", c.data_type.lower() + ) + + return metadata.to_list(mdata) + + +def discover_catalog(mssql_conn, config): + """Returns a Catalog describing the structure of the database.""" + LOGGER.info("Preparing Catalog") + mssql_conn = MSSQLConnection(config) + filter_dbs_config = config.get("filter_dbs") + + if filter_dbs_config: + filter_dbs_clause = ",".join(["'{}'".format(db) for db in filter_dbs_config.split(",")]) + + table_schema_clause = "WHERE c.TABLE_SCHEMA IN ({})".format(filter_dbs_clause) + else: + table_schema_clause = """ + WHERE c.TABLE_SCHEMA NOT IN ( + 'information_schema', + 'INFORMATION_SCHEMA', + 'performance_schema', + 'sys' + )""" + + with connect_with_backoff(mssql_conn) as open_conn: + cur = open_conn.cursor() + LOGGER.info("Fetching tables") + cur.execute( + """SELECT TABLE_SCHEMA, + TABLE_NAME, + TABLE_TYPE + FROM INFORMATION_SCHEMA.TABLES c + {} + """.format( + table_schema_clause + ) + ) + table_info = {} + + for (db, table, table_type) in cur.fetchall(): + if db not in table_info: + table_info[db] = {} + + table_info[db][table] = {"row_count": None, "is_view": table_type == "VIEW"} + LOGGER.info("Tables fetched, fetching columns") + cur.execute( + """with constraint_columns as ( + select c.TABLE_SCHEMA + , c.TABLE_NAME + , c.COLUMN_NAME + + from INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE c + + join INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc + on tc.TABLE_SCHEMA = c.TABLE_SCHEMA + and tc.TABLE_NAME = c.TABLE_NAME + and tc.CONSTRAINT_NAME = c.CONSTRAINT_NAME + and tc.CONSTRAINT_TYPE in ('PRIMARY KEY', 'UNIQUE')) + SELECT c.TABLE_SCHEMA, + c.TABLE_NAME, + c.COLUMN_NAME, + DATA_TYPE, + CHARACTER_MAXIMUM_LENGTH, + NUMERIC_PRECISION, + NUMERIC_SCALE, + case when cc.COLUMN_NAME is null then 0 else 1 end + FROM INFORMATION_SCHEMA.COLUMNS c + + left join constraint_columns cc + on cc.TABLE_NAME = c.TABLE_NAME + and cc.TABLE_SCHEMA = c.TABLE_SCHEMA + and cc.COLUMN_NAME = c.COLUMN_NAME + + {} + ORDER BY c.TABLE_SCHEMA, c.TABLE_NAME, c.ORDINAL_POSITION + """.format( + table_schema_clause + ) + ) + columns = [] + LOGGER.info(f"{ARRAYSIZE=}") + + for rec in ResultIterator(cur, ARRAYSIZE): + columns.append(Column(*rec)) + + LOGGER.info("Columns Fetched") + entries = [] + for (k, cols) in itertools.groupby(columns, lambda c: (c.table_schema, c.table_name)): + cols = list(cols) + (table_schema, table_name) = k + schema = Schema( + type="object", + properties={c.column_name: schema_for_column(c, config) for c in cols}, + ) + md = create_column_metadata(cols, config) + md_map = metadata.to_map(md) + + md_map = metadata.write(md_map, (), "database-name", table_schema) + + is_view = table_info[table_schema][table_name]["is_view"] + + if table_schema in table_info and table_name in table_info[table_schema]: + row_count = table_info[table_schema][table_name].get("row_count") + + if row_count is not None: + md_map = metadata.write(md_map, (), "row-count", row_count) + + md_map = metadata.write(md_map, (), "is-view", is_view) + + key_properties = [c.column_name for c in cols if c.is_primary_key == 1] + + md_map = metadata.write(md_map, (), "table-key-properties", key_properties) + + entry = CatalogEntry( + table=table_name, + stream=table_name, + metadata=metadata.to_list(md_map), + tap_stream_id=common.generate_tap_stream_id(table_schema, table_name), + schema=schema, + ) + + entries.append(entry) + LOGGER.info("Catalog ready") + return Catalog(entries) + + +def do_discover(mssql_conn, config): + discover_catalog(mssql_conn, config).dump() + + +def desired_columns(selected : list, table_schema): + """Return the set of column names we need to include in the SELECT. + + selected - set of column names marked as selected in the input catalog + table_schema - the most recently discovered Schema for the table + """ + all_columns = [column for column in table_schema.properties.keys()] + + available = [ + column for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion == 'available' + ] + + automatic = [ + column for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion == 'automatic' + ] + + unsupported = [ + column for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion == 'unsupported' + ] + + unknown = [ + (column,column_schema.inclusion) + for column, column_schema + in table_schema.properties.items() + if column_schema.inclusion not in ['available', 'automatic', 'unsupported'] + ] + + if unknown: + raise Exception(f"Unknown inclusions: {unknown}") + + selected_but_unsupported = [c for c in selected if c in unsupported] + if selected_but_unsupported: + LOGGER.warning( + "Columns %s were selected but are not supported. Skipping them.", + selected_but_unsupported, + ) + + selected_but_nonexistent = [c for c in selected if c not in all_columns] + if selected_but_nonexistent: + LOGGER.warning("Columns %s were selected but do not exist.", selected_but_nonexistent) + + not_selected_but_automatic = [c for c in automatic if c not in selected] + if not_selected_but_automatic: + LOGGER.warning( + "Columns %s are primary keys but were not selected. Adding them.", + not_selected_but_automatic, + ) + + desired = [c for c in all_columns if (c in available and c in selected) or c in automatic] + + return list(dict.fromkeys(desired)) + + +def is_valid_currently_syncing_stream(selected_stream, state): + stream_metadata = metadata.to_map(selected_stream.metadata) + replication_method = stream_metadata.get((), {}).get("replication-method") + + if replication_method != "LOG_BASED": + return True + + if replication_method == "LOG_BASED" and cdc_stream_requires_historical(selected_stream, state): + return True + + return False + + +def cdc_stream_requires_historical(catalog_entry, state): + + current_lsn = singer.get_bookmark(state, catalog_entry.tap_stream_id, "lsn") + + max_lsn_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, "max_lsn_values") + + last_lsn_fetched = singer.get_bookmark(state, catalog_entry.tap_stream_id, "last_lsn_fetched") + + if (current_lsn) and (not max_lsn_values and not last_lsn_fetched): + return False + + return True + + +def resolve_catalog(discovered_catalog, streams_to_sync): + result = Catalog(streams=[]) + + # Iterate over the streams in the input catalog and match each one up + # with the same stream in the discovered catalog. + for catalog_entry in streams_to_sync: + catalog_metadata = metadata.to_map(catalog_entry.metadata) + replication_key = catalog_metadata.get((), {}).get("replication-key") + + discovered_table = discovered_catalog.get_stream(catalog_entry.tap_stream_id) + database_name = common.get_database_name(catalog_entry) + + if not discovered_table: + LOGGER.warning( + "Database %s table %s was selected but does not exist", + database_name, + catalog_entry.table, + ) + continue + + selected = [ + k + for k in discovered_table.schema.properties.keys() + if common.property_is_selected(catalog_entry, k) or k == replication_key + ] + + # These are the columns we need to select + columns = desired_columns(selected, discovered_table.schema) + result.streams.append( + CatalogEntry( + tap_stream_id=catalog_entry.tap_stream_id, + metadata=catalog_entry.metadata, + stream=catalog_entry.tap_stream_id, + table=catalog_entry.table, + schema=Schema( + type="object", + properties={col: discovered_table.schema.properties[col] for col in columns}, + ), + ) + ) + + return result + + +def get_non_cdc_streams(mssql_conn, catalog, config, state): + """Method to discover all connections which will not use CDC + + Returns the Catalog of data we're going to sync for all SELECT-based streams + (i.e. INCREMENTAL, FULL_TABLE, and LOG_BASED that require a historical sync). + LOG_BASED streams that require a historical sync are inferred from lack + of any state. + + Using the Catalog provided from the input file, this function will return a + Catalog representing exactly which tables and columns that will be emitted + by SELECT-based syncs. This is achieved by comparing the input Catalog to a + freshly discovered Catalog to determine the resulting Catalog. + + The resulting Catalog will include the following any streams marked as + "selected" that currently exist in the database. Columns marked as "selected" + and those labled "automatic" (e.g. primary keys and replication keys) will be + included. Streams will be prioritized in the following order: + 1. currently_syncing if it is SELECT-based + 2. any streams that do not have state + 3. any streams that do not have a replication method of LOG_BASED + """ + mssql_conn = MSSQLConnection(config) + discovered = discover_catalog(mssql_conn, config) + + # Filter catalog to include only selected streams + selected_streams = list(filter(lambda s: common.stream_is_selected(s), catalog.streams)) + streams_with_state = [] + streams_without_state = [] + + for stream in selected_streams: + stream_metadata = metadata.to_map(stream.metadata) + + for k, v in stream_metadata.get((), {}).items(): + LOGGER.info(f"{k}: {v}") + replication_method = stream_metadata.get((), {}).get("replication-method") + stream_state = state.get("bookmarks", {}).get(stream.tap_stream_id) + + if not stream_state: + if replication_method == "LOG_BASED": + LOGGER.info( + "LOG_BASED stream %s requires full historical sync", stream.tap_stream_id + ) + + streams_without_state.append(stream) + elif ( + stream_state + and replication_method == "LOG_BASED" + and cdc_stream_requires_historical(stream, state) + ): + is_view = common.get_is_view(stream) + + if is_view: + raise Exception( + "Unable to replicate stream({}) with cdc because it is a view.".format( + stream.stream + ) + ) + + LOGGER.info("LOG_BASED stream %s will resume its historical sync", stream.tap_stream_id) + + streams_with_state.append(stream) + elif stream_state and replication_method != "LOG_BASED": + streams_with_state.append(stream) + + # If the state says we were in the middle of processing a stream, skip + # to that stream. Then process streams without prior state and finally + # move onto streams with state (i.e. have been synced in the past) + currently_syncing = singer.get_currently_syncing(state) + + # prioritize streams that have not been processed + ordered_streams = streams_without_state + streams_with_state + + if currently_syncing: + currently_syncing_stream = list( + filter( + lambda s: s.tap_stream_id == currently_syncing + and is_valid_currently_syncing_stream(s, state), + ordered_streams, + ) + ) + + non_currently_syncing_streams = list( + filter(lambda s: s.tap_stream_id != currently_syncing, ordered_streams) + ) + + streams_to_sync = currently_syncing_stream + non_currently_syncing_streams + else: + # prioritize streams that have not been processed + streams_to_sync = ordered_streams + + return resolve_catalog(discovered, streams_to_sync) + + +def get_cdc_streams(mssql_conn, catalog, config, state): + discovered = discover_catalog(mssql_conn, config) + + selected_streams = list(filter(lambda s: common.stream_is_selected(s), catalog.streams)) + cdc_streams = [] + + for stream in selected_streams: + stream_metadata = metadata.to_map(stream.metadata) + replication_method = stream_metadata.get((), {}).get("replication-method") + + if replication_method == "LOG_BASED" and not cdc_stream_requires_historical(stream, state): + cdc_streams.append(stream) + + return resolve_catalog(discovered, cdc_streams) + + +def write_schema_message(catalog_entry, bookmark_properties=[]): + key_properties = common.get_key_properties(catalog_entry) + + singer.write_message( + singer.SchemaMessage( + stream=catalog_entry.stream, + schema=catalog_entry.schema.to_dict(), + key_properties=key_properties, + bookmark_properties=bookmark_properties, + ) + ) + + +def do_sync_incremental(mssql_conn, config, catalog_entry, state, columns): + mssql_conn = MSSQLConnection(config) + md_map = metadata.to_map(catalog_entry.metadata) + replication_key = md_map.get((), {}).get("replication-key") + write_schema_message(catalog_entry=catalog_entry, bookmark_properties=[replication_key]) + LOGGER.info("Schema written") + incremental.sync_table(mssql_conn, config, catalog_entry, state, columns) + + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + +def do_sync_historical_log(mssql_conn, config, catalog_entry, state, columns): + mssql_conn = MSSQLConnection(config) + + # Add additional keys to the schema + log_based.add_synthetic_keys_to_schema(catalog_entry) + + write_schema_message(catalog_entry) + + stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + + # full_table.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version) + log_based.sync_historic_table(mssql_conn, config, catalog_entry, state, columns, stream_version) + + # Prefer initial_full_table_complete going forward + singer.clear_bookmark(state, catalog_entry.tap_stream_id, "version") + + state = singer.write_bookmark( + state, catalog_entry.tap_stream_id, "initial_full_table_complete", True + ) + + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + +def do_sync_full_table(mssql_conn, config, catalog_entry, state, columns): + mssql_conn = MSSQLConnection(config) + + write_schema_message(catalog_entry) + + stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + + full_table.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version) + + # Prefer initial_full_table_complete going forward + singer.clear_bookmark(state, catalog_entry.tap_stream_id, "version") + + state = singer.write_bookmark( + state, catalog_entry.tap_stream_id, "initial_full_table_complete", True + ) + + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + +def do_sync_log_based(mssql_conn, config, catalog_entry, state, columns): + mssql_conn = MSSQLConnection(config) + md_map = metadata.to_map(catalog_entry.metadata) + stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + replication_key = md_map.get((), {}).get("replication-key") + # Add additional keys to the schema + log_based.add_synthetic_keys_to_schema(catalog_entry) + + write_schema_message(catalog_entry=catalog_entry, bookmark_properties=[replication_key]) + LOGGER.info("Schema written") + stream_version = common.get_stream_version(catalog_entry.tap_stream_id, state) + log_based.sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version) + + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + +def sync_non_cdc_streams(mssql_conn, non_cdc_catalog, config, state): + mssql_conn = MSSQLConnection(config) + + for catalog_entry in non_cdc_catalog.streams: + columns = list(catalog_entry.schema.properties.keys()) + + if not columns: + LOGGER.warning( + "There are no columns selected for stream %s, skipping it.", catalog_entry.stream + ) + continue + + state = singer.set_currently_syncing(state, catalog_entry.tap_stream_id) + + # Emit a state message to indicate that we've started this stream + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + md_map = metadata.to_map(catalog_entry.metadata) + replication_method = md_map.get((), {}).get("replication-method") + replication_key = md_map.get((), {}).get("replication-key") + primary_keys = common.get_key_properties(catalog_entry) + start_lsn = md_map.get((), {}).get("lsn") + LOGGER.info(f"Table {catalog_entry.table} proposes {replication_method} sync") + if not replication_method and config.get("default_replication_method"): + replication_method = config.get("default_replication_method") + LOGGER.info( + f"Table {catalog_entry.table} reverting to DEFAULT {replication_method} sync" + ) + + if replication_method == "INCREMENTAL" and not replication_key: + LOGGER.info( + f"No replication key for {catalog_entry.table}, using full table replication" + ) + replication_method = "FULL_TABLE" + # Check for INCREMENTAL load without primary keys removed + # INCREMENTAL loads can be performed without primary keys as long as there is a replication key + if replication_method == "LOG_BASED" and not start_lsn: + LOGGER.info(f"No initial load for {catalog_entry.table}, using full table replication") + else: + LOGGER.info(f"Table {catalog_entry.table} will use {replication_method} sync") + + database_name = common.get_database_name(catalog_entry) + + with metrics.job_timer("sync_table") as timer: + timer.tags["database"] = database_name + timer.tags["table"] = catalog_entry.table + + if replication_method == "INCREMENTAL": + LOGGER.info(f"syncing {catalog_entry.table} incrementally") + do_sync_incremental(mssql_conn, config, catalog_entry, state, columns) + elif replication_method == "FULL_TABLE": + LOGGER.info(f"syncing {catalog_entry.table} full table") + do_sync_full_table(mssql_conn, config, catalog_entry, state, columns) + elif replication_method == "LOG_BASED": + LOGGER.info(f"syncing {catalog_entry.table} cdc tables") + do_sync_historical_log(mssql_conn, config, catalog_entry, state, columns) + else: + raise Exception( + "only INCREMENTAL, LOG_BASED and FULL_TABLE replication methods are supported" + ) + + state = singer.set_currently_syncing(state, None) + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + +def sync_cdc_streams(mssql_conn, cdc_catalog, config, state): + mssql_conn = MSSQLConnection(config) + + if cdc_catalog.streams: + for catalog_entry in cdc_catalog.streams: + columns = list(catalog_entry.schema.properties.keys()) + if not columns: + LOGGER.warning( + "There are no columns selected for stream %s, skipping it.", + catalog_entry.stream, + ) + continue + + state = singer.set_currently_syncing(state, catalog_entry.tap_stream_id) + + # Emit a state message to indicate that we've started this stream + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + md_map = metadata.to_map(catalog_entry.metadata) + replication_method = md_map.get((), {}).get("replication-method") + LOGGER.info(f"Table {catalog_entry.table} proposes {replication_method} sync") + LOGGER.info(f"Table {catalog_entry.table} will use {replication_method} sync") + + database_name = common.get_database_name(catalog_entry) + + with metrics.job_timer("table_cdc_sync") as timer: + timer.tags["database"] = database_name + timer.tags["table"] = catalog_entry.table + + if replication_method == "LOG_BASED": + LOGGER.info(f"syncing {catalog_entry.table} cdc tables") + do_sync_log_based(mssql_conn, config, catalog_entry, state, columns) + else: + raise Exception("only LOG_BASED methods are supported for CDC") + + state = singer.set_currently_syncing(state, None) + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + +def do_sync(mssql_conn, config, catalog, state): + LOGGER.info("Beginning sync") + non_cdc_catalog = get_non_cdc_streams(mssql_conn, catalog, config, state) + cdc_catalog = get_cdc_streams(mssql_conn, catalog, config, state) + + for entry in non_cdc_catalog.streams: + LOGGER.info(f"Need to sync {entry.table}") + sync_non_cdc_streams(mssql_conn, non_cdc_catalog, config, state) + sync_cdc_streams(mssql_conn, cdc_catalog, config, state) + + +def log_server_params(mssql_conn): + with connect_with_backoff(mssql_conn) as open_conn: + try: + with open_conn.cursor() as cur: + cur.execute("""SELECT @@VERSION as version, @@lock_timeout as lock_wait_timeout""") + row = cur.fetchone() + LOGGER.info( + "Server Parameters: " + "version: %s, " + "lock_timeout: %s, ", + *row, + ) + except: + # Try without the @@lock_timeout as it is not available in all platforms. + try: + with open_conn.cursor() as cur: + cur.execute("""SELECT @@VERSION as version, -1 as lock_wait_timeout""") + row = cur.fetchone() + LOGGER.info( + "Server Parameters: " + "version: %s, " + "lock_timeout: %s, ", + *row, + ) + except: + LOGGER.warning("Encountered error checking server params.") + + +def main_impl(): + + global ARRAYSIZE + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + mssql_conn = MSSQLConnection(args.config) + log_server_params(mssql_conn) + + ARRAYSIZE = args.config.get('cursor_array_size',1) + common.ARRAYSIZE = ARRAYSIZE + + if args.discover: + do_discover(mssql_conn, args.config) + elif args.catalog: + state = args.state or {} + do_sync(mssql_conn, args.config, args.catalog, state) + elif args.properties: + catalog = Catalog.from_dict(args.properties) + state = args.state or {} + do_sync(mssql_conn, args.config, catalog, state) + else: + LOGGER.info("No properties were selected") + + +def main(): + try: + main_impl() + except Exception as exc: + LOGGER.critical(exc) + raise exc diff --git a/tap_mssql/connection.py b/tap_mssql/connection.py index 449dd27f..54ca78c4 100755 --- a/tap_mssql/connection.py +++ b/tap_mssql/connection.py @@ -1,64 +1,71 @@ -#!/usr/bin/env python3 - -import backoff -import pymssql -import singer - -LOGGER = singer.get_logger() - - -@backoff.on_exception(backoff.expo, pymssql.Error, max_tries=5, factor=2) -def connect_with_backoff(connection): - warnings = [] - with connection.cursor(): - if warnings: - LOGGER.info( - ( - "Encountered non-fatal errors when configuring session that could " - "impact performance:" - ) - ) - for w in warnings: - LOGGER.warning(w) - - return connection - - -class MSSQLConnection(pymssql.Connection): - def __init__(self, config): - args = { - "user": config.get("user"), - "password": config.get("password"), - "server": config["host"], - "database": config["database"], - "charset": config.get("characterset", "utf8"), - "port": config.get("port", "1433"), - "tds_version": config.get("tds_version", "7.3"), - } - conn = pymssql._mssql.connect(**args) - super().__init__(conn, False, True) - - def __enter__(self): - return self - - def __exit__(self, *exc_info): - del exc_info - self.close() - - -def make_connection_wrapper(config): - class ConnectionWrapper(MSSQLConnection): - def __init__(self, *args, **kwargs): - super().__init__(config) - - connect_with_backoff(self) - - return ConnectionWrapper - -def ResultIterator(cursor, arraysize=1): - while True: - results = cursor.fetchmany(arraysize) - if not results: - break - for result in results: - yield result \ No newline at end of file +#!/usr/bin/env python3 + +import backoff +import pymssql +import singer +from os import environ + +LOGGER = singer.get_logger() + + +@backoff.on_exception(backoff.expo, pymssql.Error, max_tries=5, factor=2) +def connect_with_backoff(connection): + warnings = [] + with connection.cursor(): + if warnings: + LOGGER.info( + ( + "Encountered non-fatal errors when configuring session that could " + "impact performance:" + ) + ) + for w in warnings: + LOGGER.warning(w) + + return connection + + +class MSSQLConnection(pymssql.Connection): + def __init__(self, config): + args = { + "user": config.get("user"), + "password": config.get("password"), + "server": config["host"], + "database": config["database"], + "charset": config.get("characterset", "utf8"), + "port": config.get("port", "1433"), + "tds_version": config.get("tds_version", "7.3"), + } + # Add additional conn_properties for specific version settings + if config.get("conn_properties"): + args["conn_properties"] = config.get("conn_properties") + # Optional ability to dump TDS logs + if config.get("enable_tds_logging"): + environ['TDSDUMP'] = 'stderr' + conn = pymssql._mssql.connect(**args) + super().__init__(conn, False, True) + + def __enter__(self): + return self + + def __exit__(self, *exc_info): + del exc_info + self.close() + + +def make_connection_wrapper(config): + class ConnectionWrapper(MSSQLConnection): + def __init__(self, *args, **kwargs): + super().__init__(config) + + connect_with_backoff(self) + + return ConnectionWrapper + +def ResultIterator(cursor, arraysize=1): + while True: + results = cursor.fetchmany(arraysize) + if not results: + break + for result in results: + yield result diff --git a/tap_mssql/sync_strategies/log_based.py b/tap_mssql/sync_strategies/log_based.py index 3359283d..e553e28d 100644 --- a/tap_mssql/sync_strategies/log_based.py +++ b/tap_mssql/sync_strategies/log_based.py @@ -1,396 +1,406 @@ -#!/usr/bin/env python3 -# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression - -import copy - -import singer -from singer.schema import Schema - -import tap_mssql.sync_strategies.common as common -from tap_mssql.connection import MSSQLConnection, connect_with_backoff - -LOGGER = singer.get_logger() - - -def py_bin_to_mssql(binary_value): - return "CONVERT(BINARY(10),'0x" + binary_value + "',1)" - - -def verify_change_data_capture_table(connection, schema_name, table_name): - cur = connection.cursor() - cur.execute( - """select s.name as schema_name, t.name as table_name, t.is_tracked_by_cdc, t.object_id - from sys.tables t - join sys.schemas s on (s.schema_id = t.schema_id) - and t.name = '{}' - and s.name = '{}'""".format( - table_name, schema_name - ) - ) - row = cur.fetchone() - - return row[2] - - -def verify_change_data_capture_databases(connection): - cur = connection.cursor() - cur.execute( - """SELECT name, is_cdc_enabled - FROM sys.databases WHERE database_id = DB_ID()""" - ) - row = cur.fetchone() - - LOGGER.info( - "CDC Databases enable : Database %s, Enabled %s", - *row, - ) - return row - - -def verify_read_isolation_databases(connection): - cur = connection.cursor() - cur.execute( - """SELECT DB_NAME(database_id), - is_read_committed_snapshot_on, - snapshot_isolation_state_desc - FROM sys.databases - WHERE database_id = DB_ID();""" - ) - row = cur.fetchone() - - if row[1] is False and row[2] == "OFF": - LOGGER.warning( - ( - "CDC Databases may result in dirty reads. Consider enabling Read Committed" - " or Snapshot isolation: Database %s, Is Read Committed Snapshot is %s," - " Snapshot Isolation is %s" - ), - *row, - ) - return row - - -def get_lsn_available_range(connection, capture_instance_name): - cur = connection.cursor() - query = """SELECT sys.fn_cdc_get_min_lsn ( '{}' ) lsn_from - , sys.fn_cdc_get_max_lsn () lsn_to - ; - """.format( - capture_instance_name - ) - cur.execute(query) - row = cur.fetchone() - - if row[0] is None: # Test that the lsn_from is not NULL i.e. there is change data to process - LOGGER.info("No data available to process in CDC table %s", capture_instance_name) - else: - LOGGER.info( - "Data available in cdc table %s from lsn %s", capture_instance_name, row[0].hex() - ) - - return row - - -def get_to_lsn(connection): - cur = connection.cursor() - query = """select sys.fn_cdc_get_max_lsn () """ - - cur.execute(query) - row = cur.fetchone() - - LOGGER.info( - "Max LSN ID : %s", - row[0].hex(), - ) - return row - - -def add_synthetic_keys_to_schema(catalog_entry): - catalog_entry.schema.properties["_sdc_operation_type"] = Schema( - description="Source operation I=Insert, D=Delete, U=Update", - type=["null", "string"], - format="string", - ) - catalog_entry.schema.properties["_sdc_lsn_commit_timestamp"] = Schema( - description="Source system commit timestamp", type=["null", "string"], format="date-time" - ) - catalog_entry.schema.properties["_sdc_lsn_deleted_at"] = Schema( - description="Source system delete timestamp", type=["null", "string"], format="date-time" - ) - catalog_entry.schema.properties["_sdc_lsn_value"] = Schema( - description="Source system log sequence number (LSN)", - type=["null", "string"], - format="string", - ) - catalog_entry.schema.properties["_sdc_lsn_seq_value"] = Schema( - description="Source sequence number within the system log sequence number (LSN)", - type=["null", "string"], - format="string", - ) - catalog_entry.schema.properties["_sdc_lsn_operation"] = Schema( - description=( - "The operation that took place (1=Delete, 2=Insert, 3=Update (Before Image)," - "4=Update (After Image) )" - ), - type=["null", "integer"], - format="integer", - ) - - return catalog_entry - - -def generate_bookmark_keys(catalog_entry): - - # TO_DO: - # 1. check the use of the top three values above and the parameter value, seem to not be required. - # 2. check the base_bookmark_keys required - base_bookmark_keys = { - "last_lsn_fetched", - "max_lsn_values", - "lsn", - "version", - "initial_full_table_complete", - } - - bookmark_keys = base_bookmark_keys - - return bookmark_keys - - -def sync_historic_table(mssql_conn, config, catalog_entry, state, columns, stream_version): - mssql_conn = MSSQLConnection(config) - common.whitelist_bookmark_keys( - generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state - ) - - # Add additional keys to the columns - extended_columns = columns + [ - "_sdc_operation_type", - "_sdc_lsn_commit_timestamp", - "_sdc_lsn_deleted_at", - "_sdc_lsn_value", - "_sdc_lsn_seq_value", - "_sdc_lsn_operation", - ] - - bookmark = state.get("bookmarks", {}).get(catalog_entry.tap_stream_id, {}) - version_exists = True if "version" in bookmark else False - - initial_full_table_complete = singer.get_bookmark( - state, catalog_entry.tap_stream_id, "initial_full_table_complete" - ) - - state_version = singer.get_bookmark(state, catalog_entry.tap_stream_id, "version") - - activate_version_message = singer.ActivateVersionMessage( - stream=catalog_entry.stream, version=stream_version - ) - - # For the initial replication, emit an ACTIVATE_VERSION message - # at the beginning so the records show up right away. - if not initial_full_table_complete and not (version_exists and state_version is None): - singer.write_message(activate_version_message) - - with connect_with_backoff(mssql_conn) as open_conn: - with open_conn.cursor() as cur: - - escaped_columns = map(lambda c: common.prepare_columns_sql(catalog_entry, c), columns) - table_name = catalog_entry.table - schema_name = common.get_database_name(catalog_entry) - - if not verify_change_data_capture_table(mssql_conn, schema_name, table_name): - raise Exception( - ( - "Error {}.{}: does not have change data capture enabled. Call EXEC" - " sys.sp_cdc_enable_table with relevant parameters to enable CDC." - ).format(schema_name, table_name) - ) - - verify_read_isolation_databases(mssql_conn) - - # Store the current database lsn number, will use this to store at the end of the initial load. - # Note: Recommend no transactions loaded when the initial loads are performed. - # Have captured the to_lsn before the initial load sync in-case records are added during the sync. - lsn_to = str(get_to_lsn(mssql_conn)[0].hex()) - - select_sql = """ - SELECT {} - ,'I' _sdc_operation_type - , cast('1900-01-01' as datetime) _sdc_lsn_commit_timestamp - , null _sdc_lsn_deleted_at - , '00000000000000000000' _sdc_lsn_value - , '00000000000000000000' _sdc_lsn_seq_value - , 2 as _sdc_lsn_operation - FROM {}.{} - ;""".format( - ",".join(escaped_columns), schema_name, table_name - ) - params = {} - - common.sync_query( - cur, - catalog_entry, - state, - select_sql, - extended_columns, - stream_version, - params, - config, - ) - state = singer.write_bookmark(state, catalog_entry.tap_stream_id, "lsn", lsn_to) - - # store the state of the table lsn's after the initial load ready for the next CDC run - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - # clear max pk value and last pk fetched upon successful sync - singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values") - singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched") - - singer.write_message(activate_version_message) - - -def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version): - mssql_conn = MSSQLConnection(config) - common.whitelist_bookmark_keys( - generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state - ) - - # Add additional keys to the columns - extended_columns = columns + [ - "_sdc_operation_type", - "_sdc_lsn_commit_timestamp", - "_sdc_lsn_deleted_at", - "_sdc_lsn_value", - "_sdc_lsn_seq_value", - "_sdc_lsn_operation", - ] - - bookmark = state.get("bookmarks", {}).get(catalog_entry.tap_stream_id, {}) - version_exists = True if "version" in bookmark else False - - initial_full_table_complete = singer.get_bookmark( - state, catalog_entry.tap_stream_id, "initial_full_table_complete" - ) - - state_version = singer.get_bookmark(state, catalog_entry.tap_stream_id, "version") - - activate_version_message = singer.ActivateVersionMessage( - stream=catalog_entry.stream, version=stream_version - ) - - # For the initial replication, emit an ACTIVATE_VERSION message - # at the beginning so the records show up right away. - if not initial_full_table_complete and not (version_exists and state_version is None): - singer.write_message(activate_version_message) - - with connect_with_backoff(mssql_conn) as open_conn: - with open_conn.cursor() as cur: - - state_last_lsn = singer.get_bookmark(state, catalog_entry.tap_stream_id, "lsn") - - escaped_columns = map(lambda c: common.prepare_columns_sql(catalog_entry, c), columns) - table_name = catalog_entry.table - schema_name = common.get_database_name(catalog_entry) - schema_table = schema_name + "_" + table_name - - if not verify_change_data_capture_table(mssql_conn, schema_name, table_name): - raise Exception( - ( - "Error {}.{}: does not have change data capture enabled. " - "Call EXEC sys.sp_cdc_enable_table with relevant parameters to enable CDC." - ).format(schema_name, table_name) - ) - - lsn_range = get_lsn_available_range(mssql_conn, schema_table) - - if lsn_range[0] is not None: # Test to see if there are any change records to process - lsn_from = str(lsn_range[0].hex()) - lsn_to = str(lsn_range[1].hex()) - - if lsn_from <= state_last_lsn: - LOGGER.info( - ( - "The last lsn processed as per the state file %s, minimum available lsn" - " for extract table %s, and the maximum lsn is %s." - ), - state_last_lsn, - lsn_from, - lsn_to, - ) - if lsn_to == state_last_lsn: - LOGGER.info( - ( - "The last lsn processed as per the state file is equal to the max" - " lsn available - no changes expected - state lsn will not be incremented" - ), - ) - from_lsn_expression = "{}".format(py_bin_to_mssql(state_last_lsn)) - else: - from_lsn_expression = ( - ( - "sys.fn_cdc_increment_lsn({})" - ).format(py_bin_to_mssql(state_last_lsn)) - ) - else: - raise Exception( - ( - "Error {}.{}: CDC changes have expired, the minimum lsn is {}, the last" - " processed lsn is {}. Recommend a full load as there may be missing data." - ).format(schema_name, table_name, lsn_from, state_last_lsn) - ) - - select_sql = """DECLARE @from_lsn binary (10), @to_lsn binary (10) - - SET @from_lsn = {} - SET @to_lsn = {} - - SELECT {} - ,case __$operation - when 2 then 'I' - when 4 then 'U' - when 1 then 'D' - end _sdc_operation_type - , sys.fn_cdc_map_lsn_to_time(__$start_lsn) _sdc_lsn_commit_timestamp - , case __$operation - when 1 then sys.fn_cdc_map_lsn_to_time(__$start_lsn) - else null - end _sdc_lsn_deleted_at - , __$start_lsn _sdc_lsn_value - , __$seqval _sdc_lsn_seq_value - , __$operation _sdc_lsn_operation - FROM cdc.fn_cdc_get_all_changes_{}(@from_lsn, @to_lsn, 'all') - ORDER BY __$start_lsn, __$seqval, __$operation - ;""".format( - from_lsn_expression, - py_bin_to_mssql(lsn_to), - ",".join(escaped_columns), - schema_table, - ) - - params = {} - - common.sync_query( - cur, - catalog_entry, - state, - select_sql, - extended_columns, - stream_version, - params, - config, - ) - - else: - # Store the current database lsn number, need to store the latest lsn checkpoint because the - # CDC logs expire after a point in time. Therefore if there are no records read, then refresh - # the max lsn_to to the latest LSN in the database. - lsn_to = str(get_to_lsn(mssql_conn)[0].hex()) - - state = singer.write_bookmark(state, catalog_entry.tap_stream_id, "lsn", lsn_to) - singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) - - # clear max lsn value and last lsn fetched upon successful sync - singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_lsn_values") - singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_lsn_fetched") - - singer.write_message(activate_version_message) +#!/usr/bin/env python3 +# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression + +import copy + +import singer +from singer.schema import Schema + +import tap_mssql.sync_strategies.common as common +from tap_mssql.connection import MSSQLConnection, connect_with_backoff + +LOGGER = singer.get_logger() + + +def py_bin_to_mssql(binary_value): + return "CONVERT(BINARY(10),'0x" + binary_value + "',1)" + + +def verify_change_data_capture_table(connection, schema_name, table_name): + cur = connection.cursor() + cur.execute( + """select s.name as schema_name, t.name as table_name, t.is_tracked_by_cdc, t.object_id + from sys.tables t + join sys.schemas s on (s.schema_id = t.schema_id) + and t.name = '{}' + and s.name = '{}'""".format( + table_name.replace('"',''), schema_name.replace('"','') + ) + ) + row = cur.fetchone() + + if row: + return row[2] + else: + return False + + +def verify_change_data_capture_databases(connection): + cur = connection.cursor() + cur.execute( + """SELECT name, is_cdc_enabled + FROM sys.databases WHERE database_id = DB_ID()""" + ) + row = cur.fetchone() + + if row: + LOGGER.info( + "CDC Databases enable : Database %s, Enabled %s", + *row, + ) + return row + else: + return False + + +def verify_read_isolation_databases(connection): + cur = connection.cursor() + cur.execute( + """SELECT DB_NAME(database_id), + is_read_committed_snapshot_on, + snapshot_isolation_state_desc + FROM sys.databases + WHERE database_id = DB_ID();""" + ) + row = cur.fetchone() + + if row[1] is False and row[2] == "OFF": + LOGGER.warning( + ( + "CDC Databases may result in dirty reads. Consider enabling Read Committed" + " or Snapshot isolation: Database %s, Is Read Committed Snapshot is %s," + " Snapshot Isolation is %s" + ), + *row, + ) + return row + + +def get_lsn_available_range(connection, capture_instance_name): + cur = connection.cursor() + query = """SELECT sys.fn_cdc_get_min_lsn ( '{}' ) lsn_from + , sys.fn_cdc_get_max_lsn () lsn_to + ; + """.format( + capture_instance_name + ) + cur.execute(query) + row = cur.fetchone() + + if row[0] is None: # Test that the lsn_from is not NULL i.e. there is change data to process + LOGGER.info("No data available to process in CDC table %s", capture_instance_name) + else: + LOGGER.info( + "Data available in cdc table %s from lsn %s", capture_instance_name, row[0].hex() + ) + + return row + + +def get_to_lsn(connection): + cur = connection.cursor() + query = """select sys.fn_cdc_get_max_lsn () """ + + cur.execute(query) + row = cur.fetchone() + + LOGGER.info( + "Max LSN ID : %s", + row[0].hex(), + ) + return row + + +def add_synthetic_keys_to_schema(catalog_entry): + catalog_entry.schema.properties["_sdc_operation_type"] = Schema( + description="Source operation I=Insert, D=Delete, U=Update", + type=["null", "string"], + format="string", + ) + catalog_entry.schema.properties["_sdc_lsn_commit_timestamp"] = Schema( + description="Source system commit timestamp", type=["null", "string"], format="date-time" + ) + catalog_entry.schema.properties["_sdc_lsn_deleted_at"] = Schema( + description="Source system delete timestamp", type=["null", "string"], format="date-time" + ) + catalog_entry.schema.properties["_sdc_lsn_value"] = Schema( + description="Source system log sequence number (LSN)", + type=["null", "string"], + format="string", + ) + catalog_entry.schema.properties["_sdc_lsn_seq_value"] = Schema( + description="Source sequence number within the system log sequence number (LSN)", + type=["null", "string"], + format="string", + ) + catalog_entry.schema.properties["_sdc_lsn_operation"] = Schema( + description=( + "The operation that took place (1=Delete, 2=Insert, 3=Update (Before Image)," + "4=Update (After Image) )" + ), + type=["null", "integer"], + format="integer", + ) + + return catalog_entry + + +def generate_bookmark_keys(catalog_entry): + + # TO_DO: + # 1. check the use of the top three values above and the parameter value, seem to not be required. + # 2. check the base_bookmark_keys required + base_bookmark_keys = { + "last_lsn_fetched", + "max_lsn_values", + "lsn", + "version", + "initial_full_table_complete", + } + + bookmark_keys = base_bookmark_keys + + return bookmark_keys + + +def sync_historic_table(mssql_conn, config, catalog_entry, state, columns, stream_version): + mssql_conn = MSSQLConnection(config) + common.whitelist_bookmark_keys( + generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state + ) + + # Add additional keys to the columns + extended_columns = columns + [ + "_sdc_operation_type", + "_sdc_lsn_commit_timestamp", + "_sdc_lsn_deleted_at", + "_sdc_lsn_value", + "_sdc_lsn_seq_value", + "_sdc_lsn_operation", + ] + + bookmark = state.get("bookmarks", {}).get(catalog_entry.tap_stream_id, {}) + version_exists = True if "version" in bookmark else False + + initial_full_table_complete = singer.get_bookmark( + state, catalog_entry.tap_stream_id, "initial_full_table_complete" + ) + + state_version = singer.get_bookmark(state, catalog_entry.tap_stream_id, "version") + + activate_version_message = singer.ActivateVersionMessage( + stream=catalog_entry.stream, version=stream_version + ) + + # For the initial replication, emit an ACTIVATE_VERSION message + # at the beginning so the records show up right away. + if not initial_full_table_complete and not (version_exists and state_version is None): + singer.write_message(activate_version_message) + + with connect_with_backoff(mssql_conn) as open_conn: + with open_conn.cursor() as cur: + + escaped_columns = map(lambda c: common.prepare_columns_sql(catalog_entry, c), columns) + escaped_table_name = common.escape(catalog_entry.table) + escaped_schema_name = common.escape(common.get_database_name(catalog_entry)) + + if not verify_change_data_capture_table(mssql_conn, escaped_schema_name, escaped_table_name): + raise Exception( + ( + "Error {}.{}: does not have change data capture enabled. Call EXEC" + " sys.sp_cdc_enable_table with relevant parameters to enable CDC." + ).format(escaped_schema_name, escaped_table_name) + ) + + verify_read_isolation_databases(mssql_conn) + + # Store the current database lsn number, will use this to store at the end of the initial load. + # Note: Recommend no transactions loaded when the initial loads are performed. + # Have captured the to_lsn before the initial load sync in-case records are added during the sync. + lsn_to = str(get_to_lsn(mssql_conn)[0].hex()) + + select_sql = """ + SELECT {} + ,'I' _sdc_operation_type + , cast('1900-01-01' as datetime) _sdc_lsn_commit_timestamp + , null _sdc_lsn_deleted_at + , '00000000000000000000' _sdc_lsn_value + , '00000000000000000000' _sdc_lsn_seq_value + , 2 as _sdc_lsn_operation + FROM {}.{} + ;""".format( + ",".join(escaped_columns), escaped_schema_name, escaped_table_name + ) + params = {} + + common.sync_query( + cur, + catalog_entry, + state, + select_sql, + extended_columns, + stream_version, + params, + config, + ) + state = singer.write_bookmark(state, catalog_entry.tap_stream_id, "lsn", lsn_to) + + # store the state of the table lsn's after the initial load ready for the next CDC run + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + # clear max pk value and last pk fetched upon successful sync + singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values") + singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched") + + singer.write_message(activate_version_message) + + +def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version): + mssql_conn = MSSQLConnection(config) + common.whitelist_bookmark_keys( + generate_bookmark_keys(catalog_entry), catalog_entry.tap_stream_id, state + ) + + # Add additional keys to the columns + extended_columns = columns + [ + "_sdc_operation_type", + "_sdc_lsn_commit_timestamp", + "_sdc_lsn_deleted_at", + "_sdc_lsn_value", + "_sdc_lsn_seq_value", + "_sdc_lsn_operation", + ] + + bookmark = state.get("bookmarks", {}).get(catalog_entry.tap_stream_id, {}) + version_exists = True if "version" in bookmark else False + + initial_full_table_complete = singer.get_bookmark( + state, catalog_entry.tap_stream_id, "initial_full_table_complete" + ) + + state_version = singer.get_bookmark(state, catalog_entry.tap_stream_id, "version") + + activate_version_message = singer.ActivateVersionMessage( + stream=catalog_entry.stream, version=stream_version + ) + + # For the initial replication, emit an ACTIVATE_VERSION message + # at the beginning so the records show up right away. + if not initial_full_table_complete and not (version_exists and state_version is None): + singer.write_message(activate_version_message) + + with connect_with_backoff(mssql_conn) as open_conn: + with open_conn.cursor() as cur: + + state_last_lsn = singer.get_bookmark(state, catalog_entry.tap_stream_id, "lsn") + + escaped_columns = map(lambda c: common.prepare_columns_sql(catalog_entry, c), columns) + table_name = catalog_entry.table + schema_name = common.get_database_name(catalog_entry) + cdc_table = schema_name + "_" + table_name + escape_table_name = common.escape(catalog_entry.table) + escaped_schema_name = common.escape(schema_name) + escaped_cdc_table = common.escape(cdc_table) + escaped_cdc_function = common.escape("fn_cdc_get_all_changes_" + cdc_table) + + if not verify_change_data_capture_table(mssql_conn, escaped_schema_name, escape_table_name): + raise Exception( + ( + "Error {}.{}: does not have change data capture enabled. " + "Call EXEC sys.sp_cdc_enable_table with relevant parameters to enable CDC." + ).format(escaped_schema_name, escape_table_name) + ) + + lsn_range = get_lsn_available_range(mssql_conn, escaped_cdc_table) + + if lsn_range[0] is not None: # Test to see if there are any change records to process + lsn_from = str(lsn_range[0].hex()) + lsn_to = str(lsn_range[1].hex()) + + if lsn_from <= state_last_lsn: + LOGGER.info( + ( + "The last lsn processed as per the state file %s, minimum available lsn" + " for extract table %s, and the maximum lsn is %s." + ), + state_last_lsn, + lsn_from, + lsn_to, + ) + if lsn_to == state_last_lsn: + LOGGER.info( + ( + "The last lsn processed as per the state file is equal to the max" + " lsn available - no changes expected - state lsn will not be incremented" + ), + ) + from_lsn_expression = "{}".format(py_bin_to_mssql(state_last_lsn)) + else: + from_lsn_expression = ( + ( + "sys.fn_cdc_increment_lsn({})" + ).format(py_bin_to_mssql(state_last_lsn)) + ) + else: + raise Exception( + ( + "Error {}.{}: CDC changes have expired, the minimum lsn is {}, the last" + " processed lsn is {}. Recommend a full load as there may be missing data." + ).format(escaped_schema_name, escape_table_name, lsn_from, state_last_lsn) + ) + + select_sql = """DECLARE @from_lsn binary (10), @to_lsn binary (10) + + SET @from_lsn = {} + SET @to_lsn = {} + + SELECT {} + ,case __$operation + when 2 then 'I' + when 4 then 'U' + when 1 then 'D' + end _sdc_operation_type + , sys.fn_cdc_map_lsn_to_time(__$start_lsn) _sdc_lsn_commit_timestamp + , case __$operation + when 1 then sys.fn_cdc_map_lsn_to_time(__$start_lsn) + else null + end _sdc_lsn_deleted_at + , __$start_lsn _sdc_lsn_value + , __$seqval _sdc_lsn_seq_value + , __$operation _sdc_lsn_operation + FROM cdc.{}(@from_lsn, @to_lsn, 'all') + ORDER BY __$start_lsn, __$seqval, __$operation + ;""".format( + from_lsn_expression, + py_bin_to_mssql(lsn_to), + ",".join(escaped_columns), + escaped_cdc_function, + ) + + params = {} + + common.sync_query( + cur, + catalog_entry, + state, + select_sql, + extended_columns, + stream_version, + params, + config, + ) + + else: + # Store the current database lsn number, need to store the latest lsn checkpoint because the + # CDC logs expire after a point in time. Therefore if there are no records read, then refresh + # the max lsn_to to the latest LSN in the database. + lsn_to = str(get_to_lsn(mssql_conn)[0].hex()) + + state = singer.write_bookmark(state, catalog_entry.tap_stream_id, "lsn", lsn_to) + singer.write_message(singer.StateMessage(value=copy.deepcopy(state))) + + # clear max lsn value and last lsn fetched upon successful sync + singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_lsn_values") + singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_lsn_fetched") + + singer.write_message(activate_version_message)