From ba4e70b7c27e6718e1d74de5b3466b24796e4082 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 2 Aug 2024 14:18:16 -0300 Subject: [PATCH 1/4] feat: add update_by_query action to transport --- lib/esse/events.rb | 1 + lib/esse/transport/indices.rb | 52 +++++++++++++++++++ .../transport/update_by_query_spec.rb | 8 +++ .../transport/update_by_query_spec.rb | 8 +++ .../transport/update_by_query_spec.rb | 8 +++ .../transport/update_by_query_spec.rb | 8 +++ .../transport_update_by_query.rb | 50 ++++++++++++++++++ 7 files changed, 135 insertions(+) create mode 100644 spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb create mode 100644 spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb create mode 100644 spec/esse/integrations/elasticsearch-7/transport/update_by_query_spec.rb create mode 100644 spec/esse/integrations/elasticsearch-8/transport/update_by_query_spec.rb create mode 100644 spec/support/shared_examples/transport_update_by_query.rb diff --git a/lib/esse/events.rb b/lib/esse/events.rb index 83e6574..3d443c3 100644 --- a/lib/esse/events.rb +++ b/lib/esse/events.rb @@ -57,5 +57,6 @@ module Events register_event 'elasticsearch.count' register_event 'elasticsearch.get' register_event 'elasticsearch.reindex' + register_event 'elasticsearch.update_by_query' end end diff --git a/lib/esse/transport/indices.rb b/lib/esse/transport/indices.rb index 58034d7..2a9f437 100644 --- a/lib/esse/transport/indices.rb +++ b/lib/esse/transport/indices.rb @@ -210,6 +210,58 @@ def reindex(body:, **options) payload[:response] = coerce_exception { client.reindex(**opts) } end end + + # Performs an update on every document in the index without changing the source, + # for example to pick up a mapping change. + # + # @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices (*Required*) + # @option arguments [String] :analyzer The analyzer to use for the query string + # @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false) + # @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR) + # @option arguments [String] :df The field to use as default where no field prefix is given in the query string + # @option arguments [Number] :from Starting offset (default: 0) + # @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed) + # @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified) + # @option arguments [String] :conflicts What to do when the update by query hits version conflicts? (options: abort, proceed) + # @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all) + # @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored + # @option arguments [String] :pipeline Ingest pipeline to set on index requests made by this action. (default: none) + # @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random) + # @option arguments [String] :q Query in the Lucene query string syntax + # @option arguments [List] :routing A comma-separated list of specific routing values + # @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search + # @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch) + # @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout. + # @option arguments [Number] :size Deprecated, please use `max_docs` instead + # @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents) + # @option arguments [List] :sort A comma-separated list of : pairs + # @option arguments [List] :_source True or false to return the _source field or not, or a list of fields to return + # @option arguments [List] :_source_excludes A list of fields to exclude from the returned _source field + # @option arguments [List] :_source_includes A list of fields to extract and return from the _source field + # @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early. + # @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes + # @option arguments [Boolean] :version Specify whether to return document version as part of a hit + # @option arguments [Boolean] :version_type Should the document increment the version number (internal) on hit or not (reindex) + # @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting + # @option arguments [Boolean] :refresh Should the affected indexes be refreshed? + # @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable. + # @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1) + # @option arguments [Number] :scroll_size Size on the scroll request powering the update by query + # @option arguments [Boolean] :wait_for_completion Should the request should block until the update by query operation is complete. + # @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle. + # @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`. + # @option arguments [Hash] :headers Custom HTTP headers + # @option arguments [Hash] :body The search definition using the Query DSL + # + # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html + def update_by_query(index:, **options) + throw_error_when_readonly! + + Esse::Events.instrument('elasticsearch.update_by_query') do |payload| + payload[:request] = opts = options.merge(index: index) + payload[:response] = coerce_exception { client.update_by_query(**opts) } + end + end end include InstanceMethods diff --git a/spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb new file mode 100644 index 0000000..63c62e7 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_update_by_query' + +stack_describe 'elasticsearch', '5.x', Esse::Transport, '#update_by_query' do + include_examples 'transport#update_by_query' +end diff --git a/spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb new file mode 100644 index 0000000..e512a4d --- /dev/null +++ b/spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_update_by_query' + +stack_describe 'elasticsearch', '6.x', Esse::Transport, '#update_by_query' do + include_examples 'transport#update_by_query' +end diff --git a/spec/esse/integrations/elasticsearch-7/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-7/transport/update_by_query_spec.rb new file mode 100644 index 0000000..5fe3e1e --- /dev/null +++ b/spec/esse/integrations/elasticsearch-7/transport/update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_update_by_query' + +stack_describe 'elasticsearch', '7.x', Esse::Transport, '#update_by_query' do + include_examples 'transport#update_by_query' +end diff --git a/spec/esse/integrations/elasticsearch-8/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-8/transport/update_by_query_spec.rb new file mode 100644 index 0000000..c4949ec --- /dev/null +++ b/spec/esse/integrations/elasticsearch-8/transport/update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/transport_update_by_query' + +stack_describe 'elasticsearch', '8.x', Esse::Transport, '#update_by_query' do + include_examples 'transport#update_by_query' +end diff --git a/spec/support/shared_examples/transport_update_by_query.rb b/spec/support/shared_examples/transport_update_by_query.rb new file mode 100644 index 0000000..79e6230 --- /dev/null +++ b/spec/support/shared_examples/transport_update_by_query.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'transport#update_by_query' do + let(:body) do + { + settings: { + index: { + number_of_shards: 1, + number_of_replicas: 0 + } + } + } + end + + it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do + es_client do |client, _conf, cluster| + cluster.warm_up! + expect(client).not_to receive(:perform_request) + cluster.readonly = true + expect { + cluster.api.update_by_query(index: "#{cluster.index_prefix}_redonly", body: { script: { source: 'ctx._source.title = "foo"' } }, q: '*') + }.to raise_error(Esse::Transport::ReadonlyClusterError) + end + end + + it 'raises an # Date: Fri, 2 Aug 2024 14:25:59 -0300 Subject: [PATCH 2/4] feat: add update_by_query method to index --- lib/esse/index/documents.rb | 14 +++++ .../index/documents_update_by_query_spec.rb | 8 +++ .../index/documents_update_by_query_spec.rb | 8 +++ .../indices/documents_update_by_query_spec.rb | 8 +++ .../index_documents_update_by_query.rb | 61 +++++++++++++++++++ 5 files changed, 99 insertions(+) create mode 100644 spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb create mode 100644 spec/esse/integrations/elasticsearch-7/index/documents_update_by_query_spec.rb create mode 100644 spec/esse/integrations/elasticsearch-8/indices/documents_update_by_query_spec.rb create mode 100644 spec/support/shared_examples/index_documents_update_by_query.rb diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index fe61f7f..74f365c 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -236,6 +236,20 @@ def import(*repo_types, context: {}, eager_include_document_attributes: false, l count end + # Update documents by query + # + # @param options [Hash] Hash of paramenters that will be passed along to elasticsearch request + # @option [String, nil] :suffix The index suffix. Defaults to the nil. + # + # @return [Hash] The elasticsearch response hash + def update_by_query(suffix: nil, **options) + definition = { + index: index_name(suffix: suffix), + }.merge(options) + cluster.may_update_type!(definition) + cluster.api.update_by_query(**definition) + end + protected def document?(doc) diff --git a/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb new file mode 100644 index 0000000..cbc0485 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/index_documents_update_by_query' + +stack_describe 'elasticsearch', '6.x', Esse::Index, '.update_by_query' do + include_examples 'index.update_by_query' +end diff --git a/spec/esse/integrations/elasticsearch-7/index/documents_update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-7/index/documents_update_by_query_spec.rb new file mode 100644 index 0000000..37759c3 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-7/index/documents_update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/index_documents_update_by_query' + +stack_describe 'elasticsearch', '7.x', Esse::Index, '.update_by_query' do + include_examples 'index.update_by_query' +end diff --git a/spec/esse/integrations/elasticsearch-8/indices/documents_update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-8/indices/documents_update_by_query_spec.rb new file mode 100644 index 0000000..ba1c4f6 --- /dev/null +++ b/spec/esse/integrations/elasticsearch-8/indices/documents_update_by_query_spec.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'support/shared_examples/index_documents_update_by_query' + +stack_describe 'elasticsearch', '8.x', Esse::Index, '.update_by_query' do + include_examples 'index.update_by_query' +end diff --git a/spec/support/shared_examples/index_documents_update_by_query.rb b/spec/support/shared_examples/index_documents_update_by_query.rb new file mode 100644 index 0000000..6fb44eb --- /dev/null +++ b/spec/support/shared_examples/index_documents_update_by_query.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +RSpec.shared_examples 'index.update_by_query' do |doc_type: false| + include_context 'with venues index definition' + + let(:params) do + doc_type ? { type: 'venue' } : {} + end + let(:doc_params) do + doc_type ? { _type: 'venue' } : {} + end + let(:index_suffix) { SecureRandom.hex(8) } + let(:body) { { script: { source: 'ctx._source.title = "foo"' }, query: { match_all: {} } } } + + it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do + es_client do |client, _conf, cluster| + cluster.warm_up! + expect(client).not_to receive(:perform_request) + cluster.readonly = true + expect { + VenuesIndex.update_by_query(body: body, **params) + }.to raise_error(Esse::Transport::ReadonlyClusterError) + end + end + + it 'raises an Esse::Transport::ServerError exception when api throws an error' do + es_client do |client, _conf, cluster| + expect { + VenuesIndex.update_by_query(body: body, **params) + }.to raise_error(Esse::Transport::NotFoundError) + end + end + + it 'updates the documents in the aliased index' do + es_client do |client, _conf, cluster| + VenuesIndex.create_index(alias: true, suffix: index_suffix) + VenuesIndex.import(refresh: true, suffix: index_suffix, **params) + + resp = nil + expect { + resp = VenuesIndex.update_by_query(body: body, **params) + }.not_to raise_error + expect(resp['total']).to eq(total_venues) + expect(resp['updated']).to eq(total_venues) + end + end + + it 'updates the documents in the unaliased index' do + es_client do |client, _conf, cluster| + VenuesIndex.create_index(alias: false, suffix: index_suffix) + VenuesIndex.import(refresh: true, suffix: index_suffix, **params) + + resp = nil + expect { + resp = VenuesIndex.update_by_query(body: body, suffix: index_suffix, **params) + }.not_to raise_error + expect(resp['total']).to eq(total_venues) + expect(resp['updated']).to eq(total_venues) + end + end +end From 7d469ed98e8650eca790d2281ff5b006aff2e8dc Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 2 Aug 2024 14:37:54 -0300 Subject: [PATCH 3/4] feat: fix specs for es that use type --- ...ry_spec.rb => documents_update_by_query_spec.rb} | 2 +- ...ry_spec.rb => documents_update_by_query_spec.rb} | 2 +- ...ry_spec.rb => documents_update_by_query_spec.rb} | 0 ...ry_spec.rb => documents_update_by_query_spec.rb} | 0 .../shared_examples/transport_update_by_query.rb | 13 ++++++++----- 5 files changed, 10 insertions(+), 7 deletions(-) rename spec/esse/integrations/elasticsearch-5/transport/{update_by_query_spec.rb => documents_update_by_query_spec.rb} (75%) rename spec/esse/integrations/elasticsearch-6/transport/{update_by_query_spec.rb => documents_update_by_query_spec.rb} (75%) rename spec/esse/integrations/elasticsearch-7/transport/{update_by_query_spec.rb => documents_update_by_query_spec.rb} (100%) rename spec/esse/integrations/elasticsearch-8/transport/{update_by_query_spec.rb => documents_update_by_query_spec.rb} (100%) diff --git a/spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-5/transport/documents_update_by_query_spec.rb similarity index 75% rename from spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb rename to spec/esse/integrations/elasticsearch-5/transport/documents_update_by_query_spec.rb index 63c62e7..9240c73 100644 --- a/spec/esse/integrations/elasticsearch-5/transport/update_by_query_spec.rb +++ b/spec/esse/integrations/elasticsearch-5/transport/documents_update_by_query_spec.rb @@ -4,5 +4,5 @@ require 'support/shared_examples/transport_update_by_query' stack_describe 'elasticsearch', '5.x', Esse::Transport, '#update_by_query' do - include_examples 'transport#update_by_query' + include_examples 'transport#update_by_query', doc_type: true end diff --git a/spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-6/transport/documents_update_by_query_spec.rb similarity index 75% rename from spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb rename to spec/esse/integrations/elasticsearch-6/transport/documents_update_by_query_spec.rb index e512a4d..9328280 100644 --- a/spec/esse/integrations/elasticsearch-6/transport/update_by_query_spec.rb +++ b/spec/esse/integrations/elasticsearch-6/transport/documents_update_by_query_spec.rb @@ -4,5 +4,5 @@ require 'support/shared_examples/transport_update_by_query' stack_describe 'elasticsearch', '6.x', Esse::Transport, '#update_by_query' do - include_examples 'transport#update_by_query' + include_examples 'transport#update_by_query', doc_type: true end diff --git a/spec/esse/integrations/elasticsearch-7/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-7/transport/documents_update_by_query_spec.rb similarity index 100% rename from spec/esse/integrations/elasticsearch-7/transport/update_by_query_spec.rb rename to spec/esse/integrations/elasticsearch-7/transport/documents_update_by_query_spec.rb diff --git a/spec/esse/integrations/elasticsearch-8/transport/update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-8/transport/documents_update_by_query_spec.rb similarity index 100% rename from spec/esse/integrations/elasticsearch-8/transport/update_by_query_spec.rb rename to spec/esse/integrations/elasticsearch-8/transport/documents_update_by_query_spec.rb diff --git a/spec/support/shared_examples/transport_update_by_query.rb b/spec/support/shared_examples/transport_update_by_query.rb index 79e6230..5f2feb9 100644 --- a/spec/support/shared_examples/transport_update_by_query.rb +++ b/spec/support/shared_examples/transport_update_by_query.rb @@ -1,6 +1,9 @@ # frozen_string_literal: true -RSpec.shared_examples 'transport#update_by_query' do +RSpec.shared_examples 'transport#update_by_query' do |doc_type: false| + let(:params) do + doc_type ? { type: 'geo' } : {} + end let(:body) do { settings: { @@ -18,7 +21,7 @@ expect(client).not_to receive(:perform_request) cluster.readonly = true expect { - cluster.api.update_by_query(index: "#{cluster.index_prefix}_redonly", body: { script: { source: 'ctx._source.title = "foo"' } }, q: '*') + cluster.api.update_by_query(**params, index: "#{cluster.index_prefix}_redonly", body: { script: { source: 'ctx._source.title = "foo"' } }, q: '*') }.to raise_error(Esse::Transport::ReadonlyClusterError) end end @@ -26,7 +29,7 @@ it 'raises an # Date: Fri, 2 Aug 2024 14:46:23 -0300 Subject: [PATCH 4/4] fix: fix broken specs related es version with type --- .../elasticsearch-5/transport/reindex_spec.rb | 2 +- .../index/documents_update_by_query_spec.rb | 2 +- .../elasticsearch-6/transport/reindex_spec.rb | 2 +- spec/support/shared_examples/transport_reindex.rb | 15 +++++++++------ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb b/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb index d8f7bea..6591aa8 100644 --- a/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb +++ b/spec/esse/integrations/elasticsearch-5/transport/reindex_spec.rb @@ -4,5 +4,5 @@ require 'support/shared_examples/transport_reindex' stack_describe 'elasticsearch', '5.x', Esse::Transport, '#reindex' do - include_examples 'transport#reindex' + include_examples 'transport#reindex', doc_type: true end diff --git a/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb b/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb index cbc0485..b5b3758 100644 --- a/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb +++ b/spec/esse/integrations/elasticsearch-6/index/documents_update_by_query_spec.rb @@ -4,5 +4,5 @@ require 'support/shared_examples/index_documents_update_by_query' stack_describe 'elasticsearch', '6.x', Esse::Index, '.update_by_query' do - include_examples 'index.update_by_query' + include_examples 'index.update_by_query', doc_type: true end diff --git a/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb b/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb index 6bc0409..c23fd3e 100644 --- a/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb +++ b/spec/esse/integrations/elasticsearch-6/transport/reindex_spec.rb @@ -4,5 +4,5 @@ require 'support/shared_examples/transport_reindex' stack_describe 'elasticsearch', '6.x', Esse::Transport, '#reindex' do - include_examples 'transport#reindex' + include_examples 'transport#reindex', doc_type: true end diff --git a/spec/support/shared_examples/transport_reindex.rb b/spec/support/shared_examples/transport_reindex.rb index 2beeb2c..cd5e810 100644 --- a/spec/support/shared_examples/transport_reindex.rb +++ b/spec/support/shared_examples/transport_reindex.rb @@ -1,6 +1,9 @@ # frozen_string_literal: true -RSpec.shared_examples 'transport#reindex' do +RSpec.shared_examples 'transport#reindex' do |doc_type: false| + let(:params) do + doc_type ? { type: 'geo' } : {} + end let(:body) do { settings: { @@ -18,7 +21,7 @@ expect(client).not_to receive(:perform_request) cluster.readonly = true expect { - cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } }) + cluster.api.reindex(**params, body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } }) }.to raise_error(Esse::Transport::ReadonlyClusterError) end end @@ -26,7 +29,7 @@ it 'raises an #