Skip to content

Commit

Permalink
feat(crons): Add empty incident_occurrences_consumer (#80527)
Browse files Browse the repository at this point in the history
Part of GH-79328
  • Loading branch information
evanpurkhiser authored Nov 11, 2024
1 parent 49ce622 commit 5fc9f67
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 3 deletions.
2 changes: 1 addition & 1 deletion requirements-base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ rfc3339-validator>=0.1.2
rfc3986-validator>=0.1.1
# [end] jsonschema format validators
sentry-arroyo>=2.16.5
sentry-kafka-schemas>=0.1.116
sentry-kafka-schemas>=0.1.118
sentry-ophio==1.0.0
sentry-protos>=0.1.34
sentry-redis-tools>=0.1.7
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ sentry-cli==2.16.0
sentry-devenv==1.13.0
sentry-forked-django-stubs==5.1.1.post1
sentry-forked-djangorestframework-stubs==3.15.1.post2
sentry-kafka-schemas==0.1.116
sentry-kafka-schemas==0.1.118
sentry-ophio==1.0.0
sentry-protos==0.1.34
sentry-redis-tools==0.1.7
Expand Down
2 changes: 1 addition & 1 deletion requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ rpds-py==0.20.0
rsa==4.8
s3transfer==0.10.0
sentry-arroyo==2.16.5
sentry-kafka-schemas==0.1.116
sentry-kafka-schemas==0.1.118
sentry-ophio==1.0.0
sentry-protos==0.1.34
sentry-redis-tools==0.1.7
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2893,6 +2893,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
"ingest-monitors": "default",
"monitors-clock-tick": "default",
"monitors-clock-tasks": "default",
"monitors-incident-occurrences": "default",
"uptime-configs": "default",
"uptime-results": "default",
"uptime-configs": "default",
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Topic(Enum):
INGEST_MONITORS = "ingest-monitors"
MONITORS_CLOCK_TICK = "monitors-clock-tick"
MONITORS_CLOCK_TASKS = "monitors-clock-tasks"
MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences"
UPTIME_CONFIG = "uptime-configs"
UPTIME_RESULTS = "uptime-results"
UPTIME_CONFIGS = "uptime-configs"
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ def ingest_transactions_options() -> list[click.Option]:
"topic": Topic.MONITORS_CLOCK_TASKS,
"strategy_factory": "sentry.monitors.consumers.clock_tasks_consumer.MonitorClockTasksStrategyFactory",
},
"monitors-incident-occurrences": {
"topic": Topic.MONITORS_INCIDENT_OCCURRENCES,
"strategy_factory": "sentry.monitors.consumers.incident_occurrences_consumer.MonitorIncidentOccurenceStrategyFactory",
},
"uptime-results": {
"topic": Topic.UPTIME_RESULTS,
"strategy_factory": "sentry.uptime.consumers.results_consumer.UptimeResultsStrategyFactory",
Expand Down
43 changes: 43 additions & 0 deletions src/sentry/monitors/consumers/incident_occurrences_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from __future__ import annotations

import logging
from collections.abc import Mapping

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import BrokerValue, Commit, FilteredPayload, Message, Partition
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence

from sentry.conf.types.kafka_definition import Topic, get_topic_codec

logger = logging.getLogger(__name__)

MONITORS_INCIDENT_OCCURRENCES: Codec[IncidentOccurrence] = get_topic_codec(
Topic.MONITORS_INCIDENT_OCCURRENCES
)


def process_incident_occurrence(message: Message[KafkaPayload | FilteredPayload]):
assert not isinstance(message.payload, FilteredPayload)
assert isinstance(message.value, BrokerValue)

# wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value)
# TODO(epurkhiser): Do something with issue occurrence


class MonitorIncidentOccurenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(self) -> None:
pass

def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=process_incident_occurrence,
next_step=CommitOffsets(commit),
)
1 change: 1 addition & 0 deletions src/sentry/runner/commands/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def devserver(

kafka_consumers.add("monitors-clock-tick")
kafka_consumers.add("monitors-clock-tasks")
kafka_consumers.add("monitors-incident-occurrences")

if settings.SENTRY_USE_PROFILING:
kafka_consumers.add("ingest-profiles")
Expand Down
1 change: 1 addition & 0 deletions tests/sentry/consumers/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def test_dlq(consumer_def) -> None:
"ingest-monitors",
"monitors-clock-tick",
"monitors-clock-tasks",
"monitors-incident-occurrences",
"uptime-results",
"metrics-last-seen-updater",
"generic-metrics-last-seen-updater",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime
from unittest import mock

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import ProcessingStrategy
from arroyo.types import BrokerValue, Message, Partition, Topic
from django.utils import timezone
from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence

from sentry.monitors.consumers.incident_occurrences_consumer import (
MONITORS_INCIDENT_OCCURRENCES,
MonitorIncidentOccurenceStrategyFactory,
)

partition = Partition(Topic("test"), 0)


def create_consumer() -> ProcessingStrategy[KafkaPayload]:
factory = MonitorIncidentOccurenceStrategyFactory()
commit = mock.Mock()
return factory.create_with_partitions(commit, {partition: 0})


def sned_incident_occurrence(
consumer: ProcessingStrategy[KafkaPayload],
ts: datetime,
incident_occurrence: IncidentOccurrence,
):
value = BrokerValue(
KafkaPayload(b"fake-key", MONITORS_INCIDENT_OCCURRENCES.encode(incident_occurrence), []),
partition,
1,
ts,
)
consumer.submit(Message(value))


def test_simple():
# XXX(epurkhiser): Doesn't really test anything yet
ts = timezone.now().replace(second=0, microsecond=0)

consumer = create_consumer()
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": 1617895645,
"received_ts": 1617895650,
"failed_checkin_id": 123456,
"incident_id": 987654,
"previous_checkin_ids": [111222, 333444, 55666],
},
)

0 comments on commit 5fc9f67

Please sign in to comment.