Skip to content

Commit

Permalink
Polling client
Browse files Browse the repository at this point in the history
  • Loading branch information
lietu committed Jul 9, 2016
1 parent 29f6d54 commit 38e59ca
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 10 deletions.
49 changes: 44 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
# PuSu Engine client for Python

## Simple example
Client for using the PuSu Engine server from Python. Probably much better
suitable for sending than receiving, but both should work.

```

## Threaded example

If you want immediate delivery and are ok with your callbacks getting called
from another thread, you can use the threaded client.

```python
from pypusu.threaded import PuSuClient
from time import sleep


if __name__ == "__main__":
c = PuSuClient("ws://127.0.0.1:55000")
Expand All @@ -20,19 +29,49 @@ if __name__ == "__main__":
c.subscribe("channel.1", listener)
c.publish("channel.1", {"foo": "bar"})

from time import sleep
sleep(30)

print(count)
```

## Polling example

If your callbacks getting called from another thread is an issue, you can use
the polling client. Internally it still uses threads, but it will not deliver
the messages to your callbacks until you call `poll()`.

```python
from pypusu.polling import PuSuClient
from time import sleep


if __name__ == "__main__":
c = PuSuClient("ws://127.0.0.1:55000")

count = 0

def listener(msg):
global count
count += 1

c.authorize("foo")
c.subscribe("channel.1", listener)
c.publish("channel.1", {"foo": "bar"})

for i in range(0, 30):
sleep(1)
c.poll()

print(count)
```


## Dependencies

Not quite sure how to deal with dependencies yet, so for now you'll have to
install them separately.

For the threaded client you'll need the following in your `requirements.txt`:
Both threaded and polling client need the following in your `requirements.txt`:

```
ws4py==0.3.5
Expand Down
43 changes: 43 additions & 0 deletions polling_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from pypusu.polling import PuSuClient
from time import sleep, time

if __name__ == "__main__":
print("Connecting")
c = PuSuClient("ws://127.0.0.1:55000")

count = 0

def listener(msg):
global count
count += 1

print("Authorizing")
c.authorize("foo")
print("Subscribing")
c.subscribe("channel.1", listener)

print("Waiting")

target = 500
start = time()
for i in range(1, target + 1):
c.publish("channel.1", {"foo": "bar"})
end = time()
elapsed = end - start

print("Sent {} messages in {:.3f}s, {:.2f}msg/s".format(
target,
elapsed,
target / elapsed
))

sleep(1)
print("So far got {} messages, polling...".format(count))
c.poll()
print("After poll got {} messages, waiting for more...".format(count))

for i in range(0, 60):
sleep(1)
c.poll()

print("Got {} messages".format(count))
65 changes: 65 additions & 0 deletions pypusu/polling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import json
from .threaded import PuSuClient as ThreadedPuSuClient

try:
# Py2
from Queue import Queue, Empty
except ImportError:
# Py3
from queue import Queue, Empty

DEBUG = False


class PuSuClient(ThreadedPuSuClient):
"""
PuSu Engine client using the ws4py builtin client. Variant where reading
messages takes polling, in case you want to avoid your code being called
from another thread. Just make sure you poll often enough or the queue
will use up your RAM.
.. code-block:: python
from pypusy.polling import PuSuClient
client = PuSuClient("ws://127.0.0.1:55000")
def listener(msg):
print(msg)
client.subscribe("my-channel", listener)
client.publish("some-channel", "data")
# Will call any subscribers if there were messages to deliver
client.poll()
"""

def __init__(self, *args):
super(PuSuClient, self).__init__(*args)
self._incoming_messages = Queue()

def poll(self):
"""
Check for any new messages for us
"""
count = 0
while True:
try:
item = self._incoming_messages.get_nowait()
count += 1
self._on_receive(item)
except Empty:
break
if DEBUG:
print("<< {} messages delivered from queue".format(count))

def _received_message(self, data):
if DEBUG:
print("<- {}".format(data))

msg = json.loads(str(data))
# Only publish messages should be paused, the rest is required for
# normal function
if msg["type"] == PuSuClient.TYPE_PUBLISH:
self._incoming_messages.put(msg)
else:
self._on_receive(msg)
2 changes: 1 addition & 1 deletion pypusu/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PuSuClient(_PuSuBaseClient):
but doesn't depend on anything else.
.. code-block:: python
from pypusy import PuSuClient
from pypusy.threaded import PuSuClient
client = PuSuClient("ws://127.0.0.1:55000")
Expand Down
24 changes: 20 additions & 4 deletions threaded_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from pypusu.threaded import PuSuClient
from time import sleep, time

if __name__ == "__main__":
print("Connecting")
c = PuSuClient("ws://127.0.0.1:55000")

count = 0
Expand All @@ -11,12 +13,26 @@ def listener(msg):
count += 1


print("Authorizing")
c.authorize("foo")
print("Subscribing")
c.subscribe("channel.1", listener)
c.publish("channel.1", {"foo": "bar"})

from time import sleep
print("Waiting")

sleep(30)
target = 500
start = time()
for i in range(1, target + 1):
c.publish("channel.1", {"foo": "bar"})
end = time()
elapsed = end - start

print(count)
print("Sent {} messages in {:.3f}s, {:.2f}msg/s".format(
target,
elapsed,
target / elapsed
))

sleep(60)

print("Got {} messages".format(count))

0 comments on commit 38e59ca

Please sign in to comment.