diff --git a/requirements-base.txt b/requirements-base.txt index 00da4693e8417f..369a7d6a8ff253 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -66,7 +66,7 @@ rfc3339-validator>=0.1.2 rfc3986-validator>=0.1.1 # [end] jsonschema format validators sentry-arroyo>=2.16.5 -sentry-kafka-schemas>=0.1.116 +sentry-kafka-schemas>=0.1.118 sentry-ophio==1.0.0 sentry-protos>=0.1.34 sentry-redis-tools>=0.1.7 diff --git a/requirements-dev-frozen.txt b/requirements-dev-frozen.txt index 70a30221fc706c..083e207c18090b 100644 --- a/requirements-dev-frozen.txt +++ b/requirements-dev-frozen.txt @@ -184,7 +184,7 @@ sentry-cli==2.16.0 sentry-devenv==1.13.0 sentry-forked-django-stubs==5.1.1.post1 sentry-forked-djangorestframework-stubs==3.15.1.post2 -sentry-kafka-schemas==0.1.116 +sentry-kafka-schemas==0.1.118 sentry-ophio==1.0.0 sentry-protos==0.1.34 sentry-redis-tools==0.1.7 diff --git a/requirements-frozen.txt b/requirements-frozen.txt index e2ec8ec13fea69..dbacad8a31fe63 100644 --- a/requirements-frozen.txt +++ b/requirements-frozen.txt @@ -125,7 +125,7 @@ rpds-py==0.20.0 rsa==4.8 s3transfer==0.10.0 sentry-arroyo==2.16.5 -sentry-kafka-schemas==0.1.116 +sentry-kafka-schemas==0.1.118 sentry-ophio==1.0.0 sentry-protos==0.1.34 sentry-redis-tools==0.1.7 diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 7c2c55d7b8947c..39926ffd6b6bd4 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2893,6 +2893,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: "ingest-monitors": "default", "monitors-clock-tick": "default", "monitors-clock-tasks": "default", + "monitors-incident-occurrences": "default", "uptime-configs": "default", "uptime-results": "default", "uptime-configs": "default", diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index f2d5f6a0d40986..f486e4c3f3469f 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -47,6 +47,7 @@ class Topic(Enum): INGEST_MONITORS = "ingest-monitors" MONITORS_CLOCK_TICK = "monitors-clock-tick" MONITORS_CLOCK_TASKS = "monitors-clock-tasks" + MONITORS_INCIDENT_OCCURRENCES = "monitors-incident-occurrences" UPTIME_CONFIG = "uptime-configs" UPTIME_RESULTS = "uptime-results" UPTIME_CONFIGS = "uptime-configs" diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 66608fcf848ed5..02c31010a58a91 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -256,6 +256,10 @@ def ingest_transactions_options() -> list[click.Option]: "topic": Topic.MONITORS_CLOCK_TASKS, "strategy_factory": "sentry.monitors.consumers.clock_tasks_consumer.MonitorClockTasksStrategyFactory", }, + "monitors-incident-occurrences": { + "topic": Topic.MONITORS_INCIDENT_OCCURRENCES, + "strategy_factory": "sentry.monitors.consumers.incident_occurrences_consumer.MonitorIncidentOccurenceStrategyFactory", + }, "uptime-results": { "topic": Topic.UPTIME_RESULTS, "strategy_factory": "sentry.uptime.consumers.results_consumer.UptimeResultsStrategyFactory", diff --git a/src/sentry/monitors/consumers/incident_occurrences_consumer.py b/src/sentry/monitors/consumers/incident_occurrences_consumer.py new file mode 100644 index 00000000000000..3a8799e9ae5589 --- /dev/null +++ b/src/sentry/monitors/consumers/incident_occurrences_consumer.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import logging +from collections.abc import Mapping + +from arroyo.backends.kafka.consumer import KafkaPayload +from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory +from arroyo.processing.strategies.commit import CommitOffsets +from arroyo.processing.strategies.run_task import RunTask +from arroyo.types import BrokerValue, Commit, FilteredPayload, Message, Partition +from sentry_kafka_schemas.codecs import Codec +from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence + +from sentry.conf.types.kafka_definition import Topic, get_topic_codec + +logger = logging.getLogger(__name__) + +MONITORS_INCIDENT_OCCURRENCES: Codec[IncidentOccurrence] = get_topic_codec( + Topic.MONITORS_INCIDENT_OCCURRENCES +) + + +def process_incident_occurrence(message: Message[KafkaPayload | FilteredPayload]): + assert not isinstance(message.payload, FilteredPayload) + assert isinstance(message.value, BrokerValue) + + # wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value) + # TODO(epurkhiser): Do something with issue occurrence + + +class MonitorIncidentOccurenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): + def __init__(self) -> None: + pass + + def create_with_partitions( + self, + commit: Commit, + partitions: Mapping[Partition, int], + ) -> ProcessingStrategy[KafkaPayload]: + return RunTask( + function=process_incident_occurrence, + next_step=CommitOffsets(commit), + ) diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 2425dcded08c9b..ce55cc8a3e7902 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -338,6 +338,7 @@ def devserver( kafka_consumers.add("monitors-clock-tick") kafka_consumers.add("monitors-clock-tasks") + kafka_consumers.add("monitors-incident-occurrences") if settings.SENTRY_USE_PROFILING: kafka_consumers.add("ingest-profiles") diff --git a/tests/sentry/consumers/test_run.py b/tests/sentry/consumers/test_run.py index 99674e33f02e36..9b48e119cf382f 100644 --- a/tests/sentry/consumers/test_run.py +++ b/tests/sentry/consumers/test_run.py @@ -38,6 +38,7 @@ def test_dlq(consumer_def) -> None: "ingest-monitors", "monitors-clock-tick", "monitors-clock-tasks", + "monitors-incident-occurrences", "uptime-results", "metrics-last-seen-updater", "generic-metrics-last-seen-updater", diff --git a/tests/sentry/monitors/consumers/test_incident_occurrence_consumer.py b/tests/sentry/monitors/consumers/test_incident_occurrence_consumer.py new file mode 100644 index 00000000000000..d6f279440920ba --- /dev/null +++ b/tests/sentry/monitors/consumers/test_incident_occurrence_consumer.py @@ -0,0 +1,53 @@ +from datetime import datetime +from unittest import mock + +from arroyo.backends.kafka import KafkaPayload +from arroyo.processing.strategies import ProcessingStrategy +from arroyo.types import BrokerValue, Message, Partition, Topic +from django.utils import timezone +from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence + +from sentry.monitors.consumers.incident_occurrences_consumer import ( + MONITORS_INCIDENT_OCCURRENCES, + MonitorIncidentOccurenceStrategyFactory, +) + +partition = Partition(Topic("test"), 0) + + +def create_consumer() -> ProcessingStrategy[KafkaPayload]: + factory = MonitorIncidentOccurenceStrategyFactory() + commit = mock.Mock() + return factory.create_with_partitions(commit, {partition: 0}) + + +def sned_incident_occurrence( + consumer: ProcessingStrategy[KafkaPayload], + ts: datetime, + incident_occurrence: IncidentOccurrence, +): + value = BrokerValue( + KafkaPayload(b"fake-key", MONITORS_INCIDENT_OCCURRENCES.encode(incident_occurrence), []), + partition, + 1, + ts, + ) + consumer.submit(Message(value)) + + +def test_simple(): + # XXX(epurkhiser): Doesn't really test anything yet + ts = timezone.now().replace(second=0, microsecond=0) + + consumer = create_consumer() + sned_incident_occurrence( + consumer, + ts, + { + "clock_tick_ts": 1617895645, + "received_ts": 1617895650, + "failed_checkin_id": 123456, + "incident_id": 987654, + "previous_checkin_ids": [111222, 333444, 55666], + }, + )