Skip to content

Commit

Permalink
Reset index using _reindex (#26)
Browse files Browse the repository at this point in the history
* chore: add custom cli type that accept both bool and hashes

* chore: hash values may also be a bool

* chore: separate reindex task

* chore: move delete and update by query to proper location

* chore: rename health file to cluster

* feat: add tasks to the cluster api

* feat: improve the index reset task by polling the task id

* chore: bump version
  • Loading branch information
marcosgz authored Oct 4, 2024
1 parent cb839a3 commit 70fdc03
Show file tree
Hide file tree
Showing 25 changed files with 414 additions and 124 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Rename eager_include_document_attributes to eager_load_lazy_attributes
* Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing
* Add `delete_by_query` action to transport and index APIs
* Extends the index to support the `_reindex` api

## 0.3.6 - 2024-08-07
* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.4.0.rc2)
esse (0.4.0.rc3)
multi_json
thor (>= 0.19)

Expand Down
1 change: 1 addition & 0 deletions lib/esse/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'thor'

require_relative 'primitives/output'
require_relative 'cli/parser/bool_or_hash'
require_relative 'cli/index'
require_relative 'cli/generate'
require_relative 'cli/event_listener'
Expand Down
7 changes: 4 additions & 3 deletions lib/esse/cli/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ class Index < Base
* Delete the old index.
DESC
option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name'
option :import, type: :boolean, default: true, desc: 'Import documents before point alias to the new index'
option :reindex, type: :boolean, default: false, desc: 'Use _reindex API to import documents from the old index to the new index'
option :import, desc: 'Import documents before point alias to the new index'
option :reindex, desc: 'Use _reindex API to import documents from the old index to the new index'
option :optimize, type: :boolean, default: true, desc: 'Optimize index before import documents by disabling refresh_interval and setting number_of_replicas to 0'
option :settings, type: :hash, default: nil, desc: 'List of settings to pass to the index class. Example: --settings=refresh_interval:1s,number_of_replicas:0'
def reset(*index_classes)
require_relative 'index/reset'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
opts[:reindex] = Parser::BoolOrHash.new(:reindex, default: false).parse(opts[:reindex])
opts[:import] = Parser::BoolOrHash.new(:import, default: true).parse(opts[:import])
if opts[:import] && opts[:reindex]
raise ArgumentError, 'You cannot use --import and --reindex together'
end
Expand Down Expand Up @@ -100,7 +102,6 @@ def open(*index_classes)
option :preload_lazy_attributes, type: :string, default: nil, desc: 'Command separated list of lazy document attributes to preload using search API before the bulk import. Or pass `true` to preload all lazy attributes'
option :eager_load_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to include to the bulk index request. Or pass `true` to include all lazy attributes'
option :update_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to bulk update after the bulk index request Or pass `true` to include all lazy attributes'

def import(*index_classes)
require_relative 'index/import'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
Expand Down
52 changes: 52 additions & 0 deletions lib/esse/cli/parser/bool_or_hash.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module Esse
module CLI
module Parser
FALSEY = [false, 'false', 'FALSE', 'f', 'F'].freeze
TRUTHY = [true, 'true', 'TRUE', 't', 'T'].freeze
HASH_MATCHER = /([\w\.\-]+)\:([^\s]+)/.freeze
HASH_SEPARATOR = /[\s]+/.freeze
ARRAY_SEPARATOR = /[\,]+/.freeze

class BoolOrHash
def initialize(key, default: nil)
@key = key
@default = default
end

def parse(input)
return true if TRUTHY.include?(input)
return false if FALSEY.include?(input)
return input if input.is_a?(Hash)
return @default if input.nil?
return true if @key.to_s == input
return @default unless HASH_MATCHER.match?(input)

compact_hash = input.to_s.split(HASH_SEPARATOR).each_with_object({}) do |pair, hash|
key, val = pair.match(HASH_MATCHER).captures
hash[key.to_sym] = may_array(val)
end
return @default if compact_hash.empty?

Esse::HashUtils.explode_keys(compact_hash)
end

private

def may_array(value)
return may_bool(value) unless ARRAY_SEPARATOR.match?(value)

value.split(ARRAY_SEPARATOR).map { |v| may_bool(v) }
end

