Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various sqlmesh and clickhouse updates for concurrency #2174

Merged
merged 9 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ templater = dbt
runaway_limit = 10
max_line_length = 80
indent_unit = space
exclude_rules = AL09
exclude_rules = AL09, RF02

[sqlfluff:indentation]
tab_space_size = 4
Expand Down
1,209 changes: 624 additions & 585 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ sqlalchemy = "^2.0.25"
textual = "^0.52.1"
redis = "^5.0.7"
githubkit = "^0.11.6"
sqlmesh = { git = "https://github.com/opensource-observer/sqlmesh.git", extras = [
sqlmesh = { git = "https://github.com/opensource-observer/sqlmesh.git", rev = "test-clickhouse-engine-updates", extras = [
"gcppostgres",
] }
dagster-duckdb = "^0.24.0"
Expand Down
18 changes: 17 additions & 1 deletion warehouse/metrics_mesh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

dotenv.load_dotenv()


def pool_manager_factory(config: ClickhouseConnectionConfig):
from clickhouse_connect.driver import httputil

return httputil.get_pool_manager(
num_pools=config.concurrent_tasks,
max_size=config.concurrent_tasks,
)


config = Config(
model_defaults=ModelDefaultsConfig(dialect="clickhouse", start="2024-08-01"),
gateways={
Expand All @@ -32,8 +42,14 @@
password=os.environ.get("SQLMESH_CLICKHOUSE_PASSWORD", ""),
port=int(os.environ.get("SQLMESH_CLICKHOUSE_PORT", "443")),
concurrent_tasks=int(
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "8")
os.environ.get("SQLMESH_CLICKHOUSE_CONCURRENT_TASKS", "16")
),
send_receive_timeout=1800,
connection_settings={"allow_nondeterministic_mutations": 1},
connection_pool_options={
"maxsize": 24,
"retries": 3,
},
),
state_connection=GCPPostgresConnectionConfig(
instance_connection_string=os.environ.get(
Expand Down
8 changes: 4 additions & 4 deletions warehouse/metrics_mesh/macros/time_aggregation_bucket.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator
from sqlglot import expressions as exp


@macro()
def time_aggregation_bucket(
evaluator: MacroEvaluator, timeExp: exp.Expression, rollup: str
evaluator: MacroEvaluator, time_exp: exp.Expression, rollup: str
):
from sqlmesh.core.dialect import parse_one

Expand All @@ -25,7 +25,7 @@ def time_aggregation_bucket(
unit=exp.Var(this=rollup_to_interval[rollup]),
),
exp.Cast(
this=timeExp,
this=time_exp,
to=exp.DataType(this=exp.DataType.Type.DATE, nested=False),
),
],
Expand All @@ -38,6 +38,6 @@ def time_aggregation_bucket(
return exp.Anonymous(
this=rollup_to_clickhouse_function[rollup],
expressions=[
timeExp,
time_exp,
],
)
28 changes: 28 additions & 0 deletions warehouse/metrics_mesh/macros/to_unix_timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator


@macro()
def str_to_unix_timestamp(
evaluator: MacroEvaluator,
time_exp: exp.Expression,
):
from sqlmesh.core.dialect import parse_one

if evaluator.runtime_stage in ["loading", "creating"]:
return parse_one("1::Uint32", dialect="clickhouse")

if evaluator.engine_adapter.dialect == "duckdb":
return exp.TimeToUnix(
this=exp.StrToTime(
this=time_exp,
format=exp.Array(
expressions=[exp.Literal(this="%Y-%m-%d", is_string=True)]
),
)
)
return exp.Anonymous(
this="toUnixTimestamp",
expressions=[time_exp],
)
19 changes: 17 additions & 2 deletions warehouse/metrics_mesh/models/events_daily_to_artifact.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ MODEL (
name metrics.events_daily_to_artifact,
kind INCREMENTAL_BY_TIME_RANGE (
time_column bucket_day,
batch_size 30
batch_size 90
),
start '2015-01-01',
cron '@daily',
dialect 'clickhouse',
partitioned_by (event_type, DATE_TRUNC('MONTH', bucket_day)),
grain (
bucket_day,
event_type,
Expand All @@ -20,8 +21,20 @@ MODEL (
event_type String,
from_artifact_id String,
to_artifact_id String,
_sign Int8,
_version UInt32,
amount Float64
)
),
physical_properties (
ORDER_BY = (
event_type,
event_source,
from_artifact_id,
to_artifact_id,
bucket_day
),
),
storage_format "VersionedCollapsingMergeTree(_sign, _version)",
);
WITH events AS (
SELECT DISTINCT from_artifact_id,
Expand All @@ -38,6 +51,8 @@ SELECT from_artifact_id,
event_source,
event_type,
DATE_TRUNC('DAY', time::DATE) AS bucket_day,
1 as _sign,
@str_to_unix_timestamp(@execution_ds) as _version,
SUM(amount) AS amount
FROM events
GROUP BY from_artifact_id,
Expand Down
2 changes: 2 additions & 0 deletions warehouse/metrics_tools/lib/factories/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def generate_models_from_query(
"name": ModelKindName.INCREMENTAL_BY_TIME_RANGE,
"time_column": "metrics_sample_date",
"batch_size": 1,
"batch_concurrency": 1,
},
dialect="clickhouse",
columns=columns,
Expand Down Expand Up @@ -349,3 +350,4 @@ def timeseries_metrics(
# raise Exception("no queries generated from the evaluated queries")
# top_level_select = top_level_select.with_(f"all_{cte_suffix}", union_cte)
# return top_level_select
# return top_level_select