Skip to content

Commit 56e9343

Browse files
committed
Support running coroutines in new asyncio worker
This commit adds a new type of worker, in addition to the existing threaded worker, that can run tasks defined as asyncio coroutines. This makes Spinach compatible with the whole asyncio ecosystem while providing better concurrency for tasks that are heavily IO bound. The implementation swaps the queue used between the engine and the workers with one that works both with sync and async code. Then the existing `Workers` class is split into two main parts: the threaded workers (using the sync part of the queue) and the asyncio workers (using the async part of the queue). There are two main shortcomings to this implementation: 1. Scheduling jobs is still blocking, so when creating a job from an asyncio task, care must be taken to wrap the call in `asyncio.to_thread` to prevent the event loop from blocking. 2. The compatibility with 3rd-party integrations defined in the contrib package is not guarantied when running asyncio tasks.
1 parent 232f4c6 commit 56e9343

13 files changed

+492
-128
lines changed

README.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ Redis task queue for Python 3 heavily inspired by Celery and RQ.
1414

1515
Distinctive features:
1616

17+
- Threaded and asyncio workers
1718
- At-least-once or at-most-once delivery per task
1819
- Periodic tasks without an additional process
1920
- Concurrency limits on queued jobs
2021
- Scheduling of tasks in batch
2122
- Integrations with `Flask, Django, Logging, Sentry and Datadog
2223
<https://spinach.readthedocs.io/en/stable/user/integrations.html>`_
2324
- Embeddable workers for easier testing
24-
- Python 3, threaded, explicit... see `design choices
25+
- See `design choices
2526
<https://spinach.readthedocs.io/en/stable/user/design.html>`_ for more
2627
details
2728

doc/index.rst

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ Spinach is a Redis task queue for Python 3 heavily inspired by Celery and RQ.
77

88
Distinctive features:
99

10+
- Threaded and asyncio workers
1011
- At-least-once or at-most-once delivery per task
1112
- Periodic tasks without an additional process
1213
- Concurrency limits on queued jobs
1314
- Scheduling of tasks in batch
1415
- Embeddable workers for easier testing
1516
- Integrations with :ref:`Flask, Django, Logging, Sentry and Datadog
1617
<integrations>`
17-
- Python 3, threaded, explicit... see :ref:`design choices <design>` for more
18-
details
18+
- See :ref:`design choices <design>` for more details
1919

2020
Installation::
2121

@@ -53,6 +53,7 @@ Getting started with spinach:
5353
user/jobs
5454
user/engine
5555
user/queues
56+
user/asyncio
5657
user/integrations
5758
user/signals
5859
user/production

doc/user/asyncio.rst

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
.. _asyncio:
2+
3+
Asyncio
4+
=======
5+
6+
Spinach allows to define and run tasks as asyncio coroutines. In this mode the worker is a single
7+
thread that runs all tasks asynchronously. This allows for greater concurrency as well as
8+
compatibility with the asyncio ecosystem.
9+
10+
Creating async tasks
11+
--------------------
12+
13+
To define an asynchronous task, just prefix its definition with the ``async`` keyword::
14+
15+
@spin.task(name='compute')
16+
async def compute(a, b):
17+
await asyncio.sleep(1)
18+
print('Computed {} + {} = {}'.format(a, b, a + b))
19+
20+
To run the workers in asynchronous mode, pass the ``AsyncioWorkers`` class to ``start_workers``::
21+
22+
from spinach import AsyncioWorkers
23+
24+
spin.start_workers(number=256, workers_class=AsyncioWorkers)
25+
26+
When using the asyncio workers, the ``number`` argument can be set quite high because each worker
27+
is just a coroutine, consuming a negligible amount of resources.
28+
29+
Scheduling jobs
30+
---------------
31+
32+
Because internally only workers are asyncio aware, jobs are still sent to Redis using a blocking
33+
socket. This means that to schedule jobs from asynchronous code, care must be taken to send jobs
34+
from outside the event loop. This can be achieve using `asyncio.to_thread
35+
<https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread>`_::
36+
37+
await asyncio.to_thread(spin.schedule, compute, 2, 4)
38+
39+
Code scheduling a lot of jobs should use :ref:`batches <batch>` to improve performance.
40+
41+
Example
42+
-------
43+
44+
.. literalinclude:: ../../examples/asyncio_workers.py
45+
46+
47+
.. note:: If an application defines both sync and async tasks, each kind of task should go in its
48+
own :ref:`queue <queues>` so that sync tasks are picked by threaded workers and async
49+
tasks by asyncio workers.
50+
51+
.. note:: Not all contrib :ref:`integrations <integrations>` may work with asynchronous workers.

