Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Found a circular reference in the code #2457

Merged
merged 1 commit into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion warehouse/metrics_tools/factory/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import textwrap
from metrics_tools.runner import MetricsRunner
from metrics_tools.transformer.tables import ExecutionContextTableTransform
from metrics_tools.utils.logging import add_metrics_tools_to_sqlmesh_logging
import pandas as pd
from dataclasses import dataclass, field

Expand All @@ -33,7 +34,7 @@
GeneratedModel,
GeneratedPythonModel,
)
from metrics_tools.factory.macros import (
from metrics_tools.macros import (
metrics_end,
metrics_entity_type_col,
metrics_name,
Expand Down Expand Up @@ -564,6 +565,9 @@ def generated_model_additional_macros(
def timeseries_metrics(
**raw_options: t.Unpack[TimeseriesMetricsOptions],
):
add_metrics_tools_to_sqlmesh_logging()

logger.debug("loading timeseries metrics")
calling_file = inspect.stack()[1].filename
timeseries_metrics = TimeseriesMetrics.from_raw_options(**raw_options)
return timeseries_metrics.generate_models(calling_file)
Expand Down
3 changes: 3 additions & 0 deletions warehouse/metrics_tools/macros/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# ruff: noqa: F403

from .macros import *
31 changes: 28 additions & 3 deletions warehouse/metrics_tools/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import arrow
import logging
from metrics_tools.utils.glot import str_or_expressions
from sqlmesh import EngineAdapter
from sqlmesh.core.context import ExecutionContext
from sqlmesh.core.config import DuckDBConnectionConfig
from sqlmesh.core.engine_adapter.duckdb import DuckDBEngineAdapter
from sqlmesh.core.macros import RuntimeStage

from metrics_tools.definition import PeerMetricDependencyRef
from metrics_tools.intermediate import run_macro_evaluator
from metrics_tools.factory.macros import (
from metrics_tools.macros import (
metrics_end,
metrics_sample_date,
metrics_start,
Expand Down Expand Up @@ -78,6 +79,13 @@ def _connection_factory(self) -> t.Callable:
return lambda: self._existing_connection


class FakeEngineAdapter(EngineAdapter):
"""This needs some work"""

def __init__(self, dialect: str):
self.dialect = dialect


class MetricsRunner:
@classmethod
def create_duckdb_execution_context(
Expand All @@ -94,6 +102,17 @@ def connection_factory():
context = ExecutionContext(engine_adapter, {})
return cls(context, str_or_expressions(query), ref, locals)

@classmethod
def from_engine_adapter(
cls,
engine_adapter: EngineAdapter,
query: str | t.List[exp.Expression],
ref: PeerMetricDependencyRef,
locals: t.Optional[t.Dict[str, t.Any]],
):
context = ExecutionContext(engine_adapter, {})
return cls(context, str_or_expressions(query), ref, locals)

@classmethod
def from_sqlmesh_context(
cls,
Expand Down Expand Up @@ -132,9 +151,8 @@ def run_rolling(self, start: datetime, end: datetime):
df: pd.DataFrame = pd.DataFrame()
logger.debug(f"run_rolling called with start={start} and end={end}")
count = 0
for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)):
for rendered_query in self.render_rolling_queries(start, end):
count += 1
rendered_query = self.render_query(day.datetime, day.datetime)
logger.debug(
f"executing rolling window: {rendered_query}",
extra={"query": rendered_query},
Expand Down Expand Up @@ -181,6 +199,13 @@ def render_query(self, start: datetime, end: datetime) -> str:
)
return "\n".join(rendered_parts)

def render_rolling_queries(self, start: datetime, end: datetime) -> t.Iterator[str]:
# Given a rolling input render all the rolling queries
logger.debug(f"render_rolling_rolling called with start={start} and end={end}")
for day in arrow.Arrow.range("day", arrow.get(start), arrow.get(end)):
rendered_query = self.render_query(day.datetime, day.datetime)
yield rendered_query

def commit(self, start: datetime, end: datetime, destination: str):
"""Like run but commits the result to the database"""
try:
Expand Down
22 changes: 22 additions & 0 deletions warehouse/metrics_tools/utils/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import os

connected_to_sqlmesh_logs = False


def add_metrics_tools_to_sqlmesh_logging():
"""sqlmesh won't automatically add metrics_tools logging. This will enable
logs from any of the metrics tools utilities. If sqlmesh is the runner"""
import __main__

global connected_to_sqlmesh_logs

class MetricsToolsFilter(logging.Filter):
def filter(self, record):
return record.name == "metrics_tools"

app_name = os.path.basename(__main__.__file__)
if app_name == "sqlmesh" and not connected_to_sqlmesh_logs:
app_logger = logging.getLogger(app_name)
app_logger.addFilter(MetricsToolsFilter())
connected_to_sqlmesh_logs = True