Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Escaping objects for log based replication, adding conn_properties and enable_tds_logging #79

Merged
merged 10 commits into from
Sep 4, 2024

Conversation

s7clarke10
Copy link
Collaborator

@s7clarke10 s7clarke10 commented Aug 31, 2024

This PR resolves two outstanding issues.

  1. Table and Schema names are not being escaped for Log Based replication i.e. enclosed in double quote when send to MS SQL Server.

    If a table or schema name used lowercase or special characters the name would not be recognised and will fail.
    The following changes should resolve the following issues Table Select is not being taken correctly. #74

  2. Different SQL Server platforms have different support for database settings and features. This PR adds conn_properties to override default settings. This resolves Add support for APS/PDW/Azure Synapse Dedicated SQL Pool #28

  3. Adding the ability to dump TDS logs. When set it will dump out the underlying TDS driver logs. Useful for diagnosing issues if you are having connection issues to SQL Server databases. WARNING! this does dump a lot of information and may log secure data, should be only used in Development environments.

@s7clarke10 s7clarke10 self-assigned this Aug 31, 2024
@s7clarke10 s7clarke10 requested a review from mjsqu August 31, 2024 07:19
@s7clarke10
Copy link
Collaborator Author

Awaiting verification that this PR works before review and merging.

@HaydenNess
Copy link

HaydenNess commented Sep 1, 2024

Hi Steve, I appreciate your quick efforts there.

I've just set a test environment and home to try it out.

I've only hit the following problem. When verifying CDC being enabled, the table/schema names should not be escaped with "".

When the following is executed in log_based.py sync_table (and presumably sync_historic_table):

            escaped_table_name = common.escape(catalog_entry.table)
            escaped_schema_name = common.escape(common.get_database_name(catalog_entry))

            if not verify_change_data_capture_table(mssql_conn, escaped_schema_name, escaped_table_name):

The resulting query looks like:

select s.name as schema_name, t.name as table_name, t.is_tracked_by_cdc, t.object_id
                   from sys.tables t
                   join sys.schemas s on (s.schema_id = t.schema_id)
                   and  t.name = '"TestPlus+"'
                   and  s.name = '"dbo"'

which fails because no rows are returned. When I replace " with nothing, everything else works as expected.

It is interesting to note that SQL Server tends to use [] delimiters rather than "", Quickly exploring idea, I can create tables with names such as te[esst]]tes, 'te'st', test'test"test, and so on. Only [] delimiters seem to be able to capture the full range. Not a high priority of course given that such names are a bit silly.

In the case of a table name like 'mytable' in the above query, we need to escape the single quotes by using double quotes. e.g.

and  t.name = '''mytable'''

@s7clarke10
Copy link
Collaborator Author

s7clarke10 commented Sep 1, 2024 via email

@s7clarke10 s7clarke10 changed the title Escaping table and schema names for log based replication Escaping objects for log based replication and adding conn_properties Sep 2, 2024
@cwegener
Copy link

cwegener commented Sep 2, 2024

The new conn_properties option works great for me when connecting to Azure Synapse SQL Data Warehouse Dedicated Pool.

@SenneVanstraelen
Copy link

Hello, for me the problem is not fixed. However, I did notice that the catalog is captured correctly. It seems to be going wrong when Meltano tries to access the catalog and search for the stream. I'm testing with the [] now.

@s7clarke10
Copy link
Collaborator Author

s7clarke10 commented Sep 2, 2024 via email

@s7clarke10
Copy link
Collaborator Author

s7clarke10 commented Sep 2, 2024 via email

@HaydenNess
Copy link

I tried a few runs last night, using log based sync, with the initial sync and the incremental syncs, and had no problems using tables with a '+' in it. Of course it can still be broken with more archaic table names, but that is a concern for another time.

Thank you very much for the fix, it alleviates some patchwork I have to make. Unfortunately my target database also doesn't support '+', and CyberDuck seems to have issues with '+' as well, so there's more for me to do.