doc/user/design.rst

+12-23
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,22 @@ be summed up as: explicit is better than implicit. Spinach makes sure that it
1111
does not provide any convenient feature that can backfire in more complex
1212
usages.
1313

14-
Threaded workers
15-
----------------
14+
Threaded & asynchronous workers
15+
-------------------------------
1616

17-
Spinach workers are threaded while other task queues like Celery or RQ rely on
18-
processes by default.
17+
Spinach workers are either threaded or asynchronous while other task queues
18+
like Celery or RQ rely on processes by default.
1919

20-
Threaded workers work best with IO bound tasks: tasks that make requests to
21-
other services, query a database or read files. If your task are CPU bound,
22-
meaning that you do heavy computations in Python, a process based worker will
23-
be more efficient.
20+
Threaded and asynchronous workers work best with IO bound tasks: tasks that
21+
make requests to other services, query a database or read files. If your tasks
22+
are CPU bound, meaning that you do heavy computations in Python, a process
23+
based worker will be more efficient.
2424

2525
Tasks in a typical web application are more often than not IO bound. The choice
26-
of threads as unit of concurrency is a sensible one.
27-
28-
Threads also have the advantage of being lighter than processes, a system can
29-
handle more threads than processes before resources get exhausted.
30-
31-
Thread safety
32-
~~~~~~~~~~~~~
33-
34-
As Spinach workers are threads, care must be taken to make sure that the
35-
application is thread-safe. The good news is that your application is probably
36-
already thread-safe: web frameworks are often run threaded as well, so they
37-
take care of most of the heavy work for you.
26+
of threads or coroutines as unit of concurrency is a sensible one.
3827

39-
You can read an article I wrote for an `introduction to thread-safety
40-
<https://lemanchet.fr/articles/learning-python-3-threading-module.html>`_.
28+
Threads and coroutines also have the advantage of being lighter than processes,
29+
a system can handle more threads than processes before resources get exhausted.
4130

4231
Fork
4332
~~~~
@@ -191,4 +180,4 @@ the ability to process jobs.
191180

192181
Because worker processes can die unexpectedly (power loss, OOM killed, extended
193182
network outage...), Spinach tries to detect dead workers and reschedule
194-
the jobs that were running on them if the jobs are safe to be retried.
183+
the jobs that were running on them if the jobs are safe to be retried.

doc/user/production.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ Advices to read before deploying an application using Spinach to production.
88
Spinach
99
-------
1010

11-
Since Spinach relies heavily on threads the user's code MUST be thread-safe.
12-
This is usually quite easy to achieve on a traditional web application because
13-
frameworks like Flask or Django make that obvious.
11+
Since by default Spinach executes jobs in a separate threads, the user's code
12+
must be thread-safe. This is usually quite easy to achieve on a traditional web
13+
application because frameworks like Flask or Django make that straightforward.
1414

1515
Tasks should not store state in the process between invocations. Instead all
1616
state must be stored in an external system, like a database or a cache. This
@@ -70,7 +70,7 @@ Spinach:
7070
- Task `args` and `kwargs` are JSON serializable and small in size
7171
- Jobs are sent in :class:`Batch` to the broker when multiple jobs are to be
7272
scheduled at once
73-
- The user's code is thread-safe
73+
- The user's code is thread-safe when using the default threaded workers
7474
- Tasks do not store state in the process between invocations
7575
- Logging is configured and exceptions are sent to Sentry, see
7676
:doc:`integrations`

