Skip to content

Commit

Permalink
Report service logs to the Redis queue
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed May 24, 2024
1 parent 243dd09 commit 0852efc
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
22 changes: 22 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM python:3.12.3-alpine

RUN apt-get update && apt-get -y -q --no-install-recommends install libgomp1
RUN apt-get -y install git
RUN mkdir -p /app/src /app/docker_volume

RUN addgroup --system python && adduser --system --group python
RUN chown -R python:python /app
USER python

ENV VIRTUAL_ENV=/app/venv
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip --default-timeout=1000 install -r requirements.txt

WORKDIR /app
COPY ./src ./src

ENV PYTHONPATH "${PYTHONPATH}:/app/src"
19 changes: 19 additions & 0 deletions data/LogsMessage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import json
from enum import Enum

from pydantic import BaseModel


class Severity(str, Enum):
error = "error"
info = "info"


class LogsMessage(BaseModel):
tenant: str
extraction_name: str
severity: Severity
message: str

def dump(self):
return json.loads(self.model_dump_json())
39 changes: 37 additions & 2 deletions worker_metadata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
from time import sleep
from time import sleep, time

import redis
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ, cmd

from app import params_path
from data.LogsMessage import LogsMessage, Severity
from data.MetadataExtractionTask import MetadataExtractionTask
from data.ResultsMessage import ResultsMessage

Expand All @@ -24,6 +24,12 @@ def __init__(self):
qname="information_extraction_results",
)

self.logs_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="information_extraction_logs",
)

def process(self, id, message, rc, ts):
task = MetadataExtractionTask(**message)

Expand All @@ -41,9 +47,38 @@ def process(self, id, message, rc, ts):
data_url=data_url,
)

self.send_logs(task)
self.results_queue.sendMessage().message(model_results_message.dict()).execute()
return True

def delete_old_messages(self):
message = self.logs_queue.receiveMessage().exceptions(False).execute()

while message and message["ts"] < time() - 2 * 60 * 60 * 24:
self.logs_queue.deleteMessage(id=message["id"]).execute()
message = self.logs_queue.receiveMessage().exceptions(False).execute()

def send_logs(self, task):
self.delete_old_messages()

log_message = LogsMessage(
tenant=task.tenant,
extraction_name=task.params.id,
severity=Severity.error,
message="Error log example from dummy services",
)

self.logs_queue.sendMessage().message(log_message.dump()).execute()

log_message = LogsMessage(
tenant=task.tenant,
extraction_name=task.params.id,
severity=Severity.info,
message="Information log example from dummy services",
)

self.logs_queue.sendMessage().message(log_message.dump()).execute()

def subscribe_to_tasks_queue(self):
while True:
try:
Expand Down

0 comments on commit 0852efc

Please sign in to comment.