Skip to content

Commit

Permalink
Merge branch 'master' into feature/exception-serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Samuel committed Jul 12, 2024
2 parents 3c62c5e + 7685bd7 commit 30b7711
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def calculate_meaning_of_life() -> int:

The task decorator accepts a few arguments to customize the task:

- `priority`: The priority of the task (larger numbers are higher priority)
- `priority`: The priority of the task (between -100 and 100. Larger numbers are higher priority. 0 by default)
- `queue_name`: Whether to run the task on a specific queue
- `backend`: Name of the backend for this task to use (as defined in `TASKS`)

Expand Down
12 changes: 9 additions & 3 deletions django_tasks/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing_extensions import ParamSpec

from django_tasks.exceptions import InvalidTaskError
from django_tasks.task import Task, TaskResult
from django_tasks.task import MAX_PRIORITY, MIN_PRIORITY, Task, TaskResult
from django_tasks.utils import is_global_function

T = TypeVar("T")
Expand Down Expand Up @@ -45,8 +45,14 @@ def validate_task(self, task: Task) -> None:
if not self.supports_async_task and iscoroutinefunction(task.func):
raise InvalidTaskError("Backend does not support async tasks")

if task.priority < 0:
raise InvalidTaskError("priority must be zero or greater")
if (
task.priority < MIN_PRIORITY
or task.priority > MAX_PRIORITY
or int(task.priority) != task.priority
):
raise InvalidTaskError(
f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}"
)

if not self.supports_defer and task.run_after is not None:
raise InvalidTaskError("Backend does not support run_after")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.13 on 2024-07-10 15:48

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("django_tasks_database", "0004_dbtaskresult_started_at"),
]

operations = [
migrations.AlterField(
model_name="dbtaskresult",
name="priority",
field=models.IntegerField(default=0),
),
migrations.AddConstraint(
model_name="dbtaskresult",
constraint=models.CheckConstraint(
check=models.Q(("priority__range", (-100, 100))), name="priority_range"
),
),
]
24 changes: 22 additions & 2 deletions django_tasks/backends/database/models.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import logging
import uuid
from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar

from django.core.exceptions import SuspiciousOperation
from django.db import models
from django.db.models import F
from django.db.models import F, Q
from django.db.models.constraints import CheckConstraint
from django.utils import timezone
from django.utils.module_loading import import_string
from typing_extensions import ParamSpec

from django_tasks.task import DEFAULT_QUEUE_NAME, ResultStatus, Task
from django_tasks.utils import exception_to_dict, retry
from django_tasks.task import (
DEFAULT_PRIORITY,
DEFAULT_QUEUE_NAME,
MAX_PRIORITY,
MIN_PRIORITY,
ResultStatus,
Task,
)
from django_tasks.utils import exception_to_dict, retry

logger = logging.getLogger("django_tasks.backends.database")

T = TypeVar("T")
P = ParamSpec("P")
Expand Down Expand Up @@ -69,7 +82,7 @@ class DBTaskResult(GenericBase[P, T], models.Model):

args_kwargs = models.JSONField()

priority = models.PositiveSmallIntegerField(default=0)
priority = models.IntegerField(default=DEFAULT_PRIORITY)

task_path = models.TextField()

Expand All @@ -86,6 +99,12 @@ class Meta:
ordering = [F("priority").desc(), F("run_after").desc(nulls_last=True)]
verbose_name = "Task Result"
verbose_name_plural = "Task Results"
constraints = [
CheckConstraint(
check=Q(priority__range=(MIN_PRIORITY, MAX_PRIORITY)),
name="priority_range",
)
]

@property
def task(self) -> Task[P, T]:
Expand Down Expand Up @@ -147,5 +166,6 @@ def set_failed(self, exc: BaseException) -> None:
try:
self.result = exception_to_dict(exc)
except Exception:
logger.exception("Task id=%s unable to save exception", self.id)
self.result = None
self.save(update_fields=["status", "finished_at", "result"])
8 changes: 7 additions & 1 deletion django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from inspect import iscoroutinefunction
from typing import TypeVar
from uuid import uuid4
Expand All @@ -11,6 +12,9 @@

from .base import BaseTaskBackend

logger = logging.getLogger(__name__)


T = TypeVar("T")
P = ParamSpec("P")

