Skip to content

Commit

Permalink
reset index using /_reindex API (#21)
Browse files Browse the repository at this point in the history
* chore: add --reindex option to the cli index reset command

* feat: Add reindex action to transport api

* feat: new functionality to reindex data using _reindex API after reset
  • Loading branch information
marcosgz authored Aug 2, 2024
1 parent e370f09 commit 15bad91
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 10 deletions.
7 changes: 7 additions & 0 deletions lib/esse/cli/event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ def elasticsearch_bulk(event)
end
print_message(stats.join(', ') + '.')
end

def elasticsearch_reindex(event)
print_message '[%<runtime>s] Reindex from %<from>s to %<to>s successfuly completed',
from: colorize(event[:request].dig(:body, :source, :index), :bold),
to: colorize(event[:request].dig(:body, :dest, :index), :bold),
runtime: formatted_runtime(event[:runtime])
end
end
end
end
6 changes: 4 additions & 2 deletions lib/esse/cli/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ class Index < Base
DESC
option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name'
option :import, type: :boolean, default: true, desc: 'Import documents before point alias to the new index'
option :reindex, type: :boolean, default: false, desc: 'Use _reindex API to import documents from the old index to the new index'
option :optimize, type: :boolean, default: true, desc: 'Optimize index before import documents by disabling refresh_interval and setting number_of_replicas to 0'
option :settings, type: :hash, default: nil, desc: 'List of settings to pass to the index class. Example: --settings=refresh_interval:1s,number_of_replicas:0'
def reset(*index_classes)
require_relative 'index/reset'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
if opts[:import] && opts[:reindex]
raise ArgumentError, 'You cannot use --import and --reindex together'
end
Reset.new(indices: index_classes, **opts).run
end

# @TODO Add reindex task to create a new index and import documents from the old index using _reindex API

desc 'create *INDEX_CLASSES', 'Creates indices for the given classes'
long_desc <<-DESC
Creates index and applies mapping and settings for the given classes.
Expand Down
1 change: 1 addition & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ module Events
register_event 'elasticsearch.exist'
register_event 'elasticsearch.count'
register_event 'elasticsearch.get'
register_event 'elasticsearch.reindex'
end
end
10 changes: 6 additions & 4 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def create_index(suffix: nil, body: nil, settings: nil, **options)
# @return [Hash] the elasticsearch response
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-open-close.html
def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, **options)
def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **options)
cluster.throw_error_when_readonly!

suffix ||= Esse.timestamp
Expand All @@ -69,9 +69,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru
cluster.api.delete_index(index: index_name)
end
if import
import(**options, suffix: suffix)
elsif reindex && (_from = indices_pointing_to_alias).any?
# @TODO: Reindex using the reindex API
import(**options, suffix: suffix, refresh: refresh)
elsif reindex && (source_indexes = indices_pointing_to_alias).any?
source_indexes.each do |from|
cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh)
end
end

if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
Expand Down
25 changes: 25 additions & 0 deletions lib/esse/transport/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ def update_settings(index:, body:, **options)
payload[:response] = coerce_exception { client.indices.put_settings(**opts) }
end
end

# Allows to copy documents from one index to another, optionally filtering the source
# documents by a query, changing the destination index settings, or fetching the
# documents from a remote cluster.
#
# @option arguments [Boolean] :refresh Should the affected indexes be refreshed?
# @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable.
# @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
# @option arguments [Boolean] :wait_for_completion Should the request should block until the reindex is complete.
# @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle.
# @option arguments [Time] :scroll Control how long to keep the search context alive
# @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`.
# @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents)
# @option arguments [Hash] :headers Custom HTTP headers
# @option arguments [Hash] :body The search definition using the Query DSL and the prototype for the index request. (*Required*)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
def reindex(body:, **options)
throw_error_when_readonly!

Esse::Events.instrument('elasticsearch.reindex') do |payload|
payload[:request] = opts = options.merge(body: body)
payload[:response] = coerce_exception { client.reindex(**opts) }
end
end
end

include InstanceMethods
Expand Down
30 changes: 30 additions & 0 deletions spec/esse/cli/event_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,36 @@
end
end

describe '.elasticsearch_reindex' do
subject do
described_class['elasticsearch.reindex'].call(event)
end

let(:event_id) { 'elasticsearch.reindex' }

let(:event) do
Esse::Events::Event.new(event_id, payload)
end

let(:payload) do
{
runtime: 1.32,
request: {
body: {
source: { index: 'source_index' },
dest: { index: 'dest_index' },
}
}
}
end

