Skip to content

Commit

Permalink
add support for checkpoint path
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Sep 27, 2021
1 parent 4631d59 commit 486e942
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 12 deletions.
2 changes: 2 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# POLL_TIMEOUT=0.1
# replication slot cleanup interval (in secs)
# REPLICATION_SLOT_CLEANUP_INTERVAL=180
# checkpoint file path
# CHECKPOINT_PATH=./

# Elasticsearch
# ELASTICSEARCH_SCHEME=http
Expand Down
2 changes: 1 addition & 1 deletion pgsync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__author__ = "Tolu Aina"
__email__ = "[email protected]"
__version__ = "2.1.3"
__version__ = "2.1.4"
20 changes: 11 additions & 9 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ def _get_foreign_keys(model_a, model_b):
return foreign_keys


def get_foreign_keys(node_a, node_b):
def get_foreign_keys(node_a: Node, node_b: Node) -> dict:
"""Return dict of single foreign key with multiple columns.
e.g:
Expand All @@ -939,7 +939,7 @@ def get_foreign_keys(node_a, node_b):
column_1, column_2, column_N are of type ForeignKeyContraint
"""
foreign_keys = {}
foreign_keys: dict = {}
# if either offers a foreign_key via relationship, use it!
if (
node_a.relationship.foreign_key.parent
Expand Down Expand Up @@ -1013,7 +1013,7 @@ def pg_engine(
return sa.create_engine(url, echo=echo, connect_args=connect_args)


def pg_execute(engine, query, values=None, options=None):
def pg_execute(engine, query, values=None, options=None) -> None:
options: dict = options or {"isolation_level": "AUTOCOMMIT"}
conn = engine.connect()
try:
Expand All @@ -1026,37 +1026,39 @@ def pg_execute(engine, query, values=None, options=None):
raise


def create_schema(engine, schema):
def create_schema(engine, schema) -> None:
"""Create database schema."""
if schema != SCHEMA:
engine.execute(sa.schema.CreateSchema(schema))


def create_database(database: str, echo: bool = False):
def create_database(database: str, echo: bool = False) -> None:
"""Create a database."""
logger.debug(f"Creating database: {database}")
engine = pg_engine(database="postgres", echo=echo)
pg_execute(engine, f'CREATE DATABASE "{database}"')
logger.debug(f"Created database: {database}")


def drop_database(database: str, echo: bool = False):
def drop_database(database: str, echo: bool = False) -> None:
"""Drop a database."""
logger.debug(f"Dropping database: {database}")
engine = pg_engine(database="postgres", echo=echo)
pg_execute(engine, f'DROP DATABASE IF EXISTS "{database}"')
logger.debug(f"Dropped database: {database}")


def create_extension(database: str, extension: str, echo: bool = False):
def create_extension(
database: str, extension: str, echo: bool = False
) -> None:
"""Create a database extension."""
logger.debug(f"Creating extension: {extension}")
engine = pg_engine(database=database, echo=echo)
pg_execute(engine, f'CREATE EXTENSION IF NOT EXISTS "{extension}"')
logger.debug(f"Created extension: {extension}")


def drop_extension(database: str, extension: str, echo: bool = False):
def drop_extension(database: str, extension: str, echo: bool = False) -> None:
"""Drop a database extension."""
logger.debug(f"Dropping extension: {extension}")
engine = pg_engine(database=database, echo=echo)
Expand All @@ -1066,7 +1068,7 @@ def drop_extension(database: str, extension: str, echo: bool = False):

def compiled_query(
query: str, label: Optional[str] = None, literal_binds: bool = False
):
) -> None:
"""Compile an SQLAlchemy query with an optional label."""
query: str = str(
query.compile(
Expand Down
1 change: 1 addition & 0 deletions pgsync/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"REPLICATION_SLOT_CLEANUP_INTERVAL",
default=180.0,
)
CHECKPOINT_PATH = env.str("CHECKPOINT_PATH", default="./")

# Elasticsearch:
ELASTICSEARCH_SCHEME = env.str("ELASTICSEARCH_SCHEME", default="http")
Expand Down
12 changes: 11 additions & 1 deletion pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from .querybuilder import QueryBuilder
from .redisqueue import RedisQueue
from .settings import (
CHECKPOINT_PATH,
POLL_TIMEOUT,
REDIS_POLL_INTERVAL,
REPLICATION_SLOT_CLEANUP_INTERVAL,
Expand Down Expand Up @@ -80,7 +81,9 @@ def __init__(
self._checkpoint: int = None
self._plugins: Plugins = None
self._truncate: bool = False
self._checkpoint_file: str = f".{self.__name}"
self._checkpoint_file: str = os.path.join(
CHECKPOINT_PATH, f".{self.__name}"
)
self.redis: RedisQueue = RedisQueue(self.__name)
self.tree: Tree = Tree(self)
self._last_truncate_timestamp: datetime = datetime.now()
Expand Down Expand Up @@ -159,6 +162,13 @@ def validate(self, repl_slots: Optional[bool] = True) -> None:
f'Make sure you have run the "bootstrap" command.'
)

# ensure the checkpoint filepath is valid
if not os.path.exists(CHECKPOINT_PATH):
raise RuntimeError(
f'Ensure the checkpoint path exists "{CHECKPOINT_PATH}" '
f"and is readable ."
)

root: Node = self.tree.build(self.nodes)
root.display()
for node in traverse_breadth_first(root):
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.1.3
current_version = 2.1.4
commit = True
tag = True

Expand Down

0 comments on commit 486e942

Please sign in to comment.