@s7clarke10
Copy link
Collaborator Author

Here is a SQL script which I have used to simulate the issue creating a countryName$Store table.

-- create new test database
use master;

create database test_database;

use test_database;

-- Enable CDC on the test_database.

EXEC sys.sp_cdc_enable_db;  

-- =========  
-- CDC Queries to check what is happening  
-- =========  

-- Check for CDC errors
select *
from sys.dm_cdc_errors;

-- Stop the CDC Capture Job
exec sys.sp_cdc_stop_job;

-- Start the CDC Capture Job
EXEC sys.sp_cdc_start_job;

-- Check that CDC is enabled on the database
SELECT name, is_cdc_enabled
FROM sys.databases WHERE database_id = DB_ID();

-- Check tables which have CDC enabled on them
select s.name as schema_name, t.name as table_name, t.is_tracked_by_cdc, t.object_id
from sys.tables t
join sys.schemas s on (s.schema_id = t.schema_id)
where t.is_tracked_by_cdc = 1
;


-- create test table and insert data

drop table test_database.dbo."countryName$Store";
create table test_database.dbo."countryName$Store"
(
test_col varchar(255) CONSTRAINT PK_CDC PRIMARY KEY CLUSTERED
)
;

insert into test_database.dbo."countryName$Store"
(test_col)
values 
('1 - Record One')
;

insert into test_database.dbo."countryName$Store"
(test_col)
values 
('2 - Record Two')
;

-- Enable CDC on this table.

EXEC sys.sp_cdc_enable_table  
@source_schema = N'dbo',
@source_name   = N'countryName$Store',  
@role_name     = NULL,
@supports_net_changes = 1  
;

-- Complete Initial Load via the tap then add
-- the following record for a CDC run.

insert into test_database.dbo."countryName$Store"
(test_col)
values 
('3 - Record Three')
;

@s7clarke10
Copy link
Collaborator Author

s7clarke10 commented Sep 3, 2024

Here are the results - logged based replication under a SQL Server 2019 Docker Image appears to work correctly as expected.

This is the command I used to start a SQL Server Docker Image with CDC working in it 😊.

docker run -e "ACCEPT_EULA=Y" -e "MSSQL_SA_PASSWORD=testDatabase1" -e "MSSQL_PID=Developer" -e "MSSQL_AGENT_ENABLED=true" -p 1433:1433 -d mcr.microsoft.com/mssql/server:2019-latest

This is a tap-mssql config file to connect to this database.

{
    "host": "127.0.0.1",
    "database": "test_database",
    "port": "1433",
    "user": "sa",
    "password": "testDatabase1"
  }

The output properties.json file from the poetry run tap-mssql -c config.json --discover > properties.json needed the selected and replication-method added to the streams in the properties.json file like so.

      "stream": "countryName$Store",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected-by-default": false,
            "database-name": "dbo",
            "is-view": false,
            "table-key-properties": [
              "test_col"
            ],
            "selected": true,
            "replication-method": "LOG_BASED"
          }
        },

Here is my test log from my various runs.

vscode ➜ /workspaces/python-dnd/pipelinewise-tap-mssql (feature/escape_log_based_tables) $ poetry run tap-mssql -c config.json --discover > properties.json
time=2024-09-03 06:21:21 name=singer level=INFO message=Server Parameters: version: Microsoft SQL Server 2019 (RTM-CU28) (KB5039747) - 15.0.4385.2 (X64) 
        Jul 25 2024 21:32:40 
        Copyright (C) 2019 Microsoft Corporation
        Developer Edition (64-bit) on Linux (Ubuntu 20.04.6 LTS) <X64>, lock_timeout: -1, 
