Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(crons): Record stats for volume history at clock tick #79574

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import statistics
from collections import Counter
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
Expand All @@ -11,6 +12,7 @@
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry import options
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
Expand All @@ -27,6 +29,18 @@
# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
# of the window of the MONITOR_VOLUME_RETENTION. It is important to consider
# the expected uniformity of the volume for different steps.
#
# For example, since we tend to have much larger volume of check-ins
# on-the-hour it may not make sense to look at each minute as a data point.
# This is why this is set to 1 day, since this should closely match the
# harmonics of how check-ins are sent (people like to check-in on the hour, but
# there are probably more check-ins at midnight, than at 3pm).
MONITOR_VOLUME_DECISION_STEP = timedelta(days=1)

# We record 30 days worth of historical data for each minute of check-ins.
MONITOR_VOLUME_RETENTION = timedelta(days=30)

Expand Down Expand Up @@ -85,6 +99,94 @@ def _make_reference_ts(ts: datetime):
return int(ts.replace(second=0, microsecond=0).timestamp())


def _evaluate_tick_decision(tick: datetime):
wedamija marked this conversation as resolved.
Show resolved Hide resolved
evanpurkhiser marked this conversation as resolved.
Show resolved Hide resolved
"""
When the clock is ticking, we may decide this tick is invalid and should
result in unknown misses and marking all in-progress check-ins as having an
unknown result.

We do this by looking at the historic volume of check-ins for the
particular minute boundary we just crossed.

XXX(epurkhiser): This is currently in development and no decision is made
to mark unknowns, instead we are only recording metrics for each clock tick
"""
if not options.get("crons.tick_volume_anomaly_detection"):
return
Comment on lines +114 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Any reason to not use a feature flag?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn’t remember how to use it when we don’t have an organization lol


redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# The clock has just ticked to the next minute. Look at the previous minute
# volume across the last with the values
past_ts = tick - timedelta(minutes=1)
start_ts = past_ts - MONITOR_VOLUME_RETENTION

# Generate previous timestamps to fetch. The first past_ts timestamp is
# also included in this query
historic_timestamps: list[datetime] = [past_ts]
historic_ts = past_ts

while historic_ts > start_ts:
historic_ts = historic_ts - MONITOR_VOLUME_DECISION_STEP
historic_timestamps.append(historic_ts)

# Bulk fetch volume counts
volumes = redis_client.mget(
MONITOR_VOLUME_HISTORY.format(_make_reference_ts(ts)) for ts in historic_timestamps
)

past_minute_volume = _int_or_none(volumes.pop(0))
historic_volume: list[int] = [int(v) for v in volumes if v is not None]

# Can't make any decisions if we didn't have data for the past minute
if past_minute_volume is None:
return

# Can't make any decisions if we don't have historic data
if len(historic_volume) == 0:
return

# Record some statistics about the past_minute_volume volume in comparison
# to the historic_volume data

historic_mean = statistics.mean(historic_volume)
historic_stdev = statistics.stdev(historic_volume)

historic_stdev_pct = (historic_stdev / historic_mean) * 100
Copy link
Member

@ram-senth ram-senth Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, this metric (aka coefficient of variation) is not used in the actual logic. Is that intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right now all this function is doing is recording metrics.

I want to see what the numbers look like before making any decisions on what our thresholds are.


# Calculate the z-score of our past minutes volume in comparison to the
# historic volume data. The z-score is measured in terms of standard
# deviations from the mean
Copy link
Member

@ram-senth ram-senth Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interpretation of z-score measuring number of standard deviations from the mean is applicable only for normally distributed data. I would recommend looking at the distribution of per-minute volume. If it not normally distributed then I would recommend using different metric. Seer uses interquartile range for this same reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to include it for now. I'll take a look out our existing data but I am pretty sure it's going to be relatively normally distributed.

z_score = (past_minute_volume - historic_mean) / historic_stdev

# Percentage deviation from the mean for our past minutes volume
pct_deviation = (abs(past_minute_volume - historic_mean) / historic_mean) * 100

metrics.gauge(
"monitors.task.clock_tick.historic_volume_stdev_pct",
historic_stdev_pct,
sample_rate=1.0,
)
metrics.gauge("monitors.task.volume_history.count", len(historic_volume), sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.z_score", z_score, sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.pct_deviation", pct_deviation, sample_rate=1.0)

# XXX(epurkhiser): We're not actually making any decisions with this data
# just yet.
logger.info(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": len(historic_volume),
"z_score": z_score,
"pct_deviation": pct_deviation,
"historic_mean": historic_mean,
"historic_stdev": historic_stdev,
},
)


