Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset index using _reindex #26

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Rename eager_include_document_attributes to eager_load_lazy_attributes
* Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing
* Add `delete_by_query` action to transport and index APIs
* Extends the index to support the `_reindex` api

## 0.3.6 - 2024-08-07
* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
1 change: 1 addition & 0 deletions lib/esse/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'thor'

require_relative 'primitives/output'
require_relative 'cli/parser/bool_or_hash'
require_relative 'cli/index'
require_relative 'cli/generate'
require_relative 'cli/event_listener'
Expand Down
7 changes: 4 additions & 3 deletions lib/esse/cli/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class Index < Base
* Delete the old index.
DESC
option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name'
option :import, type: :boolean, default: true, desc: 'Import documents before point alias to the new index'
option :reindex, type: :boolean, default: false, desc: 'Use _reindex API to import documents from the old index to the new index'
option :import, desc: 'Import documents before point alias to the new index'
option :reindex, desc: 'Use _reindex API to import documents from the old index to the new index'
option :optimize, type: :boolean, default: true, desc: 'Optimize index before import documents by disabling refresh_interval and setting number_of_replicas to 0'
option :settings, type: :hash, default: nil, desc: 'List of settings to pass to the index class. Example: --settings=refresh_interval:1s,number_of_replicas:0'
def reset(*index_classes)
require_relative 'index/reset'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
opts[:reindex] = Parser::BoolOrHash.new(:reindex, default: false).parse(opts[:reindex])
opts[:import] = Parser::BoolOrHash.new(:import, default: true).parse(opts[:import])
if opts[:import] && opts[:reindex]
raise ArgumentError, 'You cannot use --import and --reindex together'
end
Expand Down Expand Up @@ -100,7 +102,6 @@ def open(*index_classes)
option :preload_lazy_attributes, type: :string, default: nil, desc: 'Command separated list of lazy document attributes to preload using search API before the bulk import. Or pass `true` to preload all lazy attributes'
option :eager_load_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to include to the bulk index request. Or pass `true` to include all lazy attributes'
option :update_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to bulk update after the bulk index request Or pass `true` to include all lazy attributes'

def import(*index_classes)
require_relative 'index/import'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
Expand Down
52 changes: 52 additions & 0 deletions lib/esse/cli/parser/bool_or_hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module Esse
module CLI
module Parser
FALSEY = [false, 'false', 'FALSE', 'f', 'F'].freeze
TRUTHY = [true, 'true', 'TRUE', 't', 'T'].freeze
HASH_MATCHER = /([\w\.\-]+)\:([^\s]+)/.freeze
HASH_SEPARATOR = /[\s]+/.freeze
ARRAY_SEPARATOR = /[\,]+/.freeze

class BoolOrHash
def initialize(key, default: nil)
@key = key
@default = default
end

def parse(input)
return true if TRUTHY.include?(input)
return false if FALSEY.include?(input)
return input if input.is_a?(Hash)
return @default if input.nil?
return true if @key.to_s == input
return @default unless HASH_MATCHER.match?(input)

compact_hash = input.to_s.split(HASH_SEPARATOR).each_with_object({}) do |pair, hash|
key, val = pair.match(HASH_MATCHER).captures
hash[key.to_sym] = may_array(val)
end
return @default if compact_hash.empty?

Esse::HashUtils.explode_keys(compact_hash)
end

private

def may_array(value)
return may_bool(value) unless ARRAY_SEPARATOR.match?(value)

value.split(ARRAY_SEPARATOR).map { |v| may_bool(v) }
end

def may_bool(value)
return true if TRUTHY.include?(value)
return false if FALSEY.include?(value)

value
end
end
end
end
end
37 changes: 31 additions & 6 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru

suffix ||= Esse.timestamp
suffix = Esse.timestamp while index_exist?(suffix: suffix)
syncronous_import = true
syncronous_import = false if reindex.is_a?(Hash) && reindex[:wait_for_completion] == false

if optimize && import
optimized_creation = optimize && syncronous_import && (import || reindex)
if optimized_creation
definition = [settings_hash(settings: settings), mappings_hash].reduce(&:merge)
number_of_replicas = definition.dig(Esse::SETTING_ROOT_KEY, :index, :number_of_replicas)
refresh_interval = definition.dig(Esse::SETTING_ROOT_KEY, :index, :refresh_interval)
Expand All @@ -68,28 +71,50 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru
if index_exist? && aliases.none?
cluster.api.delete_index(index: index_name)
end

if import
import_kwargs = import.is_a?(Hash) ? import : {}
import_kwargs[:refresh] ||= refresh if refresh
import_kwargs[:refresh] ||= refresh unless refresh.nil?
import(**options, **import_kwargs, suffix: suffix)
elsif reindex && (source_indexes = indices_pointing_to_alias).any?
reindex_kwargs = reindex.is_a?(Hash) ? reindex : {}
reindex_kwargs[:wait_for_completion] = true unless reindex_kwargs.key?(:wait_for_completion)
reindex_kwargs[:refresh] ||= refresh unless refresh.nil?
source_indexes.each do |from|
cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh)
reindex(**reindex_kwargs, body: {
source: { index: from },
dest: { index: index_name(suffix: suffix) }
})
end
end

if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
if optimized_creation && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
update_settings(suffix: suffix, settings: settings)
refresh(suffix: suffix)
end

update_aliases(suffix: suffix)
update_aliases(suffix: suffix) if syncronous_import

true
end

# Copies documents from a source to a destination.
#
# To avoid http timeout, we are sending the request with `wait_for_completion: false` and polling the task
# until it is completed.
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options)
resp = cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: false)
return resp unless wait_for_completion

task_id = resp['task']
task = nil
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
end
task
end

# Checks the index existance. Returns true or false
#
# UsersIndex.index_exist? #=> true
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Esse
class Transport
require_relative './transport/aliases'
require_relative './transport/health'
require_relative './transport/cluster'
require_relative './transport/indices'
require_relative './transport/search'
require_relative './transport/documents'
Expand Down
28 changes: 28 additions & 0 deletions lib/esse/transport/health.rb → lib/esse/transport/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ module InstanceMethods
def health(**options)
coerce_exception { client.cluster.health(**options) }
end

# Returns information about the tasks currently executing on one or more nodes in the cluster.
#
# @option arguments [String] :format a short version of the Accept header, e.g. json, yaml
# @option arguments [List] :nodes A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes
# @option arguments [List] :actions A comma-separated list of actions that should be returned. Leave empty to return all.
# @option arguments [Boolean] :detailed Return detailed task information (default: false)
# @option arguments [String] :parent_task_id Return tasks with specified parent task id (node_id:task_number). Set to -1 to return all.
# @option arguments [List] :h Comma-separated list of column names to display
# @option arguments [Boolean] :help Return help information
# @option arguments [List] :s Comma-separated list of column names or column aliases to sort by
# @option arguments [String] :time The unit in which to display time values (options: d, h, m, s, ms, micros, nanos)
# @option arguments [Boolean] :v Verbose mode. Display column headers
# @option arguments [Hash] :headers Custom HTTP headers
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html
def tasks(**options)
# coerce_exception { client.perform_request('GET', '/_tasks', options).body }
coerce_exception { client.tasks.list(**options) }
end

def task(id:, **options)
coerce_exception { client.tasks.get(task_id: id, **options) }
end

def cancel_task(id:, **options)
coerce_exception { client.tasks.cancel(task_id: id, **options) }
end
end

include InstanceMethods
Expand Down
Loading
Loading