From 5252413c7f9e503d4c2ed566a3dd937b43e38c66 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 4 Oct 2024 17:36:05 -0300 Subject: [PATCH 1/3] feat: add events to the tasks related actions --- gemfiles/Gemfile.elasticsearch-1.x.lock | 2 +- gemfiles/Gemfile.elasticsearch-2.x.lock | 2 +- gemfiles/Gemfile.elasticsearch-5.x.lock | 2 +- gemfiles/Gemfile.elasticsearch-6.x.lock | 2 +- gemfiles/Gemfile.elasticsearch-7.x.lock | 2 +- gemfiles/Gemfile.elasticsearch-8.x.lock | 2 +- gemfiles/Gemfile.opensearch-1.x.lock | 2 +- gemfiles/Gemfile.opensearch-2.x.lock | 2 +- lib/esse/cli/event_listener.rb | 26 +++++++++++ lib/esse/events.rb | 3 ++ lib/esse/index/indices.rb | 9 +++- lib/esse/transport/cluster.rb | 16 +++++-- lib/esse/version.rb | 2 +- spec/esse/cli/event_listener_spec.rb | 57 +++++++++++++++++++++++++ 14 files changed, 114 insertions(+), 15 deletions(-) diff --git a/gemfiles/Gemfile.elasticsearch-1.x.lock b/gemfiles/Gemfile.elasticsearch-1.x.lock index 5c84b72..09889cf 100644 --- a/gemfiles/Gemfile.elasticsearch-1.x.lock +++ b/gemfiles/Gemfile.elasticsearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-2.x.lock b/gemfiles/Gemfile.elasticsearch-2.x.lock index 4dd2e66..e7a4d4f 100644 --- a/gemfiles/Gemfile.elasticsearch-2.x.lock +++ b/gemfiles/Gemfile.elasticsearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-5.x.lock b/gemfiles/Gemfile.elasticsearch-5.x.lock index 4dedda8..cd8f2b7 100644 --- a/gemfiles/Gemfile.elasticsearch-5.x.lock +++ b/gemfiles/Gemfile.elasticsearch-5.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-6.x.lock b/gemfiles/Gemfile.elasticsearch-6.x.lock index f8068f9..0a07a98 100644 --- a/gemfiles/Gemfile.elasticsearch-6.x.lock +++ b/gemfiles/Gemfile.elasticsearch-6.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-7.x.lock b/gemfiles/Gemfile.elasticsearch-7.x.lock index 4afc4eb..eb0508e 100644 --- a/gemfiles/Gemfile.elasticsearch-7.x.lock +++ b/gemfiles/Gemfile.elasticsearch-7.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-8.x.lock b/gemfiles/Gemfile.elasticsearch-8.x.lock index 20bb982..b148c7a 100644 --- a/gemfiles/Gemfile.elasticsearch-8.x.lock +++ b/gemfiles/Gemfile.elasticsearch-8.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.opensearch-1.x.lock b/gemfiles/Gemfile.opensearch-1.x.lock index 2c5fbe5..70e6ea5 100644 --- a/gemfiles/Gemfile.opensearch-1.x.lock +++ b/gemfiles/Gemfile.opensearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.opensearch-2.x.lock b/gemfiles/Gemfile.opensearch-2.x.lock index e6c3827..fb75c2e 100644 --- a/gemfiles/Gemfile.opensearch-2.x.lock +++ b/gemfiles/Gemfile.opensearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc3) + esse (0.4.0.rc4) multi_json thor (>= 0.19) diff --git a/lib/esse/cli/event_listener.rb b/lib/esse/cli/event_listener.rb index acdddf7..1f2e77e 100644 --- a/lib/esse/cli/event_listener.rb +++ b/lib/esse/cli/event_listener.rb @@ -102,6 +102,32 @@ def elasticsearch_reindex(event) to: colorize(event[:request].dig(:body, :dest, :index), :bold), runtime: formatted_runtime(event[:runtime]) end + + def elasticsearch_task(event) + running_time_in_nanos = event[:response].dig('task', 'running_time_in_nanos') + runtime = running_time_in_nanos ? "#{running_time_in_nanos / 1_000_000} ms" : 'unknown' + + case event[:response]['completed'] + when true + print_message '[%s] Task %s successfuly completed. %s', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]), + total_runtime: colorize("Elapsed time: #{runtime}", :bold) + when false + description = event[:response].dig('task', 'description') + print_message '[%s] Task %s still in progress: %s. %s', + task_id: colorize(event[:request][:id], :bold), + description: description, + runtime: formatted_runtime(event[:runtime]), + total_runtime: colorize("Elapsed time: #{runtime}", :bold) + end + end + + def elasticsearch_cancel_task(event) + print_message '[%s] Task %s successfuly canceled', + task_id: colorize(event[:request][:id], :bold), + runtime: formatted_runtime(event[:runtime]) + end end end end diff --git a/lib/esse/events.rb b/lib/esse/events.rb index 08f5cfc..dfff63e 100644 --- a/lib/esse/events.rb +++ b/lib/esse/events.rb @@ -59,5 +59,8 @@ module Events register_event 'elasticsearch.reindex' register_event 'elasticsearch.update_by_query' register_event 'elasticsearch.delete_by_query' + register_event 'elasticsearch.tasks' + register_event 'elasticsearch.task' + register_event 'elasticsearch.cancel_task' end end diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 3e34278..736f6c1 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, * task_id = resp['task'] task = nil - while (task = cluster.api.task(id: task_id))['completed'] == false - sleep poll_interval + begin + while (task = cluster.api.task(id: task_id))['completed'] == false + sleep poll_interval + end + rescue Interrupt => e + cluster.api.cancel_task(id: task_id) + raise e end task end diff --git a/lib/esse/transport/cluster.rb b/lib/esse/transport/cluster.rb index 435f023..365295b 100644 --- a/lib/esse/transport/cluster.rb +++ b/lib/esse/transport/cluster.rb @@ -40,16 +40,24 @@ def health(**options) # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html def tasks(**options) - # coerce_exception { client.perform_request('GET', '/_tasks', options).body } - coerce_exception { client.tasks.list(**options) } + Esse::Events.instrument('elasticsearch.tasks') do |payload| + payload[:request] = options + payload[:response] = coerce_exception { client.tasks.list(**options) } + end end def task(id:, **options) - coerce_exception { client.tasks.get(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) } + end end def cancel_task(id:, **options) - coerce_exception { client.tasks.cancel(task_id: id, **options) } + Esse::Events.instrument('elasticsearch.cancel_task') do |payload| + payload[:request] = { id: id }.merge(options) + payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) } + end end end diff --git a/lib/esse/version.rb b/lib/esse/version.rb index e5cc85b..3b509bb 100644 --- a/lib/esse/version.rb +++ b/lib/esse/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Esse - VERSION = '0.4.0.rc3' + VERSION = '0.4.0.rc4' end diff --git a/spec/esse/cli/event_listener_spec.rb b/spec/esse/cli/event_listener_spec.rb index d96b6ac..906e99e 100644 --- a/spec/esse/cli/event_listener_spec.rb +++ b/spec/esse/cli/event_listener_spec.rb @@ -252,6 +252,63 @@ end end + describe '.elasticsearch_task' do + subject do + described_class['elasticsearch.task'].call(event) + end + + let(:event_id) { 'elasticsearch.task' } + + let(:event) do + Esse::Events::Event.new(event_id, payload) + end + + let(:payload) do + { + runtime: 1.32, + request: { + id: 'task_id', + }, + response: response, + } + end + + context 'when task is completed' do + let(:response) do + { + 'completed' => true, + 'task' => { + 'running_time_in_nanos' => 1_000_000, + } + } + end + + it 'prints message' do + expect { subject }.to output(<<~MSG).to_stdout + [#{formatted_runtime(1.32)}] Task #{colorize('task_id', :bold)} successfuly completed. #{colorize('Elapsed time: 1 ms', :bold)} + MSG + end + end + + context 'when task is not completed' do + let(:response) do + { + 'completed' => false, + 'task' => { + 'running_time_in_nanos' => 1_000_000, + 'description' => 'task description', + } + } + end + + it 'prints message' do + expect { subject }.to output(<<~MSG).to_stdout + [#{formatted_runtime(1.32)}] Task #{colorize('task_id', :bold)} still in progress: task description. #{colorize('Elapsed time: 1 ms', :bold)} + MSG + end + end + end + def colorize(*args) Esse::Output.colorize(*args) end From 9c83c5920f56dee429da111c7e871ad487bf942d Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 4 Oct 2024 18:32:58 -0300 Subject: [PATCH 2/3] feat: ensure poll_interval is an integer --- lib/esse/index/indices.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 736f6c1..66159d9 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -111,7 +111,7 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, * task = nil begin while (task = cluster.api.task(id: task_id))['completed'] == false - sleep poll_interval + sleep poll_interval.to_i end rescue Interrupt => e cluster.api.cancel_task(id: task_id) From 4438d82695ff4ea837b75f973312f275f08288b0 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 7 Oct 2024 08:34:42 -0300 Subject: [PATCH 3/3] feat: cast integer values --- lib/esse/cli/parser/bool_or_hash.rb | 18 +++++++++++------- spec/esse/cli/parser/bool_or_hash_spec.rb | 12 ++++++++++-- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/lib/esse/cli/parser/bool_or_hash.rb b/lib/esse/cli/parser/bool_or_hash.rb index 265de09..7a0aaa5 100644 --- a/lib/esse/cli/parser/bool_or_hash.rb +++ b/lib/esse/cli/parser/bool_or_hash.rb @@ -35,16 +35,20 @@ def parse(input) private def may_array(value) - return may_bool(value) unless ARRAY_SEPARATOR.match?(value) + return cast(value) unless ARRAY_SEPARATOR.match?(value) - value.split(ARRAY_SEPARATOR).map { |v| may_bool(v) } + value.split(ARRAY_SEPARATOR).map { |v| cast(v) } end - def may_bool(value) - return true if TRUTHY.include?(value) - return false if FALSEY.include?(value) - - value + def cast(value) + case value + when *TRUTHY then true + when *FALSEY then false + when /\A\d+\z/ then value.to_i + when /\A\d+\.\d+\z/ then value.to_f + else + value + end end end end diff --git a/spec/esse/cli/parser/bool_or_hash_spec.rb b/spec/esse/cli/parser/bool_or_hash_spec.rb index 69b8d8e..8f81213 100644 --- a/spec/esse/cli/parser/bool_or_hash_spec.rb +++ b/spec/esse/cli/parser/bool_or_hash_spec.rb @@ -81,8 +81,8 @@ end it 'split comma separated values' do - expect(parser.parse('a:1,2,3')).to eq(a: %w[1 2 3]) - expect(parser.parse('a:1,2,3 b:4,5,6')).to eq(a: %w[1 2 3], b: %w[4 5 6]) + expect(parser.parse('a:c,d,e')).to eq(a: %w[c d e]) + expect(parser.parse('a:x,y,z b:p,q,r')).to eq(a: %w[x y z], b: %w[p q r]) expect(parser.parse('a:b,c:d')).to eq(a: %w[b c:d]) end @@ -94,6 +94,14 @@ expect(parser.parse('foo:true')).to eq(foo: true) expect(parser.parse('foo:false')).to eq(foo: false) end + + it 'coerces the value of hash to integer' do + expect(parser.parse('foo:123')).to eq(foo: 123) + end + + it 'coerces the value of hash to float' do + expect(parser.parse('foo:123.456')).to eq(foo: 123.456) + end end end end