Expand All @@ -29,19 +33,21 @@ def enqueue(

enqueued_at = timezone.now()
started_at = timezone.now()
result_id = str(uuid4())
try:
result = json_normalize(calling_task_func(*args, **kwargs))
status = ResultStatus.COMPLETE
except Exception as e:
try:
result = exception_to_dict(e)
except Exception:
logger.exception("Task id=%s unable to save exception", result_id)
result = None
status = ResultStatus.FAILED

task_result = TaskResult[T](
task=task,
id=str(uuid4()),
id=result_id,
status=status,
enqueued_at=enqueued_at,
started_at=started_at,
Expand Down
8 changes: 6 additions & 2 deletions django_tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

DEFAULT_TASK_BACKEND_ALIAS = "default"
DEFAULT_QUEUE_NAME = "default"
MIN_PRIORITY = -100
MAX_PRIORITY = 100
DEFAULT_PRIORITY = 0


class ResultStatus(TextChoices):
Expand Down Expand Up @@ -71,6 +74,7 @@ def name(self) -> str:

def using(
self,
*,
priority: Optional[int] = None,
queue_name: Optional[str] = None,
run_after: Optional[Union[datetime, timedelta]] = None,
Expand Down Expand Up @@ -164,7 +168,7 @@ def task(function: Callable[P, T], /) -> Task[P, T]: ...
@overload
def task(
*,
priority: int = 0,
priority: int = DEFAULT_PRIORITY,
queue_name: str = DEFAULT_QUEUE_NAME,
backend: str = DEFAULT_TASK_BACKEND_ALIAS,
) -> Callable[[Callable[P, T]], Task[P, T]]: ...
Expand All @@ -174,7 +178,7 @@ def task(
def task(
function: Optional[Callable[P, T]] = None,
*,
priority: int = 0,
priority: int = DEFAULT_PRIORITY,
queue_name: str = DEFAULT_QUEUE_NAME,
backend: str = DEFAULT_TASK_BACKEND_ALIAS,
) -> Union[Task[P, T], Callable[[Callable[P, T]], Task[P, T]]]:
Expand Down
58 changes: 56 additions & 2 deletions tests/tests/test_database_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from django.core.exceptions import SuspiciousOperation
from django.core.management import call_command, execute_from_command_line
from django.test import TestCase, TransactionTestCase, override_settings
from django.db.utils import IntegrityError
from django.test import TransactionTestCase, override_settings
from django.urls import reverse
from django.utils import timezone

Expand All @@ -29,7 +30,7 @@
}
}
)
class DatabaseBackendTestCase(TestCase):
class DatabaseBackendTestCase(TransactionTestCase):
def test_using_correct_backend(self) -> None:
self.assertEqual(default_task_backend, tasks["default"])
self.assertIsInstance(tasks["default"], DatabaseBackend)
Expand Down Expand Up @@ -204,6 +205,34 @@ def test_database_backend_app_missing(self) -> None:
self.assertEqual(len(errors), 1)
self.assertIn("django_tasks.backends.database", errors[0].hint)

def test_priority_range_check(self) -> None:
with self.assertRaises(IntegrityError):
DBTaskResult.objects.create(
task_path="", backend_name="default", priority=-101, args_kwargs={}
)

with self.assertRaises(IntegrityError):
DBTaskResult.objects.create(
task_path="", backend_name="default", priority=101, args_kwargs={}
)

# Django accepts the float, but only stores an int
result = DBTaskResult.objects.create(
task_path="", backend_name="default", priority=3.1, args_kwargs={}
)
result.refresh_from_db()
self.assertEqual(result.priority, 3)

DBTaskResult.objects.create(
task_path="", backend_name="default", priority=100, args_kwargs={}
)
DBTaskResult.objects.create(
task_path="", backend_name="default", priority=-100, args_kwargs={}
)
DBTaskResult.objects.create(
task_path="", backend_name="default", priority=0, args_kwargs={}
)


@override_settings(
TASKS={
Expand Down Expand Up @@ -309,6 +338,28 @@ def test_failing_task(self) -> None:
self.assertGreaterEqual(result.started_at, result.enqueued_at) # type: ignore
self.assertGreaterEqual(result.finished_at, result.started_at) # type: ignore
self.assertEqual(result.status, ResultStatus.FAILED)
self.assertIsInstance(result.result, ValueError)

self.assertEqual(DBTaskResult.objects.ready().count(), 0)

def test_complex_exception(self) -> None:
result = test_tasks.complex_exception.enqueue()
self.assertEqual(DBTaskResult.objects.ready().count(), 1)

with self.assertNumQueries(8), self.assertLogs(
"django_tasks.backends.database", level="ERROR"
):
self.run_worker()

self.assertEqual(result.status, ResultStatus.NEW)
result.refresh()
self.assertIsNotNone(result.started_at)
self.assertIsNotNone(result.finished_at)

self.assertGreaterEqual(result.started_at, result.enqueued_at) # type: ignore
self.assertGreaterEqual(result.finished_at, result.started_at) # type: ignore
self.assertEqual(result.status, ResultStatus.FAILED)
self.assertIsNone(result.result)

self.assertIsInstance(result.result, ValueError)
assert result.traceback # So that mypy knows the next line is allowed
Expand Down Expand Up @@ -429,6 +480,7 @@ def test_run_after_priority(self) -> None:
high_priority_result = test_tasks.noop_task.using(priority=10).enqueue()

low_priority_result = test_tasks.noop_task.using(priority=2).enqueue()
lower_priority_result = test_tasks.noop_task.using(priority=-2).enqueue()

self.assertEqual(
[dbt.task_result for dbt in DBTaskResult.objects.all()],
Expand All @@ -438,6 +490,7 @@ def test_run_after_priority(self) -> None:
low_priority_result,
far_future_result,
future_result,
lower_priority_result,
],
)

Expand All @@ -446,6 +499,7 @@ def test_run_after_priority(self) -> None:
[
high_priority_result,
low_priority_result,
lower_priority_result,
],
)

Expand Down
4 changes: 2 additions & 2 deletions tests/tests/test_immediate_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ def test_catches_exception(self) -> None:

def test_complex_exception(self) -> None:
result = default_task_backend.enqueue(test_tasks.complex_exception, [], {})
with self.assertLogs("django_tasks.backends.immediate", level="ERROR"):
result = default_task_backend.enqueue(test_tasks.complex_exception, [], {})

self.assertEqual(result.status, ResultStatus.FAILED)
self.assertIsNotNone(result.started_at)
self.assertIsNotNone(result.finished_at)
self.assertGreaterEqual(result.started_at, result.enqueued_at)
self.assertGreaterEqual(result.finished_at, result.started_at)

self.assertIsNone(result.result)
self.assertIsNone(result.traceback)

self.assertIsNone(result.get_result())
self.assertEqual(result.task, test_tasks.complex_exception)
self.assertEqual(result.args, [])
Expand Down
22 changes: 20 additions & 2 deletions tests/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
InvalidTaskError,
ResultDoesNotExist,
)
from django_tasks.task import MAX_PRIORITY, MIN_PRIORITY
from tests import tasks as test_tasks


