diff --git a/CHANGELOG.md b/CHANGELOG.md index 25b083d..c1ab6c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Rename eager_include_document_attributes to eager_load_lazy_attributes * Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing * Add `delete_by_query` action to transport and index APIs +* Extends the index to support the `_reindex` api ## 0.3.6 - 2024-08-07 * Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source diff --git a/gemfiles/Gemfile.elasticsearch-1.x.lock b/gemfiles/Gemfile.elasticsearch-1.x.lock index 08a30f9..5c84b72 100644 --- a/gemfiles/Gemfile.elasticsearch-1.x.lock +++ b/gemfiles/Gemfile.elasticsearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-2.x.lock b/gemfiles/Gemfile.elasticsearch-2.x.lock index 6bb2f70..4dd2e66 100644 --- a/gemfiles/Gemfile.elasticsearch-2.x.lock +++ b/gemfiles/Gemfile.elasticsearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-5.x.lock b/gemfiles/Gemfile.elasticsearch-5.x.lock index f4b4952..4dedda8 100644 --- a/gemfiles/Gemfile.elasticsearch-5.x.lock +++ b/gemfiles/Gemfile.elasticsearch-5.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-6.x.lock b/gemfiles/Gemfile.elasticsearch-6.x.lock index 6b79618..f8068f9 100644 --- a/gemfiles/Gemfile.elasticsearch-6.x.lock +++ b/gemfiles/Gemfile.elasticsearch-6.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-7.x.lock b/gemfiles/Gemfile.elasticsearch-7.x.lock index b055529..4afc4eb 100644 --- a/gemfiles/Gemfile.elasticsearch-7.x.lock +++ b/gemfiles/Gemfile.elasticsearch-7.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.elasticsearch-8.x.lock b/gemfiles/Gemfile.elasticsearch-8.x.lock index e47e461..20bb982 100644 --- a/gemfiles/Gemfile.elasticsearch-8.x.lock +++ b/gemfiles/Gemfile.elasticsearch-8.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.opensearch-1.x.lock b/gemfiles/Gemfile.opensearch-1.x.lock index 780d07f..2c5fbe5 100644 --- a/gemfiles/Gemfile.opensearch-1.x.lock +++ b/gemfiles/Gemfile.opensearch-1.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/gemfiles/Gemfile.opensearch-2.x.lock b/gemfiles/Gemfile.opensearch-2.x.lock index b36c8f7..e6c3827 100644 --- a/gemfiles/Gemfile.opensearch-2.x.lock +++ b/gemfiles/Gemfile.opensearch-2.x.lock @@ -1,7 +1,7 @@ PATH remote: .. specs: - esse (0.4.0.rc2) + esse (0.4.0.rc3) multi_json thor (>= 0.19) diff --git a/lib/esse/cli.rb b/lib/esse/cli.rb index 21bd4c0..74d389f 100644 --- a/lib/esse/cli.rb +++ b/lib/esse/cli.rb @@ -3,6 +3,7 @@ require 'thor' require_relative 'primitives/output' +require_relative 'cli/parser/bool_or_hash' require_relative 'cli/index' require_relative 'cli/generate' require_relative 'cli/event_listener' diff --git a/lib/esse/cli/index.rb b/lib/esse/cli/index.rb index 4b0feda..131cfe7 100644 --- a/lib/esse/cli/index.rb +++ b/lib/esse/cli/index.rb @@ -15,13 +15,15 @@ class Index < Base * Delete the old index. DESC option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name' - option :import, type: :boolean, default: true, desc: 'Import documents before point alias to the new index' - option :reindex, type: :boolean, default: false, desc: 'Use _reindex API to import documents from the old index to the new index' + option :import, desc: 'Import documents before point alias to the new index' + option :reindex, desc: 'Use _reindex API to import documents from the old index to the new index' option :optimize, type: :boolean, default: true, desc: 'Optimize index before import documents by disabling refresh_interval and setting number_of_replicas to 0' option :settings, type: :hash, default: nil, desc: 'List of settings to pass to the index class. Example: --settings=refresh_interval:1s,number_of_replicas:0' def reset(*index_classes) require_relative 'index/reset' opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym) + opts[:reindex] = Parser::BoolOrHash.new(:reindex, default: false).parse(opts[:reindex]) + opts[:import] = Parser::BoolOrHash.new(:import, default: true).parse(opts[:import]) if opts[:import] && opts[:reindex] raise ArgumentError, 'You cannot use --import and --reindex together' end @@ -100,7 +102,6 @@ def open(*index_classes) option :preload_lazy_attributes, type: :string, default: nil, desc: 'Command separated list of lazy document attributes to preload using search API before the bulk import. Or pass `true` to preload all lazy attributes' option :eager_load_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to include to the bulk index request. Or pass `true` to include all lazy attributes' option :update_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to bulk update after the bulk index request Or pass `true` to include all lazy attributes' - def import(*index_classes) require_relative 'index/import' opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym) diff --git a/lib/esse/cli/parser/bool_or_hash.rb b/lib/esse/cli/parser/bool_or_hash.rb new file mode 100644 index 0000000..265de09 --- /dev/null +++ b/lib/esse/cli/parser/bool_or_hash.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module Esse + module CLI + module Parser + FALSEY = [false, 'false', 'FALSE', 'f', 'F'].freeze + TRUTHY = [true, 'true', 'TRUE', 't', 'T'].freeze + HASH_MATCHER = /([\w\.\-]+)\:([^\s]+)/.freeze + HASH_SEPARATOR = /[\s]+/.freeze + ARRAY_SEPARATOR = /[\,]+/.freeze + + class BoolOrHash + def initialize(key, default: nil) + @key = key + @default = default + end + + def parse(input) + return true if TRUTHY.include?(input) + return false if FALSEY.include?(input) + return input if input.is_a?(Hash) + return @default if input.nil? + return true if @key.to_s == input + return @default unless HASH_MATCHER.match?(input) + + compact_hash = input.to_s.split(HASH_SEPARATOR).each_with_object({}) do |pair, hash| + key, val = pair.match(HASH_MATCHER).captures + hash[key.to_sym] = may_array(val) + end + return @default if compact_hash.empty? + + Esse::HashUtils.explode_keys(compact_hash) + end + + private + + def may_array(value) + return may_bool(value) unless ARRAY_SEPARATOR.match?(value) + + value.split(ARRAY_SEPARATOR).map { |v| may_bool(v) } + end + + def may_bool(value) + return true if TRUTHY.include?(value) + return false if FALSEY.include?(value) + + value + end + end + end + end +end diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index 495dfea..3e34278 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -53,8 +53,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru suffix ||= Esse.timestamp suffix = Esse.timestamp while index_exist?(suffix: suffix) + syncronous_import = true + syncronous_import = false if reindex.is_a?(Hash) && reindex[:wait_for_completion] == false - if optimize && import + optimized_creation = optimize && syncronous_import && (import || reindex) + if optimized_creation definition = [settings_hash(settings: settings), mappings_hash].reduce(&:merge) number_of_replicas = definition.dig(Esse::SETTING_ROOT_KEY, :index, :number_of_replicas) refresh_interval = definition.dig(Esse::SETTING_ROOT_KEY, :index, :refresh_interval) @@ -68,28 +71,50 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru if index_exist? && aliases.none? cluster.api.delete_index(index: index_name) end + if import import_kwargs = import.is_a?(Hash) ? import : {} - import_kwargs[:refresh] ||= refresh if refresh + import_kwargs[:refresh] ||= refresh unless refresh.nil? import(**options, **import_kwargs, suffix: suffix) elsif reindex && (source_indexes = indices_pointing_to_alias).any? reindex_kwargs = reindex.is_a?(Hash) ? reindex : {} - reindex_kwargs[:wait_for_completion] = true unless reindex_kwargs.key?(:wait_for_completion) + reindex_kwargs[:refresh] ||= refresh unless refresh.nil? source_indexes.each do |from| - cluster.api.reindex(**options, body: { source: { index: from }, dest: { index: index_name(suffix: suffix) } }, refresh: refresh) + reindex(**reindex_kwargs, body: { + source: { index: from }, + dest: { index: index_name(suffix: suffix) } + }) end end - if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval + if optimized_creation && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval update_settings(suffix: suffix, settings: settings) refresh(suffix: suffix) end - update_aliases(suffix: suffix) + update_aliases(suffix: suffix) if syncronous_import true end + # Copies documents from a source to a destination. + # + # To avoid http timeout, we are sending the request with `wait_for_completion: false` and polling the task + # until it is completed. + # + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html + def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options) + resp = cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: false) + return resp unless wait_for_completion + + task_id = resp['task'] + task = nil + while (task = cluster.api.task(id: task_id))['completed'] == false + sleep poll_interval + end + task + end + # Checks the index existance. Returns true or false # # UsersIndex.index_exist? #=> true diff --git a/lib/esse/transport.rb b/lib/esse/transport.rb index 6c73e4a..90ec541 100644 --- a/lib/esse/transport.rb +++ b/lib/esse/transport.rb @@ -3,7 +3,7 @@ module Esse class Transport require_relative './transport/aliases' - require_relative './transport/health' + require_relative './transport/cluster' require_relative './transport/indices' require_relative './transport/search' require_relative './transport/documents' diff --git a/lib/esse/transport/health.rb b/lib/esse/transport/cluster.rb similarity index 50% rename from lib/esse/transport/health.rb rename to lib/esse/transport/cluster.rb index 5d740a1..435f023 100644 --- a/lib/esse/transport/health.rb +++ b/lib/esse/transport/cluster.rb @@ -23,6 +23,34 @@ module InstanceMethods def health(**options) coerce_exception { client.cluster.health(**options) } end + + # Returns information about the tasks currently executing on one or more nodes in the cluster. + # + # @option arguments [String] :format a short version of the Accept header, e.g. json, yaml + # @option arguments [List] :nodes A comma-separated list of node IDs or names to limit the returned information; use `_local` to return information from the node you're connecting to, leave empty to get information from all nodes + # @option arguments [List] :actions A comma-separated list of actions that should be returned. Leave empty to return all. + # @option arguments [Boolean] :detailed Return detailed task information (default: false) + # @option arguments [String] :parent_task_id Return tasks with specified parent task id (node_id:task_number). Set to -1 to return all. + # @option arguments [List] :h Comma-separated list of column names to display + # @option arguments [Boolean] :help Return help information + # @option arguments [List] :s Comma-separated list of column names or column aliases to sort by + # @option arguments [String] :time The unit in which to display time values (options: d, h, m, s, ms, micros, nanos) + # @option arguments [Boolean] :v Verbose mode. Display column headers + # @option arguments [Hash] :headers Custom HTTP headers + # + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html + def tasks(**options) + # coerce_exception { client.perform_request('GET', '/_tasks', options).body } + coerce_exception { client.tasks.list(**options) } + end + + def task(id:, **options) + coerce_exception { client.tasks.get(task_id: id, **options) } + end + + def cancel_task(id:, **options) + coerce_exception { client.tasks.cancel(task_id: id, **options) } + end end include InstanceMethods diff --git a/lib/esse/transport/documents.rb b/lib/esse/transport/documents.rb index 6e910eb..83c58a2 100644 --- a/lib/esse/transport/documents.rb +++ b/lib/esse/transport/documents.rb @@ -192,6 +192,104 @@ def bulk(body:, **options) response end end + + + # Performs an update on every document in the index without changing the source, + # for example to pick up a mapping change. + # + # @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices (*Required*) + # @option arguments [String] :analyzer The analyzer to use for the query string + # @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false) + # @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR) + # @option arguments [String] :df The field to use as default where no field prefix is given in the query string + # @option arguments [Number] :from Starting offset (default: 0) + # @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed) + # @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified) + # @option arguments [String] :conflicts What to do when the update by query hits version conflicts? (options: abort, proceed) + # @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all) + # @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored + # @option arguments [String] :pipeline Ingest pipeline to set on index requests made by this action. (default: none) + # @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random) + # @option arguments [String] :q Query in the Lucene query string syntax + # @option arguments [List] :routing A comma-separated list of specific routing values + # @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search + # @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch) + # @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout. + # @option arguments [Number] :size Deprecated, please use `max_docs` instead + # @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents) + # @option arguments [List] :sort A comma-separated list of : pairs + # @option arguments [List] :_source True or false to return the _source field or not, or a list of fields to return + # @option arguments [List] :_source_excludes A list of fields to exclude from the returned _source field + # @option arguments [List] :_source_includes A list of fields to extract and return from the _source field + # @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early. + # @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes + # @option arguments [Boolean] :version Specify whether to return document version as part of a hit + # @option arguments [Boolean] :version_type Should the document increment the version number (internal) on hit or not (reindex) + # @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting + # @option arguments [Boolean] :refresh Should the affected indexes be refreshed? + # @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable. + # @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) + # @option arguments [Number] :scroll_size Size on the scroll request powering the update by query + # @option arguments [Boolean] :wait_for_completion Should the request should block until the update by query operation is complete. + # @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle. + # @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`. + # @option arguments [Hash] :headers Custom HTTP headers + # @option arguments [Hash] :body The search definition using the Query DSL + # + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html + def update_by_query(index:, **options) + throw_error_when_readonly! + + Esse::Events.instrument('elasticsearch.update_by_query') do |payload| + payload[:request] = opts = options.merge(index: index) + payload[:response] = coerce_exception { client.update_by_query(**opts) } + end + end + + # Deletes documents matching the provided query. + # + # @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices + # @option arguments [String] :analyzer The analyzer to use for the query string + # @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false) + # @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR) + # @option arguments [String] :df The field to use as default where no field prefix is given in the query string + # @option arguments [Number] :from Starting offset (default: 0) + # @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed) + # @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified) + # @option arguments [String] :conflicts What to do when the delete by query hits version conflicts? (options: abort, proceed) + # @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all) + # @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored + # @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random) + # @option arguments [String] :q Query in the Lucene query string syntax + # @option arguments [List] :routing A comma-separated list of specific routing values + # @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search + # @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch) + # @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout. + # @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents) + # @option arguments [List] :sort A comma-separated list of : pairs + # @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early. + # @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes + # @option arguments [Boolean] :version Specify whether to return document version as part of a hit + # @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting + # @option arguments [Boolean] :refresh Should the affected indexes be refreshed? + # @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable. + # @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the delete by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) + # @option arguments [Number] :scroll_size Size on the scroll request powering the delete by query + # @option arguments [Boolean] :wait_for_completion Should the request should block until the delete by query is complete. + # @option arguments [Number] :requests_per_second The throttle for this request in sub-requests per second. -1 means no throttle. + # @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`. + # @option arguments [Hash] :headers Custom HTTP headers + # @option arguments [Hash] :body The search definition using the Query DSL (*Required*) + # + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + def delete_by_query(index:, **options) + throw_error_when_readonly! + + Esse::Events.instrument('elasticsearch.delete_by_query') do |payload| + payload[:request] = opts = options.merge(index: index) + payload[:response] = coerce_exception { client.delete_by_query(**opts) } + end + end end include InstanceMethods diff --git a/lib/esse/transport/indices.rb b/lib/esse/transport/indices.rb index 4e5e636..58034d7 100644 --- a/lib/esse/transport/indices.rb +++ b/lib/esse/transport/indices.rb @@ -210,103 +210,6 @@ def reindex(body:, **options) payload[:response] = coerce_exception { client.reindex(**opts) } end end - - # Performs an update on every document in the index without changing the source, - # for example to pick up a mapping change. - # - # @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices (*Required*) - # @option arguments [String] :analyzer The analyzer to use for the query string - # @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false) - # @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR) - # @option arguments [String] :df The field to use as default where no field prefix is given in the query string - # @option arguments [Number] :from Starting offset (default: 0) - # @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed) - # @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified) - # @option arguments [String] :conflicts What to do when the update by query hits version conflicts? (options: abort, proceed) - # @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all) - # @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored - # @option arguments [String] :pipeline Ingest pipeline to set on index requests made by this action. (default: none) - # @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random) - # @option arguments [String] :q Query in the Lucene query string syntax - # @option arguments [List] :routing A comma-separated list of specific routing values - # @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search - # @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch) - # @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout. - # @option arguments [Number] :size Deprecated, please use `max_docs` instead - # @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents) - # @option arguments [List] :sort A comma-separated list of : pairs - # @option arguments [List] :_source True or false to return the _source field or not, or a list of fields to return - # @option arguments [List] :_source_excludes A list of fields to exclude from the returned _source field - # @option arguments [List] :_source_includes A list of fields to extract and return from the _source field - # @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early. - # @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes - # @option arguments [Boolean] :version Specify whether to return document version as part of a hit - # @option arguments [Boolean] :version_type Should the document increment the version number (internal) on hit or not (reindex) - # @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting - # @option arguments [Boolean] :refresh Should the affected indexes be refreshed? - # @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable. - # @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) - # @option arguments [Number] :scroll_size Size on the scroll request powering the update by query - # @option arguments [Boolean] :wait_for_completion Should the request should block until the update by query operation is complete. - # @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle. - # @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`. - # @option arguments [Hash] :headers Custom HTTP headers - # @option arguments [Hash] :body The search definition using the Query DSL - # - # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html - def update_by_query(index:, **options) - throw_error_when_readonly! - - Esse::Events.instrument('elasticsearch.update_by_query') do |payload| - payload[:request] = opts = options.merge(index: index) - payload[:response] = coerce_exception { client.update_by_query(**opts) } - end - end - - # Deletes documents matching the provided query. - # - # @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices - # @option arguments [String] :analyzer The analyzer to use for the query string - # @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false) - # @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR) - # @option arguments [String] :df The field to use as default where no field prefix is given in the query string - # @option arguments [Number] :from Starting offset (default: 0) - # @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed) - # @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified) - # @option arguments [String] :conflicts What to do when the delete by query hits version conflicts? (options: abort, proceed) - # @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all) - # @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored - # @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random) - # @option arguments [String] :q Query in the Lucene query string syntax - # @option arguments [List] :routing A comma-separated list of specific routing values - # @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search - # @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch) - # @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout. - # @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents) - # @option arguments [List] :sort A comma-separated list of : pairs - # @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early. - # @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes - # @option arguments [Boolean] :version Specify whether to return document version as part of a hit - # @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting - # @option arguments [Boolean] :refresh Should the affected indexes be refreshed? - # @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable. - # @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the delete by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) - # @option arguments [Number] :scroll_size Size on the scroll request powering the delete by query - # @option arguments [Boolean] :wait_for_completion Should the request should block until the delete by query is complete. - # @option arguments [Number] :requests_per_second The throttle for this request in sub-requests per second. -1 means no throttle. - # @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`. - # @option arguments [Hash] :headers Custom HTTP headers - # @option arguments [Hash] :body The search definition using the Query DSL (*Required*) - # - # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - def delete_by_query(index:, **options) - throw_error_when_readonly! - - Esse::Events.instrument('elasticsearch.delete_by_query') do |payload| - payload[:request] = opts = options.merge(index: index) - payload[:response] = coerce_exception { client.delete_by_query(**opts) } - end - end end include InstanceMethods diff --git a/lib/esse/version.rb b/lib/esse/version.rb index d13131b..e5cc85b 100644 --- a/lib/esse/version.rb +++ b/lib/esse/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Esse - VERSION = '0.4.0.rc2' + VERSION = '0.4.0.rc3' end diff --git a/spec/esse/cli/index/reset_spec.rb b/spec/esse/cli/index/reset_spec.rb index 875ba51..b08043b 100644 --- a/spec/esse/cli/index/reset_spec.rb +++ b/spec/esse/cli/index/reset_spec.rb @@ -53,7 +53,7 @@ cli_exec(%w[index reset all]) end - it 'allows to pass --no-import' do + it 'allows to pass --no-optimize' do expect(CountiesIndex).to receive(:reset_index).with(**defaults, optimize: false).and_return(true) cli_exec(%w[index reset CountiesIndex --no-optimize]) end @@ -77,7 +77,12 @@ it 'forwards the --reindex option to the index class' do expect(CountiesIndex).to receive(:reset_index).with(**defaults, import: false, reindex: true).and_return(true) - cli_exec(%w[index reset CountiesIndex --reindex --no-import]) + cli_exec(%w[index reset CountiesIndex --reindex --import=false]) + end + + it 'forwards the --import hash option to the index class' do + expect(CountiesIndex).to receive(:reset_index).with(**defaults, import: { lazy_document_attributes: true }).and_return(true) + cli_exec(%w[index reset CountiesIndex --import=lazy_document_attributes:true]) end end end diff --git a/spec/esse/cli/parser/bool_or_hash_spec.rb b/spec/esse/cli/parser/bool_or_hash_spec.rb new file mode 100644 index 0000000..69b8d8e --- /dev/null +++ b/spec/esse/cli/parser/bool_or_hash_spec.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'esse/cli' + +RSpec.describe Esse::CLI::Parser::BoolOrHash do + describe '#parse' do + let(:options) { {} } + let(:key) { :testkey } + + context 'when passing a boolean' do + let(:parser) { described_class.new(key, **options) } + + it { expect(parser.parse('true')).to eq(true) } + it { expect(parser.parse('TRUE')).to eq(true) } + it { expect(parser.parse('t')).to eq(true) } + it { expect(parser.parse('T')).to eq(true) } + it { expect(parser.parse(true)).to eq(true) } + + it { expect(parser.parse('false')).to eq(false) } + it { expect(parser.parse('FALSE')).to eq(false) } + it { expect(parser.parse('f')).to eq(false) } + it { expect(parser.parse('F')).to eq(false) } + it { expect(parser.parse(false)).to eq(false) } + end + + context 'when passing a string' do + let(:parser) { described_class.new(key, **options) } + + it { expect(parser.parse('foo')).to eq(nil) } + it { expect(parser.parse('')).to eq(nil) } + it { expect(parser.parse(nil)).to eq(nil) } + + it "returns true when the key is the same as the input" do + expect(parser.parse(key.to_s)).to eq(true) + end + end + + context 'when passing true as default' do + let(:parser) { described_class.new(key, default: true) } + + it { expect(parser.parse('foo')).to eq(true) } + it { expect(parser.parse('')).to eq(true) } + it { expect(parser.parse(nil)).to eq(true) } + it { expect(parser.parse('true')).to eq(true) } + it { expect(parser.parse('false')).to eq(false) } + end + + context 'when passing false as default' do + let(:parser) { described_class.new(key, default: false) } + + it { expect(parser.parse('foo')).to eq(false) } + it { expect(parser.parse('')).to eq(false) } + it { expect(parser.parse(nil)).to eq(false) } + it { expect(parser.parse('true')).to eq(true) } + it { expect(parser.parse('false')).to eq(false) } + end + + context 'when passing a hash as default' do + let(:parser) { described_class.new(key, default: { foo: 'bar' }) } + + it { expect(parser.parse('foo')).to eq(foo: 'bar') } + it { expect(parser.parse('')).to eq(foo: 'bar') } + it { expect(parser.parse(nil)).to eq(foo: 'bar') } + it { expect(parser.parse('true')).to eq(true) } + it { expect(parser.parse('false')).to eq(false) } + end + + context 'when passing a hash' do + let(:parser) { described_class.new(key, **options) } + + it { expect(parser.parse('foo:bar')).to eq(foo: 'bar') } + it { expect(parser.parse('f_o:bar')).to eq(f_o: 'bar') } + it { expect(parser.parse('f0o:bar')).to eq(f0o: 'bar') } + it { expect(parser.parse('f-o:bar')).to eq('f-o': 'bar') } + it { expect(parser.parse('foo:bar baz:qux')).to eq(foo: 'bar', baz: 'qux') } + + it 'explodes keys' do + expect(parser.parse('a.b.c:d')).to eq(a: { b: { c: 'd' } }) + expect(parser.parse('a.b.c:d a.b.x:y')).to eq(a: { b: { c: 'd', x: 'y' } }) + end + + it 'split comma separated values' do + expect(parser.parse('a:1,2,3')).to eq(a: %w[1 2 3]) + expect(parser.parse('a:1,2,3 b:4,5,6')).to eq(a: %w[1 2 3], b: %w[4 5 6]) + expect(parser.parse('a:b,c:d')).to eq(a: %w[b c:d]) + end + + it 'returns the given value when it is already a hash' do + expect(parser.parse(foo: 'bar')).to eq(foo: 'bar') + end + + it 'coerces the value of hash to boolean' do + expect(parser.parse('foo:true')).to eq(foo: true) + expect(parser.parse('foo:false')).to eq(foo: false) + end + end + end +end diff --git a/spec/esse/integrations/elasticsearch-6/transport/tasks_spec.rb b/spec/esse/integrations/elasticsearch-6/transport/tasks_spec.rb new file mode 100644 index 0000000..29b119e --- /dev/null +++ b/spec/esse/integrations/elasticsearch-6/transport/tasks_spec.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_tasks' + +stack_describe 'elasticsearch', '6.x', Esse::Transport, '#tasks' do + include_examples 'transport#tasks' + include_examples 'transport#task' + include_examples 'transport#cancel_task' +end diff --git a/spec/esse/integrations/elasticsearch-7/transport/tasks_spec.rb b/spec/esse/integrations/elasticsearch-7/transport/tasks_spec.rb new file mode 100644 index 0000000..dba6389 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-7/transport/tasks_spec.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_tasks' + +stack_describe 'elasticsearch', '7.x', Esse::Transport, '#tasks' do + include_examples 'transport#tasks' + include_examples 'transport#task' + include_examples 'transport#cancel_task' +end diff --git a/spec/esse/integrations/elasticsearch-8/transport/tasks_spec.rb b/spec/esse/integrations/elasticsearch-8/transport/tasks_spec.rb new file mode 100644 index 0000000..27341df --- /dev/null +++ b/spec/esse/integrations/elasticsearch-8/transport/tasks_spec.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_tasks' + +stack_describe 'elasticsearch', '8.x', Esse::Transport, '#tasks' do + include_examples 'transport#tasks' + include_examples 'transport#task' + include_examples 'transport#cancel_task' +end diff --git a/spec/support/shared_examples/index_reset_index.rb b/spec/support/shared_examples/index_reset_index.rb index de7882a..3fa99aa 100644 --- a/spec/support/shared_examples/index_reset_index.rb +++ b/spec/support/shared_examples/index_reset_index.rb @@ -82,29 +82,37 @@ end end - it 'reindex data from the old index to the new index' do + it 'create async task to reindex data from the old index and do not update the alias' do es_client do |client, _conf, cluster| GeosIndex.create_index(alias: true, suffix: '2021') GeosIndex.import(refresh: true) expect { - GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true) + GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: false }, refresh: true) }.not_to raise_error - expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"]) + expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_2021"]) expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true) expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true) - expect(GeosIndex.count).to be_positive + + count = 0 + (0..3).each do |t| + GeosIndex.refresh(suffix: index_suffix) + count = GeosIndex.count(suffix: index_suffix) + break if count.positive? + sleep(t) if t.positive? + end + expect(count).to be_positive end end - it 'forwads the reindex options to the reindex method' do + it 'reindex data from the old index to the new index by awaiting for completion' do es_client do |client, _conf, cluster| GeosIndex.create_index(alias: true, suffix: '2021') GeosIndex.import(refresh: true) expect { - GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: true }, refresh: true) + GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: true, poll_interval: 0.2 }, refresh: true) }.not_to raise_error expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"]) diff --git a/spec/support/shared_examples/transport_tasks.rb b/spec/support/shared_examples/transport_tasks.rb new file mode 100644 index 0000000..b64e771 --- /dev/null +++ b/spec/support/shared_examples/transport_tasks.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'transport#tasks' do + it 'retrieves the tasks of cluster' do + es_client do |_client, _conf, cluster| + expect(resp = cluster.api.tasks).to be_a(Hash) + expect(resp).to have_key('nodes') + end + end + + it 'does not raise Esse::Transport::ReadonlyClusterError error when the cluster is readonly' do + es_client do |_client, _conf, cluster| + cluster.readonly = true + expect { + cluster.api.tasks + }.not_to raise_error + end + end +end + +RSpec.shared_examples 'transport#task' do + it 'retrieves the task of cluster' do + es_client do |_client, _conf, cluster| + expect(cluster.client.tasks).to receive(:get).with(task_id: '1').and_return({ 'task' => { 'id' => '1' } }) + + expect(resp = cluster.api.task(id: '1')).to be_a(Hash) + end + end +end + +RSpec.shared_examples 'transport#cancel_task' do + it 'cancels the task of cluster' do + es_client do |_client, _conf, cluster| + expect(cluster.client.tasks).to receive(:cancel).with(task_id: '1').and_return({ 'task' => { 'id' => '1' } }) + + expect(resp = cluster.api.cancel_task(id: '1')).to be_a(Hash) + end + end +end