Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset index using /_reindex API #21

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/esse/cli/event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ def elasticsearch_bulk(event)
end
print_message(stats.join(', ') + '.')
end

def elasticsearch_reindex(event)
print_message '[%<runtime>s] Reindex from %<from>s to %<to>s successfuly completed',
from: colorize(event[:request].dig(:body, :source, :index), :bold),
to: colorize(event[:request].dig(:body, :dest, :index), :bold),
runtime: formatted_runtime(event[:runtime])
end
end
end
end
6 changes: 4 additions & 2 deletions lib/esse/cli/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ class Index < Base
DESC
option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name'
option :import, type: :boolean, default: true, desc: 'Import documents before point alias to the new index'
option :reindex, type: :boolean, default: false, desc: 'Use _reindex API to import documents from the old index to the new index'
option :optimize, type: :boolean, default: true, desc: 'Optimize index before import documents by disabling refresh_interval and setting number_of_replicas to 0'
option :settings, type: :hash, default: nil, desc: 'List of settings to pass to the index class. Example: --settings=refresh_interval:1s,number_of_replicas:0'
def reset(*index_classes)
require_relative 'index/reset'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
if opts[:import] && opts[:reindex]
raise ArgumentError, 'You cannot use --import and --reindex together'
end
Reset.new(indices: index_classes, **opts).run
end

# @TODO Add reindex task to create a new index and import documents from the old index using _reindex API

desc 'create *INDEX_CLASSES', 'Creates indices for the given classes'
long_desc <<-DESC
Creates index and applies mapping and settings for the given classes.
Expand Down
1 change: 1 addition & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ module Events
register_event 'elasticsearch.exist'
register_event 'elasticsearch.count'
register_event 'elasticsearch.get'
register_event 'elasticsearch.reindex'
end
end
10 changes: 6 additions & 4 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def create_index(suffix: nil, body: nil, settings: nil, **options)
# @return [Hash] the elasticsearch response
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-open-close.html
def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, **options)
def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: true, reindex: false, refresh: nil, **options)
cluster.throw_error_when_readonly!

suffix ||= Esse.timestamp
Expand All @@ -69,9 +69,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru
cluster.api.delete_index(index: index_name)
end
if import
import(**options, suffix: suffix)
elsif reindex && (_from = indices_pointing_to_alias).any?
# @TODO: Reindex using the reindex API
import(**options, suffix: suffix, refresh: refresh)
elsif reindex && (source_indexes = indices_pointing_to_alias).any?
source_indexes.each do |from|
cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh)
end
end

if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
Expand Down
25 changes: 25 additions & 0 deletions lib/esse/transport/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ def update_settings(index:, body:, **options)
payload[:response] = coerce_exception { client.indices.put_settings(**opts) }
end
end

# Allows to copy documents from one index to another, optionally filtering the source
# documents by a query, changing the destination index settings, or fetching the
# documents from a remote cluster.
#
# @option arguments [Boolean] :refresh Should the affected indexes be refreshed?
# @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable.
# @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
# @option arguments [Boolean] :wait_for_completion Should the request should block until the reindex is complete.
# @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle.
# @option arguments [Time] :scroll Control how long to keep the search context alive
# @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`.
# @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents)
# @option arguments [Hash] :headers Custom HTTP headers
# @option arguments [Hash] :body The search definition using the Query DSL and the prototype for the index request. (*Required*)
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
def reindex(body:, **options)
throw_error_when_readonly!

Esse::Events.instrument('elasticsearch.reindex') do |payload|
payload[:request] = opts = options.merge(body: body)
payload[:response] = coerce_exception { client.reindex(**opts) }
end
end
end

include InstanceMethods
Expand Down
30 changes: 30 additions & 0 deletions spec/esse/cli/event_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,36 @@
end
end

describe '.elasticsearch_reindex' do
subject do
described_class['elasticsearch.reindex'].call(event)
end

let(:event_id) { 'elasticsearch.reindex' }

let(:event) do
Esse::Events::Event.new(event_id, payload)
end

let(:payload) do
{
runtime: 1.32,
request: {
body: {
source: { index: 'source_index' },
dest: { index: 'dest_index' },
}
}
}
end

