Skip to content

Commit

Permalink
feat: Add reindex action to transport api
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Aug 2, 2024
1 parent 876648f commit faa4690
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ module Events
register_event 'elasticsearch.exist'
register_event 'elasticsearch.count'
register_event 'elasticsearch.get'
register_event 'elasticsearch.reindex'
end
end
25 changes: 25 additions & 0 deletions lib/esse/transport/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ def update_settings(index:, body:, **options)
payload[:response] = coerce_exception { client.indices.put_settings(**opts) }
end
end

# Allows to copy documents from one index to another, optionally filtering the source
# documents by a query, changing the destination index settings, or fetching the
# documents from a remote cluster.
#
# @option arguments [Boolean] :refresh Should the affected indexes be refreshed?
# @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable.
# @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
# @option arguments [Boolean] :wait_for_completion Should the request should block until the reindex is complete.
# @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle.
# @option arguments [Time] :scroll Control how long to keep the search context alive
# @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`.
# @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents)
# @option arguments [Hash] :headers Custom HTTP headers
# @option arguments [Hash] :body The search definition using the Query DSL and the prototype for the index request. (*Required*)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
def reindex(body:, **options)
throw_error_when_readonly!

Esse::Events.instrument('elasticsearch.reindex') do |payload|
payload[:request] = opts = options.merge(body: body)
payload[:response] = coerce_exception { client.reindex(**opts) }
end
end
end

include InstanceMethods
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '5.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '6.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '7.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '8.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
54 changes: 54 additions & 0 deletions spec/support/shared_examples/transport_reindex.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

RSpec.shared_examples 'transport#reindex' do
let(:body) do
{
settings: {
index: {
number_of_shards: 1,
number_of_replicas: 0
}
}
}
end

it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do
es_client do |client, _conf, cluster|
cluster.warm_up!
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } })
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an #<Esse::Transport::NotFoundError exception when the source index does not exist' do
es_client do |_client, _conf, cluster|
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_non_existent_index" }, dest: { index: "#{cluster.index_prefix}_to" } })
}.to raise_error(Esse::Transport::NotFoundError)
end
end

context 'when the source index exists' do
it 'reindexes the source index to the destination index' do
es_client do |client, _conf, cluster|
source_index = "#{cluster.index_prefix}_reindex_from"
dest_index = "#{cluster.index_prefix}_reindex_to"
cluster.api.create_index(index: source_index, body: body)
cluster.api.create_index(index: dest_index, body: body)
cluster.api.index(index: source_index, id: 1, body: { title: 'foo' }, refresh: true)

resp = nil
expect {
resp = cluster.api.reindex(body: { source: { index: source_index }, dest: { index: dest_index } }, refresh: true)
}.not_to raise_error
expect(resp['total']).to eq(1)

resp = cluster.api.get(index: dest_index, id: 1, _source: false)
expect(resp['found']).to eq(true)
end
end
end
end

0 comments on commit faa4690

Please sign in to comment.