Skip to content

Commit

Permalink
feat(taskworker) Add metrics to task production and worker operations (
Browse files Browse the repository at this point in the history
…#80634)

Add metrics for high-level task operations like fetching tasks in
workers, completing tasks and failure modes.
  • Loading branch information
markstory authored Nov 14, 2024
1 parent 00ea404 commit b20411e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/sentry/taskworker/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sentry.taskworker.retry import Retry
from sentry.taskworker.router import TaskRouter
from sentry.taskworker.task import P, R, Task
from sentry.utils import metrics
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

Expand Down Expand Up @@ -92,6 +93,7 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
return wrapped

def send_task(self, activation: TaskActivation) -> None:
metrics.incr("taskworker.registry.send_task", tags={"namespace": activation.namespace})
# TODO(taskworker) producer callback handling
self.producer.produce(
ArroyoTopic(name=self.topic.value),
Expand Down
37 changes: 35 additions & 2 deletions src/sentry/taskworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from sentry.taskworker.registry import taskregistry
from sentry.taskworker.service.client import TaskClient
from sentry.utils import metrics

logger = logging.getLogger("sentry.taskworker.worker")

Expand Down Expand Up @@ -89,6 +90,7 @@ def start(self) -> int:
task = self.fetch_task()

if not task:
metrics.incr("taskworker.worker.no_task.pause")
time.sleep(1)
continue

Expand All @@ -97,6 +99,10 @@ def start(self) -> int:
self._max_task_count is not None
and self._max_task_count <= self._execution_count
):
metrics.incr(
"taskworker.worker.max_task_count_reached",
tags={"count": self._execution_count},
)
logger.info("Max task execution count reached. Terminating")
return 0

Expand All @@ -110,13 +116,16 @@ def fetch_task(self) -> TaskActivation | None:
try:
activation = self.client.get_task()
except grpc.RpcError:
metrics.incr("taskworker.worker.get_task.failed")
logger.info("get_task failed. Retrying in 1 second")
return None

if not activation:
metrics.incr("taskworker.worker.get_task.not_found")
logger.info("No task fetched")
return None

metrics.incr("taskworker.worker.get_task.success")
return activation

def _known_task(self, activation: TaskActivation) -> bool:
Expand All @@ -139,6 +148,10 @@ def _known_task(self, activation: TaskActivation) -> bool:
def process_task(self, activation: TaskActivation) -> TaskActivation | None:
assert self._pool
if not self._known_task(activation):
metrics.incr(
"taskworker.worker.unknown_task",
tags={"namespace": activation.namespace, "taskname": activation.taskname},
)
self._execution_count += 1
return self.client.update_task(
task_id=activation.id,
Expand Down Expand Up @@ -193,15 +206,35 @@ def process_task(self, activation: TaskActivation) -> TaskActivation | None:
self._execution_count += 1

task_added_time = activation.received_at.ToDatetime().timestamp()
execution_duration = execution_complete_time - execution_start_time
execution_latency = execution_complete_time - task_added_time
logger.info(
"taskworker.task_execution",
extra={
"taskname": activation.taskname,
"execution_duration": execution_complete_time - execution_start_time,
"execution_latency": execution_complete_time - task_added_time,
"execution_duration": execution_duration,
"execution_latency": execution_latency,
"status": next_state,
},
)
metrics.incr(
"taskworker.worker.execute_task",
tags={
"namespace": activation.namespace,
"status": next_state,
},
)
metrics.distribution(
"taskworker.worker.execution_duration",
execution_duration,
tags={"namespace": activation.namespace},
)
metrics.distribution(
"taskworker.worker.execution_latency",
execution_latency,
tags={"namespace": activation.namespace},
)

return self.client.update_task(
task_id=activation.id,
status=next_state,
Expand Down

0 comments on commit b20411e

Please sign in to comment.