Skip to content

Commit

Permalink
parallel-benchmark: Flesh out sqlite support further
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Oct 16, 2024
1 parent c9f854a commit 44b83a2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ services.log
test/scalability/results/**/*.csv
test/scalability/results/**/*.png
misc/wasm/target
parallel-benchmark.db

# This is un-orthodox, but adding it to the repo would "tie" it to each user's
# local version of nixpkgs. This way, we can all use the flake and have a
Expand Down
2 changes: 1 addition & 1 deletion ci/plugins/mzcompose/hooks/pre-exit
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ timeout 300 buildkite-agent artifact upload "$artifacts_str" || true
bin/ci-builder run stable bin/ci-annotate-errors --test-cmd="$(cat test_cmd)" --test-desc="$(cat test_desc)" "${artifacts[@]}" > ci-annotate-errors.log || CI_ANNOTATE_ERRORS_RESULT=$?
buildkite-agent artifact upload "ci-annotate-errors.log"

if [ ! -s services.log ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Long single-node Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of txn-wal" ] && [ "$BUILDKITE_LABEL" != "Mz E2E Test" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for DFR)" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for CTF)" ] && [ "$BUILDKITE_LABEL" != "QA Canary Environment Base Load" ] && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Canary Environment" ]; then
if [ ! -s services.log ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Long single-node Maelstrom coverage of persist" ] && [ "$BUILDKITE_LABEL" != "Maelstrom coverage of txn-wal" ] && [ "$BUILDKITE_LABEL" != "Mz E2E Test" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for DFR)" ] && [ "$BUILDKITE_LABEL" != "Output consistency (version for CTF)" ] && [ "$BUILDKITE_LABEL" != "QA Canary Environment Base Load" ] && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Canary Environment" ] && [ "$BUILDKITE_LABEL" != "Parallel Benchmark against QA Benchmarking Staging Environment" ]; then
echo "+++ services.log is empty, failing"
exit 1
fi
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/parallel_benchmark/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def add(self, action: str, measurement: Measurement) -> None:
(
self.scenario,
action,
measurement.duration,
measurement.duration * 1000,
measurement.timestamp,
),
)
Expand Down
116 changes: 100 additions & 16 deletions test/parallel-benchmark/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,107 @@ def __init__(self, action: str, m: MeasurementsStore, start_time: float):
elif isinstance(m, SQLiteStore):
cursor = m.conn.cursor()
cursor.execute(
f"SELECT count(*), count(*) / (max(timestamp) - {start_time}), max(duration) * 1000, min(duration) * 1000, avg(duration) * 1000 FROM measurements WHERE scenario = ? AND action = ?",
(m.scenario, action),
"""
WITH RankedDurations AS (
SELECT
duration,
ROW_NUMBER() OVER (ORDER BY duration ASC) AS row_num,
COUNT(*) OVER () AS total_rows
FROM measurements
WHERE scenario = ? AND action = ?
),
Percentiles AS (
SELECT
MAX(CASE WHEN row_num <= total_rows * 0.50 THEN duration END) AS p50,
MAX(CASE WHEN row_num <= total_rows * 0.95 THEN duration END) AS p95,
MAX(CASE WHEN row_num <= total_rows * 0.99 THEN duration END) AS p99,
MAX(CASE WHEN row_num <= total_rows * 0.999 THEN duration END) AS p99_9,
MAX(CASE WHEN row_num <= total_rows * 0.9999 THEN duration END) AS p99_99,
MAX(CASE WHEN row_num <= total_rows * 0.99999 THEN duration END) AS p99_999,
MAX(CASE WHEN row_num <= total_rows * 0.999999 THEN duration END) AS p99_9999,
MAX(CASE WHEN row_num <= total_rows * 0.9999999 THEN duration END) AS p99_99999,
MAX(CASE WHEN row_num <= total_rows * 0.99999999 THEN duration END) AS p99_999999
FROM RankedDurations
),
Regression AS (
SELECT
COUNT(*) AS n,
SUM(timestamp * duration) AS sum_xy,
SUM(timestamp) AS sum_x,
SUM(duration) AS sum_y,
SUM(timestamp * timestamp) AS sum_xx
FROM measurements
WHERE scenario = ? AND action = ?
),
Stats AS (
SELECT
avg(duration) AS avg_duration,
COUNT(*) AS count_durations
FROM measurements
WHERE scenario = ? AND action = ?
),
VarianceCalc AS (
SELECT
SUM((duration - (SELECT avg_duration FROM Stats)) * (duration - (SELECT avg_duration FROM Stats))) AS variance
FROM measurements
WHERE scenario = ? AND action = ?
)
SELECT
count(*),
count(*) / (max(timestamp) - ?),
max(duration),
min(duration),
avg(duration),
(sqrt(variance / count_durations)),
p50,
p95,
p99,
p99_9,
p99_99,
p99_999,
p99_9999,
p99_99999,
p99_999999,
(r.n * r.sum_xy - r.sum_x * r.sum_y) / (r.n * r.sum_xx - r.sum_x * r.sum_x)
FROM measurements
JOIN Percentiles ON true
JOIN Regression r ON true
JOIN Stats ON true
JOIN VarianceCalc ON true
WHERE scenario = ? AND action = ?
""",
(
m.scenario,
action,
m.scenario,
action,
m.scenario,
action,
m.scenario,
action,
start_time,
m.scenario,
action,
),
)
self.queries, self.qps, self.max, self.min, self.avg = cursor.fetchone()
# TODO: Rest
self.median = 0.0
self.p50 = 0.0
self.p95 = 0.0
self.p99 = 0.0
self.p99_9 = 0.0
self.p99_99 = 0.0
self.p99_999 = 0.0
self.p99_9999 = 0.0
self.p99_99999 = 0.0
self.p99_999999 = 0.0
self.std = 0.0
self.slope = 0.0
(
self.queries,
self.qps,
self.max,
self.min,
self.avg,
self.std,
self.p50,
self.p95,
self.p99,
self.p99_9,
self.p99_99,
self.p99_999,
self.p99_9999,
self.p99_99999,
self.p99_999999,
self.slope,
) = cursor.fetchone()
else:
raise ValueError(
f"Unknown measurements store (for action {action}): {type(m)}"
Expand Down

0 comments on commit 44b83a2

Please sign in to comment.