diff --git a/saq/queue/postgres.py b/saq/queue/postgres.py index 20e4656..78a3301 100644 --- a/saq/queue/postgres.py +++ b/saq/queue/postgres.py @@ -43,7 +43,6 @@ "Missing dependencies for Postgres. Install them with `pip install saq[postgres]`." ) from e -SWEEP_LOCK_KEY = 0 ENQUEUE_CHANNEL = "saq:enqueue" JOBS_TABLE = "saq_jobs" STATS_TABLE = "saq_stats" @@ -256,10 +255,11 @@ async def sweep(self, lock: int = 60, abort: float = 5.0) -> list[str]: # Attempt to get the sweep lock and hold on to it async with self._get_connection() as conn, conn.cursor() as cursor, conn.transaction(): await cursor.execute( - SQL("SELECT pg_try_advisory_lock({key1}, {key2})").format( - key1=self.saq_lock_keyspace, - key2=SWEEP_LOCK_KEY, - ) + SQL("SELECT pg_try_advisory_lock(%(key1)s, hashtext(%(queue)s))"), + { + "key1": self.saq_lock_keyspace, + "queue": self.name, + }, ) result = await cursor.fetchone() if result and not result[0]: @@ -272,38 +272,67 @@ async def sweep(self, lock: int = 60, abort: float = 5.0) -> list[str]: SQL( dedent( """ - -- Delete expired jobs - DELETE FROM {jobs_table} - WHERE status IN ('aborted', 'complete', 'failed') - AND {now} >= expire_at; - - -- Delete expired stats - DELETE FROM {stats_table} - WHERE {now} >= expire_at; - - -- Fetch active and aborting jobs without advisory locks - WITH locks AS ( - SELECT objid - FROM pg_locks - WHERE locktype = 'advisory' - AND classid = {job_lock_keyspace} - AND objsubid = 2 -- key is int pair, not single bigint - ) - SELECT key, job, objid - FROM {jobs_table} LEFT OUTER JOIN locks ON lock_key = objid - WHERE status IN ('active', 'aborting'); - """ + -- Delete expired jobs + DELETE FROM {jobs_table} + WHERE queue = %(queue)s + AND status IN ('aborted', 'complete', 'failed') + AND %(now)s >= expire_at; + """ ) ).format( jobs_table=self.jobs_table, stats_table=self.stats_table, - now=math.ceil(seconds(now())), - job_lock_keyspace=self.job_lock_keyspace, - ) + ), + { + "queue": self.name, + "now": math.ceil(seconds(now())), + }, + ) + + await cursor.execute( + SQL( + dedent( + """ + -- Delete expired stats + DELETE FROM {stats_table} + WHERE %(now)s >= expire_at; + """ + ) + ).format( + jobs_table=self.jobs_table, + stats_table=self.stats_table, + ), + { + "now": math.ceil(seconds(now())), + }, + ) + + await cursor.execute( + SQL( + dedent( + """ + -- Fetch active and aborting jobs without advisory locks + WITH locks AS ( + SELECT objid + FROM pg_locks + WHERE locktype = 'advisory' + AND classid = %(job_lock_keyspace)s + AND objsubid = 2 -- key is int pair, not single bigint + ) + SELECT key, job, objid + FROM {jobs_table} LEFT OUTER JOIN locks ON lock_key = objid + WHERE queue = %(queue)s + AND status IN ('active', 'aborting'); + """ + ) + ).format( + jobs_table=self.jobs_table, + ), + { + "queue": self.name, + "job_lock_keyspace": self.job_lock_keyspace, + }, ) - # Move cursor past result sets for the first two delete statements - cursor.nextset() - cursor.nextset() results = await cursor.fetchall() for key, job_bytes, objid in results: job = self.deserialize(job_bytes)