Expand Down Expand Up @@ -133,9 +134,26 @@ def test_naive_datetime(self) -> None:

def test_invalid_priority(self) -> None:
with self.assertRaisesMessage(
InvalidTaskError, "priority must be zero or greater"
InvalidTaskError,
f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}",
):
test_tasks.noop_task.using(priority=-1)
test_tasks.noop_task.using(priority=-101)

with self.assertRaisesMessage(
InvalidTaskError,
f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}",
):
test_tasks.noop_task.using(priority=101)

with self.assertRaisesMessage(
InvalidTaskError,
f"priority must be a whole number between {MIN_PRIORITY} and {MAX_PRIORITY}",
):
test_tasks.noop_task.using(priority=3.1) # type:ignore[arg-type]

test_tasks.noop_task.using(priority=100)
test_tasks.noop_task.using(priority=-100)
test_tasks.noop_task.using(priority=0)

def test_call_task(self) -> None:
self.assertEqual(test_tasks.calculate_meaning_of_life.call(), 42)
Expand Down
11 changes: 8 additions & 3 deletions tests/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import subprocess
from unittest.mock import Mock

from django.core.exceptions import ImproperlyConfigured
from django.core.exceptions import ImproperlyConfigured
from django.test import SimpleTestCase

from django_tasks import utils
from django_tasks.exceptions import InvalidTaskError
from django_tasks.exceptions import InvalidTaskError
from tests import tasks as test_tasks


Expand Down Expand Up @@ -104,9 +106,9 @@ def test_serialize_exceptions(self) -> None:
self.assertEqual(
set(data.keys()), {"exc_type", "exc_args", "exc_traceback"}
)
exception = utils.exception_from_dict(data)
self.assertIsInstance(exception, type(exc))
self.assertEqual(exception.args, exc.args)
reconstructed = utils.exception_from_dict(data)
self.assertIsInstance(reconstructed, type(exc))
self.assertEqual(reconstructed.args, exc.args)

# Check that the exception traceback contains only one line,
# the one with the name of the exception and the message.
Expand Down Expand Up @@ -152,6 +154,9 @@ def test_cannot_deserialize_non_exception(self) -> None:
{"exc_type": "subprocess.check_output", "exc_args": ["exit", "1"]},
{"exc_type": "True", "exc_args": []},
{"exc_type": "math.pi", "exc_args": []},
{"exc_type": __name__, "exc_args": []},
{"exc_type": utils.get_module_path(type(self)), "exc_args": []},
{"exc_type": utils.get_module_path(Mock), "exc_args": []},
]:
with self.subTest(data):
with self.assertRaises((TypeError, ImportError)):
Expand Down

0 comments on commit 30b7711

Please sign in to comment.