Skip to content

Commit

Permalink
Merge branch 'master' into issue-00100
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeHyuckSa authored Sep 13, 2024
2 parents 46f9a13 + c26bfc5 commit d958238
Show file tree
Hide file tree
Showing 22 changed files with 515 additions and 105 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ jobs:
run: just lint
- name: Run tests
run: just test
- name: Run fast tests
run: just test-fast

test-postgres:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ cover/
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
*.sqlite3
*.sqlite3-journal

# Flask stuff:
instance/
Expand Down
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ just test-sqlite
# To run all of the above:
just test-dbs
```

Due to database worker process' tests, tests cannot run using an in-memory database, which means tests run quite slow locally. If you're not modifying the worker, and want you tests run run quicker, run:

```sh
just test-fast
```
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ assert default_task_backend.supports_get_result

This is particularly useful in combination with Django's [system check framework](https://docs.djangoproject.com/en/stable/topics/checks/).

### Signals

A few [Signals](https://docs.djangoproject.com/en/stable/topics/signals/) are provided to more easily respond to certain task events.

Whilst signals are available, they may not be the most maintainable approach.

- `django_tasks.signals.task_enqueued`: Called when a task is enqueued. The sender is the backend class. Also called with the enqueued `task_result`.
- `django_tasks.signals.task_finished`: Called when a task finishes (`COMPLETE` or `FAILED`). The sender is the backend class. Also called with the finished `task_result`.

## Contributing

See [CONTRIBUTING.md](./CONTRIBUTING.md) for information on how to contribute.
2 changes: 2 additions & 0 deletions django_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DEFAULT_TASK_BACKEND_ALIAS,
ResultStatus,
Task,
TaskResult,
task,
)

Expand All @@ -28,6 +29,7 @@
"task",
"ResultStatus",
"Task",
"TaskResult",
]


Expand Down
9 changes: 7 additions & 2 deletions django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from django_tasks.backends.base import BaseTaskBackend
from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.signals import task_enqueued
from django_tasks.task import Task
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import json_normalize
Expand Down Expand Up @@ -52,10 +53,14 @@ def enqueue(

db_result = self._task_to_db_task(task, args, kwargs)

def save_result() -> None:
db_result.save()
task_enqueued.send(type(self), task_result=db_result.task_result)

if self._get_enqueue_on_commit_for_task(task):
transaction.on_commit(db_result.save)
transaction.on_commit(save_result)
else:
db_result.save()
save_result()

return db_result.task_result

Expand Down
70 changes: 42 additions & 28 deletions django_tasks/backends/database/management/commands/db_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from types import FrameType
from typing import List, Optional

from django.core.exceptions import SuspiciousOperation
from django.core.management.base import BaseCommand
from django.db import connections
from django.db.utils import OperationalError
Expand All @@ -17,28 +18,37 @@
from django_tasks.backends.database.models import DBTaskResult
from django_tasks.backends.database.utils import exclusive_transaction
from django_tasks.exceptions import InvalidTaskBackendError
from django_tasks.task import DEFAULT_QUEUE_NAME, ResultStatus
from django_tasks.signals import task_finished
from django_tasks.task import DEFAULT_QUEUE_NAME

package_logger = logging.getLogger("django_tasks")
logger = logging.getLogger("django_tasks.backends.database.db_worker")


class Worker:
def __init__(
self, *, queue_names: List[str], interval: float, batch: bool, backend_name: str
self,
*,
queue_names: List[str],
interval: float,
batch: bool,
backend_name: str,
startup_delay: bool,
):
self.queue_names = queue_names
self.process_all_queues = "*" in queue_names
self.interval = interval
self.batch = batch
self.backend_name = backend_name
self.startup_delay = startup_delay

self.running = True
self.running_task = False

def shutdown(self, signum: int, frame: Optional[FrameType]) -> None:
if not self.running:
logger.warning(
"Received %s - shutting down immediately.", signal.strsignal(signum)
"Received %s - terminating current task.", signal.strsignal(signum)
)
sys.exit(1)

Expand All @@ -64,7 +74,7 @@ def start(self) -> None:

logger.info("Starting worker for queues=%s", ",".join(self.queue_names))

if self.interval:
if self.startup_delay and self.interval:
# Add a random small delay before starting the loop to avoid a thundering herd
time.sleep(random.random())

Expand All @@ -84,7 +94,7 @@ def start(self) -> None:
except OperationalError as e:
# Ignore locked databases and keep trying.
# It should unlock eventually.
if "database is locked" in e.args[0]:
if "is locked" in e.args[0]:
task_result = None
else:
raise
Expand Down Expand Up @@ -124,32 +134,28 @@ def run_task(self, db_task_result: DBTaskResult) -> None:
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
ResultStatus.RUNNING,
task_result.status,
)
return_value = task.call(*task_result.args, **task_result.kwargs)

# Setting the return and success value inside the error handling,
# So errors setting it (eg JSON encode) can still be recorded
db_task_result.set_complete(return_value)
logger.info(
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
ResultStatus.COMPLETE,
task_finished.send(
sender=type(task.get_backend()), task_result=db_task_result.task_result
)
except BaseException as e:
# Use `.exception` to integrate with error monitoring tools (eg Sentry)
logger.exception(
"Task id=%s path=%s state=%s",
db_task_result.id,
db_task_result.task_path,
ResultStatus.FAILED,
)
db_task_result.set_failed(e)

# If the user tried to terminate, let them
if isinstance(e, KeyboardInterrupt):
raise
try:
sender = type(db_task_result.task.get_backend())
task_result = db_task_result.task_result
except (ModuleNotFoundError, SuspiciousOperation):
logger.exception("Task id=%s failed unexpectedly", db_task_result.id)
else:
task_finished.send(
sender=sender,
task_result=task_result,
)


def valid_backend_name(val: str) -> str:
Expand Down Expand Up @@ -202,21 +208,27 @@ def add_arguments(self, parser: ArgumentParser) -> None:
dest="backend_name",
help="The backend to operate on (default: %(default)r)",
)
parser.add_argument(
"--no-startup-delay",
action="store_false",
dest="startup_delay",
help="Don't add a small delay at startup.",
)

def configure_logging(self, verbosity: int) -> None:
if verbosity == 0:
logger.setLevel(logging.CRITICAL)
package_logger.setLevel(logging.CRITICAL)
elif verbosity == 1:
logger.setLevel(logging.WARNING)
package_logger.setLevel(logging.WARNING)
elif verbosity == 2:
logger.setLevel(logging.INFO)
package_logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.DEBUG)
package_logger.setLevel(logging.DEBUG)

# If no handler is configured, the logs won't show,
# regardless of the set level.
if not logger.hasHandlers():
logger.addHandler(logging.StreamHandler(self.stdout))
if not package_logger.hasHandlers():
package_logger.addHandler(logging.StreamHandler(self.stdout))

def handle(
self,
Expand All @@ -226,6 +238,7 @@ def handle(
interval: float,
batch: bool,
backend_name: str,
startup_delay: bool,
**options: dict,
) -> None:
self.configure_logging(verbosity)
Expand All @@ -235,6 +248,7 @@ def handle(
interval=interval,
batch=batch,
backend_name=backend_name,
startup_delay=startup_delay,
)

worker.start()
Expand Down
16 changes: 11 additions & 5 deletions django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing_extensions import ParamSpec

from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.signals import task_enqueued
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import json_normalize

Expand All @@ -27,6 +28,11 @@ def __init__(self, options: dict) -> None:

self.results = []

def _store_result(self, result: TaskResult) -> None:
object.__setattr__(result, "enqueued_at", timezone.now())
self.results.append(result)
task_enqueued.send(type(self), task_result=result)

def enqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
Expand All @@ -36,7 +42,7 @@ def enqueue(
task=task,
id=str(uuid4()),
status=ResultStatus.NEW,
enqueued_at=timezone.now(),
enqueued_at=None,
started_at=None,
finished_at=None,
args=json_normalize(args),
Expand All @@ -45,12 +51,12 @@ def enqueue(
)

if self._get_enqueue_on_commit_for_task(task) is not False:
# Copy the task to prevent mutation issues
transaction.on_commit(partial(self.results.append, deepcopy(result)))
transaction.on_commit(partial(self._store_result, result))
else:
self.results.append(deepcopy(result))
self._store_result(result)

return result
# Copy the task to prevent mutation issues
return deepcopy(result)

# We don't set `supports_get_result` as the results are scoped to the current thread
def get_result(self, result_id: str) -> TaskResult:
Expand Down
23 changes: 12 additions & 11 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.utils import timezone
from typing_extensions import ParamSpec

from django_tasks.signals import task_enqueued, task_finished
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import exception_to_dict, json_normalize

Expand All @@ -28,6 +29,9 @@ def _execute_task(self, task_result: TaskResult) -> None:
"""
Execute the task for the given `TaskResult`, mutating it with the outcome
"""
object.__setattr__(task_result, "enqueued_at", timezone.now())
task_enqueued.send(type(self), task_result=task_result)

task = task_result.task

calling_task_func = (
Expand All @@ -44,28 +48,25 @@ def _execute_task(self, task_result: TaskResult) -> None:
),
)
except BaseException as e:
# If the user tried to terminate, let them
if isinstance(e, KeyboardInterrupt):
raise

object.__setattr__(task_result, "finished_at", timezone.now())
try:
object.__setattr__(task_result, "_exception_data", exception_to_dict(e))
except Exception:
logger.exception("Task id=%s unable to save exception", task_result.id)

# Use `.exception` to integrate with error monitoring tools (eg Sentry)
logger.exception(
"Task id=%s path=%s state=%s",
task_result.id,
task.module_path,
ResultStatus.FAILED,
)
object.__setattr__(task_result, "status", ResultStatus.FAILED)

# If the user tried to terminate, let them
if isinstance(e, KeyboardInterrupt):
raise
task_finished.send(type(self), task_result=task_result)
else:
object.__setattr__(task_result, "finished_at", timezone.now())
object.__setattr__(task_result, "status", ResultStatus.COMPLETE)

task_finished.send(type(self), task_result=task_result)

def enqueue(
self, task: Task[P, T], args: P.args, kwargs: P.kwargs
) -> TaskResult[T]:
Expand All @@ -75,7 +76,7 @@ def enqueue(
task=task,
id=str(uuid4()),
status=ResultStatus.NEW,
enqueued_at=timezone.now(),
enqueued_at=None,
started_at=None,
finished_at=None,
args=json_normalize(args),
Expand Down
Loading

0 comments on commit d958238

Please sign in to comment.