Skip to content

Commit

Permalink
fix(backend/postgres): allow concurrent pubs
Browse files Browse the repository at this point in the history
This fix adds an asyncio.Lock to avoid
`asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress`

fixes encode#22
  • Loading branch information
pwoolvett committed May 13, 2022
1 parent 9255c29 commit 54e70b2
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
10 changes: 7 additions & 3 deletions broadcaster/_backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@ def __init__(self, url: str):

async def connect(self) -> None:
self._conn = await asyncpg.connect(self._url)
self._lock = asyncio.Lock()
self._listen_queue: asyncio.Queue = asyncio.Queue()

async def disconnect(self) -> None:
await self._conn.close()

async def subscribe(self, channel: str) -> None:
await self._conn.add_listener(channel, self._listener)
async with self._lock:
await self._conn.add_listener(channel, self._listener)

async def unsubscribe(self, channel: str) -> None:
await self._conn.remove_listener(channel, self._listener)
async with self._lock:
await self._conn.remove_listener(channel, self._listener)

async def publish(self, channel: str, message: str) -> None:
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)
async with self._lock:
await self._conn.execute("SELECT pg_notify($1, $2);", channel, message)

def _listener(self, *args: Any) -> None:
connection, pid, channel, payload = args
Expand Down
32 changes: 32 additions & 0 deletions tests/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,35 @@ async def test_broadcast(setup_broadcast):
event = await subscriber.get()
assert event.channel == channel
assert event.message == msg


@pytest.mark.asyncio
@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True)
async def test_sub(setup_broadcast):
uid = uuid4()
channel1 = f"chatroom-{uid}1"
channel2 = f"chatroom-{uid}2"

to_sub = [
setup_broadcast._backend.subscribe(channel1),
setup_broadcast._backend.subscribe(channel2),
]
await asyncio.gather(*to_sub)


@pytest.mark.asyncio
@pytest.mark.parametrize(["setup_broadcast"], URLS, indirect=True)
async def test_unsub(setup_broadcast):
uid = uuid4()
channel1 = f"chatroom-{uid}1"
channel2 = f"chatroom-{uid}2"

await setup_broadcast._backend.subscribe(channel1)
await setup_broadcast._backend.subscribe(channel2)

to_unsub = [
setup_broadcast._backend.unsubscribe(channel1),
setup_broadcast._backend.unsubscribe(channel2),
]

await asyncio.gather(*to_unsub)

0 comments on commit 54e70b2

Please sign in to comment.