Calling tasks from a synchronous context #111
-
Hi, I've been learning more & more about this library the past week, and I think there's something I'm misunderstanding fundamentally. For context, I have a Django project. Parts of it use django channels (websocket connections) and parts of it use django rest framework (HTTP). I'd like for both my synchronous code (DRF) and asynchronous code (channels) to be able to interface well with the task queue, and I figured I would try out this project. First off, I want to make sure I'm getting the basics right. I'm starting my worker with a django management command: python manage.py taskiq The above command is equivalent to: taskiq worker my_app.broker:broker my_app.tasks.my_task -r In my case, my directory structure looks like this - where accounts is an unrelated django app, and my_app is the taskiq app: ├── accounts I'm able to run my django server normally with Watching for file changes with StatReloader
Performing system checks...
Running broker.py in env: development That's because my broker is defined like this: import os
from taskiq import InMemoryBroker
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from dotenv import load_dotenv
load_dotenv()
print("Running broker.py in env:", os.environ.get("ENVIRONMENT"))
RESULT_BACKEND_URL = os.environ.get("RESULT_BACKEND_URL")
MESSAGE_BROKER_URL = os.environ.get("MESSAGE_BROKER_URL")
ENVIRONMENT = os.environ.get("ENVIRONMENT")
broker = ListQueueBroker(
url=MESSAGE_BROKER_URL,
result_backend=RedisAsyncResultBackend(
redis_url=RESULT_BACKEND_URL,
result_ex_time=1000, # 1000 seconds
)
)
if ENVIRONMENT == "test":
broker = InMemoryBroker() I think that I am doing it right so far - but I've included it because I could be totally off-base here. Please let me know if I am. Now, my_task is a function that will need to interact with the database. So, I'll need to import models in my_task: from accounts.models import AccountGroup
from accounts.services import AccountGroupServices
@broker.task
async def my_task(account_group_id: int) -> bool:
account_group: AccountGroup = await AccountGroupServices().async_get_account_group(pk=account_group_id)
...
print_success(f"my_task for account_group_id: {account_group_id} completed successfully.")
return True This is where I ran into my first problem. Let me preface this by saying I am totally new to Django and TaskIQ - it is hard for me to differentiate between problems within Django's realm and problems within TaskIQ's realm. I'm adding this to potentially help someone solve the error that I got: django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet. I understand this is because I'm importing database models before calling django.setup(), so I added this: import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_project.settings')
import django
django.setup()
from accounts.models import AccountGroup Now, I am able to call my tasks from my views.py in another app like this: from async_task_queue.tasks.my_task import my_task
class AccountGroupView(generics.CreateAPIView):
""" API endpoint for interacting with the AccountGroup model. """
serializer_class =AccountGroupSerializer
def post(self, request, *args, **kwargs):
account_group_id = request.data.get('account_group_id')
...
result = asyncio.run(my_task.kiq(account_group_id=account_group_id))
wait_res = asyncio.run(result.wait_result(timeout=2))
print_success(success=f"Task result: {wait_res}")
...
return Response({'message': 'Success}, status=status.HTTP_201_CREATED) And it works properly - in my console running the Django server, I see the wait_res values: SUCCESS: Task result: is_err=False log=None return_value=True execution_time=0.5232694149017334 In my console running the worker, I see the log output I expected, too: SUCCESS: my_task for account_group_id: 2 completed successfully All actions completed successfully, including changes to the database. However, if I then run the task for a second time too quickly after that, I get an exception: RuntimeError: Task <Task cancelling name='Task-10' coro=<AsyncTaskiqDecoratedTask.kiq() running at /home/tyler/.virtualenvs/my_project/lib/python3.11/site-packages/taskiq/decor.py:94> cb=[_run_until_complete_cb() at /usr/lib/python3.11/asyncio/base_events.py:180]> got Future <Future pending> attached to a different loop It raises an exception: RuntimeError: Event loop is closed I feel like everything I've tried isn't working. I know that asyncio.run() is creating an event loop & should be closing it - but I did try this anyways: async def run_my_task(account_group_id: int) -> None:
""" Run my_task in an asynchronous context. """
task = await my_task.kiq(account_group_id=account_group_id)
result = await task.wait_result(timeout=2)
# In my post() handler I did things like:
# asyncio.get_event_loop().run_until_complete(run_my_task(account_group_id=account_group_id))
# I also tried:
# result = asyncio.run(run_my_task(account_group_id=account_group_id)) However, both of those resulted in the same problem when trying to run the task after a successful run: During handling of the above exception (Task <Task cancelling name='Task-10' coro=<AsyncTaskiqDecoratedTask.kiq() running at /home/tyler/.virtualenvs/my_project/lib/python3.11/site-packages/taskiq/decor.py:94> cb=[_run_until_complete_cb() at /usr/lib/python3.11/asyncio/base_events.py:180]> got Future <Future pending> attached to a different loop), another exception occurred:
/home/tyler/.virtualenvs/my_project/lib/python3.11/site-packages/taskiq/kicker.py, line 132, in kiq Can someone help me to understand where I'm going wrong? Am I fundamentally misunderstanding how to use the library? Edit: I ended up installing adrf and it works now. I still think there's probably some stuff I'm not understanding though. I will leave the discussion open, maybe there's someone out there who could learn from what I've written here. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
The problem of event loop happens because when you call import asyncio
loop = asyncio.new_event_loop() Then you can run tasks using your global event loop and from async_task_queue.my_loop import loop
from async_task_queue.tasks.my_task import my_task
class AccountGroupView(generics.CreateAPIView):
""" API endpoint for interacting with the AccountGroup model. """
serializer_class =AccountGroupSerializer
def post(self, request, *args, **kwargs):
account_group_id = request.data.get('account_group_id')
...
result = loop.run_until_complete(my_task.kiq(account_group_id=account_group_id))
wait_res = loop.run_until_complete(result.wait_result(timeout=2))
print_success(success=f"Task result: {wait_res}")
...
return Response({'message': 'Success}, status=status.HTTP_201_CREATED) Hope it helps. |
Beta Was this translation helpful? Give feedback.
@s3rius No worries, I understand you're busy. Your answer sounds logical - that said, it seemed more reasonable in my case to just make my views asynchronous, too. That's how
adrf
solved my issue;adrf
is just a library for adding async support to Django REST framework.It's probably important to note: I realize I never mentioned in the original post that the view in my example inherited from
generics.CreateAPIView
, which is provided byrest_framework
(Django REST framework). DRF doesn't support async views out of the box, but has recently suggested that people useadrf
as an add-on to add support for async views.The view can be made async instead by first installing adrf: