Skip to content

Commit

Permalink
feat: add update_by_query action to transport
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Aug 2, 2024
1 parent 15bad91 commit ba4e70b
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ module Events
register_event 'elasticsearch.count'
register_event 'elasticsearch.get'
register_event 'elasticsearch.reindex'
register_event 'elasticsearch.update_by_query'
end
end
52 changes: 52 additions & 0 deletions lib/esse/transport/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,58 @@ def reindex(body:, **options)
payload[:response] = coerce_exception { client.reindex(**opts) }
end
end

# Performs an update on every document in the index without changing the source,
# for example to pick up a mapping change.
#
# @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices (*Required*)
# @option arguments [String] :analyzer The analyzer to use for the query string
# @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false)
# @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR)
# @option arguments [String] :df The field to use as default where no field prefix is given in the query string
# @option arguments [Number] :from Starting offset (default: 0)
# @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed)
# @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)
# @option arguments [String] :conflicts What to do when the update by query hits version conflicts? (options: abort, proceed)
# @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all)
# @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored
# @option arguments [String] :pipeline Ingest pipeline to set on index requests made by this action. (default: none)
# @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random)
# @option arguments [String] :q Query in the Lucene query string syntax
# @option arguments [List] :routing A comma-separated list of specific routing values
# @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search
# @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch)
# @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout.
# @option arguments [Number] :size Deprecated, please use `max_docs` instead
# @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents)
# @option arguments [List] :sort A comma-separated list of <field>:<direction> pairs
# @option arguments [List] :_source True or false to return the _source field or not, or a list of fields to return
# @option arguments [List] :_source_excludes A list of fields to exclude from the returned _source field
# @option arguments [List] :_source_includes A list of fields to extract and return from the _source field
# @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early.
# @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes
# @option arguments [Boolean] :version Specify whether to return document version as part of a hit
# @option arguments [Boolean] :version_type Should the document increment the version number (internal) on hit or not (reindex)
# @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting
# @option arguments [Boolean] :refresh Should the affected indexes be refreshed?
# @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable.
# @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
# @option arguments [Number] :scroll_size Size on the scroll request powering the update by query
# @option arguments [Boolean] :wait_for_completion Should the request should block until the update by query operation is complete.
# @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle.
# @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`.
# @option arguments [Hash] :headers Custom HTTP headers
# @option arguments [Hash] :body The search definition using the Query DSL
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
def update_by_query(index:, **options)
throw_error_when_readonly!

Esse::Events.instrument('elasticsearch.update_by_query') do |payload|
payload[:request] = opts = options.merge(index: index)
payload[:response] = coerce_exception { client.update_by_query(**opts) }
end
end
end

include InstanceMethods
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '5.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '6.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '7.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '8.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query'
end
50 changes: 50 additions & 0 deletions spec/support/shared_examples/transport_update_by_query.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

RSpec.shared_examples 'transport#update_by_query' do
let(:body) do
{
settings: {
index: {
number_of_shards: 1,
number_of_replicas: 0
}
}
}
end

it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do
es_client do |client, _conf, cluster|
cluster.warm_up!
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
cluster.api.update_by_query(index: "#{cluster.index_prefix}_redonly", body: { script: { source: 'ctx._source.title = "foo"' } }, q: '*')
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an #<Esse::Transport::NotFoundError exception when the source index does not exist' do
es_client do |_client, _conf, cluster|
expect {
cluster.api.update_by_query(index: "#{cluster.index_prefix}_non_existent_index", body: { script: { source: 'ctx._source.title = "foo"' }, query: { match_all: {} } })
}.to raise_error(Esse::Transport::NotFoundError)
end
end

context 'when the index exists' do
it 'reindexes the source index to the destination index' do
es_client do |client, _conf, cluster|
index_name = "#{cluster.index_prefix}_update_by_query"
cluster.api.create_index(index: index_name, body: body)
cluster.api.index(index: index_name, id: 1, body: { title: 'old title' }, refresh: true)

resp = nil
expect {
resp = cluster.api.update_by_query(index: index_name, body: { script: { source: 'ctx._source.title = "new title"' }, query: { match_all: {} } })
}.not_to raise_error
expect(resp['total']).to eq(1)
expect(resp['updated']).to eq(1)
end
end
end
end

0 comments on commit ba4e70b

Please sign in to comment.