Skip to content

Commit

Permalink
feat: add events to the tasks related actions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Oct 4, 2024
1 parent 70fdc03 commit 5252413
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 15 deletions.
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc3)
esse (0.4.0.rc4)
multi_json
thor (>= 0.19)

Expand Down
26 changes: 26 additions & 0 deletions lib/esse/cli/event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ def elasticsearch_reindex(event)
to: colorize(event[:request].dig(:body, :dest, :index), :bold),
runtime: formatted_runtime(event[:runtime])
end

def elasticsearch_task(event)
running_time_in_nanos = event[:response].dig('task', 'running_time_in_nanos')
runtime = running_time_in_nanos ? "#{running_time_in_nanos / 1_000_000} ms" : 'unknown'

case event[:response]['completed']
when true
print_message '[%<runtime>s] Task %<task_id>s successfuly completed. %<total_runtime>s',
task_id: colorize(event[:request][:id], :bold),
runtime: formatted_runtime(event[:runtime]),
total_runtime: colorize("Elapsed time: #{runtime}", :bold)
when false
description = event[:response].dig('task', 'description')
print_message '[%<runtime>s] Task %<task_id>s still in progress: %<description>s. %<total_runtime>s',
task_id: colorize(event[:request][:id], :bold),
description: description,
runtime: formatted_runtime(event[:runtime]),
total_runtime: colorize("Elapsed time: #{runtime}", :bold)
end
end

def elasticsearch_cancel_task(event)
print_message '[%<runtime>s] Task %<task_id>s successfuly canceled',
task_id: colorize(event[:request][:id], :bold),
runtime: formatted_runtime(event[:runtime])
end
end
end
end
3 changes: 3 additions & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,8 @@ module Events
register_event 'elasticsearch.reindex'
register_event 'elasticsearch.update_by_query'
register_event 'elasticsearch.delete_by_query'
register_event 'elasticsearch.tasks'
register_event 'elasticsearch.task'
register_event 'elasticsearch.cancel_task'
end
end
9 changes: 7 additions & 2 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, *

task_id = resp['task']
task = nil
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
begin
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
end
rescue Interrupt => e
cluster.api.cancel_task(id: task_id)
raise e
end
task
end
Expand Down
16 changes: 12 additions & 4 deletions lib/esse/transport/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,24 @@ def health(**options)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html
def tasks(**options)
# coerce_exception { client.perform_request('GET', '/_tasks', options).body }
coerce_exception { client.tasks.list(**options) }
Esse::Events.instrument('elasticsearch.tasks') do |payload|
payload[:request] = options
payload[:response] = coerce_exception { client.tasks.list(**options) }
end
end

def task(id:, **options)
coerce_exception { client.tasks.get(task_id: id, **options) }
Esse::Events.instrument('elasticsearch.task') do |payload|
payload[:request] = { id: id }.merge(options)
payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) }
end
end

def cancel_task(id:, **options)
coerce_exception { client.tasks.cancel(task_id: id, **options) }
Esse::Events.instrument('elasticsearch.cancel_task') do |payload|
payload[:request] = { id: id }.merge(options)
payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) }
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/esse/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Esse
VERSION = '0.4.0.rc3'
VERSION = '0.4.0.rc4'
end
57 changes: 57 additions & 0 deletions spec/esse/cli/event_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,63 @@
end
end

describe '.elasticsearch_task' do
subject do
described_class['elasticsearch.task'].call(event)
end

let(:event_id) { 'elasticsearch.task' }

let(:event) do
Esse::Events::Event.new(event_id, payload)
end

let(:payload) do
{
runtime: 1.32,
request: {
id: 'task_id',
},
response: response,
}
end

context 'when task is completed' do
let(:response) do
{
'completed' => true,
'task' => {
'running_time_in_nanos' => 1_000_000,
}
}
end

it 'prints message' do
expect { subject }.to output(<<~MSG).to_stdout
[#{formatted_runtime(1.32)}] Task #{colorize('task_id', :bold)} successfuly completed. #{colorize('Elapsed time: 1 ms', :bold)}
MSG
end
end

context 'when task is not completed' do
let(:response) do
{
'completed' => false,
'task' => {
'running_time_in_nanos' => 1_000_000,
'description' => 'task description',
}
}
end

it 'prints message' do
expect { subject }.to output(<<~MSG).to_stdout
[#{formatted_runtime(1.32)}] Task #{colorize('task_id', :bold)} still in progress: task description. #{colorize('Elapsed time: 1 ms', :bold)}
MSG
end
end
end

def colorize(*args)
Esse::Output.colorize(*args)
end
Expand Down

0 comments on commit 5252413

Please sign in to comment.