Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Signals #104

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ assert default_task_backend.supports_get_result

This is particularly useful in combination with Django's [system check framework](https://docs.djangoproject.com/en/stable/topics/checks/).

### Signals

A few [Signals](https://docs.djangoproject.com/en/stable/topics/signals/) are provided to more easily respond to certain task events.

Whilst signals are available, they may not be the most maintainable approach.

- `django_tasks.signals.task_enqueued`: Called when a task is enqueued. The sender is the backend class. Also called with the enqueued `task_result`.
- `django_tasks.signals.task_finished`: Called when a task finishes (`COMPLETE` or `FAILED`). The sender is the backend class. Also called with the finished `task_result`.

## Contributing

See [CONTRIBUTING.md](./CONTRIBUTING.md) for information on how to contribute.
2 changes: 2 additions & 0 deletions django_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULT_TASK_BACKEND_ALIAS,
ResultStatus,
Task,
TaskResult,
task,
)

Expand All @@ -28,6 +29,7 @@
"task",
"ResultStatus",
"Task",
"TaskResult",
]


Expand Down
9 changes: 7 additions & 2 deletions django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from django_tasks.backends.base import BaseTaskBackend
from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.signals import task_enqueued
from django_tasks.task import Task
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import json_normalize
Expand Down Expand Up @@ -52,10 +53,14 @@ def enqueue(

db_result = self._task_to_db_task(task, args, kwargs)

def save_result() -> None:
db_result.save()
task_enqueued.send(type(self), task_result=db_result.task_result)

if self._get_enqueue_on_commit_for_task(task):
transaction.on_commit(db_result.save)
transaction.on_commit(save_result)
else:
db_result.save()
save_result()

return db_result.task_result

Expand Down
43 changes: 23 additions & 20 deletions django_tasks/backends/database/management/commands/db_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from types import FrameType
from typing import List, Optional

from django.core.exceptions import SuspiciousOperation
from django.core.management.base import BaseCommand
from django.db import connections
from django.db.utils import OperationalError
Expand All @@ -17,8 +18,10 @@
from django_tasks.backends.database.models import DBTaskResult
from django_tasks.backends.database.utils import exclusive_transaction
from django_tasks.exceptions import InvalidTaskBackendError
from django_tasks.task import DEFAULT_QUEUE_NAME, ResultStatus
from django_tasks.signals import task_finished
from django_tasks.task import DEFAULT_QUEUE_NAME

package_logger = logging.getLogger("django_tasks")
logger = logging.getLogger("django_tasks.backends.database.db_worker")


Expand Down Expand Up @@ -124,28 +127,28 @@ def run_task(self, db_task_result: DBTaskResult) -> None:
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
ResultStatus.RUNNING,
task_result.status,
)
return_value = task.call(*task_result.args, **task_result.kwargs)

# Setting the return and success value inside the error handling,
# So errors setting it (eg JSON encode) can still be recorded
db_task_result.set_complete(return_value)
logger.info(
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
ResultStatus.COMPLETE,
task_finished.send(
sender=type(task.get_backend()), task_result=db_task_result.task_result
)
except BaseException as e:
# Use `.exception` to integrate with error monitoring tools (eg Sentry)
logger.exception(
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
ResultStatus.FAILED,
)
db_task_result.set_failed(e)
try:
sender = type(db_task_result.task.get_backend())
task_result = db_task_result.task_result
except (ModuleNotFoundError, SuspiciousOperation):
logger.exception("Task id=%s failed unexpectedly", db_task_result.id)
else:
task_finished.send(
sender=sender,
task_result=task_result,
)

# If the user tried to terminate, let them
if isinstance(e, KeyboardInterrupt):
Expand Down Expand Up @@ -205,18 +208,18 @@ def add_arguments(self, parser: ArgumentParser) -> None:

def configure_logging(self, verbosity: int) -> None:
if verbosity == 0:
logger.setLevel(logging.CRITICAL)
package_logger.setLevel(logging.CRITICAL)
elif verbosity == 1:
logger.setLevel(logging.WARNING)
package_logger.setLevel(logging.WARNING)
elif verbosity == 2:
logger.setLevel(logging.INFO)
package_logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.DEBUG)
package_logger.setLevel(logging.DEBUG)

# If no handler is configured, the logs won't show,
# regardless of the set level.
if not logger.hasHandlers():
logger.addHandler(logging.StreamHandler(self.stdout))
if not package_logger.hasHandlers():
package_logger.addHandler(logging.StreamHandler(self.stdout))