time=2024-09-03 06:21:21 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:21:21 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:21:21 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:21:21 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:21:21 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:21:21 name=singer level=INFO message=Catalog ready
vscode ➜ /workspaces/python-dnd/pipelinewise-tap-mssql (feature/escape_log_based_tables) $ poetry run tap-mssql -c config.json -p properties.json
time=2024-09-03 06:22:59 name=singer level=INFO message=Server Parameters: version: Microsoft SQL Server 2019 (RTM-CU28) (KB5039747) - 15.0.4385.2 (X64) 
        Jul 25 2024 21:32:40 
        Copyright (C) 2019 Microsoft Corporation
        Developer Edition (64-bit) on Linux (Ubuntu 20.04.6 LTS) <X64>, lock_timeout: -1, 
time=2024-09-03 06:22:59 name=singer level=INFO message=Beginning sync
time=2024-09-03 06:22:59 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:22:59 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:22:59 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:22:59 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:22:59 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:22:59 name=singer level=INFO message=Catalog ready
time=2024-09-03 06:22:59 name=singer level=INFO message=selected-by-default: False
time=2024-09-03 06:22:59 name=singer level=INFO message=database-name: dbo
time=2024-09-03 06:22:59 name=singer level=INFO message=is-view: False
time=2024-09-03 06:22:59 name=singer level=INFO message=table-key-properties: ['test_col']
time=2024-09-03 06:22:59 name=singer level=INFO message=selected: True
time=2024-09-03 06:22:59 name=singer level=INFO message=replication-method: LOG_BASED
time=2024-09-03 06:22:59 name=singer level=INFO message=LOG_BASED stream dbo-countryName$Store requires full historical sync
time=2024-09-03 06:22:59 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:22:59 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:22:59 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:22:59 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:22:59 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:22:59 name=singer level=INFO message=Catalog ready
time=2024-09-03 06:22:59 name=singer level=INFO message=Need to sync countryName$Store
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store"}}
time=2024-09-03 06:22:59 name=singer level=INFO message=Table countryName$Store proposes LOG_BASED sync
time=2024-09-03 06:22:59 name=singer level=INFO message=No initial load for countryName$Store, using full table replication
time=2024-09-03 06:22:59 name=singer level=INFO message=syncing countryName$Store cdc tables
{"type":"SCHEMA","stream":"dbo-countryName$Store","schema":{"properties":{"test_col":{"inclusion":"automatic","maxLength":255,"type":["null","string"]},"_sdc_operation_type":{"description":"Source operation I=Insert, D=Delete, U=Update","format":"string","type":["null","string"]},"_sdc_lsn_commit_timestamp":{"description":"Source system commit timestamp","format":"date-time","type":["null","string"]},"_sdc_lsn_deleted_at":{"description":"Source system delete timestamp","format":"date-time","type":["null","string"]},"_sdc_lsn_value":{"description":"Source system log sequence number (LSN)","format":"string","type":["null","string"]},"_sdc_lsn_seq_value":{"description":"Source sequence number within the system log sequence number (LSN)","format":"string","type":["null","string"]},"_sdc_lsn_operation":{"description":"The operation that took place (1=Delete, 2=Insert, 3=Update (Before Image),4=Update (After Image) )","format":"integer","type":["null","integer"]}},"type":"object"},"key_properties":["test_col"]}
{"type":"ACTIVATE_VERSION","stream":"dbo-countryName$Store","version":1725344579952}
time=2024-09-03 06:22:59 name=singer level=WARNING message=CDC Databases may result in dirty reads. Consider enabling Read Committed or Snapshot isolation: Database test_database, Is Read Committed Snapshot is False, Snapshot Isolation is OFF
time=2024-09-03 06:22:59 name=singer level=INFO message=Max LSN ID : 00000025000005a80003
time=2024-09-03 06:22:59 name=singer level=INFO message=ARRAYSIZE=1
{"type":"RECORD","stream":"dbo-countryName$Store","record":{"test_col":"1 - Record One","_sdc_operation_type":"I","_sdc_lsn_commit_timestamp":"1900-01-01T00:00:00+00:00","_sdc_lsn_deleted_at":null,"_sdc_lsn_value":"00000000000000000000","_sdc_lsn_seq_value":"00000000000000000000","_sdc_lsn_operation":2},"version":1725344579952,"time_extracted":"2024-09-03T06:22:59.977713Z"}
{"type":"RECORD","stream":"dbo-countryName$Store","record":{"test_col":"2 - Record Two","_sdc_operation_type":"I","_sdc_lsn_commit_timestamp":"1900-01-01T00:00:00+00:00","_sdc_lsn_deleted_at":null,"_sdc_lsn_value":"00000000000000000000","_sdc_lsn_seq_value":"00000000000000000000","_sdc_lsn_operation":2},"version":1725344579952,"time_extracted":"2024-09-03T06:22:59.977713Z"}
time=2024-09-03 06:22:59 name=singer level=INFO message=METRIC: b'{"type":"counter","metric":"record_count","value":2,"tags":{"database":"dbo","table":"countryName$Store"}}'
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store"}}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003"}}}}
{"type":"ACTIVATE_VERSION","stream":"dbo-countryName$Store","version":1725344579952}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
time=2024-09-03 06:22:59 name=singer level=INFO message=METRIC: b'{"type":"timer","metric":"job_duration","value":0.03532218933105469,"tags":{"job_type":"sync_table","database":"dbo","table":"countryName$Store","status":"succeeded"}}'
{"type":"STATE","value":{"currently_syncing":null,"bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
vscode ➜ /workspaces/python-dnd/pipelinewise-tap-mssql (feature/escape_log_based_tables) $ vi state.json
vscode ➜ /workspaces/python-dnd/pipelinewise-tap-mssql (feature/escape_log_based_tables) $ poetry run tap-mssql -c config.json -p properties.json -s state.json
time=2024-09-03 06:24:48 name=singer level=INFO message=Server Parameters: version: Microsoft SQL Server 2019 (RTM-CU28) (KB5039747) - 15.0.4385.2 (X64) 
        Jul 25 2024 21:32:40 
        Copyright (C) 2019 Microsoft Corporation
        Developer Edition (64-bit) on Linux (Ubuntu 20.04.6 LTS) <X64>, lock_timeout: -1, 
time=2024-09-03 06:24:48 name=singer level=INFO message=Beginning sync
time=2024-09-03 06:24:48 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:24:48 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:24:48 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:24:48 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:24:48 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:24:48 name=singer level=INFO message=Catalog ready
time=2024-09-03 06:24:48 name=singer level=INFO message=selected-by-default: False
time=2024-09-03 06:24:48 name=singer level=INFO message=database-name: dbo
time=2024-09-03 06:24:48 name=singer level=INFO message=is-view: False
time=2024-09-03 06:24:48 name=singer level=INFO message=table-key-properties: ['test_col']
time=2024-09-03 06:24:48 name=singer level=INFO message=selected: True
time=2024-09-03 06:24:48 name=singer level=INFO message=replication-method: LOG_BASED
time=2024-09-03 06:24:48 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:24:48 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:24:48 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:24:48 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:24:48 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:24:48 name=singer level=INFO message=Catalog ready
{"type":"STATE","value":{"currently_syncing":null,"bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
time=2024-09-03 06:24:48 name=singer level=INFO message=Table countryName$Store proposes LOG_BASED sync
time=2024-09-03 06:24:48 name=singer level=INFO message=Table countryName$Store will use LOG_BASED sync
time=2024-09-03 06:24:48 name=singer level=INFO message=syncing countryName$Store cdc tables
{"type":"SCHEMA","stream":"dbo-countryName$Store","schema":{"properties":{"test_col":{"inclusion":"automatic","maxLength":255,"type":["null","string"]},"_sdc_operation_type":{"description":"Source operation I=Insert, D=Delete, U=Update","format":"string","type":["null","string"]},"_sdc_lsn_commit_timestamp":{"description":"Source system commit timestamp","format":"date-time","type":["null","string"]},"_sdc_lsn_deleted_at":{"description":"Source system delete timestamp","format":"date-time","type":["null","string"]},"_sdc_lsn_value":{"description":"Source system log sequence number (LSN)","format":"string","type":["null","string"]},"_sdc_lsn_seq_value":{"description":"Source sequence number within the system log sequence number (LSN)","format":"string","type":["null","string"]},"_sdc_lsn_operation":{"description":"The operation that took place (1=Delete, 2=Insert, 3=Update (Before Image),4=Update (After Image) )","format":"integer","type":["null","integer"]}},"type":"object"},"key_properties":["test_col"],"bookmark_properties":[null]}
time=2024-09-03 06:24:48 name=singer level=INFO message=Schema written
time=2024-09-03 06:24:48 name=singer level=INFO message=Data available in cdc table "dbo_countryName$Store" from lsn 00000000000000000000
time=2024-09-03 06:24:48 name=singer level=INFO message=The last lsn processed as per the state file 00000025000005a80003, minimum available lsn for extract table 00000000000000000000, and the maximum lsn is 00000025000005a80003.
time=2024-09-03 06:24:48 name=singer level=INFO message=The last lsn processed as per the state file is equal to the max lsn available - no changes expected - state lsn will not be incremented
time=2024-09-03 06:24:48 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:24:48 name=singer level=INFO message=METRIC: b'{"type":"counter","metric":"record_count","value":0,"tags":{"database":"dbo","table":"countryName$Store"}}'
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
{"type":"ACTIVATE_VERSION","stream":"dbo-countryName$Store","version":1725344688209}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
time=2024-09-03 06:24:48 name=singer level=INFO message=METRIC: b'{"type":"timer","metric":"job_duration","value":0.03827333450317383,"tags":{"job_type":"table_cdc_sync","database":"dbo","table":"countryName$Store","status":"succeeded"}}'
{"type":"STATE","value":{"currently_syncing":null,"bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
vscode ➜ /workspaces/python-dnd/pipelinewise-tap-mssql (feature/escape_log_based_tables) $ poetry run tap-mssql -c config.json -p properties.json -s state.json
time=2024-09-03 06:26:02 name=singer level=INFO message=Server Parameters: version: Microsoft SQL Server 2019 (RTM-CU28) (KB5039747) - 15.0.4385.2 (X64) 
        Jul 25 2024 21:32:40 
        Copyright (C) 2019 Microsoft Corporation
        Developer Edition (64-bit) on Linux (Ubuntu 20.04.6 LTS) <X64>, lock_timeout: -1, 
time=2024-09-03 06:26:02 name=singer level=INFO message=Beginning sync
time=2024-09-03 06:26:02 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:26:02 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:26:02 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:26:02 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:26:02 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:26:02 name=singer level=INFO message=Catalog ready
time=2024-09-03 06:26:02 name=singer level=INFO message=selected-by-default: False
time=2024-09-03 06:26:02 name=singer level=INFO message=database-name: dbo
time=2024-09-03 06:26:02 name=singer level=INFO message=is-view: False
time=2024-09-03 06:26:02 name=singer level=INFO message=table-key-properties: ['test_col']
time=2024-09-03 06:26:02 name=singer level=INFO message=selected: True
time=2024-09-03 06:26:02 name=singer level=INFO message=replication-method: LOG_BASED
time=2024-09-03 06:26:02 name=singer level=INFO message=Preparing Catalog
time=2024-09-03 06:26:02 name=singer level=INFO message=Fetching tables
time=2024-09-03 06:26:02 name=singer level=INFO message=Tables fetched, fetching columns
time=2024-09-03 06:26:02 name=singer level=INFO message=ARRAYSIZE=1
time=2024-09-03 06:26:02 name=singer level=INFO message=Columns Fetched
time=2024-09-03 06:26:02 name=singer level=INFO message=Catalog ready
{"type":"STATE","value":{"currently_syncing":null,"bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
time=2024-09-03 06:26:02 name=singer level=INFO message=Table countryName$Store proposes LOG_BASED sync
time=2024-09-03 06:26:02 name=singer level=INFO message=Table countryName$Store will use LOG_BASED sync
time=2024-09-03 06:26:02 name=singer level=INFO message=syncing countryName$Store cdc tables
{"type":"SCHEMA","stream":"dbo-countryName$Store","schema":{"properties":{"test_col":{"inclusion":"automatic","maxLength":255,"type":["null","string"]},"_sdc_operation_type":{"description":"Source operation I=Insert, D=Delete, U=Update","format":"string","type":["null","string"]},"_sdc_lsn_commit_timestamp":{"description":"Source system commit timestamp","format":"date-time","type":["null","string"]},"_sdc_lsn_deleted_at":{"description":"Source system delete timestamp","format":"date-time","type":["null","string"]},"_sdc_lsn_value":{"description":"Source system log sequence number (LSN)","format":"string","type":["null","string"]},"_sdc_lsn_seq_value":{"description":"Source sequence number within the system log sequence number (LSN)","format":"string","type":["null","string"]},"_sdc_lsn_operation":{"description":"The operation that took place (1=Delete, 2=Insert, 3=Update (Before Image),4=Update (After Image) )","format":"integer","type":["null","integer"]}},"type":"object"},"key_properties":["test_col"],"bookmark_properties":[null]}
time=2024-09-03 06:26:02 name=singer level=INFO message=Schema written
time=2024-09-03 06:26:02 name=singer level=INFO message=Data available in cdc table "dbo_countryName$Store" from lsn 00000000000000000000
time=2024-09-03 06:26:02 name=singer level=INFO message=The last lsn processed as per the state file 00000025000005a80003, minimum available lsn for extract table 00000000000000000000, and the maximum lsn is 00000025000007f80003.
time=2024-09-03 06:26:02 name=singer level=INFO message=ARRAYSIZE=1
{"type":"RECORD","stream":"dbo-countryName$Store","record":{"test_col":"3 - Record Three","_sdc_operation_type":"I","_sdc_lsn_commit_timestamp":"2024-09-03T06:25:53.270000+00:00","_sdc_lsn_deleted_at":null,"_sdc_lsn_value":"00000025000007f80003","_sdc_lsn_seq_value":"00000025000007f80002","_sdc_lsn_operation":2},"version":1725344762576,"time_extracted":"2024-09-03T06:26:02.584353Z"}
time=2024-09-03 06:26:02 name=singer level=INFO message=METRIC: b'{"type":"counter","metric":"record_count","value":1,"tags":{"database":"dbo","table":"countryName$Store"}}'
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000005a80003","initial_full_table_complete":true}}}}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000007f80003","initial_full_table_complete":true}}}}
{"type":"ACTIVATE_VERSION","stream":"dbo-countryName$Store","version":1725344762576}
{"type":"STATE","value":{"currently_syncing":"dbo-countryName$Store","bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000007f80003","initial_full_table_complete":true}}}}
time=2024-09-03 06:26:02 name=singer level=INFO message=METRIC: b'{"type":"timer","metric":"job_duration","value":0.22086167335510254,"tags":{"job_type":"table_cdc_sync","database":"dbo","table":"countryName$Store","status":"succeeded"}}'
{"type":"STATE","value":{"currently_syncing":null,"bookmarks":{"dbo-countryName$Store":{"lsn":"00000025000007f80003","initial_full_table_complete":true}}}}

@s7clarke10
Copy link
Collaborator Author

I have also tested a FULL_TABLE run. So at this stage I cannot find an error that you are experience @SenneVanstraelen .

@SenneVanstraelen
Copy link

Hello Steve

Thanks for the comprehensive test. On a recent version of SQL server it works fine indeed.

However, we are connecting with a SQL Server 2005, which only takes TDS version 7.0 or 4.2.
There is also a space between the 'CompanyName CountryCode$Store'.

Further testing learns that it is only the removed when the letter after the '$' is a capital letter.
When looking into the catalog file, the stream is properly named, however when trying to select from this stream, we get the error that 'CompanyName CountryCode$tore' does not exist, so the 'S' is removed from the script.

@s7clarke10
Copy link
Collaborator Author

Hi @SenneVanstraelen,

I have also tried with the table you have suggested with the space i.e. 'CompanyName CountryCode$Store'. That does seem to replicate okay.

According to the FreeTDS documentation TDS version 7.2 is supported for SQL Server 2005. Might be worth changing this parameter to 7.2.

I have tried adjusting the config against my SQL Server instance using TDS versions to 7.0 and 7.2 and couldn't replicate the issue. It would be interesting if you get an improve adjusting the TDS version like so?

{
    "host": "127.0.0.1",
    "database": "test_database",
    "port": "1433",
    "user": "sa",
    "password": "testDatabase1",
    "tds_version": "7.2"
  }

Also I assume you tried to rebuild the catalog, my properties.json looked like this for the new table.

      "stream": "CompanyName CountryCode$Store",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected-by-default": false,
            "database-name": "dbo",
            "is-view": false,
            "table-key-properties": [
              "test_col"
            ],
            "selected": true,
            "replication-method": "LOG_BASED"
          }
        },

At this stage if nothing else works, I am going to have to put this down to a unique SQL Server 2005 / TDS issue as I cannot replicate the issue. I do wonder if this is actual a Meltano issue with the $ in the select table in the meltano.yml being some how interpreted incorrectly? Have you tried just installing the tap and running through something like my run log without Meltano?

@SenneVanstraelen
Copy link

Hello

I have also seen that in the documentation. However, the server does not accept the connection request when using 7.2. It complains about a wrong EOF. In 7.0 and 4.2 it works fine however (I have the same error in my Database client).

The catalog does indeed run fine, the name is correct there. I have not tried to run the tap without Meltano. It might be specific to the integration within Meltano, but using the $ in select for other databases (more recent versions of TDS), works fine.

Thank you for developments and testing. If we cannot find a solution, we will have to find another means to pull the data from the SQL Server 2005 database.

@s7clarke10
Copy link
Collaborator Author

Okay, it seems to work okay in Meltano too.

mkdir meltano
cd meltano
pip install meltano
meltano init
cd test-database/
meltano add extractor tap-mssql
# Update pip url in the meltano.yml file like in the example below
# Re-install the meltano extractor tap-mssql with the PR branch
meltano install extractor tap-mssql --clean
meltano invoke tap-mssql

Here is my .env file

TAP_MSSQL_HOST="127.0.0.1"
TAP_MSSQL_DATABASE="test_database"
TAP_MSSQL_PORT=1433
TAP_MSSQL_USER="sa"
TAP_MSSQL_PASSWORD="testDatabase1"
TAP_MSSQL_TDS_VERSION="7.0"
TAP_MSSQL__SELECT='["dbo-CompanyName CountryCode$Store"]'

