Skip to content

Commit

Permalink
feat(crons): Record stats for volume history at clock tick
Browse files Browse the repository at this point in the history
This adds a function `_evaluate_tick_decision` which looks back at the
last MONITOR_VOLUME_RETENTION days worth of history and compares the
minute we just ticked past to that data.

We record 3 metrics from this comparison

- z_value: This is measured as a ratio of standard deviations from the
  mean value

- pct_deviation: This is the percentage we've deviated from the mean

- count: This is the number of historic data points we're considering

The z_value and pct_deviation will be most helpful in making our
decision as to whether we've entered an "incident" state or not.
  • Loading branch information
evanpurkhiser committed Oct 23, 2024
1 parent de8b3c0 commit 630ee8b
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 1 deletion.
105 changes: 105 additions & 0 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from __future__ import annotations

import logging
import statistics
from collections import Counter
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
from typing import List

from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry import options
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
Expand All @@ -27,6 +30,18 @@
# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
# of the window of the MONITOR_VOLUME_RETENTION. It is important to consider
# the expected uniformity of the volume for different steps.
#
# For example, since we tend to have much larger volume of check-ins
# on-the-hour it may not make sense to look at each minute as a data point.
# This is why this is set to 1 day, since this should closely match the
# harmonics of how check-ins are sent (people like to check-in on the hour, but
# there are probably more check-ins at midnight, than at 3pm).
MONITOR_VOLUME_DECISION_STEP = timedelta(days=1)

# We record 30 days worth of historical data for each minute of check-ins.
MONITOR_VOLUME_RETENTION = timedelta(days=30)

Expand Down Expand Up @@ -85,6 +100,94 @@ def _make_reference_ts(ts: datetime):
return int(ts.replace(second=0, microsecond=0).timestamp())


def _evaluate_tick_decision(tick: datetime):
"""
When the clock is ticking, we may decide this tick is invalid and should
result in unknown misses and marking all in-progress check-ins as having an
unknown result.
We do this by looking at the historic volume of check-ins for the
particular minute boundary we just crossed.
XXX(epurkhiser): This is currently in development and no decision is made
to mark unknowns, instead we are only recording metrics for each clock tick
"""
if not options.get("crons.tick_volume_anomaly_detection"):
return

redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# The clock has just ticked to the next minute. Look at the previous minute
# volume across the last with the values
past_ts = tick - timedelta(minutes=1)
start_ts = past_ts - MONITOR_VOLUME_RETENTION

# Generate previous timestamps to fetch. The first past_ts timestamp is
# also included in this query
historic_timestamps: List[datetime] = [past_ts]
historic_ts = past_ts

while historic_ts > start_ts:
historic_ts = historic_ts - MONITOR_VOLUME_DECISION_STEP
historic_timestamps.append(historic_ts)

# Bulk fetch volume counts
volumes = redis_client.mget(
MONITOR_VOLUME_HISTORY.format(_make_reference_ts(ts)) for ts in historic_timestamps
)

past_minute_volume = _int_or_none(volumes.pop(0))
historic_volume: List[int] = [int(v) for v in volumes if v is not None]

# Can't make any decisions if we didn't have data for the past minute
if past_minute_volume is None:
return

# Can't make any decisions if we don't have historic data
if len(historic_volume) == 0:
return

# Record some statistics about the past_minute_volume volume in comparison
# to the historic_volume data

historic_mean = statistics.mean(historic_volume)
historic_stdev = statistics.stdev(historic_volume)

historic_stdev_pct = (historic_stdev / historic_mean) * 100

# Calculate the z-score of our past minutes volume in comparison to the
# historic volume data. The z-score is measured in terms of standard
# deviations from the mean
z_score = (past_minute_volume - historic_mean) / historic_stdev

# Percentage deviation from the mean for our past minutes volume
pct_deviation = (abs(past_minute_volume - historic_mean) / historic_mean) * 100

metrics.gauge(
"monitors.task.clock_tick.historic_volume_stdev_pct",
historic_stdev_pct,
sample_rate=1.0,
)
metrics.gauge("monitors.task.volume_history.count", len(historic_volume), sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.z_score", z_score, sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.pct_deviation", pct_deviation, sample_rate=1.0)

# XXX(epurkhiser): We're not actually making any decisions with this data
# just yet.
logger.info(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": len(historic_volume),
"z_score": z_score,
"pct_deviation": pct_deviation,
"historic_mean": historic_mean,
"historic_stdev": historic_stdev,
},
)


