Skip to content

Commit

Permalink
feat: add events to the tasks related actions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Oct 4, 2024
1 parent 70fdc03 commit e60ed85
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 6 deletions.
26 changes: 26 additions & 0 deletions lib/esse/cli/event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ def elasticsearch_reindex(event)
to: colorize(event[:request].dig(:body, :dest, :index), :bold),
runtime: formatted_runtime(event[:runtime])
end

def elasticsearch_task(event)
running_time_in_nanos = event[:response].dig('task', 'running_time_in_nanos')
runtime = running_time_in_nanos ? "#{running_time_in_nanos / 1_000_000} ms" : 'unknown'

case event[:response]['completed']
when true
print_message '[%<runtime>s] Task %<task_id>s successfuly completed. %<total_runtime>s',
task_id: colorize(event[:request][:id], :bold),
runtime: formatted_runtime(event[:runtime]),
total_runtime: colorize("Runtime: #{runtime}", :bold, :green)
when false
description = event[:response].dig('task', 'description')
print_message '[%<runtime>s] Task %<task_id>s still running: %<description>s. %<total_runtime>s',
task_id: colorize(event[:request][:id], :bold),
description: description,
runtime: formatted_runtime(event[:runtime]),
total_runtime: colorize("Runtime: #{runtime}", :bold, :yellow)
end
end

def elasticsearch_cancel_task(event)
print_message '[%<runtime>s] Task %<task_id>s successfuly canceled',
task_id: colorize(event[:request][:id], :bold),
runtime: formatted_runtime(event[:runtime])
end
end
end
end
3 changes: 3 additions & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,8 @@ module Events
register_event 'elasticsearch.reindex'
register_event 'elasticsearch.update_by_query'
register_event 'elasticsearch.delete_by_query'
register_event 'elasticsearch.tasks'
register_event 'elasticsearch.task'
register_event 'elasticsearch.cancel_task'
end
end
9 changes: 7 additions & 2 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, *

task_id = resp['task']
task = nil
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
begin
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
end
rescue Interrupt => e
cluster.api.cancel_task(id: task_id)
raise e
end
task
end
Expand Down
16 changes: 12 additions & 4 deletions lib/esse/transport/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,24 @@ def health(**options)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html
def tasks(**options)
# coerce_exception { client.perform_request('GET', '/_tasks', options).body }
coerce_exception { client.tasks.list(**options) }
Esse::Events.instrument('elasticsearch.tasks') do |payload|
payload[:request] = options
payload[:response] = coerce_exception { client.tasks.list(**options) }
end
end

def task(id:, **options)
coerce_exception { client.tasks.get(task_id: id, **options) }
Esse::Events.instrument('elasticsearch.task') do |payload|
payload[:request] = { id: id }.merge(options)
payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) }
end
end

def cancel_task(id:, **options)
coerce_exception { client.tasks.cancel(task_id: id, **options) }
Esse::Events.instrument('elasticsearch.cancel_task') do |payload|
payload[:request] = { id: id }.merge(options)
payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) }
end
end
end

Expand Down

0 comments on commit e60ed85

Please sign in to comment.