Skip to content

Commit

Permalink
fivetran-destination: Use the provided Docker image (#24835)
Browse files Browse the repository at this point in the history
This PR is centered around testing the Fivetran Destination, it does a
few individual things:

1. Uses the Fivetran provided Docker image of the "destination tester"
instead of building our own from our fork of the fivetran sdk.
2. Breaks up the existing "test_writes" into separate tests, since the
testing behavior we were trying to do isn't yet supported (see
[Slack](https://materializeinc.slack.com/archives/C060KAR4802/p1706548430680759)).
3. Updates our connector so we no longer splice the "_fivetran_synced"
and "_fivetran_deleted" columns into various requests, since the tester
now provides these.
4. Runs our Fivetran destination tests in CI.

There are a couple outstanding issues related to testing the Fivetran
Destination that I have pinged them about and are currently in progress:

1. aarch64 based Docker image. Currently if you run these tests locally
on a Mac it will run the x86 image in emulation mode.
[Slack](https://materializeinc.slack.com/archives/C060KAR4802/p1706548430680759)
2. The "truncate" option in their testing framework is currently broken.
[Slack](https://materializeinc.slack.com/archives/C060KAR4802/p1706651239233319).

Even with these issues I wanted to get this change up in case I switch
to other work soon.

### Motivation

Fixes https://github.com/MaterializeInc/materialize/issues/24446

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered.
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [x] This PR includes the following [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note):
  - N/a
  • Loading branch information
ParkMyCar authored Jan 31, 2024
1 parent 2efe775 commit 9491d6d
Show file tree
Hide file tree
Showing 22 changed files with 164 additions and 150 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
url = https://github.com/MaterializeInc/sqllogictest.git
[submodule "fivetran-sdk"]
path = misc/fivetran-sdk
url = https://github.com/MaterializeInc/fivetran_sdk.git
url = https://github.com/fivetran/fivetran_sdk.git
37 changes: 26 additions & 11 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -386,17 +386,32 @@ steps:
agents:
queue: linux-aarch64-small

- id: ssh-connection
label: SSH connection tests
depends_on: build-aarch64
timeout_in_minutes: 40
inputs: [test/ssh-connection]
artifact_paths: junit_*.xml
plugins:
- ./ci/plugins/mzcompose:
composition: ssh-connection
agents:
queue: linux-aarch64
- group: "Connection tests"
key: connection-tests
steps:
- id: ssh-connection
label: SSH connection tests
depends_on: build-aarch64
timeout_in_minutes: 40
inputs: [test/ssh-connection]
artifact_paths: junit_*.xml
plugins:
- ./ci/plugins/mzcompose:
composition: ssh-connection
agents:
queue: linux-aarch64

- id: fivetran-destination-tests
label: Fivetran Destination tests
depends_on: build-aarch64
timeout_in_minutes: 10
inputs: [test/fivetran-destination]
artifact_paths: junit_*.xml
plugins:
- ./ci/plugins/mzcompose:
composition: fivetran-destination
agents:
queue: linux-aarch64-small

- group: "Kafka tests"
key: kafka-tests
Expand Down
2 changes: 1 addition & 1 deletion misc/fivetran-sdk
Submodule fivetran-sdk updated 36 files
+0 −7 .gitignore
+5 −1 README.md
+73 −25 development-guide.md
+3 −3 examples/connector/java/README.md
+2 −2 examples/destination/java/README.md
+0 −8 tools/.bazelrc
+0 −1 tools/.bazelversion
+0 −53 tools/BUILD
+0 −9 tools/Dockerfile.connector_tester
+0 −9 tools/Dockerfile.destination_tester
+0 −152 tools/README.md
+0 −20 tools/VerifyJRE17Spec.java
+0 −199 tools/WORKSPACE
+0 −28 tools/client/BUILD
+0 −150 tools/client/src/client/connector/SdkConnectorClient.java
+0 −204 tools/client/src/client/destination/SdkWriterClient.java
+33 −0 tools/connector-tester/README.md
+129 −0 tools/destination-tester/README.md
+0 −2,480 tools/maven_install.json
+0 −20 tools/scripts/build_sdk_connector_tester_image.sh
+0 −20 tools/scripts/build_sdk_destination_tester_image.sh
+0 −57 tools/testers/BUILD
+0 −489 tools/testers/src/testers/SdkConnectorTester.java
+0 −523 tools/testers/src/testers/SdkDestinationTester.java
+0 −23 tools/testers/src/testers/util/InstantFormattedSerializer.java
+0 −281 tools/testers/src/testers/util/MockConnectorOutput.java
+0 −367 tools/testers/src/testers/util/MockWarehouse.java
+0 −37 tools/testers/src/testers/util/SchemaTable.java
+0 −188 tools/testers/src/testers/util/SdkConverters.java
+0 −82 tools/testers/src/testers/util/SdkCrypto.java
+0 −73 tools/testers/test/testers/ClasspathRunner.java
+0 −7 tools/testers/test/testers/RunAllSpecs.java
+0 −84 tools/testers/test/testers/util/MockConnectorOutputIntegrationSpec.java
+0 −236 tools/testers/test/testers/util/MockConnectorOutputSpec.java
+0 −429 tools/testers/test/testers/util/MockWarehouseSpec.java
+0 −28 tools/testers/test/testers/util/SdkConvertersSpec.java
1 change: 0 additions & 1 deletion misc/images/fivetran-destination-tester/.gitignore

This file was deleted.

22 changes: 0 additions & 22 deletions misc/images/fivetran-destination-tester/Dockerfile

This file was deleted.

19 changes: 0 additions & 19 deletions misc/images/fivetran-destination-tester/mzbuild.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from materialize.mzcompose.service import Service

FIVETRAN_TESTER_VERSION = "024.0125.001"


class FivetranDestinationTester(Service):
def __init__(
Expand All @@ -26,7 +28,7 @@ def __init__(
super().__init__(
name="fivetran-destination-tester",
config={
"mzbuild": "fivetran-destination-tester",
"image": f"it5t/fivetran-sdk-destination-tester:{FIVETRAN_TESTER_VERSION}",
"command": command,
"environment": environment,
"volumes": volumes_extra,
Expand Down
6 changes: 0 additions & 6 deletions src/fivetran-destination/src/destination/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,6 @@ async fn create_table(request: CreateTableRequest) -> Result<(), anyhow::Error>
}
}

// TODO(benesch): should the SDK be providing these in the request?
defs.extend([
"_fivetran_deleted boolean".into(),
"_fivetran_synced timestamptz".into(),
]);

// TODO(benesch): support primary keys.
#[allow(clippy::overly_complex_bool_expr)]
if !primary_key_columns.is_empty() && false {
Expand Down
20 changes: 3 additions & 17 deletions src/fivetran-destination/src/destination/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::crypto::AsyncAesDecrypter;
use crate::destination::{config, ddl};
use crate::fivetran_sdk::write_batch_request::FileParams;
use crate::fivetran_sdk::{
Column, Compression, DataType, Encryption, Table, TruncateRequest, TruncateResponse,
WriteBatchRequest, WriteBatchResponse,
Compression, Encryption, Table, TruncateRequest, TruncateResponse, WriteBatchRequest,
WriteBatchResponse,
};

pub async fn handle_truncate_request(
Expand Down Expand Up @@ -101,28 +101,14 @@ async fn truncate_table(request: TruncateRequest) -> Result<(), anyhow::Error> {
}

async fn write_batch(request: WriteBatchRequest) -> Result<(), anyhow::Error> {
let Some(mut table) = request.table else {
let Some(table) = request.table else {
bail!("internal error: WriteBatchRequest missing \"table\" field");
};

if !table.columns.iter().any(|c| c.primary_key) {
bail!("table has no primary key columns");
}

// TODO(benesch): should the SDK be providing these in the request?
table.columns.push(Column {
name: "_fivetran_deleted".into(),
r#type: DataType::Boolean.into(),
primary_key: false,
decimal: None,
});
table.columns.push(Column {
name: "_fivetran_synced".into(),
r#type: DataType::UtcDatetime.into(),
primary_key: false,
decimal: None,
});

let Some(FileParams::Csv(csv_file_params)) = request.file_params else {
bail!("internal error: WriteBatchRequest missing \"file_params\" field");
};
Expand Down
80 changes: 57 additions & 23 deletions test/fivetran-destination/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
),
]

# Tests that are currently broken because the Fivetran Tester seems to do the wrong thing.
#
# 'test-truncate': https://materializeinc.slack.com/archives/C060KAR4802/p1706651239233319
BROKEN_TESTS = ["test-truncate"]


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
parser.add_argument("filter", nargs="?")
Expand All @@ -90,6 +95,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
if args.filter and args.filter not in path.name:
print(f"Test case {path.name!r} does not match filter; skipping...")
continue
if path.name in BROKEN_TESTS:
print(f"Test case {path.name!r} is currently broken; skipping...")
continue
with c.test_case(path.name):
_run_test_case(c, path)

Expand All @@ -107,27 +115,53 @@ def _run_test_case(c: Composition, path: Path):
assert False, f"unexpected test file: {test_file}"


# Run the Fivetran Destination Tester with a single file.
def _run_destination_tester(c: Composition, test_file: Path):
data_dir = ROOT / "data"
for data_file in data_dir.iterdir():
if data_file.name in ("configuration.json", ".gitignore"):
continue
data_file.unlink()
shutil.copy(test_file, data_dir)

expected_failure = None
last_line = test_file.read_text().splitlines()[-1]
if last_line.startswith("// FAIL: "):
expected_failure = last_line.removeprefix("// FAIL: ")
if expected_failure:
ret = c.run("fivetran-destination-tester", check=False, capture=True)
print("stdout:")
print(ret.stdout)
assert (
ret.returncode != 0
), f"destination tester did not fail with expected message {expected_failure!r}"
assert (
expected_failure in ret.stdout
), f"destination tester did not fail with expected message {expected_failure!r}"
else:
c.run("fivetran-destination-tester")
# The Fivetran Destination tester operates on an entire directory at a time. We run
# individual test cases by copying everything into a single "data" directory which
# automatically gets cleaned up at the start and end of every run.
with DataDirGuard(ROOT / "data") as data_dir:
test_file = ROOT / test_file
shutil.copy(test_file, data_dir.path())

last_line = test_file.read_text().splitlines()[-1]
if last_line.startswith("// FAIL: "):
expected_failure = last_line.removeprefix("// FAIL: ")
else:
expected_failure = None

if expected_failure:
ret = c.run("fivetran-destination-tester", check=False, capture=True)
print("stdout:")
print(ret.stdout)
assert (
ret.returncode != 0
), f"destination tester did not fail with expected message {expected_failure!r}"
assert (
expected_failure in ret.stdout
), f"destination tester did not fail with expected message {expected_failure!r}"
else:
c.run("fivetran-destination-tester")


# Type that implements the Context Protocol that makes it easy to automatically clean up our data
# directory before and after every test run.
class DataDirGuard:
def __init__(self, dir: Path):
self._dir = dir

def clean(self):
for file in self._dir.iterdir():
if file.name in ("configuration.json", ".gitignore"):
continue
file.unlink()

def path(self) -> Path:
return self._dir

def __enter__(self):
self.clean()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.clean()
1 change: 1 addition & 0 deletions test/fivetran-destination/test-truncate/00-README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Verifies that truncate works correctly.
34 changes: 34 additions & 0 deletions test/fivetran-destination/test-truncate/01-setup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"create_table": {
"test_truncate": {
"columns": {
"k1": "INT",
"k2": "STRING",
"v1": "DECIMAL",
"v2": "JSON"
},
"primary_key": ["k1", "k2"]
}
},
"ops": [
{
"upsert": {
"test_truncate": [
{"k1": "1", "k2": "a", "v1": "12.78", "v2": "{\"x\": \"y\"}"},
{"k1": "1", "k2": "b", "v1": "91.28", "v2": "{}"},
{"k1": "2", "k2": "a", "v1": "34.21", "v2": "null"}
]
}
},
{
"update": {
"test_truncate": [
{"k1": "2", "k2": "a", "v2": "{\"x\": \"z\"}"}
]
}
},
{
"truncate": ["test_truncate"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

> SELECT k1, k2, v1, v2, _fivetran_deleted FROM test.tester.test_writes
> SELECT k1, k2, v1, v2, _fivetran_deleted FROM test.tester.test_truncate
k1 k2 v1 v2 _fivetran_deleted
-------------------------------------------------
1 a 12.78 "{\"x\":\"y\"}" true
Expand Down
1 change: 1 addition & 0 deletions test/fivetran-destination/test-update/00-README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Verifies that update works correctly.
31 changes: 31 additions & 0 deletions test/fivetran-destination/test-update/01-setup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"create_table": {
"test_update": {
"columns": {
"k1": "INT",
"k2": "STRING",
"v1": "DECIMAL",
"v2": "JSON"
},
"primary_key": ["k1", "k2"]
}
},
"ops": [
{
"upsert": {
"test_update": [
{"k1": "1", "k2": "a", "v1": "12.78", "v2": "{\"x\": \"y\"}"},
{"k1": "1", "k2": "b", "v1": "91.28", "v2": "{}"},
{"k1": "2", "k2": "a", "v1": "34.21", "v2": "null"}
]
}
},
{
"update": {
"test_update": [
{"k1": "2", "k2": "a", "v2": "{\"x\": \"z\"}"}
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

> SELECT k1, k2, v1, v2, _fivetran_deleted FROM test.tester.test_writes
> SELECT k1, k2, v1, v2, _fivetran_deleted FROM test.tester.test_update
k1 k2 v1 v2 _fivetran_deleted
-------------------------------------------------
1 a 12.78 "{\"x\":\"y\"}" false
Expand Down
1 change: 1 addition & 0 deletions test/fivetran-destination/test-upsert/00-README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Verifies that upsert works correctly.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"create_table": {
"test_writes": {
"test_upsert": {
"columns": {
"k1": "INT",
"k2": "STRING",
Expand All @@ -13,7 +13,7 @@
"ops": [
{
"upsert": {
"test_writes": [
"test_upsert": [
{"k1": "1", "k2": "a", "v1": "12.78", "v2": "{\"x\": \"y\"}"},
{"k1": "1", "k2": "b", "v1": "91.28", "v2": "{}"},
{"k1": "2", "k2": "a", "v1": "34.21", "v2": "null"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

> SELECT k1, k2, v1, v2, _fivetran_deleted FROM test.tester.test_writes
> SELECT k1, k2, v1, v2, _fivetran_deleted FROM test.tester.test_upsert
k1 k2 v1 v2 _fivetran_deleted
-------------------------------------------------
1 a 12.78 "{\"x\":\"y\"}" false
Expand Down
1 change: 0 additions & 1 deletion test/fivetran-destination/test-writes/00-README

This file was deleted.

Loading

0 comments on commit 9491d6d

Please sign in to comment.