-
-
Notifications
You must be signed in to change notification settings - Fork 5
feat(snapshot) Provide a formatter configuration when taking a snapshot. #28
Conversation
@@ -129,7 +129,29 @@ def snapshot(ctx, snapshot_config): | |||
"type": "object", | |||
"properties": { | |||
"table": {"type": "string"}, | |||
"columns": {"type": "array", "items": {"type": "string"}}, | |||
"columns": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See test_config.py for an example of how how the config now looks like.
from cdc.snapshots.destinations import SnapshotDestination | ||
from cdc.utils.logging import LoggerAdapter | ||
from cdc.utils.registry import Configuration | ||
|
||
logger = LoggerAdapter(logging.getLogger(__name__)) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From here to line 78 (except for line 68) the code did not change, it was only formatted when I saved it.
raise ValueError(f"Unknown formatter type {type(column.formatter)}") | ||
|
||
|
||
# We could add more mappings if needed, which is unlikely. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The additional complexity in mapping a config format to the postgres format, is that the config is supposed to be DB agnostic (as this system has a concept of source, which is the source DB, and multiple backend implementations one of which is postgres). This is the postgres implementation.
Right now there is one implementation only though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you foresee a lot of different formats for postgres? Why not have a lookup on a specific format, "%Y-%m-%d%H:%M:%S": "YYYY-MM-DDHH24:MI:SS"
. Why do the regex substitution part by part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you may be right. It is very unlikely we would change it. Though, having a config that allows for a regex like system just on paper, would be quite confusing. Since the regex system already works, I would not restrict it, this is less confusing than the other option. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works fine so let's roll with it. It's not a blocker for me.
raise ValueError(f"Unknown formatter type {type(column.formatter)}") | ||
|
||
|
||
# We could add more mappings if needed, which is unlikely. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you foresee a lot of different formats for postgres? Why not have a lookup on a specific format, "%Y-%m-%d%H:%M:%S": "YYYY-MM-DDHH24:MI:SS"
. Why do the regex substitution part by part?
I changed the way to format the date. Instead of using I can take a snapshot of 8M rows in 1 minute without format and with this formatter, while it takes 90 seconds with |
|
||
class FormatterConfig(ABC): | ||
""" | ||
Parent class to all the the formatter configs. | ||
""" | ||
|
||
@abstractmethod | ||
def to_dict(self) -> Mapping[str, str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little odd to not have this as a Mapping[str, Any]
to line up with the other to_dict
functions. I understand that you can be more specific with this function than with the other ones. I'm not saying you should change it, but something to think about.
|
||
|
||
def format_datetime(col_name: str, formatter: DateTimeFormatterConfig) -> sql.SQL: | ||
return sql.SQL("DATE_TRUNC({precision} ,{column})::timestamp AS {alias}").format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return sql.SQL("DATE_TRUNC({precision} ,{column})::timestamp AS {alias}").format( | |
return sql.SQL("DATE_TRUNC({precision}, {column})::timestamp AS {alias}").format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very much a nit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right. this is weird. Will fix.
def safe_dump_default(value: Any) -> Any: | ||
if isinstance(value, Enum): | ||
return value.value | ||
else: | ||
raise TypeError | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this handled by the from_dict()
methods now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot to remove this. Thanks for noticing.
Based on #27
When we take a snapshot of a postgres table with cdc (to import it into clickhouse) dates are formatted as
2019-06-16 06:21:39+00
. The process of transforming these dates into clickhouse dates (2019-06-16 06:21:39
) is by far the bottleneck when ingesting the snapshot into Clickhouse. This bottleneck is specifically impactful on tables like groupedmessages_local that are huge.We can largely avoid this overhead by just formatting the snapshot in a way that is good for clickhouse. Just piping the CSV into clickhouse is one order of magnitude faster.
At the same time I would not like to tightly couple the cdc producer with clickhouse.
So this PR adds more structure to the snapshot configuration so that we can configure a format for each column we query from postgres. This puts the coupling in the configuration we pass to cdc each time we extract a snapshot, instead of being in the cdc code itself.
THe formatting logic is applied in the postgres query.