Skip to content

Commit

Permalink
Minor add code improvements (#62)
Browse files Browse the repository at this point in the history
* Minor improve result handling

Minor improve typing

Minor add exception code

* Minor improve result handling

* Minor improve result handling
  • Loading branch information
ismailsimsek authored Mar 3, 2025
1 parent b4f666f commit 2a3ad6c
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 62 deletions.
90 changes: 44 additions & 46 deletions opendbt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import os
import sys
from pathlib import Path
from typing import List, Callable, Optional

# IMPORTANT! this will import the overrides, and activates the patches
# IMPORTANT! `opendbt.dbt` import needs to happen before any `dbt` import
from opendbt.dbt import *
from opendbt.logger import OpenDbtLogger
from opendbt.utils import Utils

class OpenDbtCli:

def __init__(self, project_dir: Path, profiles_dir: Path = None, callbacks: list = None):
class OpenDbtCli:
def __init__(self, project_dir: Path, profiles_dir: Optional[Path] = None, callbacks: Optional[List[Callable]] = None):
self.project_dir: Path = Path(get_nearest_project_dir(project_dir.as_posix()))
self.profiles_dir: Path = profiles_dir
self._project: PartialProject = None
self._user_callbacks = callbacks if callbacks else []
self._project_callbacks = None
self.profiles_dir: Optional[Path] = profiles_dir
self._project: Optional[PartialProject] = None
self._user_callbacks: List[Callable] = callbacks if callbacks else []
self._project_callbacks: List[Callable] = []

@property
def project(self) -> PartialProject:
Expand All @@ -37,40 +37,39 @@ def project_vars(self) -> dict:
This method only retrieves global project variables specified within the `dbt_project.yml` file.
Variables passed via command-line arguments are not included in the returned dictionary.
"""
if 'vars' in self.project_dict:
return self.project_dict['vars']
return self.project_dict.get('vars', {})

return {}

@property
def project_callbacks(self):
def project_callbacks(self) -> List[Callable]:
if not self._project_callbacks:
self._project_callbacks = self._user_callbacks
if 'dbt_callbacks' in self.project_vars:
for _callback_module in str(self.project_vars['dbt_callbacks']).split(','):
_project_callback = Utils.import_module_attribute_by_name(_callback_module)
self._project_callbacks.append(_project_callback)
self._project_callbacks = list(self._user_callbacks)
dbt_callbacks_str = self.project_vars.get('dbt_callbacks', "")
dbt_callbacks_list = [c for c in dbt_callbacks_str.split(',') if c.strip()]
for callback_module_name in dbt_callbacks_list:
callback_func = Utils.import_module_attribute_by_name(callback_module_name.strip())
self._project_callbacks.append(callback_func)

return self._project_callbacks

def invoke(self, args: list, callbacks: list = None) -> dbtRunnerResult:
def invoke(self, args: List[str], callbacks: Optional[List[Callable]] = None) -> dbtRunnerResult:
"""
Run dbt with the given arguments.
:param args: The arguments to pass to dbt.
:param callbacks:
:return: The result of the dbt run.
"""
run_callbacks = self.project_callbacks + callbacks if callbacks else self.project_callbacks
run_args = args if args else []
run_callbacks = self.project_callbacks + (callbacks if callbacks else self.project_callbacks)
run_args = args or []
if "--project-dir" not in run_args:
run_args += ["--project-dir", self.project_dir.as_posix()]
if "--profiles-dir" not in run_args and self.profiles_dir:
run_args += ["--profiles-dir", self.profiles_dir.as_posix()]
return self.run(args=run_args, callbacks=run_callbacks)

@staticmethod
def run(args: list, callbacks: list = None) -> dbtRunnerResult:
def run(args: List[str], callbacks: Optional[List[Callable]] = None) -> dbtRunnerResult:
"""
Run dbt with the given arguments.
Expand All @@ -80,40 +79,40 @@ def run(args: list, callbacks: list = None) -> dbtRunnerResult:
"""
callbacks = callbacks if callbacks else []
# https://docs.getdbt.com/reference/programmatic-invocations
dbtcr = DbtCliRunner(callbacks=callbacks)
result: dbtRunnerResult = dbtcr.invoke(args)
runner = DbtCliRunner(callbacks=callbacks)
result: dbtRunnerResult = runner.invoke(args)

if result.success:
return result

if result.exception:
raise result.exception

# take error message and raise it as exception
for _result in result.result:
_result: RunResult
_result_messages = ""
if _result.status == 'error':
_result_messages += f"{_result_messages}\n"
if _result_messages:
raise DbtRuntimeError(msg=_result.message)
err_messages = [res.message for res in result.result if isinstance(res, RunResult) and res.status == 'error']

if err_messages:
raise DbtRuntimeError(msg="\n".join(err_messages))

raise DbtRuntimeError(msg=f"DBT execution failed!")

def manifest(self, partial_parse=True, no_write_manifest=True) -> Manifest:
def manifest(self, partial_parse: bool = True, no_write_manifest: bool = True) -> Manifest:
args = ["parse"]
if partial_parse:
args += ["--partial-parse"]
args.append("--partial-parse")
if no_write_manifest:
args += ["--no-write-json"]
args.append("--no-write-json")

result = self.invoke(args=args)
if not result.success:
raise Exception(f"DBT execution failed. result:{result}")
if isinstance(result.result, Manifest):
return result.result

raise Exception(f"DBT execution did not return Manifest object. returned:{type(result.result)}")

def generate_docs(self, args: list = None):
_args = ["docs", "generate"] + args if args else []
def generate_docs(self, args: Optional[List[str]] = None):
_args = ["docs", "generate"] + (args if args else [])
self.invoke(args=_args)


Expand All @@ -124,13 +123,13 @@ class OpenDbtProject(OpenDbtLogger):

DEFAULT_TARGET = 'dev' # development

def __init__(self, project_dir: Path, target: str = None, profiles_dir: Path = None, args: list = None):
def __init__(self, project_dir: Path, target: Optional[str] = None, profiles_dir: Optional[Path] = None, args: Optional[List[str]] = None, callbacks: Optional[List[Callable]] = None):
super().__init__()
self.project_dir: Path = project_dir
self.profiles_dir: Path = profiles_dir
self.profiles_dir: Optional[Path] = profiles_dir
self.target: str = target if target else self.DEFAULT_TARGET
self.args = args if args else []
self.cli: OpenDbtCli = OpenDbtCli(project_dir=self.project_dir, profiles_dir=self.profiles_dir)
self.args: List[str] = args if args else []
self.cli: OpenDbtCli = OpenDbtCli(project_dir=self.project_dir, profiles_dir=self.profiles_dir, callbacks=callbacks)

@property
def project(self) -> PartialProject:
Expand All @@ -144,14 +143,13 @@ def project_dict(self) -> dict:
def project_vars(self) -> dict:
return self.cli.project_vars

def run(self, command: str = "build", target: str = None, args: list = None, use_subprocess: bool = False,
write_json: bool = False) -> dbtRunnerResult:

def run(self, command: str = "build", target: Optional[str] = None, args: Optional[List[str]] = None, use_subprocess: bool = False,
write_json: bool = False) -> Optional[dbtRunnerResult]:
run_args = args if args else []
run_args += ["--target", target if target else self.target]
run_args += ["--project-dir", self.project_dir.as_posix()]
run_args.extend(["--target", target if target else self.target])
run_args.extend(["--project-dir", self.project_dir.as_posix()])
if self.profiles_dir:
run_args += ["--profiles-dir", self.profiles_dir.as_posix()]
run_args.extend(["--profiles-dir", self.profiles_dir.as_posix()])
run_args = [command] + run_args + self.args
if write_json:
run_args.remove("--no-write-json")
Expand All @@ -169,8 +167,8 @@ def run(self, command: str = "build", target: str = None, args: list = None, use
self.log.info(f"Running `dbt {' '.join(run_args)}`")
return self.cli.invoke(args=run_args)

def manifest(self, partial_parse=True, no_write_manifest=True) -> Manifest:
def manifest(self, partial_parse: bool = True, no_write_manifest: bool = True) -> Manifest:
return self.cli.manifest(partial_parse=partial_parse, no_write_manifest=no_write_manifest)

def generate_docs(self, args: list = None):
def generate_docs(self, args: Optional[List[str]] = None):
return self.cli.generate_docs(args=args)
1 change: 1 addition & 0 deletions opendbt/dbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from opendbt.runtime_patcher import RuntimePatcher

try:
# IMPORTANT! `opendbt.dbt` import needs to happen before any `dbt` import
dbt_version = Version(version.get_installed_version().to_version_string(skip_matcher=True))
if Version("1.6.0") <= dbt_version < Version("1.8.0"):
from opendbt.dbt.v17.adapters.factory import OpenDbtAdapterContainer
Expand Down
40 changes: 24 additions & 16 deletions opendbt/dbt/shared/adapters/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
class OpenDbtBaseAdapter(BaseAdapter):

def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs):
with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt_obj = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt_obj, **kwargs)
model_file.close()
try:
with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
try:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt_obj = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt_obj, **kwargs)
except Exception as e:
raise Exception(
f"Failed to load or execute python model:{model_name} from file {model_file.as_posix()}") from e
finally:
model_file.close()
except Exception as e:
raise Exception(f"Failed to create temp py file for model:{model_name}") from e

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
Expand Down

0 comments on commit 2a3ad6c

Please sign in to comment.