-
I have spotted this before and have managed to resolve and am back at seeing the same message being emitted by the worker. I am running the application (FastAPI based, all source code available here). The worker is kicker off inside a container: # TaskIQ worker
worker:
container_name: ${PROJ_NAME}-worker
build:
context: .
dockerfile: Dockerfile
command: ["taskiq", "worker", "${PROJ_NAME}.broker:broker"]
env_file:
- .env.development
restart: unless-stopped
volumes:
- ./src/${PROJ_NAME}:/opt/${PROJ_NAME}
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
rabbitmq:
condition: service_healthy The broker is defined as per the examples and uses from .config import config
# See PyPI for more information about taskiq_redis
# https://pypi.org/project/taskiq-redis/
#
# Redis is recommended as the results backend for the tasks
from taskiq_redis import RedisAsyncResultBackend
# RabbitMQ is recommended as the broker
from taskiq_aio_pika import AioPikaBroker
redis_result_backend = RedisAsyncResultBackend(
config.redis_dsn
)
broker = AioPikaBroker(
config.amqp_dsn,
result_backend=redis_result_backend
) The # TaskIQ configurartion so we can share FastAPI dependencies in tasks
@app.on_event("startup")
async def app_startup():
if not broker.is_worker_process:
await broker.startup()
# On shutdown, we need to shutdown the broker
@app.on_event("shutdown")
async def app_shutdown():
if not broker.is_worker_process:
await broker.shutdown() For the purposes of getting things working I have Extract from from sqlalchemy.ext.asyncio import AsyncSession
from taskiq import TaskiqDepends
from ...db import get_async_session
from ...broker import broker
@broker.task
async def send_account_verification_email(
session: AsyncSession = TaskiqDepends(get_async_session)
) -> None:
import logging
logging.error("Kicking off send_account_verification_email") Extract from @router.get("/verify")
async def verify_user(request: Request):
"""Verify an account
"""
await send_account_verification_email.kiq()
return {"message": "hello world"} When I kick off the events, the handler returns as expected, but the worker logs a message saying:
I know I had this working at some point and obviously am missing something that is causing it to not work again. I thought I'd reach out here to see if anyone can point my error. Appreciate you sparing your time. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
I found it. I had to specifically reference the
I have to investigate the
|
Beta Was this translation helpful? Give feedback.
I found it. I had to specifically reference the
tasks
packageI have to investigate the
-fsd
cli option to ensure auto discovery of tasks.