Skip to content

Commit

Permalink
Correcting escaping
Browse files Browse the repository at this point in the history
  • Loading branch information
s7clarke10 authored Aug 31, 2024
1 parent 2a9d708 commit fb4eef5
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions tap_mssql/sync_strategies/log_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,20 +289,22 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version
state_last_lsn = singer.get_bookmark(state, catalog_entry.tap_stream_id, "lsn")

escaped_columns = map(lambda c: common.prepare_columns_sql(catalog_entry, c), columns)
table_name = common.escape(catalog_entry.table)
table_name = catalog_entry.table
schema_name = common.get_database_name(catalog_entry)
schema_table = common.escape(schema_name + "_" + table_name)
schema_name = common.escape(schema_name)
cdc_table = schema_name + "_" + table_name
escape_table_name = common.escape(catalog_entry.table)
escaped_schema_name = common.escape(schema_name)
escaped_cdc_table = common.escape(cdc_table)

if not verify_change_data_capture_table(mssql_conn, schema_name, table_name):
if not verify_change_data_capture_table(mssql_conn, escaped_schema_name, escape_table_name):
raise Exception(
(
"Error {}.{}: does not have change data capture enabled. "
"Call EXEC sys.sp_cdc_enable_table with relevant parameters to enable CDC."
).format(schema_name, table_name)
).format(escaped_schema_name, escape_table_name)
)

lsn_range = get_lsn_available_range(mssql_conn, schema_table)
lsn_range = get_lsn_available_range(mssql_conn, escaped_cdc_table)

if lsn_range[0] is not None: # Test to see if there are any change records to process
lsn_from = str(lsn_range[0].hex())
Expand Down Expand Up @@ -337,7 +339,7 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version
(
"Error {}.{}: CDC changes have expired, the minimum lsn is {}, the last"
" processed lsn is {}. Recommend a full load as there may be missing data."
).format(schema_name, table_name, lsn_from, state_last_lsn)
).format(escaped_schema_name, escape_table_name, lsn_from, state_last_lsn)
)

select_sql = """DECLARE @from_lsn binary (10), @to_lsn binary (10)
Expand All @@ -359,13 +361,13 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version
, __$start_lsn _sdc_lsn_value
, __$seqval _sdc_lsn_seq_value
, __$operation _sdc_lsn_operation
FROM cdc.fn_cdc_get_all_changes_{}(@from_lsn, @to_lsn, 'all')
FROM cdc."fn_cdc_get_all_changes_{}"(@from_lsn, @to_lsn, 'all')
ORDER BY __$start_lsn, __$seqval, __$operation
;""".format(
from_lsn_expression,
py_bin_to_mssql(lsn_to),
",".join(escaped_columns),
schema_table,
cdc_table,
)

params = {}
Expand Down

0 comments on commit fb4eef5

Please sign in to comment.