Skip to content

Commit

Permalink
fixed import error and support auth plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
toluaina committed Sep 27, 2021
1 parent 486e942 commit f71fb2e
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pgsync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

__author__ = "Tolu Aina"
__email__ = "[email protected]"
__version__ = "2.1.4"
__version__ = "2.1.5"
1 change: 1 addition & 0 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
UPDATE,
)
from .exc import ForeignKeyError, LogicalSlotParseError, TableNotFoundError
from .node import Node
from .settings import PG_SSLMODE, PG_SSLROOTCERT, QUERY_CHUNK_SIZE
from .trigger import CREATE_TRIGGER_TEMPLATE
from .utils import get_postgres_url
Expand Down
27 changes: 19 additions & 8 deletions pgsync/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from importlib import import_module
from inspect import getmembers, isclass
from pkgutil import iter_modules
from typing import List, Optional
from typing import Optional

logger = logging.getLogger(__name__)

Expand All @@ -14,21 +14,21 @@ class Plugin(ABC):
"""Plugin base class."""

@abstractmethod
def transform(self, doc, **kwargs):
def transform(self, doc: list, **kwargs) -> dict:
"""Must be implemented by all derived classes."""
pass


class Plugins(object):
def __init__(self, package: str, names: Optional[List] = None):
def __init__(self, package: str, names: Optional[list] = None):
self.package: str = package
self.names: Optional[List] = names or []
self.names: Optional[list] = names or []
self.reload()

def reload(self) -> None:
"""Reload the plugins from the available list."""
self.plugins: List = []
self._paths: List = []
self.plugins: list = []
self._paths: list = []
logger.debug(f"Reloading plugins from package: {self.package}")
self.walk(self.package)

Expand All @@ -53,7 +53,7 @@ def walk(self, package: str) -> None:
)
self.plugins.append(klass())

paths: List = []
paths: list = []
if isinstance(plugins.__path__, str):
paths.append(plugins.__path__)
else:
Expand All @@ -72,7 +72,7 @@ def walk(self, package: str) -> None:
]:
self.walk(f"{package}.{pkg}")

def transform(self, docs: List):
def transform(self, docs: list) -> dict:
"""Apply all plugins to each doc."""
for doc in docs:
for plugin in self.plugins:
Expand All @@ -83,3 +83,14 @@ def transform(self, docs: List):
_index=doc["_index"],
)
yield doc

def auth(self, key: str) -> Optional[str]:
"""Get an auth value from a key."""
for plugin in self.plugins:
if hasattr(plugin, "auth"):
logger.debug(f"Plugin: {plugin.name}")
try:
return plugin.auth(key)
except Exception as e:
logger.exception(f"Error calling auth: {e}")
return None
17 changes: 14 additions & 3 deletions pgsync/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from urllib.parse import quote_plus

from .exc import SchemaError
from .plugin import Plugins
from .settings import (
ELASTICSEARCH_HOST,
ELASTICSEARCH_PASSWORD,
Expand Down Expand Up @@ -112,11 +113,17 @@ def get_elasticsearch_url(
"""
Return the URL to connect to Elasticsearch.
"""
plugins: Plugins = Plugins("plugins", ["Auth"])

scheme: str = scheme or ELASTICSEARCH_SCHEME
host: str = host or ELASTICSEARCH_HOST
port: str = port or ELASTICSEARCH_PORT
user: str = user or ELASTICSEARCH_USER
password: str = password or ELASTICSEARCH_PASSWORD
password: str = (
plugins.auth("ELASTICSEARCH_PASSWORD")
or password
or ELASTICSEARCH_PASSWORD
)
if user:
return f"{scheme}://{user}:{quote_plus(password)}@{host}:{port}"
logger.debug("Connecting to Elasticsearch without authentication.")
Expand All @@ -133,9 +140,11 @@ def get_postgres_url(
"""
Return the URL to connect to Postgres.
"""
plugins: Plugins = Plugins("plugins", ["Auth"])

user: str = user or PG_USER
host: str = host or PG_HOST
password: str = password or PG_PASSWORD
password: str = plugins.auth("PG_PASSWORD") or password or PG_PASSWORD
port: str = port or PG_PORT
if not password:
logger.debug("Connecting to Postgres without password.")
Expand All @@ -155,8 +164,10 @@ def get_redis_url(
"""
Return the URL to connect to Redis.
"""
plugins: Plugins = Plugins("plugins", ["Auth"])

host: str = host or REDIS_HOST
password: str = password or REDIS_AUTH
password: str = plugins.auth("REDIS_AUTH") or password or REDIS_AUTH
port = port or REDIS_PORT
db: str = db or REDIS_DB
scheme: str = scheme or REDIS_SCHEME
Expand Down
26 changes: 23 additions & 3 deletions plugins/sample.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
from typing import Optional

from pgsync import plugin


class Auth(plugin.Plugin):
"""Example auth plugin."""

name = "Auth"

def transform(self, doc: list, **kwargs) -> dict:
pass

def auth(self, key: str) -> Optional[str]:
"""Sample auth."""
if key == "PG_PASSWORD":
return "abcd"
if key == "ELASTICSEARCH_PASSWORD":
return "ijkl"
if key == "REDIS_AUTH":
return None


class VillainPlugin(plugin.Plugin):
"""Example Villain plugin."""

name = "Villain"

def transform(self, doc, **kwargs):
def transform(self, doc: list, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
doc_id = kwargs["_id"]
doc_index = kwargs["_index"]
Expand All @@ -27,7 +47,7 @@ class HeroPlugin(plugin.Plugin):

name = "Hero"

def transform(self, doc, **kwargs):
def transform(self, doc: list, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
doc_id = kwargs["_id"]
doc_index = kwargs["_index"]
Expand All @@ -48,7 +68,7 @@ class GeometryPlugin(plugin.Plugin):

name = "Geometry"

def transform(self, doc, **kwargs):
def transform(self, doc: list, **kwargs) -> dict:
"""Demonstrates how to modify a document."""
doc_index = kwargs["_index"]

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.1.4
current_version = 2.1.5
commit = True
tag = True

Expand Down

0 comments on commit f71fb2e

Please sign in to comment.