Skip to content

Commit

Permalink
feat: new functionality to reindex data using _reindex API after reset
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Aug 2, 2024
1 parent faa4690 commit 97214f1
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 4 deletions.
7 changes: 7 additions & 0 deletions lib/esse/cli/event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ def elasticsearch_bulk(event)
end
print_message(stats.join(', ') + '.')
end

def elasticsearch_reindex(event)
print_message '[%<runtime>s] Reindex from %<from>s to %<to>s successfuly completed',
from: colorize(event[:request].dig(:body, :source, :index), :bold),
to: colorize(event[:request].dig(:body, :dest, :index), :bold),
runtime: formatted_runtime(event[:runtime])
end
end
end
end
10 changes: 6 additions & 4 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def create_index(suffix: nil, body: nil, settings: nil, **options)
# @return [Hash] the elasticsearch response
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-open-close.html
def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, **options)
def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **options)
cluster.throw_error_when_readonly!

suffix ||= Esse.timestamp
Expand All @@ -69,9 +69,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru
cluster.api.delete_index(index: index_name)
end
if import
import(**options, suffix: suffix)
elsif reindex && (_from = indices_pointing_to_alias).any?
# @TODO: Reindex using the reindex API
import(**options, suffix: suffix, refresh: refresh)
elsif reindex && (source_indexes = indices_pointing_to_alias).any?
source_indexes.each do |from|
cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh)
end
end

if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
Expand Down
30 changes: 30 additions & 0 deletions spec/esse/cli/event_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,36 @@
end
end

describe '.elasticsearch_reindex' do
subject do
described_class['elasticsearch.reindex'].call(event)
end

let(:event_id) { 'elasticsearch.reindex' }

let(:event) do
Esse::Events::Event.new(event_id, payload)
end

let(:payload) do
{
runtime: 1.32,
request: {
body: {
source: { index: 'source_index' },
dest: { index: 'dest_index' },
}
}
}
end

it 'prints message' do
expect { subject }.to output(<<~MSG).to_stdout
[#{formatted_runtime(1.32)}] Reindex from #{colorize('source_index', :bold)} to #{colorize('dest_index', :bold)} successfuly completed
MSG
end
end

def colorize(*args)
Esse::Output.colorize(*args)
end
Expand Down
32 changes: 32 additions & 0 deletions spec/support/shared_examples/index_reset_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,36 @@
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
end
end

context 'when the old index has data' do
it 'import data from the old index to the new index' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
expect {
GeosIndex.reset_index(suffix: index_suffix, import: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive
end
end

it 'reindex data from the old index to the new index' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
GeosIndex.import(refresh: true)

expect {
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive
end
end
end
end

0 comments on commit 97214f1

Please sign in to comment.