Skip to content

Commit

Permalink
Add multiqueue capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Sep 19, 2024
1 parent e03cf60 commit da6cc11
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 212 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM python:3.12.4-slim-bullseye

RUN apt-get update && apt-get install make
RUN apt-get update && apt-get install make git -y
RUN addgroup --system python && adduser --system --group python
RUN mkdir opt/app
RUN chown -R python:python opt/app
Expand Down
1 change: 1 addition & 0 deletions params.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"multi_select_name","options":[{"id":"1","label":"1"},{"id":"2","label":"2"},{"id":"3","label":"3"}],"multi_value":true}
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
git+https://github.com/huridocs/queue-processor@26c9413ac4fd950ace4ee542d6734e6959e10ea4
fastapi==0.110.0
python-multipart==0.0.9
pydantic==2.6.3
uvicorn==0.27.1
gunicorn==21.2.0
requests==2.31.0
PyRSMQ==0.5.0
redis==5.0.2
httpx==0.27.0
starlette==0.36.3
114 changes: 19 additions & 95 deletions worker_metadata.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,31 @@
from time import sleep, time

import redis
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ, cmd
from queue_processor.QueueProcessor import QueueProcessor

from app import params_path
from data.LogsMessage import LogsMessage, Severity

from data.MetadataExtractionTask import MetadataExtractionTask
from data.ResultsMessage import ResultsMessage


class QueueProcessor:
def __init__(self):
self.task_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="information_extraction_tasks",
)

self.results_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="information_extraction_results",
)

self.logs_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="information_extraction_logs",
)

self.logs_queue.createQueue().exceptions(False).execute()

def process(self, id, message, rc, ts):
task = MetadataExtractionTask(**message)

if task.params.options:
params_path.write_text(task.params.model_dump_json())

data_url = f"http://127.0.0.1:5056/get_suggestions/{task.tenant}/{task.params.id}"

model_results_message = ResultsMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=True,
error_message="",
data_url=data_url,
)

self.send_logs(task)
self.results_queue.sendMessage().message(model_results_message.dict()).execute()
return True

def delete_old_messages(self):
message = self.logs_queue.receiveMessage().exceptions(False).execute()

while message and message["ts"] < time() - 2 * 60 * 60 * 24:
self.logs_queue.deleteMessage(id=message["id"]).execute()
message = self.logs_queue.receiveMessage().exceptions(False).execute()

def send_logs(self, task):
self.delete_old_messages()

log_message = LogsMessage(
tenant=task.tenant,
extraction_name=task.params.id,
severity=Severity.error,
message="Error log example from dummy services",
)

self.logs_queue.sendMessage().message(log_message.dump()).execute()

log_message = LogsMessage(
tenant=task.tenant,
extraction_name=task.params.id,
severity=Severity.info,
message="Information log example from dummy services",
)
def process(message):
task = MetadataExtractionTask(**message)

self.logs_queue.sendMessage().message(log_message.dump()).execute()
if task.params.options:
params_path.write_text(task.params.model_dump_json())

def subscribe_to_tasks_queue(self):
print("Metadata extractor queue processor started")
while True:
try:
self.task_queue.getQueueAttributes().exec_command()
self.results_queue.getQueueAttributes().exec_command()
data_url = f"http://127.0.0.1:5056/get_suggestions/{task.tenant}/{task.params.id}"

redis_smq_consumer = RedisSMQConsumer(
qname="information_extraction_tasks",
processor=self.process,
host="127.0.0.1",
port=6379,
)
redis_smq_consumer.run()
except redis.exceptions.ConnectionError:
sleep(20)
except cmd.exceptions.QueueDoesNotExist:
self.task_queue.createQueue().exceptions(False).execute()
self.results_queue.createQueue().exceptions(False).execute()
model_results_message = ResultsMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=True,
error_message="",
data_url=data_url,
)

return model_results_message.model_dump()

if __name__ == "__main__":
queue_processor = QueueProcessor()
queue_processor.subscribe_to_tasks_queue()
queues_names = ["information_extraction", "development_information_extraction"]
queue_processor = QueueProcessor("127.0.0.1", 6379, queues_names)
queue_processor.start(process)
77 changes: 19 additions & 58 deletions worker_paragraphs.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,27 @@
from time import sleep

import redis
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ, cmd
from queue_processor.QueueProcessor import QueueProcessor

from data.ExtractionMessage import ExtractionMessage
from data.Task import Task