Running a meltano invoke tap-mssql works just fine.

My simple meltano.yml file looked like this.

version: 1
default_environment: dev
project_id: 06cbbacb-5167-484b-aa52-b24c7792eb06
environments:
- name: dev
- name: staging
- name: prod
plugins:
  extractors:
  - name: tap-mssql
    variant: wintersrd
    pip_url: git+https://github.com/wintersrd/pipelinewise-tap-mssql@feature/escape_log_based_tables

So at this stage I think I will have to proceed with the PR as I can't find an issue. I'll put this down to a SQL Server 2005 / FreeTDS issue. A work-around is using a Linked Server in a supported version of SQL Server to ingest this table - not ideal however.

@s7clarke10 s7clarke10 changed the title Escaping objects for log based replication and adding conn_properties Escaping objects for log based replication, adding conn_properties and enable_tds_logging Sep 3, 2024
Comment on lines +313 to +314
lsn_range = get_lsn_available_range(mssql_conn, escaped_cdc_table)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good, I can see why escaped table, e.g. "Thishasa$dollar" is used in the get lsn query, but that needs to be 'un-escaped' when built as part of the CDC function name

Comment on lines 350 to 378

select_sql = """DECLARE @from_lsn binary (10), @to_lsn binary (10)

SET @from_lsn = {}
SET @to_lsn = {}

SELECT {}
,case __$operation
when 2 then 'I'
when 4 then 'U'
when 1 then 'D'
end _sdc_operation_type
, sys.fn_cdc_map_lsn_to_time(__$start_lsn) _sdc_lsn_commit_timestamp
, case __$operation
when 1 then sys.fn_cdc_map_lsn_to_time(__$start_lsn)
else null
end _sdc_lsn_deleted_at
, __$start_lsn _sdc_lsn_value
, __$seqval _sdc_lsn_seq_value
, __$operation _sdc_lsn_operation
FROM cdc."fn_cdc_get_all_changes_{}"(@from_lsn, @to_lsn, 'all')
ORDER BY __$start_lsn, __$seqval, __$operation
;""".format(
from_lsn_expression,
py_bin_to_mssql(lsn_to),
",".join(escaped_columns),
cdc_table,
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing a consistent approach for escaping SQL Server objects might be preferable - my suggestion here adds a variable containing the cdc function name which is then escaped, then passed into the SQL.

Suggested change
select_sql = """DECLARE @from_lsn binary (10), @to_lsn binary (10)
SET @from_lsn = {}
SET @to_lsn = {}
SELECT {}
,case __$operation
when 2 then 'I'
when 4 then 'U'
when 1 then 'D'
end _sdc_operation_type
, sys.fn_cdc_map_lsn_to_time(__$start_lsn) _sdc_lsn_commit_timestamp
, case __$operation
when 1 then sys.fn_cdc_map_lsn_to_time(__$start_lsn)
else null
end _sdc_lsn_deleted_at
, __$start_lsn _sdc_lsn_value
, __$seqval _sdc_lsn_seq_value
, __$operation _sdc_lsn_operation
FROM cdc."fn_cdc_get_all_changes_{}"(@from_lsn, @to_lsn, 'all')
ORDER BY __$start_lsn, __$seqval, __$operation
;""".format(
from_lsn_expression,
py_bin_to_mssql(lsn_to),
",".join(escaped_columns),
cdc_table,
)
escaped_cdc_function = common.escape("fn_cdc_get_all_changes_" + cdc_table)
select_sql = """DECLARE @from_lsn binary (10), @to_lsn binary (10)
SET @from_lsn = {}
SET @to_lsn = {}
SELECT {}
,case __$operation
when 2 then 'I'
when 4 then 'U'
when 1 then 'D'
end _sdc_operation_type
, sys.fn_cdc_map_lsn_to_time(__$start_lsn) _sdc_lsn_commit_timestamp
, case __$operation
when 1 then sys.fn_cdc_map_lsn_to_time(__$start_lsn)
else null
end _sdc_lsn_deleted_at
, __$start_lsn _sdc_lsn_value
, __$seqval _sdc_lsn_seq_value
, __$operation _sdc_lsn_operation
FROM cdc.{}(@from_lsn, @to_lsn, 'all')
ORDER BY __$start_lsn, __$seqval, __$operation
;""".format(
from_lsn_expression,
py_bin_to_mssql(lsn_to),
",".join(escaped_columns),
escaped_cdc_function,
)

@mjsqu
Copy link
Collaborator

mjsqu commented Sep 3, 2024

Happy with this, just a suggestion around the CDC function, which could be passed through common.escape in line with other objects that are being escaped

@s7clarke10
Copy link
Collaborator Author

Thank you for that review @mjsqu. I have implemented this suggestion.

@s7clarke10 s7clarke10 merged commit d1c4540 into master Sep 4, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for APS/PDW/Azure Synapse Dedicated SQL Pool
5 participants