diff --git a/lib/esse/cli/event_listener.rb b/lib/esse/cli/event_listener.rb index 71e14c6..acdddf7 100644 --- a/lib/esse/cli/event_listener.rb +++ b/lib/esse/cli/event_listener.rb @@ -95,6 +95,13 @@ def elasticsearch_bulk(event) end print_message(stats.join(', ') + '.') end + + def elasticsearch_reindex(event) + print_message '[%s] Reindex from %s to %s successfuly completed', + from: colorize(event[:request].dig(:body, :source, :index), :bold), + to: colorize(event[:request].dig(:body, :dest, :index), :bold), + runtime: formatted_runtime(event[:runtime]) + end end end end diff --git a/lib/esse/cli/index.rb b/lib/esse/cli/index.rb index 8650fc5..c39abbe 100644 --- a/lib/esse/cli/index.rb +++ b/lib/esse/cli/index.rb @@ -16,16 +16,18 @@ class Index < Base DESC option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name' option :import, type: :boolean, default: true, desc: 'Import documents before point alias to the new index' + option :reindex, type: :boolean, default: false, desc: 'Use _reindex API to import documents from the old index to the new index' option :optimize, type: :boolean, default: true, desc: 'Optimize index before import documents by disabling refresh_interval and setting number_of_replicas to 0' option :settings, type: :hash, default: nil, desc: 'List of settings to pass to the index class. Example: --settings=refresh_interval:1s,number_of_replicas:0' def reset(*index_classes) require_relative 'index/reset' opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym) + if opts[:import] && opts[:reindex] + raise ArgumentError, 'You cannot use --import and --reindex together' + end Reset.new(indices: index_classes, **opts).run end - # @TODO Add reindex task to create a new index and import documents from the old index using _reindex API - desc 'create *INDEX_CLASSES', 'Creates indices for the given classes' long_desc <<-DESC Creates index and applies mapping and settings for the given classes. diff --git a/lib/esse/events.rb b/lib/esse/events.rb index fbc1ab5..83e6574 100644 --- a/lib/esse/events.rb +++ b/lib/esse/events.rb @@ -56,5 +56,6 @@ module Events register_event 'elasticsearch.exist' register_event 'elasticsearch.count' register_event 'elasticsearch.get' + register_event 'elasticsearch.reindex' end end diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 2b9fa09..148152c 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -48,7 +48,7 @@ def create_index(suffix: nil, body: nil, settings: nil, **options) # @return [Hash] the elasticsearch response # # @see https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-open-close.html - def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, **options) + def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **options) cluster.throw_error_when_readonly! suffix ||= Esse.timestamp @@ -69,9 +69,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru cluster.api.delete_index(index: index_name) end if import - import(**options, suffix: suffix) - elsif reindex && (_from = indices_pointing_to_alias).any? - # @TODO: Reindex using the reindex API + import(**options, suffix: suffix, refresh: refresh) + elsif reindex && (source_indexes = indices_pointing_to_alias).any? + source_indexes.each do |from| + cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh) + end end if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval diff --git a/lib/esse/transport/indices.rb b/lib/esse/transport/indices.rb index 4e77aa8..58034d7 100644 --- a/lib/esse/transport/indices.rb +++ b/lib/esse/transport/indices.rb @@ -185,6 +185,31 @@ def update_settings(index:, body:, **options) payload[:response] = coerce_exception { client.indices.put_settings(**opts) } end end + + # Allows to copy documents from one index to another, optionally filtering the source + # documents by a query, changing the destination index settings, or fetching the + # documents from a remote cluster. + # + # @option arguments [Boolean] :refresh Should the affected indexes be refreshed? + # @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable. + # @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) + # @option arguments [Boolean] :wait_for_completion Should the request should block until the reindex is complete. + # @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle. + # @option arguments [Time] :scroll Control how long to keep the search context alive + # @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`. + # @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents) + # @option arguments [Hash] :headers Custom HTTP headers + # @option arguments [Hash] :body The search definition using the Query DSL and the prototype for the index request. (*Required*) + # + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html + def reindex(body:, **options) + throw_error_when_readonly! + + Esse::Events.instrument('elasticsearch.reindex') do |payload| + payload[:request] = opts = options.merge(body: body) + payload[:response] = coerce_exception { client.reindex(**opts) } + end + end end include InstanceMethods diff --git a/spec/esse/cli/event_listener_spec.rb b/spec/esse/cli/event_listener_spec.rb index 0055cfb..d96b6ac 100644 --- a/spec/esse/cli/event_listener_spec.rb +++ b/spec/esse/cli/event_listener_spec.rb @@ -222,6 +222,36 @@ end end + describe '.elasticsearch_reindex' do + subject do + described_class['elasticsearch.reindex'].call(event) + end + + let(:event_id) { 'elasticsearch.reindex' } + + let(:event) do + Esse::Events::Event.new(event_id, payload) + end + + let(:payload) do + { + runtime: 1.32, + request: { + body: { + source: { index: 'source_index' }, + dest: { index: 'dest_index' }, + } + } + } + end + + it 'prints message' do + expect { subject }.to output(<<~MSG).to_stdout + [#{formatted_runtime(1.32)}] Reindex from #{colorize('source_index', :bold)} to #{colorize('dest_index', :bold)} successfuly completed + MSG + end + end + def colorize(*args) Esse::Output.colorize(*args) end diff --git a/spec/esse/cli/index/reset_spec.rb b/spec/esse/cli/index/reset_spec.rb index 172fffe..875ba51 100644 --- a/spec/esse/cli/index/reset_spec.rb +++ b/spec/esse/cli/index/reset_spec.rb @@ -24,6 +24,8 @@ end context 'with a valid index name' do + let(:defaults) { { import: true, optimize: true, reindex: false } } + before do stub_index(:counties) stub_index(:cities) @@ -35,7 +37,7 @@ end specify do - expect(CountiesIndex).to receive(:reset_index).with(suffix: 'foo', import: true, optimize: true).and_return(true) + expect(CountiesIndex).to receive(:reset_index).with(**defaults, suffix: 'foo').and_return(true) cli_exec(%w[index reset CountiesIndex --suffix=foo]) end @@ -51,15 +53,32 @@ cli_exec(%w[index reset all]) end - specify do - expect(CountiesIndex).to receive(:reset_index).with(import: true, optimize: false).and_return(true) + it 'allows to pass --no-import' do + expect(CountiesIndex).to receive(:reset_index).with(**defaults, optimize: false).and_return(true) cli_exec(%w[index reset CountiesIndex --no-optimize]) end it 'allows to pass --settings as a hash with imploded values' do - expect(CountiesIndex).to receive(:reset_index).with(import: true, optimize: true, settings: { 'index.refresh_interval': '-1' }).and_return(true) + expect(CountiesIndex).to receive(:reset_index).with(**defaults, settings: { 'index.refresh_interval': '-1' }).and_return(true) cli_exec(%w[index reset CountiesIndex --settings=index.refresh_interval:-1]) end + + it 'raises an error if --import and --reindex are used together' do + expect { + cli_exec(%w[index reset CountiesIndex --reindex]) + }.to raise_error(ArgumentError, 'You cannot use --import and --reindex together') + end + + it 'raises an error if --import and --reindex are used together' do + expect { + cli_exec(%w[index reset CountiesIndex --import --reindex]) + }.to raise_error(ArgumentError, 'You cannot use --import and --reindex together') + end + + it 'forwards the --reindex option to the index class' do + expect(CountiesIndex).to receive(:reset_index).with(**defaults, import: false, reindex: true).and_return(true) + cli_exec(%w[index reset CountiesIndex --reindex --no-import]) + end end end end diff --git a/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb b/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb new file mode 100644 index 0000000..d8f7bea --- /dev/null +++ b/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_reindex' + +stack_describe 'elasticsearch', '5.x', Esse::Transport, '#reindex' do + include_examples 'transport#reindex' +end diff --git a/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb b/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb new file mode 100644 index 0000000..6bc0409 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_reindex' + +stack_describe 'elasticsearch', '6.x', Esse::Transport, '#reindex' do + include_examples 'transport#reindex' +end diff --git a/spec/esse/integrations/elasticsearch-7/transport/reindex_spec.rb b/spec/esse/integrations/elasticsearch-7/transport/reindex_spec.rb new file mode 100644 index 0000000..d2900d5 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-7/transport/reindex_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_reindex' + +stack_describe 'elasticsearch', '7.x', Esse::Transport, '#reindex' do + include_examples 'transport#reindex' +end diff --git a/spec/esse/integrations/elasticsearch-8/transport/reindex_spec.rb b/spec/esse/integrations/elasticsearch-8/transport/reindex_spec.rb new file mode 100644 index 0000000..4f4d76c --- /dev/null +++ b/spec/esse/integrations/elasticsearch-8/transport/reindex_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_reindex' + +stack_describe 'elasticsearch', '8.x', Esse::Transport, '#reindex' do + include_examples 'transport#reindex' +end diff --git a/spec/support/shared_examples/index_reset_index.rb b/spec/support/shared_examples/index_reset_index.rb index 4f5858a..b51c02b 100644 --- a/spec/support/shared_examples/index_reset_index.rb +++ b/spec/support/shared_examples/index_reset_index.rb @@ -52,4 +52,36 @@ expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true) end end + + context 'when the old index has data' do + it 'import data from the old index to the new index' do + es_client do |client, _conf, cluster| + GeosIndex.create_index(alias: true, suffix: '2021') + expect { + GeosIndex.reset_index(suffix: index_suffix, import: true) + }.not_to raise_error + + expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"]) + expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true) + expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true) + expect(GeosIndex.count).to be_positive + end + end + + it 'reindex data from the old index to the new index' do + es_client do |client, _conf, cluster| + GeosIndex.create_index(alias: true, suffix: '2021') + GeosIndex.import(refresh: true) + + expect { + GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true) + }.not_to raise_error + + expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"]) + expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true) + expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true) + expect(GeosIndex.count).to be_positive + end + end + end end diff --git a/spec/support/shared_examples/transport_reindex.rb b/spec/support/shared_examples/transport_reindex.rb new file mode 100644 index 0000000..2beeb2c --- /dev/null +++ b/spec/support/shared_examples/transport_reindex.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'transport#reindex' do + let(:body) do + { + settings: { + index: { + number_of_shards: 1, + number_of_replicas: 0 + } + } + } + end + + it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do + es_client do |client, _conf, cluster| + cluster.warm_up! + expect(client).not_to receive(:perform_request) + cluster.readonly = true + expect { + cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } }) + }.to raise_error(Esse::Transport::ReadonlyClusterError) + end + end + + it 'raises an #