Skip to content

Commit

Permalink
Release 2.2.1 (#45)
Browse files Browse the repository at this point in the history
* Fix prints for the HA stream's client: 1st implementation

* Using subscription id instead of full path

* Fixed reference before assignment

* Fixed param in listener and updated changelog

---------

Co-authored-by: Carlos Blazquez <[email protected]>
  • Loading branch information
AlanKev117 and carlos-blazquez authored Jul 3, 2024
1 parent 0129f71 commit 57d883e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ the subscription *and* consuming it. Instead they will just consume an already e

2.2.0 / 2024-05-21
==================
- [added] - Support for highly-available streams with a new method under the `Listener` class: `listen_async_ha`
- [added] - Support for highly-available streams with a new method under the `Listener` class: `listen_async_ha`

2.2.1 / 2024-07-01
==================
- [fixed] - Fixed the `subscription_id` parameter of the listener callback so it changes if the listener switches regions (for HA streams only)
6 changes: 3 additions & 3 deletions dnaStreaming/demo/show_ha_stream_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

listener = Listener()
quiet_demo = os.getenv('QUIET_DEMO', "false") == "true"
max_secs = 5
max_secs = 1000
print("\n[ACTIVITY] Receiving messages (ASYNC) for {} seconds...\n[0]".format(max_secs), end='')


def callback(message, subscription_id):
callback.counter += 1
if not quiet_demo:
if message['action'] != 'del':
print('[INFO] [MSG] [{}]: AN: {}, TITLE: {}'.format(callback.counter, message['an'], message['title']))
print('[INFO] [SUBSCRIPTION]: {} [MSG] [{}]: AN: {}, TITLE: {}'.format(subscription_id, callback.counter, message['an'], message['title']))
else:
print('[INFO] [MSG] [{}]: AN: {}, *** DELETE ***'.format(callback.counter, message['an']))
print('[INFO] [SUBSCRIPTION]: {} [MSG] [{}]: AN: {}, *** DELETE ***'.format(subscription_id, callback.counter, message['an']))
else:
if callback.counter % 10 == 0:
print('[{}]'.format(callback.counter), end='')
Expand Down
9 changes: 5 additions & 4 deletions dnaStreaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ def ack_message_and_callback(message):
'Listeners for subscriptions have been configured, set and await message arrival.')
return subscription

def listen_async_ha(self, on_message_callback, subscription_id=""):
def ack_message_and_callback(message):
def listen_async_ha(self, on_message_callback):
def ack_message_and_callback(message, subscription_path):
pubsub_msg = json.loads(message.data)
logger.info("Received news message with ID: {}".format(
pubsub_msg['data'][0]['id']))
news_msg = pubsub_msg['data'][0]['attributes']
on_message_callback(news_msg, subscription_id)
short_subscription_id = subscription_path.split("/")[-1]
on_message_callback(news_msg, short_subscription_id)
message.ack()

main_pubsub_client = pubsub_service.get_client(self.config, MAIN_REGION)
backup_pubsub_client = pubsub_service.get_client(self.config, BACKUP_REGION)

subscription_id = subscription_id or self.config.subscription()
subscription_id = self.config.subscription()

if not subscription_id:
raise Exception(
Expand Down
12 changes: 9 additions & 3 deletions dnaStreaming/services/availability_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ def ha_listen(api_host, user_key, subscription_id, stop_event, main_subscription

current_region = MAIN_REGION

def wrapped_callback(subscription_path):
def inner_callback(message):
callback(message, subscription_path)

return inner_callback

streaming_pull_future = main_pubsub_client.subscribe(
main_subscription_path, callback=callback)
main_subscription_path, callback=wrapped_callback(main_subscription_path))

while not stop_event.is_set():

Expand All @@ -65,10 +71,10 @@ def ha_listen(api_host, user_key, subscription_id, stop_event, main_subscription

if active_region == MAIN_REGION:
streaming_pull_future = main_pubsub_client.subscribe(
main_subscription_path, callback=callback)
main_subscription_path, callback=wrapped_callback(main_subscription_path))
else: # active_region == BACKUP_REGION
streaming_pull_future = backup_pubsub_client.subscribe(
backup_subscription_path, callback=callback)
backup_subscription_path, callback=wrapped_callback(backup_subscription_path))

logger.warning(f"Started listener in region {active_region}")

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
with open(path.join(this_directory, 'README.rst'), encoding='utf-8') as f:
long_description = f.read()

VERSION = "2.2.0"
VERSION = "2.2.1"
RELEASE_TAG = f"release-{VERSION}"

setup(
Expand Down

0 comments on commit 57d883e

Please sign in to comment.