doc/user/tasks.rst

+2
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ directly on the :class:`Engine` using::
173173
.. autoclass:: spinach.task.Tasks
174174
:members:
175175

176+
.. _batch:
177+
176178
Batch
177179
-----
178180

examples/asyncio_workers.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import aiohttp
2+
from spinach import Engine, MemoryBroker, Batch, AsyncioWorkers
3+
4+
spin = Engine(MemoryBroker())
5+
6+
7+
@spin.task(name='get_pokemon_name')
8+
async def get_pokemon_name(pokemon_id: int):
9+
"""Call an HTTP API to retrieve a pokemon name by its ID."""
10+
url = f'https://pokeapi.co/api/v2/pokemon/{pokemon_id}'
11+
async with aiohttp.ClientSession() as session:
12+
async with session.get(url) as response:
13+
pokemon = await response.json()
14+
15+
print(f'Pokemon #{pokemon_id} is {pokemon["name"]}')
16+
17+
18+
# Schedule a batch of 150 tasks to retrieve the name of the
19+
# first 150 pokemons.
20+
batch = Batch()
21+
for pokemon_id in range(1, 151):
22+
batch.schedule(get_pokemon_name, pokemon_id)
23+
spin.schedule_batch(batch)
24+
25+
# Start the asyncio workers and process the tasks
26+
spin.start_workers(
27+
number=256,
28+
workers_class=AsyncioWorkers,
29+
stop_when_queue_empty=True
30+
)

spinach/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
from .const import VERSION
44
from .engine import Engine
55
from .task import Tasks, Batch, RetryException, AbortException
6+
from .worker import ThreadWorkers, AsyncioWorkers
67

78
__version__ = VERSION

spinach/engine.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from datetime import datetime, timezone
22
from logging import getLogger
33
import threading
4+
from typing import Type
45

56
from .task import Tasks, Batch, Schedulable
67
from .utils import run_forever, handle_sigterm
78
from .job import Job, JobStatus, advance_job_status
89
from .brokers.base import Broker
910
from .const import DEFAULT_QUEUE, DEFAULT_NAMESPACE, DEFAULT_WORKER_NUMBER
10-
from .worker import Workers
11+
from .worker import BaseWorkers, ThreadWorkers
1112
from . import exc
1213

1314

@@ -185,17 +186,21 @@ def _arbiter_func(self, stop_when_queue_empty=False):
185186

186187
logger.debug('Arbiter terminated')
187188

