Skip to content

Commit

Permalink
Found a circular reference in the code (#2457)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravenac95 authored Nov 6, 2024
1 parent 92a70c5 commit b8ac68e
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 4 deletions.
6 changes: 5 additions & 1 deletion warehouse/metrics_tools/factory/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import textwrap
from metrics_tools.runner import MetricsRunner
from metrics_tools.transformer.tables import ExecutionContextTableTransform
from metrics_tools.utils.logging import add_metrics_tools_to_sqlmesh_logging
import pandas as pd
from dataclasses import dataclass, field

Expand All @@ -33,7 +34,7 @@
GeneratedModel,
GeneratedPythonModel,
)
from metrics_tools.factory.macros import (
from metrics_tools.macros import (
metrics_end,
metrics_entity_type_col,
metrics_name,
Expand Down Expand Up @@ -564,6 +565,9 @@ def generated_model_additional_macros(
def timeseries_metrics(
**raw_options: t.Unpack[TimeseriesMetricsOptions],
):
add_metrics_tools_to_sqlmesh_logging()

logger.debug("loading timeseries metrics")
calling_file = inspect.stack()[1].filename
timeseries_metrics = TimeseriesMetrics.from_raw_options(**raw_options)
return timeseries_metrics.generate_models(calling_file)
Expand Down
3 changes: 3 additions & 0 deletions warehouse/metrics_tools/macros/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ruff: noqa: F403

from .macros import *
File renamed without changes.
31 changes: 28 additions & 3 deletions warehouse/metrics_tools/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import arrow
import logging
from metrics_tools.utils.glot import str_or_expressions
from sqlmesh import EngineAdapter
from sqlmesh.core.context import ExecutionContext
from sqlmesh.core.config import DuckDBConnectionConfig
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
from sqlmesh.core.macros import RuntimeStage

from metrics_tools.definition import PeerMetricDependencyRef
from metrics_tools.intermediate import run_macro_evaluator
from metrics_tools.factory.macros import (
from metrics_tools.macros import (
metrics_end,
metrics_sample_date,
metrics_start,
Expand Down Expand Up @@ -78,6 +79,13 @@ def _connection_factory(self) -> t.Callable:
return lambda: self._existing_connection


class FakeEngineAdapter(EngineAdapter):
"""This needs some work"""

def __init__(self, dialect: str):
self.dialect = dialect


class MetricsRunner:
@classmethod
def create_duckdb_execution_context(
Expand All @@ -94,6 +102,17 @@ def connection_factory():
context = ExecutionContext(engine_adapter, {})
return cls(context, str_or_expressions(query), ref, locals)

@classmethod
def from_engine_adapter(
cls,
engine_adapter: EngineAdapter,
query: str | t.List[exp.Expression],
ref: PeerMetricDependencyRef,
locals: t.Optional[t.Dict[str, t.Any]],
):
context = ExecutionContext(engine_adapter, {})
return cls(context, str_or_expressions(query), ref, locals)

@classmethod
def from_sqlmesh_context(
cls,
Expand Down Expand Up @@ -132,9 +151,8 @@ def run_rolling(self, start: datetime, end: datetime):
df: pd.DataFrame = pd.DataFrame()
logger.debug(f"run_rolling called with start={start} and end={end}")
count = 0
for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)):
for rendered_query in self.render_rolling_queries(start, end):
count += 1
rendered_query = self.render_query(day.datetime, day.datetime)
logger.debug(
f"executing rolling window: {rendered_query}",
extra={"query": rendered_query},
Expand Down Expand Up @@ -181,6 +199,13 @@ def render_query(self, start: datetime, end: datetime) -> str:
)
return "\n".join(rendered_parts)

def render_rolling_queries(self, start: datetime, end: datetime) -> t.Iterator[str]:
# Given a rolling input render all the rolling queries
logger.debug(f"render_rolling_rolling called with start={start} and end={end}")
for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)):
rendered_query = self.render_query(day.datetime, day.datetime)
yield rendered_query

def commit(self, start: datetime, end: datetime, destination: str):
"""Like run but commits the result to the database"""
try:
Expand Down
22 changes: 22 additions & 0 deletions warehouse/metrics_tools/utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import os

connected_to_sqlmesh_logs = False


def add_metrics_tools_to_sqlmesh_logging():
"""sqlmesh won't automatically add metrics_tools logging. This will enable
logs from any of the metrics tools utilities. If sqlmesh is the runner"""
import __main__

global connected_to_sqlmesh_logs

class MetricsToolsFilter(logging.Filter):
def filter(self, record):
return record.name == "metrics_tools"

app_name = os.path.basename(__main__.__file__)
if app_name == "sqlmesh" and not connected_to_sqlmesh_logs:
app_logger = logging.getLogger(app_name)
app_logger.addFilter(MetricsToolsFilter())
connected_to_sqlmesh_logs = True

0 comments on commit b8ac68e

Please sign in to comment.