def may_bool(value)
return true if TRUTHY.include?(value)
return false if FALSEY.include?(value)

value
end
end
end
end
end
37 changes: 31 additions & 6 deletions lib/esse/index/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru

suffix ||= Esse.timestamp
suffix = Esse.timestamp while index_exist?(suffix: suffix)
syncronous_import = true
syncronous_import = false if reindex.is_a?(Hash) && reindex[:wait_for_completion] == false

if optimize && import
optimized_creation = optimize && syncronous_import && (import || reindex)
if optimized_creation
definition = [settings_hash(settings: settings), mappings_hash].reduce(&:merge)
number_of_replicas = definition.dig(Esse::SETTING_ROOT_KEY, :index, :number_of_replicas)
refresh_interval = definition.dig(Esse::SETTING_ROOT_KEY, :index, :refresh_interval)
Expand All @@ -68,28 +71,50 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru
if index_exist? && aliases.none?
cluster.api.delete_index(index: index_name)
end

if import
import_kwargs = import.is_a?(Hash) ? import : {}
import_kwargs[:refresh] ||= refresh if refresh
import_kwargs[:refresh] ||= refresh unless refresh.nil?
import(**options, **import_kwargs, suffix: suffix)
elsif reindex && (source_indexes = indices_pointing_to_alias).any?
reindex_kwargs = reindex.is_a?(Hash) ? reindex : {}
reindex_kwargs[:wait_for_completion] = true unless reindex_kwargs.key?(:wait_for_completion)
reindex_kwargs[:refresh] ||= refresh unless refresh.nil?
source_indexes.each do |from|
cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh)
reindex(**reindex_kwargs, body: {
source: { index: from },
dest: { index: index_name(suffix: suffix) }
})
end
end

if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
if optimized_creation && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval
update_settings(suffix: suffix, settings: settings)
refresh(suffix: suffix)
end

update_aliases(suffix: suffix)
update_aliases(suffix: suffix) if syncronous_import

true
end

# Copies documents from a source to a destination.
#
# To avoid http timeout, we are sending the request with `wait_for_completion: false` and polling the task
# until it is completed.
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options)
resp = cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: false)
return resp unless wait_for_completion

task_id = resp['task']
task = nil
while (task = cluster.api.task(id: task_id))['completed'] == false
sleep poll_interval
end
task
end

# Checks the index existance. Returns true or false
#
# UsersIndex.index_exist? #=> true
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Esse
class Transport
require_relative './transport/aliases'
require_relative './transport/health'
require_relative './transport/cluster'
require_relative './transport/indices'
require_relative './transport/search'
require_relative './transport/documents'
Expand Down
28 changes: 28 additions & 0 deletions lib/esse/transport/health.rb → lib/esse/transport/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ module InstanceMethods
def health(**options)
coerce_exception { client.cluster.health(**options) }
end

# Returns information about the tasks currently executing on one or more nodes in the cluster.
#
# @option arguments [String] :format a short version of the Accept header, e.g. json, yaml
# @option arguments [List] :nodes A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes
# @option arguments [List] :actions A comma-separated list of actions that should be returned. Leave empty to return all.
# @option arguments [Boolean] :detailed Return detailed task information (default: false)
# @option arguments [String] :parent_task_id Return tasks with specified parent task id (node_id:task_number). Set to -1 to return all.
# @option arguments [List] :h Comma-separated list of column names to display
# @option arguments [Boolean] :help Return help information
# @option arguments [List] :s Comma-separated list of column names or column aliases to sort by
# @option arguments [String] :time The unit in which to display time values (options: d, h, m, s, ms, micros, nanos)
# @option arguments [Boolean] :v Verbose mode. Display column headers
# @option arguments [Hash] :headers Custom HTTP headers
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html
def tasks(**options)
# coerce_exception { client.perform_request('GET', '/_tasks', options).body }
coerce_exception { client.tasks.list(**options) }
end

def task(id:, **options)
coerce_exception { client.tasks.get(task_id: id, **options) }
end

def cancel_task(id:, **options)
coerce_exception { client.tasks.cancel(task_id: id, **options) }
end
end

include InstanceMethods
Expand Down
Loading

0 comments on commit 70fdc03

Please sign in to comment.