diff --git a/gemfiles/Gemfile.elasticsearch-1.x.lock b/gemfiles/Gemfile.elasticsearch-1.x.lock index 5c84b72..09889cf 100644 --- a/gemfiles/Gemfile.elasticsearch-1.x.lock +++ b/gemfiles/Gemfile.elasticsearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-2.x.lock b/gemfiles/Gemfile.elasticsearch-2.x.lock index 4dd2e66..e7a4d4f 100644 --- a/gemfiles/Gemfile.elasticsearch-2.x.lock +++ b/gemfiles/Gemfile.elasticsearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-5.x.lock b/gemfiles/Gemfile.elasticsearch-5.x.lock index 4dedda8..cd8f2b7 100644 --- a/gemfiles/Gemfile.elasticsearch-5.x.lock +++ b/gemfiles/Gemfile.elasticsearch-5.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-6.x.lock b/gemfiles/Gemfile.elasticsearch-6.x.lock index f8068f9..0a07a98 100644 --- a/gemfiles/Gemfile.elasticsearch-6.x.lock +++ b/gemfiles/Gemfile.elasticsearch-6.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-7.x.lock b/gemfiles/Gemfile.elasticsearch-7.x.lock index 4afc4eb..eb0508e 100644 --- a/gemfiles/Gemfile.elasticsearch-7.x.lock +++ b/gemfiles/Gemfile.elasticsearch-7.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-8.x.lock b/gemfiles/Gemfile.elasticsearch-8.x.lock index 20bb982..b148c7a 100644 --- a/gemfiles/Gemfile.elasticsearch-8.x.lock +++ b/gemfiles/Gemfile.elasticsearch-8.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.opensearch-1.x.lock b/gemfiles/Gemfile.opensearch-1.x.lock index 2c5fbe5..70e6ea5 100644 --- a/gemfiles/Gemfile.opensearch-1.x.lock +++ b/gemfiles/Gemfile.opensearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.opensearch-2.x.lock b/gemfiles/Gemfile.opensearch-2.x.lock index e6c3827..fb75c2e 100644 --- a/gemfiles/Gemfile.opensearch-2.x.lock +++ b/gemfiles/Gemfile.opensearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/lib/esse/cli/event_listener.rb b/lib/esse/cli/event_listener.rb index acdddf7..1f2e77e 100644 --- a/lib/esse/cli/event_listener.rb +++ b/lib/esse/cli/event_listener.rb @@ -102,6 +102,32 @@ def elasticsearch_reindex(event) to: colorize(event[:request].dig(:body, :dest, :index), :bold), runtime: formatted_runtime(event[:runtime]) end + + def elasticsearch_task(event) + running_time_in_nanos = event[:response].dig('task', 'running_time_in_nanos') + runtime = running_time_in_nanos ? "#{running_time_in_nanos / 1_000_000} ms" : 'unknown' + + case event[:response]['completed'] + when true + print_message '[%s] Task %s successfuly completed. %s', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]), + total_runtime: colorize("Elapsed time: #{runtime}", :bold) + when false + description = event[:response].dig('task', 'description') + print_message '[%s] Task %s still in progress: %s. %s', + task_id: colorize(event[:request][:id], :bold), + description: description, + runtime: formatted_runtime(event[:runtime]), + total_runtime: colorize("Elapsed time: #{runtime}", :bold) + end + end + + def elasticsearch_cancel_task(event) + print_message '[%s] Task %s successfuly canceled', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]) + end end end end diff --git a/lib/esse/events.rb b/lib/esse/events.rb index 08f5cfc..dfff63e 100644 --- a/lib/esse/events.rb +++ b/lib/esse/events.rb @@ -59,5 +59,8 @@ module Events register_event 'elasticsearch.reindex' register_event 'elasticsearch.update_by_query' register_event 'elasticsearch.delete_by_query' + register_event 'elasticsearch.tasks' + register_event 'elasticsearch.task' + register_event 'elasticsearch.cancel_task' end end diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 3e34278..736f6c1 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, * task_id = resp['task'] task = nil - while (task = cluster.api.task(id: task_id))['completed'] == false - sleep poll_interval + begin + while (task = cluster.api.task(id: task_id))['completed'] == false + sleep poll_interval + end + rescue Interrupt => e + cluster.api.cancel_task(id: task_id) + raise e end task end diff --git a/lib/esse/transport/cluster.rb b/lib/esse/transport/cluster.rb index 435f023..365295b 100644 --- a/lib/esse/transport/cluster.rb +++ b/lib/esse/transport/cluster.rb @@ -40,16 +40,24 @@ def health(**options) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html def tasks(**options) - # coerce_exception { client.perform_request('GET', '/_tasks', options).body } - coerce_exception { client.tasks.list(**options) } + Esse::Events.instrument('elasticsearch.tasks') do |payload| + payload[:request] = options + payload[:response] = coerce_exception { client.tasks.list(**options) } + end end def task(id:, **options) - coerce_exception { client.tasks.get(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) } + end end def cancel_task(id:, **options) - coerce_exception { client.tasks.cancel(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.cancel_task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) } + end end end diff --git a/lib/esse/version.rb b/lib/esse/version.rb index e5cc85b..3b509bb 100644 --- a/lib/esse/version.rb +++ b/lib/esse/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Esse - VERSION = '0.4.0.rc3' + VERSION = '0.4.0.rc4' end diff --git a/spec/esse/cli/event_listener_spec.rb b/spec/esse/cli/event_listener_spec.rb index d96b6ac..906e99e 100644 --- a/spec/esse/cli/event_listener_spec.rb +++ b/spec/esse/cli/event_listener_spec.rb @@ -252,6 +252,63 @@ end end + describe '.elasticsearch_task' do + subject do + described_class['elasticsearch.task'].call(event) + end + + let(:event_id) { 'elasticsearch.task' } + + let(:event) do + Esse::Events::Event.new(event_id, payload) + end + + let(:payload) do + { + runtime: 1.32, + request: { + id: 'task_id', + }, + response: response, + } + end + + context 'when task is completed' do + let(:response) do + { + 'completed' => true, + 'task' => { + 'running_time_in_nanos' => 1_000_000, + } + } + end + + it 'prints message' do + expect { subject }.to output(<<~MSG).to_stdout + [#{formatted_runtime(1.32)}] Task #{colorize('task_id', :bold)} successfuly completed. #{colorize('Elapsed time: 1 ms', :bold)} + MSG + end + end + + context 'when task is not completed' do + let(:response) do + { + 'completed' => false, + 'task' => { + 'running_time_in_nanos' => 1_000_000, + 'description' => 'task description', + } + } + end + + it 'prints message' do + expect { subject }.to output(<<~MSG).to_stdout + [#{formatted_runtime(1.32)}] Task #{colorize('task_id', :bold)} still in progress: task description. #{colorize('Elapsed time: 1 ms', :bold)} + MSG + end + end + end + def colorize(*args) Esse::Output.colorize(*args) end