def process(message):
task = Task(**message)
print(task.model_dump())
service_url = f"http://127.0.0.1:5051"
results_url = f"{service_url}/get_paragraphs/{task.tenant}/{task.params.filename}"
file_results_url = f"{service_url}/get_xml/{task.tenant}/{task.params.filename}"
extraction_message = ExtractionMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=True,
data_url=results_url,
file_url=file_results_url,
)

class QueueProcessor:
def __init__(self):
self.task_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="segmentation_tasks",
)

self.results_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="segmentation_results",
)

def process(self, id, message, rc, ts):
task = Task(**message)
print(task.dict())
service_url = f"http://127.0.0.1:5051"
results_url = f"{service_url}/get_paragraphs/{task.tenant}/{task.params.filename}"
file_results_url = f"{service_url}/get_xml/{task.tenant}/{task.params.filename}"
extraction_message = ExtractionMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=True,
data_url=results_url,
file_url=file_results_url,
)

self.results_queue.sendMessage(delay=5).message(extraction_message.dict()).execute()
return True

def subscribe_to_tasks_queue(self):
print("Paragraphs queue processor started")
while True:
try:
self.task_queue.getQueueAttributes().exec_command()
self.results_queue.getQueueAttributes().exec_command()

redis_smq_consumer = RedisSMQConsumer(
qname="segmentation_tasks",
processor=self.process,
host="127.0.0.1",
port=6379,
)
redis_smq_consumer.run()
except redis.exceptions.ConnectionError:
sleep(20)
except cmd.exceptions.QueueDoesNotExist:
self.task_queue.createQueue().exceptions(False).execute()
self.results_queue.createQueue().exceptions(False).execute()
return extraction_message.model_dump()


if __name__ == "__main__":
queue_processor = QueueProcessor()
queue_processor.subscribe_to_tasks_queue()
queues_names = ["segmentation", "development_segmentation"]
queue_processor = QueueProcessor("127.0.0.1", 6379, queues_names)
queue_processor.start(process)
74 changes: 18 additions & 56 deletions worker_translations.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,32 @@
from time import sleep

import redis
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ, cmd
from queue_processor.QueueProcessor import QueueProcessor

from data.Translation import Translation
from data.TranslationResponseMessage import TranslationResponseMessage
from data.TranslationTaskMessage import TranslationTaskMessage


class QueueProcessor:
def __init__(self):
self.task_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="translations_tasks",
)

self.results_queue = RedisSMQ(
host="127.0.0.1",
port=6379,
qname="translations_results",
)

def process(self, id, message, rc, ts):
task = TranslationTaskMessage(**message)
print(task.model_dump())

translations: list[Translation] = [self.get_translation(task, language) for language in task.languages_to]
response = TranslationResponseMessage(
**task.model_dump(),
translations=translations,
)
def process(message):
task = TranslationTaskMessage(**message)
print(task.model_dump())

self.results_queue.sendMessage(delay=5).message(response.model_dump()).execute()
return True
translations: list[Translation] = [get_translation(task, language) for language in task.languages_to]
response = TranslationResponseMessage(
**task.model_dump(),
translations=translations,
)

def subscribe_to_tasks_queue(self):
print("Translation queue processor started")
while True:
try:
self.task_queue.getQueueAttributes().exec_command()
self.results_queue.getQueueAttributes().exec_command()
return response.model_dump()

redis_smq_consumer = RedisSMQConsumer(
qname="translations_tasks",
processor=self.process,
host="127.0.0.1",
port=6379,
)
redis_smq_consumer.run()
except redis.exceptions.ConnectionError:
sleep(20)
except cmd.exceptions.QueueDoesNotExist:
self.task_queue.createQueue().exceptions(False).execute()
self.results_queue.createQueue().exceptions(False).execute()

@staticmethod
def get_translation(translation_task_message: TranslationTaskMessage, language: str) -> Translation:
if language == "error":
return Translation(text="", language=language, success=False, error_message="service error")
def get_translation(translation_task_message: TranslationTaskMessage, language: str) -> Translation:
if language == "error":
return Translation(text="", language=language, success=False, error_message="service error")

text = f"[translation for {language}] {translation_task_message.text}"
return Translation(text=text, language=language, success=False, error_message="")
text = f"[translation for {language}] {translation_task_message.text}"
return Translation(text=text, language=language, success=False, error_message="")


if __name__ == "__main__":
queue_processor = QueueProcessor()
queue_processor.subscribe_to_tasks_queue()
queues_names = ["translations", "development_translations"]
queue_processor = QueueProcessor("127.0.0.1", 6379, queues_names)
queue_processor.start(process)

0 comments on commit da6cc11

Please sign in to comment.