Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use airbyte state message format + tracking. #18

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions tap_airbyte/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from __future__ import annotations

from copy import deepcopy
import errno
import os
import shutil
Expand Down Expand Up @@ -570,8 +571,15 @@ def run_read(self) -> t.Iterator[subprocess.Popen]:
catalog.write(orjson.dumps(self.configured_airbyte_catalog))
if self.airbyte_state:
with open(f"{host_tmpdir}/state.json", "wb") as state:
self.logger.debug("Using state: %s", self.airbyte_state)
state.write(orjson.dumps(self.airbyte_state))
# Use the new airbyte state container if it exists.
state_dict = self.airbyte_state
if 'airbyte_state' in self.airbyte_state:
# This is airbyte state V2
state_dict = self.airbyte_state['airbyte_state']

self.logger.debug("Using state: %s", state_dict)
state.write(orjson.dumps(state_dict))

runtime_conf_dir = host_tmpdir if self.is_native() else self.airbyte_mount_dir
proc = subprocess.Popen(
self.to_command(
Expand Down Expand Up @@ -789,8 +797,55 @@ def sync_all(self) -> None:
):
self._process_log_message(airbyte_message)
elif airbyte_message["type"] == AirbyteMessage.STATE:
state_message = airbyte_message["state"]
# See: https://docs.airbyte.com/understanding-airbyte/database-data-catalog
# for how this state should be handled.
state_message = deepcopy(airbyte_message["state"])
state_type = state_message["type"]

if "airbyte_state" not in self.airbyte_state:
self.airbyte_state["airbyte_state"] = []

# The airbyte_state_v2 here should adhere to the link above.
existing_airbyte_state_v2: list[dict] = deepcopy(self.airbyte_state["airbyte_state"])
if state_type == "STREAM":
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
stream_descriptor = state_message["stream"]["stream_descriptor"]
stream_state = state_message["stream"]["stream_state"]

# Update the state for this stream descriptor or add it to the list.
found = False
for existing_state in existing_airbyte_state_v2:
if existing_state["type"] == "STREAM" and existing_state["stream"]["stream_descriptor"] == stream_descriptor:
existing_state["stream"]["stream_state"] = stream_state
found = True
break
if not found:
existing_airbyte_state_v2.append({
"type": "STREAM",
"stream": state_message["stream"]
})
elif state_type == "GLOBAL":
# Update the global state.
found = False
for existing_state in existing_airbyte_state_v2:
if existing_state["type"] == "GLOBAL":
existing_state["global"] = state_message["global"]
found = True
break
if not found:
existing_airbyte_state_v2.append({
"type": "GLOBAL",
"global": state_message["global"]
})
elif state_type == "LEGACY":
# One record per connector.
existing_airbyte_state_v2.clear()
existing_airbyte_state_v2.append(
{
"type": "LEGACY",
"legacy": state_message["legacy"]
}
)

if "data" in state_message:
unpacked_state = state_message["data"]
elif state_type == "STREAM":
Expand All @@ -799,7 +854,12 @@ def sync_all(self) -> None:
unpacked_state = state_message["global"]
elif state_type == "LEGACY":
unpacked_state = state_message["legacy"]
self.airbyte_state = unpacked_state

# Keep the legacy state behavior, but append the new state under a new key.
# Deepcopy here since existing_airbyte_state_v2 can refernce the same object.
JichaoS marked this conversation as resolved.
Show resolved Hide resolved
self.airbyte_state = deepcopy(unpacked_state)
self.airbyte_state['airbyte_state'] = existing_airbyte_state_v2

with STDOUT_LOCK:
singer.write_message(singer.StateMessage(self.airbyte_state))
else:
Expand Down
Loading