Skip to content

Commit

Permalink
feat: add events to the tasks related actions
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Oct 4, 2024
1 parent 70fdc03 commit 8403d25
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
8 changes: 8 additions & 0 deletions lib/esse/cli/event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ def elasticsearch_reindex(event)
to: colorize(event[:request].dig(:body, :dest, :index), :bold),
runtime: formatted_runtime(event[:runtime])
end

def elasticsearch_task(event)
require 'pry'; binding.pry
end

def elasticsearch_cancel_task(event)
require 'pry'; binding.pry
end
end
end
end
3 changes: 3 additions & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,8 @@ module Events
register_event 'elasticsearch.reindex'
register_event 'elasticsearch.update_by_query'
register_event 'elasticsearch.delete_by_query'
register_event 'elasticsearch.tasks'
register_event 'elasticsearch.task'
register_event 'elasticsearch.cancel_task'
end
end
9 changes: 7 additions & 2 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, *

task_id = resp['task']
task = nil
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
begin
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
end
rescue Interrupt => e
cluster.api.cancel_task(id: task_id)
raise e
end
task
end
Expand Down
16 changes: 12 additions & 4 deletions lib/esse/transport/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,24 @@ def health(**options)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html
def tasks(**options)
# coerce_exception { client.perform_request('GET', '/_tasks', options).body }
coerce_exception { client.tasks.list(**options) }
Esse::Events.instrument('elasticsearch.tasks') do |payload|
payload[:request] = options
payload[:response] = coerce_exception { client.tasks.list(**options) }
end
end

def task(id:, **options)
coerce_exception { client.tasks.get(task_id: id, **options) }
Esse::Events.instrument('elasticsearch.task') do |payload|
payload[:request] = { id: id }.merge(options)
payload[:response] = coerce_exception { client.tasks.get(task_id: id, **options) }
end
end

def cancel_task(id:, **options)
coerce_exception { client.tasks.cancel(task_id: id, **options) }
Esse::Events.instrument('elasticsearch.cancel_task') do |payload|
payload[:request] = { id: id }.merge(options)
payload[:response] = coerce_exception { client.tasks.cancel(task_id: id, **options) }
end
end
end

Expand Down

0 comments on commit 8403d25

Please sign in to comment.