def update_check_in_volume(ts_list: Sequence[datetime]):
"""
Increment counters for a list of check-in timestamps. Each timestamp will be
Expand Down Expand Up @@ -180,7 +282,9 @@ def try_monitor_clock_tick(ts: datetime, partition: int):
extra = {"reference_datetime": str(backfill_tick)}
logger.info("monitors.consumer.clock_tick_backfill", extra=extra)

_evaluate_tick_decision(backfill_tick)
_dispatch_tick(backfill_tick)
backfill_tick = backfill_tick + timedelta(minutes=1)

_evaluate_tick_decision(tick)
_dispatch_tick(tick)
7 changes: 7 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,13 @@
# Killswitch for monitor check-ins
register("crons.organization.disable-check-in", type=Sequence, default=[])

# Enables anomaly detection based on the volume of check-ins being processed
register(
"crons.tick_volume_anomaly_detection",
default=False,
flags=FLAG_BOOL | FLAG_AUTOMATOR_MODIFIABLE,
)

# Sets the timeout for webhooks
register(
"sentry-apps.webhook.timeout.sec",
Expand Down
148 changes: 147 additions & 1 deletion tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from datetime import timedelta
import itertools
from collections.abc import Sequence
from datetime import datetime, timedelta
from unittest import mock

from arroyo import Topic
Expand All @@ -8,15 +10,20 @@
from django.utils import timezone

from sentry.monitors.clock_dispatch import (
MONITOR_VOLUME_DECISION_STEP,
MONITOR_VOLUME_HISTORY,
MONITOR_VOLUME_RETENTION,
_dispatch_tick,
_evaluate_tick_decision,
try_monitor_clock_tick,
update_check_in_volume,
)
from sentry.testutils.helpers.options import override_options
from sentry.utils import json, redis


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger(dispatch_tick):
now = timezone.now().replace(second=0, microsecond=0)

Expand Down Expand Up @@ -45,6 +52,7 @@ def test_monitor_task_trigger(dispatch_tick):


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger_partition_desync(dispatch_tick):
"""
When consumer partitions are not completely synchronized we may read
Expand Down Expand Up @@ -76,6 +84,7 @@ def test_monitor_task_trigger_partition_desync(dispatch_tick):


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger_partition_sync(dispatch_tick):
"""
When the kafka topic has multiple partitions we want to only tick our clock
Expand Down Expand Up @@ -104,6 +113,7 @@ def test_monitor_task_trigger_partition_sync(dispatch_tick):


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
"""
In a scenario where all partitions move multiple ticks past the slowest
Expand Down Expand Up @@ -141,6 +151,7 @@ def test_monitor_task_trigger_partition_tick_skip(dispatch_tick):
@override_settings(KAFKA_TOPIC_OVERRIDES={"monitors-clock-tick": "clock-tick-test-topic"})
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
@mock.patch("sentry.monitors.clock_dispatch._clock_tick_producer")
@override_options({"crons.tick_volume_anomaly_detection": False})
def test_dispatch_to_kafka(clock_tick_producer_mock):
now = timezone.now().replace(second=0, microsecond=0)
_dispatch_tick(now)
Expand Down Expand Up @@ -177,3 +188,138 @@ def make_key(offset: timedelta) -> str:
assert minute_1 == "1"
assert minute_2 is None
assert minute_3 == "1"


def fill_historic_volume(
start: datetime, length: timedelta, step: timedelta, counts: Sequence[int]
):
"""
Creates a volume history starting at the `start` and going back `length`,
where each bucket is spaced by `step`s apart.

`count` Is a list of counts for each step. This value is a list that will
be cycled through, it must be a division of the number of steps between the
start and length.
"""
aligned_start = start.replace(second=0, microsecond=0)

# The length of counts should be divisible into the number of steps
steps = length // step
assert steps % len(counts) == 0

counts_cycle = itertools.cycle(counts)
ts = aligned_start
end = aligned_start - length

ts_list = []
while ts >= end:
count = next(counts_cycle)
ts_list.extend([ts] * count)
ts = ts - step

update_check_in_volume(ts_list)


@mock.patch("sentry.monitors.clock_dispatch.logger")
@mock.patch("sentry.monitors.clock_dispatch.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
def test_evaluate_tick_decision_simple(metrics, logger):
tick = timezone.now().replace(second=0, microsecond=0)

# This is the timestamp we're looking at just after the tick
past_ts = tick - timedelta(minutes=1)

# Fill histroic volume data for earlier minutes.
fill_historic_volume(
start=past_ts - MONITOR_VOLUME_DECISION_STEP,
length=MONITOR_VOLUME_RETENTION,
step=MONITOR_VOLUME_DECISION_STEP,
counts=[170, 150, 130, 210, 154],
)

# Record a volume of 200 for the timestamp we are considerng
update_check_in_volume([past_ts] * 165)

_evaluate_tick_decision(tick)

logger.info.assert_called_with(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": 30,
"z_score": 0.08064694302168258,
"pct_deviation": 1.3513513513513442,
"historic_mean": 162.8,
"historic_stdev": 27.279397303484902,
},
)

metrics.gauge.assert_any_call(
"monitors.task.volume_history.count",
30,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.z_score",
0.08064694302168258,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.pct_deviation",
1.3513513513513442,
sample_rate=1.0,
)


@mock.patch("sentry.monitors.clock_dispatch.logger")
@mock.patch("sentry.monitors.clock_dispatch.metrics")
@override_options({"crons.tick_volume_anomaly_detection": True})
def test_evaluate_tick_decision_volume_drop(metrics, logger):
tick = timezone.now().replace(second=0, microsecond=0)

# This is the timestamp we're looking at just after the tick
past_ts = tick - timedelta(minutes=1)

# Fill histroic volume data for earlier minutes.
fill_historic_volume(
start=past_ts - MONITOR_VOLUME_DECISION_STEP,
length=MONITOR_VOLUME_RETENTION,
step=MONITOR_VOLUME_DECISION_STEP,
counts=[13_000, 12_000, 12_500, 12_400, 12_600],
)

# Record a volume much lower than what we had been recording previously
update_check_in_volume([past_ts] * 6_000)

_evaluate_tick_decision(tick)

# Note that the pct_deviation and z_score are extremes
logger.info.assert_called_with(
"monitors.clock_dispatch.volume_history",
extra={
"reference_datetime": str(tick),
"evaluation_minute": past_ts.strftime("%H:%M"),
"history_count": 30,
"z_score": -19.816869917656856,
"pct_deviation": 52.0,
"historic_mean": 12500,
"historic_stdev": 328.0033641543204,
},
)

metrics.gauge.assert_any_call(
"monitors.task.volume_history.count",
30,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.z_score",
-19.816869917656856,
sample_rate=1.0,
)
metrics.gauge.assert_any_call(
"monitors.task.volume_history.pct_deviation",
52.0,
sample_rate=1.0,
)
Loading