-
-
Notifications
You must be signed in to change notification settings - Fork 5
fix(snapshot) Changes the type provided to the producer to fix the snapshot process. #27
Conversation
@@ -118,7 +120,7 @@ def snapshot(ctx, snapshot_config): | |||
{ | |||
"type": "object", | |||
"properties": { | |||
#TODO: make product more restrictive once we have a better idea on how to use it | |||
# TODO: make product more restrictive once we have a better idea on how to use it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearly my code was not formatted on save last time I made a change here.....
|
||
class ControlMessage(ABC): | ||
@abstractmethod | ||
def to_dict(self) -> Mapping[str, Any]: | ||
raise NotImplementedError | ||
|
||
def to_stream(self) -> StreamMessage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was in snapshot_control.py In the write_msg method.
@@ -9,12 +9,13 @@ | |||
from cdc.snapshots.sources import SnapshotSource | |||
from cdc.snapshots.snapshot_control import SnapshotControl | |||
from cdc.snapshots.snapshot_types import SnapshotId, TableConfig | |||
from cdc.streams import Producer as StreamProducer | |||
from cdc.streams.producer import Producer as StreamProducer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same: I just changed an import and everything got reformatted.
@@ -59,6 +62,9 @@ class ChangeMessage(ReplicationEvent): | |||
|
|||
table: str | |||
|
|||
def to_stream(self) -> StreamMessage: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was in the kafka.py backend.
@@ -1,80 +0,0 @@ | |||
import jsonschema # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just moved this to streams/producer.py to avoid a circular dependency when introducing StreamMessage which needs to be available to the Producer
and to the ProducerBackend
@@ -0,0 +1,81 @@ | |||
from typing import Callable, NamedTuple, Optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved here from cdc/streams/init.py
from cdc.snapshots.snapshot_control import SnapshotControl | ||
|
||
|
||
class DummyProducer(StreamProducer): | ||
class DummyProducerBackend(ProducerBackend): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the issue with the test. By mocking the producer we would miss the issue, so at least this mocks the producer backend. We should instead go further down and mock only the kafka producer.
from cdc.types import Payload | ||
|
||
|
||
class StreamMessage(NamedTuple): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: was there a reason you used NamedTuple here instead of a dataclass? I'm pretty new to the CDC code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it because there is no need to mutate anything on this, nor to have inheritance, so I did not need a dataclass. There is no strong argument against a dataclass to me here either. Tuples tend to instantiate marginally faster and not to have a dict. Since this is done for every message it may be useful, though I would not bet on that to be visible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
This seemingly complex change is actually a bug fix.
Currently the snapshot process just does not work. Months ago we (ok I broke it) made changes to the Producer to embed a header that contains the table name in Kafka messages produced by the cdc producer. In order to do that I made the stream/Producer take a ReplicationMessage object instead of a bytes payload.
This was ok, for the normal cdc operation. But it broke the snapshot as the snapshot controller uses the consumer to send a simple bytes payload to Kafka. The test passed because it was mocking the logic too far from Kafka.
To fix that I introduced a
StreamMessage
to replaceReplicationMessage
in the stream/producer interface.to_stream
method that produces the StreamMessage above