diff --git a/lib/esse/cli/event_listener.rb b/lib/esse/cli/event_listener.rb index acdddf7..9603d84 100644 --- a/lib/esse/cli/event_listener.rb +++ b/lib/esse/cli/event_listener.rb @@ -102,6 +102,32 @@ def elasticsearch_reindex(event) to: colorize(event[:request].dig(:body, :dest, :index), :bold), runtime: formatted_runtime(event[:runtime]) end + + def elasticsearch_task(event) + running_time_in_nanos = event[:response].dig('task', 'running_time_in_nanos') + runtime = running_time_in_nanos ? "#{running_time_in_nanos / 1_000_000} ms" : 'unknown' + + case event[:response]['completed'] + when true + print_message '[%s] Task %s successfuly completed. %s', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]), + total_runtime: colorize("Runtime: #{runtime}", :bold, :green) + when false + description = event[:response].dig('task', 'description') + print_message '[%s] Task %s still running: %s. %s', + task_id: colorize(event[:request][:id], :bold), + description: description, + runtime: formatted_runtime(event[:runtime]), + total_runtime: colorize("Runtime: #{runtime}", :bold, :yellow) + end + end + + def elasticsearch_cancel_task(event) + print_message '[%s] Task %s successfuly canceled', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]) + end end end end diff --git a/lib/esse/events.rb b/lib/esse/events.rb index 08f5cfc..dfff63e 100644 --- a/lib/esse/events.rb +++ b/lib/esse/events.rb @@ -59,5 +59,8 @@ module Events register_event 'elasticsearch.reindex' register_event 'elasticsearch.update_by_query' register_event 'elasticsearch.delete_by_query' + register_event 'elasticsearch.tasks' + register_event 'elasticsearch.task' + register_event 'elasticsearch.cancel_task' end end diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 3e34278..736f6c1 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, * task_id = resp['task'] task = nil - while (task = cluster.api.task(id: task_id))['completed'] == false - sleep poll_interval + begin + while (task = cluster.api.task(id: task_id))['completed'] == false + sleep poll_interval + end + rescue Interrupt => e + cluster.api.cancel_task(id: task_id) + raise e end task end diff --git a/lib/esse/transport/cluster.rb b/lib/esse/transport/cluster.rb index 435f023..365295b 100644 --- a/lib/esse/transport/cluster.rb +++ b/lib/esse/transport/cluster.rb @@ -40,16 +40,24 @@ def health(**options) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html def tasks(**options) - # coerce_exception { client.perform_request('GET', '/_tasks', options).body } - coerce_exception { client.tasks.list(**options) } + Esse::Events.instrument('elasticsearch.tasks') do |payload| + payload[:request] = options + payload[:response] = coerce_exception { client.tasks.list(**options) } + end end def task(id:, **options) - coerce_exception { client.tasks.get(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) } + end end def cancel_task(id:, **options) - coerce_exception { client.tasks.cancel(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.cancel_task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) } + end end end