From 6004f69f1327e9f1436d9fe28e57e543af101308 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 4 Oct 2024 17:36:05 -0300 Subject: [PATCH] feat: add events to the tasks related actions --- lib/esse/cli/event_listener.rb | 24 ++++++++++++++++++++++++ lib/esse/events.rb | 3 +++ lib/esse/index/indices.rb | 9 +++++++-- lib/esse/transport/cluster.rb | 16 ++++++++++++---- 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/lib/esse/cli/event_listener.rb b/lib/esse/cli/event_listener.rb index acdddf7..f83ce3f 100644 --- a/lib/esse/cli/event_listener.rb +++ b/lib/esse/cli/event_listener.rb @@ -102,6 +102,30 @@ def elasticsearch_reindex(event) to: colorize(event[:request].dig(:body, :dest, :index), :bold), runtime: formatted_runtime(event[:runtime]) end + + def elasticsearch_task(event) + running_time_in_nanos = event[:response].dig('task', 'running_time_in_nanos') + runtime = running_time_in_nanos ? "#{running_time_in_nanos / 1_000_000} ms" : 'unknown' + + case event[:response]['completed'] + when true + print_message '[%s] Task %s successfuly completed', + task_id: colorize(event[:request][:id], :bold), + runtime: (runtime || formatted_runtime(event[:runtime])) + when false + description = event[:response].dig('task', 'description') + print_message '[%s] Task %s still running: %s', + task_id: colorize(event[:request][:id], :bold), + description: description, + runtime: (runtime || formatted_runtime(event[:runtime])) + end + end + + def elasticsearch_cancel_task(event) + print_message '[%s] Task %s successfuly canceled', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]) + end end end end diff --git a/lib/esse/events.rb b/lib/esse/events.rb index 08f5cfc..dfff63e 100644 --- a/lib/esse/events.rb +++ b/lib/esse/events.rb @@ -59,5 +59,8 @@ module Events register_event 'elasticsearch.reindex' register_event 'elasticsearch.update_by_query' register_event 'elasticsearch.delete_by_query' + register_event 'elasticsearch.tasks' + register_event 'elasticsearch.task' + register_event 'elasticsearch.cancel_task' end end diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 3e34278..736f6c1 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, * task_id = resp['task'] task = nil - while (task = cluster.api.task(id: task_id))['completed'] == false - sleep poll_interval + begin + while (task = cluster.api.task(id: task_id))['completed'] == false + sleep poll_interval + end + rescue Interrupt => e + cluster.api.cancel_task(id: task_id) + raise e end task end diff --git a/lib/esse/transport/cluster.rb b/lib/esse/transport/cluster.rb index 435f023..365295b 100644 --- a/lib/esse/transport/cluster.rb +++ b/lib/esse/transport/cluster.rb @@ -40,16 +40,24 @@ def health(**options) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html def tasks(**options) - # coerce_exception { client.perform_request('GET', '/_tasks', options).body } - coerce_exception { client.tasks.list(**options) } + Esse::Events.instrument('elasticsearch.tasks') do |payload| + payload[:request] = options + payload[:response] = coerce_exception { client.tasks.list(**options) } + end end def task(id:, **options) - coerce_exception { client.tasks.get(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) } + end end def cancel_task(id:, **options) - coerce_exception { client.tasks.cancel(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.cancel_task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) } + end end end