This repository has been archived by the owner on May 6, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 5
fix(snapshot) Changes the type provided to the producer to fix the snapshot process. #27
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,22 @@ | ||
from abc import ABC, abstractmethod | ||
from cdc.types import Payload | ||
from cdc.streams.types import StreamMessage | ||
from dataclasses import dataclass, asdict | ||
from typing import Any, Mapping, Sequence | ||
import json # type: ignore | ||
|
||
from cdc.snapshots.snapshot_types import Xid, SnapshotId, SnapshotDescriptor | ||
|
||
from cdc.snapshots.snapshot_types import ( | ||
Xid, | ||
SnapshotId, | ||
SnapshotDescriptor | ||
) | ||
|
||
class ControlMessage(ABC): | ||
@abstractmethod | ||
def to_dict(self) -> Mapping[str, Any]: | ||
raise NotImplementedError | ||
|
||
def to_stream(self) -> StreamMessage: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was in snapshot_control.py In the write_msg method. |
||
json_string = json.dumps(self.to_dict()) | ||
return StreamMessage(payload=Payload(json_string.encode("utf-8"))) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class SnapshotInit(ControlMessage): | ||
|
@@ -25,18 +29,17 @@ def to_dict(self) -> Mapping[str, Any]: | |
"event": "snapshot-init", | ||
"tables": self.tables, | ||
"snapshot-id": self.snapshot_id, | ||
"product": self.product | ||
"product": self.product, | ||
} | ||
|
||
|
||
@dataclass(frozen=True) | ||
class SnapshotAbort(ControlMessage): | ||
snapshot_id: SnapshotId | ||
|
||
def to_dict(self) -> Mapping[str, Any]: | ||
return { | ||
"event": "snapshot-abort", | ||
"snapshot-id": self.snapshot_id, | ||
} | ||
return {"event": "snapshot-abort", "snapshot-id": self.snapshot_id} | ||
|
||
|
||
@dataclass(frozen=True) | ||
class SnapshotLoaded(ControlMessage): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,13 @@ | |
from cdc.snapshots.sources import SnapshotSource | ||
from cdc.snapshots.snapshot_control import SnapshotControl | ||
from cdc.snapshots.snapshot_types import SnapshotId, TableConfig | ||
from cdc.streams import Producer as StreamProducer | ||
from cdc.streams.producer import Producer as StreamProducer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same: I just changed an import and everything got reformatted. |
||
from cdc.utils.logging import LoggerAdapter | ||
from cdc.utils.registry import Configuration | ||
|
||
logger = LoggerAdapter(logging.getLogger(__name__)) | ||
|
||
|
||
class SnapshotCoordinator(ABC): | ||
""" | ||
Coordinates the process of taking a snapshot from the source database | ||
|
@@ -26,38 +27,34 @@ class SnapshotCoordinator(ABC): | |
- communicate the details of the snapshot to all the listeners TODO | ||
""" | ||
|
||
def __init__(self, | ||
def __init__( | ||
self, | ||
source: SnapshotSource, | ||
destination: DestinationContext, | ||
control: SnapshotControl, | ||
product: str, | ||
tables: Sequence[TableConfig]) -> None: | ||
tables: Sequence[TableConfig], | ||
) -> None: | ||
self.__source = source | ||
self.__destination = destination | ||
self.__product = product | ||
self.__tables = tables | ||
self.__control = control | ||
|
||
|
||
def start_process(self) -> None: | ||
logger.debug("Starting snapshot process for product %s", self.__product) | ||
snapshot_id = uuid.uuid1() | ||
logger.info("Starting snapshot ID %s", snapshot_id) | ||
table_names = [t.table for t in self.__tables] | ||
self.__control.init_snapshot( | ||
snapshot_id=snapshot_id, | ||
tables=table_names, | ||
product=self.__product, | ||
snapshot_id=snapshot_id, tables=table_names, product=self.__product | ||
) | ||
with self.__destination.open( | ||
SnapshotId(str(snapshot_id)), | ||
self.__product) as snapshot_out: | ||
SnapshotId(str(snapshot_id)), self.__product | ||
) as snapshot_out: | ||
|
||
logger.info("Snapshot ouput: %s", snapshot_out.get_name()) | ||
snapshot_desc = self.__source.dump( | ||
snapshot_out, | ||
self.__tables, | ||
) | ||
snapshot_desc = self.__source.dump(snapshot_out, self.__tables) | ||
logger.info("Snapshot taken: %r", snapshot_desc) | ||
|
||
self.__control.wait_messages_sent() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,11 +3,11 @@ | |
from abc import ABC | ||
from dataclasses import dataclass | ||
from typing import NamedTuple, NewType | ||
|
||
from cdc.types import Payload | ||
from cdc.streams.types import StreamMessage | ||
|
||
Id = NewType("Id", int) | ||
Position = NewType("Position", int) | ||
Payload = NewType("Payload", bytes) | ||
|
||
|
||
class CdcMessage(NamedTuple): | ||
|
@@ -40,6 +40,9 @@ class ReplicationEvent(ABC): | |
# any source specific processing. | ||
payload: Payload | ||
|
||
def to_stream(self) -> StreamMessage: | ||
return StreamMessage(payload=self.payload) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class BeginMessage(ReplicationEvent): | ||
|
@@ -59,6 +62,9 @@ class ChangeMessage(ReplicationEvent): | |
|
||
table: str | ||
|
||
def to_stream(self) -> StreamMessage: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was in the kafka.py backend. |
||
return StreamMessage(payload=self.payload, metadata={"table": self.table}) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class GenericMessage(ReplicationEvent): | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearly my code was not formatted on save last time I made a change here.....