188-
def start_workers(self, number: int=DEFAULT_WORKER_NUMBER,
189-
queue=DEFAULT_QUEUE, block=True,
190-
stop_when_queue_empty=False):
189+
def start_workers(self, number: int = DEFAULT_WORKER_NUMBER,
190+
queue: str = DEFAULT_QUEUE, block: bool = True,
191+
stop_when_queue_empty=False,
192+
workers_class: Type[BaseWorkers] = ThreadWorkers):
191193
"""Start the worker threads.
192194
193-
:arg number: number of worker threads to launch
194-
:arg queue: name of the queue to consume, see :doc:`queues`
195+
:arg number: number of workers to launch, each job running uses one
196+
worker.
197+
:arg queue: name of the queue to consume, see :doc:`queues`.
195198
:arg block: whether to block the calling thread until a signal arrives
196-
and workers get terminated
199+
and workers get terminated.
197200
:arg stop_when_queue_empty: automatically stop the workers when the
198201
queue is empty. Useful mostly for one-off scripts and testing.
202+
:arg worker_class: Class to change the behavior of workers,
203+
defaults to threaded workers
199204
"""
200205
if self._arbiter or self._workers:
201206
raise RuntimeError('Workers are already running')
@@ -213,7 +218,7 @@ def start_workers(self, number: int=DEFAULT_WORKER_NUMBER,
213218
self._broker.start()
214219

215220
# Start workers
216-
self._workers = Workers(
221+
self._workers = workers_class(
217222
num_workers=number,
218223
namespace=self.namespace,
219224
)

spinach/queuey.py

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import asyncio
2+
from collections import deque
3+
from concurrent.futures import Future
4+
import threading
5+
from typing import Tuple, Optional, Any, Deque
6+
7+
8+
class Queuey:
9+
"""Hybrid queue allowing to interface sync and async(io) code.
10+
11+
It is widely inspired by a talk by David Beazley on the subject:
12+
https://www.youtube.com/watch?v=x1ndXuw7S0s
13+
14+
One big difference with a normal queue is that even with a maxsize
15+
set to a fixed number, this queue can still end up taking an
16+
infinite amount of memory since pending get/put operation are kept
17+
as futures.
18+
19+
It is an alternative to the 3rd-party Janus library which had
20+
shortcomings when used in Spinach:
21+
- Janus queues have to be created in an asyncio coroutine, turning
22+
the creation of the queue in the Workers class into a strange dance.
23+
- It was not obvious to me how to implement showing the queue as full
24+
if there are unfinished tasks.
25+
- It adds a few dependencies only needed by a fractions of users, adds a
26+
ton of code for something that should be simple.
27+
"""
28+
29+
def __init__(self, maxsize: int):
30+
self.maxsize = maxsize
31+
self._mutex = threading.Lock()
32+
self._items: Deque[Any] = deque()
33+
self._getters: Deque[Future] = deque()
34+
self._putters: Deque[Tuple[Any, Future]] = deque()
35+
self._unfinished_tasks = 0
36+
37+
def _get_noblock(self) -> Tuple[Optional[Any], Optional[Future]]:
38+
with self._mutex:
39+
if self._items:
40+
if self._putters:
41+
# About to remove one item from the queue which means
42+
# that a new spot will be available. Since there are
43+
# putters waiting, wake up one and take its item.
44+
item, put_fut = self._putters.popleft()
45+
self._items.append(item)
46+
put_fut.set_result(True)
47+
return self._items.popleft(), None
48+
49+
else:
50+
fut = Future()
51+
self._getters.append(fut)
52+
return None, fut
53+
54+
def _put_noblock(self, item: Any) -> Optional[Future]:
55+
with self._mutex:
56+
if len(self._items) < self.maxsize:
57+
self._items.append(item)
58+
self._unfinished_tasks += 1
59+
if self._getters:
60+
self._getters.popleft().set_result(self._items.popleft())
61+
else:
62+
fut = Future()
63+
self._putters.append((item, fut))
64+
return fut
65+
66+
def get_sync(self) -> Any:
67+
item, fut = self._get_noblock()
68+
if fut:
69+
item = fut.result()
70+
return item
71+
72+
def put_sync(self, item: Any) -> None:
73+
fut = self._put_noblock(item)
74+
if fut is None:
75+
return
76+
77+
fut.result()
78+
79+
async def get_async(self) -> Any:
80+
item, fut = self._get_noblock()
81+
if fut:
82+
item = await asyncio.wait_for(asyncio.wrap_future(fut), None)
83+
return item
84+
85+
async def put_async(self, item: Any) -> None:
86+
fut = self._put_noblock(item)
87+
if fut is None:
88+
return
89+
90+
await asyncio.wait_for(asyncio.wrap_future(fut), None)
91+
92+
def task_done(self) -> None:
93+
"""Indicate that a formerly enqueued task is complete.
94+
95+
Raises a ValueError if called more times than there were items
96+
placed in the queue.
97+
"""
98+
with self._mutex:
99+
unfinished = self._unfinished_tasks - 1
100+
if unfinished < 0:
101+
raise ValueError('task_done() called too many times')
102+
103+
self._unfinished_tasks = unfinished
104+
105+
def empty(self) -> bool:
106+
with self._mutex:
107+
return self._unfinished_tasks == 0
108+
109+
def full(self) -> bool:
110+
with self._mutex:
111+
return self.maxsize <= self._unfinished_tasks
112+
113+
def available_slots(self) -> int:
114+
with self._mutex:
115+
return self.maxsize - self._unfinished_tasks

0 commit comments

Comments
 (0)