it 'prints message' do
expect { subject }.to output(<<~MSG).to_stdout
[#{formatted_runtime(1.32)}] Reindex from #{colorize('source_index', :bold)} to #{colorize('dest_index', :bold)} successfuly completed
MSG
end
end

def colorize(*args)
Esse::Output.colorize(*args)
end
Expand Down
27 changes: 23 additions & 4 deletions spec/esse/cli/index/reset_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
end

context 'with a valid index name' do
let(:defaults) { { import: true, optimize: true, reindex: false } }

before do
stub_index(:counties)
stub_index(:cities)
Expand All @@ -35,7 +37,7 @@
end

specify do
expect(CountiesIndex).to receive(:reset_index).with(suffix: 'foo', import: true, optimize: true).and_return(true)
expect(CountiesIndex).to receive(:reset_index).with(**defaults, suffix: 'foo').and_return(true)
cli_exec(%w[index reset CountiesIndex --suffix=foo])
end

Expand All @@ -51,15 +53,32 @@
cli_exec(%w[index reset all])
end

specify do
expect(CountiesIndex).to receive(:reset_index).with(import: true, optimize: false).and_return(true)
it 'allows to pass --no-import' do
expect(CountiesIndex).to receive(:reset_index).with(**defaults, optimize: false).and_return(true)
cli_exec(%w[index reset CountiesIndex --no-optimize])
end

it 'allows to pass --settings as a hash with imploded values' do
expect(CountiesIndex).to receive(:reset_index).with(import: true, optimize: true, settings: { 'index.refresh_interval': '-1' }).and_return(true)
expect(CountiesIndex).to receive(:reset_index).with(**defaults, settings: { 'index.refresh_interval': '-1' }).and_return(true)
cli_exec(%w[index reset CountiesIndex --settings=index.refresh_interval:-1])
end

it 'raises an error if --import and --reindex are used together' do
expect {
cli_exec(%w[index reset CountiesIndex --reindex])
}.to raise_error(ArgumentError, 'You cannot use --import and --reindex together')
end

it 'raises an error if --import and --reindex are used together' do
expect {
cli_exec(%w[index reset CountiesIndex --import --reindex])
}.to raise_error(ArgumentError, 'You cannot use --import and --reindex together')
end

it 'forwards the --reindex option to the index class' do
expect(CountiesIndex).to receive(:reset_index).with(**defaults, import: false, reindex: true).and_return(true)
cli_exec(%w[index reset CountiesIndex --reindex --no-import])
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '5.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '6.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '7.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '8.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
end
32 changes: 32 additions & 0 deletions spec/support/shared_examples/index_reset_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,36 @@
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
end
end

context 'when the old index has data' do
it 'import data from the old index to the new index' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
expect {
GeosIndex.reset_index(suffix: index_suffix, import: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive
end
end

it 'reindex data from the old index to the new index' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true, suffix: '2021')
GeosIndex.import(refresh: true)

expect {
GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true)
}.not_to raise_error

expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])
expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true)
expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true)
expect(GeosIndex.count).to be_positive
end
end
end
end
54 changes: 54 additions & 0 deletions spec/support/shared_examples/transport_reindex.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

RSpec.shared_examples 'transport#reindex' do
let(:body) do
{
settings: {
index: {
number_of_shards: 1,
number_of_replicas: 0
}
}
}
end

it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do
es_client do |client, _conf, cluster|
cluster.warm_up!
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } })
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an #<Esse::Transport::NotFoundError exception when the source index does not exist' do
es_client do |_client, _conf, cluster|
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_non_existent_index" }, dest: { index: "#{cluster.index_prefix}_to" } })
}.to raise_error(Esse::Transport::NotFoundError)
end
end

context 'when the source index exists' do
it 'reindexes the source index to the destination index' do
es_client do |client, _conf, cluster|
source_index = "#{cluster.index_prefix}_reindex_from"
dest_index = "#{cluster.index_prefix}_reindex_to"
cluster.api.create_index(index: source_index, body: body)
cluster.api.create_index(index: dest_index, body: body)
cluster.api.index(index: source_index, id: 1, body: { title: 'foo' }, refresh: true)

resp = nil
expect {
resp = cluster.api.reindex(body: { source: { index: source_index }, dest: { index: dest_index } }, refresh: true)
}.not_to raise_error
expect(resp['total']).to eq(1)

resp = cluster.api.get(index: dest_index, id: 1, _source: false)
expect(resp['found']).to eq(true)
end
end
end
end
Loading