def handle(
self,
Expand Down
16 changes: 11 additions & 5 deletions django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing_extensions import ParamSpec

from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.signals import task_enqueued
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import json_normalize

Expand All @@ -27,6 +28,11 @@ def __init__(self, options: dict) -> None:

self.results = []

def _store_result(self, result: TaskResult) -> None:
object.__setattr__(result, "enqueued_at", timezone.now())
self.results.append(result)
task_enqueued.send(type(self), task_result=result)

def enqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
Expand All @@ -36,7 +42,7 @@ def enqueue(
task=task,
id=str(uuid4()),
status=ResultStatus.NEW,
enqueued_at=timezone.now(),
enqueued_at=None,
started_at=None,
finished_at=None,
args=json_normalize(args),
Expand All @@ -45,12 +51,12 @@ def enqueue(
)

if self._get_enqueue_on_commit_for_task(task) is not False:
# Copy the task to prevent mutation issues
transaction.on_commit(partial(self.results.append, deepcopy(result)))
transaction.on_commit(partial(self._store_result, result))
else:
self.results.append(deepcopy(result))
self._store_result(result)

return result
# Copy the task to prevent mutation issues
return deepcopy(result)

# We don't set `supports_get_result` as the results are scoped to the current thread
def get_result(self, result_id: str) -> TaskResult:
Expand Down
23 changes: 12 additions & 11 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.utils import timezone
from typing_extensions import ParamSpec

from django_tasks.signals import task_enqueued, task_finished
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import exception_to_dict, json_normalize

Expand All @@ -28,6 +29,9 @@ def _execute_task(self, task_result: TaskResult) -> None:
"""
Execute the task for the given `TaskResult`, mutating it with the outcome
"""
object.__setattr__(task_result, "enqueued_at", timezone.now())
task_enqueued.send(type(self), task_result=task_result)

task = task_result.task

calling_task_func = (
Expand All @@ -44,28 +48,25 @@ def _execute_task(self, task_result: TaskResult) -> None:
),
)
except BaseException as e:
# If the user tried to terminate, let them
if isinstance(e, KeyboardInterrupt):
raise

object.__setattr__(task_result, "finished_at", timezone.now())
try:
object.__setattr__(task_result, "_exception_data", exception_to_dict(e))
except Exception:
logger.exception("Task id=%s unable to save exception", task_result.id)

# Use `.exception` to integrate with error monitoring tools (eg Sentry)
logger.exception(
"Task id=%s path=%s state=%s",
task_result.id,
task.module_path,
ResultStatus.FAILED,
)
object.__setattr__(task_result, "status", ResultStatus.FAILED)

# If the user tried to terminate, let them
if isinstance(e, KeyboardInterrupt):
raise
task_finished.send(type(self), task_result=task_result)
else:
object.__setattr__(task_result, "finished_at", timezone.now())
object.__setattr__(task_result, "status", ResultStatus.COMPLETE)

task_finished.send(type(self), task_result=task_result)

def enqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
Expand All @@ -75,7 +76,7 @@ def enqueue(
task=task,
id=str(uuid4()),
status=ResultStatus.NEW,
enqueued_at=timezone.now(),
enqueued_at=None,
started_at=None,
finished_at=None,
args=json_normalize(args),
Expand Down
39 changes: 39 additions & 0 deletions django_tasks/signal_handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import logging
from typing import Type

from asgiref.local import Local
from django.core.signals import setting_changed
from django.dispatch import receiver

from django_tasks import BaseTaskBackend, ResultStatus, TaskResult

from .signals import task_enqueued, task_finished

logger = logging.getLogger("django_tasks")


@receiver(setting_changed)
def clear_tasks_handlers(*, setting: str, **kwargs: dict) -> None:
Expand All @@ -13,3 +22,33 @@ def clear_tasks_handlers(*, setting: str, **kwargs: dict) -> None:

tasks._settings = tasks.settings = tasks.configure_settings(None) # type:ignore[attr-defined]
tasks._connections = Local() # type:ignore[attr-defined]


@receiver(task_enqueued)
def log_task_enqueued(
sender: Type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
) -> None:
logger.debug(
"Task id=%s path=%s enqueued backend=%s",
task_result.id,
task_result.task.module_path,
task_result.backend,
)


@receiver(task_finished)
def log_task_finished(
sender: Type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
) -> None:
if task_result.status == ResultStatus.FAILED:
# Use `.exception` to integrate with error monitoring tools (eg Sentry)
log_method = logger.exception
else:
log_method = logger.info

log_method(
"Task id=%s path=%s state=%s",
task_result.id,
task_result.task.module_path,
task_result.status,
)
4 changes: 4 additions & 0 deletions django_tasks/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from django.dispatch import Signal

task_enqueued = Signal()
task_finished = Signal()
Loading
Loading