def update_check_in_volume(ts_list: Sequence[datetime]):
"""
Increment counters for a list of check-in timestamps. Each timestamp will be
Expand Down Expand Up @@ -180,7 +283,9 @@ def try_monitor_clock_tick(ts: datetime, partition: int):
extra = {"reference_datetime": str(backfill_tick)}
logger.info("monitors.consumer.clock_tick_backfill", extra=extra)

_evaluate_tick_decision(backfill_tick)
_dispatch_tick(backfill_tick)
backfill_tick = backfill_tick + timedelta(minutes=1)

_evaluate_tick_decision(tick)
_dispatch_tick(tick)
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,13 @@
# Killswitch for monitor check-ins
register("crons.organization.disable-check-in", type=Sequence, default=[])

# Enables anomaly detection based on the volume of check-ins being processed
register(
"crons.tick_volume_anomaly_detection",
default=False,
flags=FLAG_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
)

# Sets the timeout for webhooks
register(
"sentry-apps.webhook.timeout.sec",
Expand Down
141 changes: 140 additions & 1 deletion tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import timedelta
import itertools
from datetime import datetime, timedelta
from typing import List
from unittest import mock

from arroyo import Topic
Expand All @@ -8,11 +10,15 @@
from django.utils import timezone

from sentry.monitors.clock_dispatch import (
MONITOR_VOLUME_DECISION_STEP,
MONITOR_VOLUME_HISTORY,
MONITOR_VOLUME_RETENTION,
_dispatch_tick,
_evaluate_tick_decision,
try_monitor_clock_tick,
update_check_in_volume,
)
from sentry.testutils.helpers.options import override_options
from sentry.utils import json, redis


Expand Down Expand Up @@ -177,3 +183,136 @@ def make_key(offset: timedelta) -> str:
assert minute_1 == "1"
assert minute_2 is None
assert minute_3 == "1"


def fill_historic_volume(start: datetime, length: timedelta, step: timedelta, counts: List[int]):
"""
Creates a volume history starting at the `start` and going back `length`,
where each bucket is spaced by `step`s apart.
`count` Is a list of counts for each step. This value is a list that will
be cycled through, it must be a division of the number of steps between the
start and length.
"""
aligned_start = start.replace(second=0, microsecond=0)

# The length of counts should be divisible into the number of steps
steps = length // step
assert steps % len(counts) == 0

counts_cycle = itertools.cycle(counts)
ts = aligned_start
end = aligned_start - length

ts_list = []
while ts >= end:
count = next(counts_cycle)
ts_list.extend([ts] * count)
ts = ts - step

update_check_in_volume(ts_list)


@mock.patch("sentry.monitors.clock_dispatch.logger")
@mock.patch("sentry.monitors.clock_dispatch.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
def test_evaluate_tick_decision_simple(metrics, logger):
tick = timezone.now().replace(second=0, microsecond=0)

# This is the timestamp we're looking at just after the tick
past_ts = tick - timedelta(minutes=1)

# Fill histroic volume data for earlier minutes.
fill_historic_volume(
start=past_ts - MONITOR_VOLUME_DECISION_STEP,
length=MONITOR_VOLUME_RETENTION,
step=MONITOR_VOLUME_DECISION_STEP,
counts=[170, 150, 130, 210, 154],
)

# Record a volume of 200 for the timestamp we are considerng
update_check_in_volume([past_ts] * 165)

_evaluate_tick_decision(tick)

logger.info.assert_called_with(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": 30,
"z_score": 0.08064694302168258,
"pct_deviation": 1.3513513513513442,
"historic_mean": 162.8,
"historic_stdev": 27.279397303484902,
},
)

metrics.gauge.assert_any_call(
"monitors.task.volume_history.count",
30,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.z_score",
0.08064694302168258,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.pct_deviation",
1.3513513513513442,
sample_rate=1.0,
)


@mock.patch("sentry.monitors.clock_dispatch.logger")
@mock.patch("sentry.monitors.clock_dispatch.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
def test_evaluate_tick_decision_volume_drop(metrics, logger):
tick = timezone.now().replace(second=0, microsecond=0)

# This is the timestamp we're looking at just after the tick
past_ts = tick - timedelta(minutes=1)

# Fill histroic volume data for earlier minutes.
fill_historic_volume(
start=past_ts - MONITOR_VOLUME_DECISION_STEP,
length=MONITOR_VOLUME_RETENTION,
step=MONITOR_VOLUME_DECISION_STEP,
counts=[13_000, 12_000, 12_500, 12_400, 12_600],
)

# Record a volume much lower than what we had been recording previously
update_check_in_volume([past_ts] * 6_000)

_evaluate_tick_decision(tick)

# Note that the pct_deviation and z_score are extremes
logger.info.assert_called_with(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": 30,
"z_score": -19.816869917656856,
"pct_deviation": 52.0,
"historic_mean": 12500,
"historic_stdev": 328.0033641543204,
},
)

metrics.gauge.assert_any_call(
"monitors.task.volume_history.count",
30,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.z_score",
-19.816869917656856,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.pct_deviation",
52.0,
sample_rate=1.0,
)

0 comments on commit 630ee8b

Please sign in to comment.