it 'prints message' do
expect { subject }.to output(<<~MSG).to_stdout
[#{formatted_runtime(1.32)}] Reindex from #{colorize('source_index', :bold)} to #{colorize('dest_index', :bold)} successfuly completed
MSG
end
end

def colorize(*args)
Esse::Output.colorize(*args)
end
Expand Down
27 changes: 23 additions & 4 deletions spec/esse/cli/index/reset_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
end

context 'with a valid index name' do
let(:defaults) { { import: true, optimize: true, reindex: false } }

before do
stub_index(:counties)
stub_index(:cities)
Expand All @@ -35,7 +37,7 @@
end

specify do
expect(CountiesIndex).to receive(:reset_index).with(suffix: 'foo', import: true, optimize: true).and_return(true)
expect(CountiesIndex).to receive(:reset_index).with(**defaults, suffix: 'foo').and_return(true)
cli_exec(%w[index reset CountiesIndex --suffix=foo])
end

Expand All @@ -51,15 +53,32 @@
cli_exec(%w[index reset all])
end

specify do
expect(CountiesIndex).to receive(:reset_index).with(import: true, optimize: false).and_return(true)
it 'allows to pass --no-import' do
expect(CountiesIndex).to receive(:reset_index).with(**defaults, optimize: false).and_return(true)
cli_exec(%w[index reset CountiesIndex --no-optimize])
end

it 'allows to pass --settings as a hash with imploded values' do
expect(CountiesIndex).to receive(:reset_index).with(import: true, optimize: true, settings: { 'index.refresh_interval': '-1' }).and_return(true)
expect(CountiesIndex).to receive(:reset_index).with(**defaults, settings: { 'index.refresh_interval': '-1' }).and_return(true)
cli_exec(%w[index reset CountiesIndex --settings=index.refresh_interval:-1])
end

it 'raises an error if --import and --reindex are used together' do
expect {
cli_exec(%w[index reset CountiesIndex --reindex])
}.to raise_error(ArgumentError, 'You cannot use --import and --reindex together')
end

it 'raises an error if --import and --reindex are used together' do
expect {
cli_exec(%w[index reset CountiesIndex --import --reindex])
}.to raise_error(ArgumentError, 'You cannot use --import and --reindex together')
end

it 'forwards the --reindex option to the index class' do
expect(CountiesIndex).to receive(:reset_index).with(**defaults, import: false, reindex: true).and_return(true)
cli_exec(%w[index reset CountiesIndex --reindex --no-import])
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '5.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '6.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '7.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '8.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
32 changes: 32 additions & 0 deletions spec/support/shared_examples/index_reset_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,36 @@
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
end
end

context 'when the old index has data' do
it 'import data from the old index to the new index' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
expect {
GeosIndex.reset_index(suffix: index_suffix, import: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive
end
end

it 'reindex data from the old index to the new index' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
GeosIndex.import(refresh: true)

expect {
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive
end
end
end
end
54 changes: 54 additions & 0 deletions spec/support/shared_examples/transport_reindex.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

RSpec.shared_examples 'transport#reindex' do
let(:body) do
{
settings: {
index: {
number_of_shards: 1,
number_of_replicas: 0
}
}
}
end

it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do
es_client do |client, _conf, cluster|
cluster.warm_up!
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } })
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an #<Esse::Transport::NotFoundError exception when the source index does not exist' do
es_client do |_client, _conf, cluster|
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_non_existent_index" }, dest: { index: "#{cluster.index_prefix}_to" } })
}.to raise_error(Esse::Transport::NotFoundError)
end
end

context 'when the source index exists' do
it 'reindexes the source index to the destination index' do
es_client do |client, _conf, cluster|
source_index = "#{cluster.index_prefix}_reindex_from"
dest_index = "#{cluster.index_prefix}_reindex_to"
cluster.api.create_index(index: source_index, body: body)
cluster.api.create_index(index: dest_index, body: body)
cluster.api.index(index: source_index, id: 1, body: { title: 'foo' }, refresh: true)

resp = nil
expect {
resp = cluster.api.reindex(body: { source: { index: source_index }, dest: { index: dest_index } }, refresh: true)
}.not_to raise_error
expect(resp['total']).to eq(1)

resp = cluster.api.get(index: dest_index, id: 1, _source: false)
expect(resp['found']).to eq(true)
end
end
end
end

0 comments on commit 15bad91

Please sign in to comment.