diff --git a/README.md b/README.md index 4d268af..fb4395d 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ def calculate_meaning_of_life() -> int: The task decorator accepts a few arguments to customize the task: -- `priority`: The priority of the task (larger numbers are higher priority) +- `priority`: The priority of the task (between -100 and 100. Larger numbers are higher priority. 0 by default) - `queue_name`: Whether to run the task on a specific queue - `backend`: Name of the backend for this task to use (as defined in `TASKS`) diff --git a/django_tasks/backends/base.py b/django_tasks/backends/base.py index 5d3abfb..7088fe9 100644 --- a/django_tasks/backends/base.py +++ b/django_tasks/backends/base.py @@ -8,7 +8,7 @@ from typing_extensions import ParamSpec from django_tasks.exceptions import InvalidTaskError -from django_tasks.task import Task, TaskResult +from django_tasks.task import MAX_PRIORITY, MIN_PRIORITY, Task, TaskResult from django_tasks.utils import is_global_function T = TypeVar("T") @@ -45,8 +45,14 @@ def validate_task(self, task: Task) -> None: if not self.supports_async_task and iscoroutinefunction(task.func): raise InvalidTaskError("Backend does not support async tasks") - if task.priority < 0: - raise InvalidTaskError("priority must be zero or greater") + if ( + task.priority < MIN_PRIORITY + or task.priority > MAX_PRIORITY + or int(task.priority) != task.priority + ): + raise InvalidTaskError( + f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}" + ) if not self.supports_defer and task.run_after is not None: raise InvalidTaskError("Backend does not support run_after") diff --git a/django_tasks/backends/database/migrations/0005_alter_dbtaskresult_priority_and_more.py b/django_tasks/backends/database/migrations/0005_alter_dbtaskresult_priority_and_more.py new file mode 100644 index 0000000..a03798c --- /dev/null +++ b/django_tasks/backends/database/migrations/0005_alter_dbtaskresult_priority_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.13 on 2024-07-10 15:48 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("django_tasks_database", "0004_dbtaskresult_started_at"), + ] + + operations = [ + migrations.AlterField( + model_name="dbtaskresult", + name="priority", + field=models.IntegerField(default=0), + ), + migrations.AddConstraint( + model_name="dbtaskresult", + constraint=models.CheckConstraint( + check=models.Q(("priority__range", (-100, 100))), name="priority_range" + ), + ), + ] diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index 706c8ae..4aff3da 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -1,15 +1,28 @@ +import logging import uuid from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar from django.core.exceptions import SuspiciousOperation from django.db import models -from django.db.models import F +from django.db.models import F, Q +from django.db.models.constraints import CheckConstraint from django.utils import timezone from django.utils.module_loading import import_string from typing_extensions import ParamSpec from django_tasks.task import DEFAULT_QUEUE_NAME, ResultStatus, Task from django_tasks.utils import exception_to_dict, retry +from django_tasks.task import ( + DEFAULT_PRIORITY, + DEFAULT_QUEUE_NAME, + MAX_PRIORITY, + MIN_PRIORITY, + ResultStatus, + Task, +) +from django_tasks.utils import exception_to_dict, retry + +logger = logging.getLogger("django_tasks.backends.database") T = TypeVar("T") P = ParamSpec("P") @@ -69,7 +82,7 @@ class DBTaskResult(GenericBase[P, T], models.Model): args_kwargs = models.JSONField() - priority = models.PositiveSmallIntegerField(default=0) + priority = models.IntegerField(default=DEFAULT_PRIORITY) task_path = models.TextField() @@ -86,6 +99,12 @@ class Meta: ordering = [F("priority").desc(), F("run_after").desc(nulls_last=True)] verbose_name = "Task Result" verbose_name_plural = "Task Results" + constraints = [ + CheckConstraint( + check=Q(priority__range=(MIN_PRIORITY, MAX_PRIORITY)), + name="priority_range", + ) + ] @property def task(self) -> Task[P, T]: @@ -147,5 +166,6 @@ def set_failed(self, exc: BaseException) -> None: try: self.result = exception_to_dict(exc) except Exception: + logger.exception("Task id=%s unable to save exception", self.id) self.result = None self.save(update_fields=["status", "finished_at", "result"]) diff --git a/django_tasks/backends/immediate.py b/django_tasks/backends/immediate.py index 80c701a..6459e8d 100644 --- a/django_tasks/backends/immediate.py +++ b/django_tasks/backends/immediate.py @@ -1,3 +1,4 @@ +import logging from inspect import iscoroutinefunction from typing import TypeVar from uuid import uuid4 @@ -11,6 +12,9 @@ from .base import BaseTaskBackend +logger = logging.getLogger(__name__) + + T = TypeVar("T") P = ParamSpec("P") @@ -29,6 +33,7 @@ def enqueue( enqueued_at = timezone.now() started_at = timezone.now() + result_id = str(uuid4()) try: result = json_normalize(calling_task_func(*args, **kwargs)) status = ResultStatus.COMPLETE @@ -36,12 +41,13 @@ def enqueue( try: result = exception_to_dict(e) except Exception: + logger.exception("Task id=%s unable to save exception", result_id) result = None status = ResultStatus.FAILED task_result = TaskResult[T]( task=task, - id=str(uuid4()), + id=result_id, status=status, enqueued_at=enqueued_at, started_at=started_at, diff --git a/django_tasks/task.py b/django_tasks/task.py index 9b4611c..4e68726 100644 --- a/django_tasks/task.py +++ b/django_tasks/task.py @@ -29,6 +29,9 @@ DEFAULT_TASK_BACKEND_ALIAS = "default" DEFAULT_QUEUE_NAME = "default" +MIN_PRIORITY = -100 +MAX_PRIORITY = 100 +DEFAULT_PRIORITY = 0 class ResultStatus(TextChoices): @@ -71,6 +74,7 @@ def name(self) -> str: def using( self, + *, priority: Optional[int] = None, queue_name: Optional[str] = None, run_after: Optional[Union[datetime, timedelta]] = None, @@ -164,7 +168,7 @@ def task(function: Callable[P, T], /) -> Task[P, T]: ... @overload def task( *, - priority: int = 0, + priority: int = DEFAULT_PRIORITY, queue_name: str = DEFAULT_QUEUE_NAME, backend: str = DEFAULT_TASK_BACKEND_ALIAS, ) -> Callable[[Callable[P, T]], Task[P, T]]: ... @@ -174,7 +178,7 @@ def task( def task( function: Optional[Callable[P, T]] = None, *, - priority: int = 0, + priority: int = DEFAULT_PRIORITY, queue_name: str = DEFAULT_QUEUE_NAME, backend: str = DEFAULT_TASK_BACKEND_ALIAS, ) -> Union[Task[P, T], Callable[[Callable[P, T]], Task[P, T]]]: diff --git a/tests/tests/test_database_backend.py b/tests/tests/test_database_backend.py index 66d723c..950a747 100644 --- a/tests/tests/test_database_backend.py +++ b/tests/tests/test_database_backend.py @@ -7,7 +7,8 @@ from django.core.exceptions import SuspiciousOperation from django.core.management import call_command, execute_from_command_line -from django.test import TestCase, TransactionTestCase, override_settings +from django.db.utils import IntegrityError +from django.test import TransactionTestCase, override_settings from django.urls import reverse from django.utils import timezone @@ -29,7 +30,7 @@ } } ) -class DatabaseBackendTestCase(TestCase): +class DatabaseBackendTestCase(TransactionTestCase): def test_using_correct_backend(self) -> None: self.assertEqual(default_task_backend, tasks["default"]) self.assertIsInstance(tasks["default"], DatabaseBackend) @@ -204,6 +205,34 @@ def test_database_backend_app_missing(self) -> None: self.assertEqual(len(errors), 1) self.assertIn("django_tasks.backends.database", errors[0].hint) + def test_priority_range_check(self) -> None: + with self.assertRaises(IntegrityError): + DBTaskResult.objects.create( + task_path="", backend_name="default", priority=-101, args_kwargs={} + ) + + with self.assertRaises(IntegrityError): + DBTaskResult.objects.create( + task_path="", backend_name="default", priority=101, args_kwargs={} + ) + + # Django accepts the float, but only stores an int + result = DBTaskResult.objects.create( + task_path="", backend_name="default", priority=3.1, args_kwargs={} + ) + result.refresh_from_db() + self.assertEqual(result.priority, 3) + + DBTaskResult.objects.create( + task_path="", backend_name="default", priority=100, args_kwargs={} + ) + DBTaskResult.objects.create( + task_path="", backend_name="default", priority=-100, args_kwargs={} + ) + DBTaskResult.objects.create( + task_path="", backend_name="default", priority=0, args_kwargs={} + ) + @override_settings( TASKS={ @@ -309,6 +338,28 @@ def test_failing_task(self) -> None: self.assertGreaterEqual(result.started_at, result.enqueued_at) # type: ignore self.assertGreaterEqual(result.finished_at, result.started_at) # type: ignore self.assertEqual(result.status, ResultStatus.FAILED) + self.assertIsInstance(result.result, ValueError) + + self.assertEqual(DBTaskResult.objects.ready().count(), 0) + + def test_complex_exception(self) -> None: + result = test_tasks.complex_exception.enqueue() + self.assertEqual(DBTaskResult.objects.ready().count(), 1) + + with self.assertNumQueries(8), self.assertLogs( + "django_tasks.backends.database", level="ERROR" + ): + self.run_worker() + + self.assertEqual(result.status, ResultStatus.NEW) + result.refresh() + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.finished_at) + + self.assertGreaterEqual(result.started_at, result.enqueued_at) # type: ignore + self.assertGreaterEqual(result.finished_at, result.started_at) # type: ignore + self.assertEqual(result.status, ResultStatus.FAILED) + self.assertIsNone(result.result) self.assertIsInstance(result.result, ValueError) assert result.traceback # So that mypy knows the next line is allowed @@ -429,6 +480,7 @@ def test_run_after_priority(self) -> None: high_priority_result = test_tasks.noop_task.using(priority=10).enqueue() low_priority_result = test_tasks.noop_task.using(priority=2).enqueue() + lower_priority_result = test_tasks.noop_task.using(priority=-2).enqueue() self.assertEqual( [dbt.task_result for dbt in DBTaskResult.objects.all()], @@ -438,6 +490,7 @@ def test_run_after_priority(self) -> None: low_priority_result, far_future_result, future_result, + lower_priority_result, ], ) @@ -446,6 +499,7 @@ def test_run_after_priority(self) -> None: [ high_priority_result, low_priority_result, + lower_priority_result, ], ) diff --git a/tests/tests/test_immediate_backend.py b/tests/tests/test_immediate_backend.py index cb7a8e9..cc6c4e8 100644 --- a/tests/tests/test_immediate_backend.py +++ b/tests/tests/test_immediate_backend.py @@ -66,16 +66,16 @@ def test_catches_exception(self) -> None: def test_complex_exception(self) -> None: result = default_task_backend.enqueue(test_tasks.complex_exception, [], {}) + with self.assertLogs("django_tasks.backends.immediate", level="ERROR"): + result = default_task_backend.enqueue(test_tasks.complex_exception, [], {}) self.assertEqual(result.status, ResultStatus.FAILED) self.assertIsNotNone(result.started_at) self.assertIsNotNone(result.finished_at) self.assertGreaterEqual(result.started_at, result.enqueued_at) self.assertGreaterEqual(result.finished_at, result.started_at) - self.assertIsNone(result.result) self.assertIsNone(result.traceback) - self.assertIsNone(result.get_result()) self.assertEqual(result.task, test_tasks.complex_exception) self.assertEqual(result.args, []) diff --git a/tests/tests/test_tasks.py b/tests/tests/test_tasks.py index 46a84c6..0f38759 100644 --- a/tests/tests/test_tasks.py +++ b/tests/tests/test_tasks.py @@ -20,6 +20,7 @@ InvalidTaskError, ResultDoesNotExist, ) +from django_tasks.task import MAX_PRIORITY, MIN_PRIORITY from tests import tasks as test_tasks @@ -133,9 +134,26 @@ def test_naive_datetime(self) -> None: def test_invalid_priority(self) -> None: with self.assertRaisesMessage( - InvalidTaskError, "priority must be zero or greater" + InvalidTaskError, + f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}", ): - test_tasks.noop_task.using(priority=-1) + test_tasks.noop_task.using(priority=-101) + + with self.assertRaisesMessage( + InvalidTaskError, + f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}", + ): + test_tasks.noop_task.using(priority=101) + + with self.assertRaisesMessage( + InvalidTaskError, + f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}", + ): + test_tasks.noop_task.using(priority=3.1) # type:ignore[arg-type] + + test_tasks.noop_task.using(priority=100) + test_tasks.noop_task.using(priority=-100) + test_tasks.noop_task.using(priority=0) def test_call_task(self) -> None: self.assertEqual(test_tasks.calculate_meaning_of_life.call(), 42) diff --git a/tests/tests/test_utils.py b/tests/tests/test_utils.py index 464b185..c3917c7 100644 --- a/tests/tests/test_utils.py +++ b/tests/tests/test_utils.py @@ -3,11 +3,13 @@ import subprocess from unittest.mock import Mock +from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured from django.test import SimpleTestCase from django_tasks import utils from django_tasks.exceptions import InvalidTaskError +from django_tasks.exceptions import InvalidTaskError from tests import tasks as test_tasks @@ -104,9 +106,9 @@ def test_serialize_exceptions(self) -> None: self.assertEqual( set(data.keys()), {"exc_type", "exc_args", "exc_traceback"} ) - exception = utils.exception_from_dict(data) - self.assertIsInstance(exception, type(exc)) - self.assertEqual(exception.args, exc.args) + reconstructed = utils.exception_from_dict(data) + self.assertIsInstance(reconstructed, type(exc)) + self.assertEqual(reconstructed.args, exc.args) # Check that the exception traceback contains only one line, # the one with the name of the exception and the message. @@ -152,6 +154,9 @@ def test_cannot_deserialize_non_exception(self) -> None: {"exc_type": "subprocess.check_output", "exc_args": ["exit", "1"]}, {"exc_type": "True", "exc_args": []}, {"exc_type": "math.pi", "exc_args": []}, + {"exc_type": __name__, "exc_args": []}, + {"exc_type": utils.get_module_path(type(self)), "exc_args": []}, + {"exc_type": utils.get_module_path(Mock), "exc_args": []}, ]: with self.subTest(data): with self.assertRaises((TypeError, ImportError)):