Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update by query #22

Merged
merged 4 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/esse/events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,6 @@ module Events
register_event 'elasticsearch.count'
register_event 'elasticsearch.get'
register_event 'elasticsearch.reindex'
register_event 'elasticsearch.update_by_query'
end
end
14 changes: 14 additions & 0 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ def import(*repo_types, context: {}, eager_include_document_attributes: false, l
count
end

# Update documents by query
#
# @param options [Hash] Hash of paramenters that will be passed along to elasticsearch request
# @option [String, nil] :suffix The index suffix. Defaults to the nil.
#
# @return [Hash] The elasticsearch response hash
def update_by_query(suffix: nil, **options)
definition = {
index: index_name(suffix: suffix),
}.merge(options)
cluster.may_update_type!(definition)
cluster.api.update_by_query(**definition)
end

protected

def document?(doc)
Expand Down
52 changes: 52 additions & 0 deletions lib/esse/transport/indices.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,58 @@ def reindex(body:, **options)
payload[:response] = coerce_exception { client.reindex(**opts) }
end
end

# Performs an update on every document in the index without changing the source,
# for example to pick up a mapping change.
#
# @option arguments [List] :index A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices (*Required*)
# @option arguments [String] :analyzer The analyzer to use for the query string
# @option arguments [Boolean] :analyze_wildcard Specify whether wildcard and prefix queries should be analyzed (default: false)
# @option arguments [String] :default_operator The default operator for query string query (AND or OR) (options: AND, OR)
# @option arguments [String] :df The field to use as default where no field prefix is given in the query string
# @option arguments [Number] :from Starting offset (default: 0)
# @option arguments [Boolean] :ignore_unavailable Whether specified concrete indices should be ignored when unavailable (missing or closed)
# @option arguments [Boolean] :allow_no_indices Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)
# @option arguments [String] :conflicts What to do when the update by query hits version conflicts? (options: abort, proceed)
# @option arguments [String] :expand_wildcards Whether to expand wildcard expression to concrete indices that are open, closed or both. (options: open, closed, hidden, none, all)
# @option arguments [Boolean] :lenient Specify whether format-based query failures (such as providing text to a numeric field) should be ignored
# @option arguments [String] :pipeline Ingest pipeline to set on index requests made by this action. (default: none)
# @option arguments [String] :preference Specify the node or shard the operation should be performed on (default: random)
# @option arguments [String] :q Query in the Lucene query string syntax
# @option arguments [List] :routing A comma-separated list of specific routing values
# @option arguments [Time] :scroll Specify how long a consistent view of the index should be maintained for scrolled search
# @option arguments [String] :search_type Search operation type (options: query_then_fetch, dfs_query_then_fetch)
# @option arguments [Time] :search_timeout Explicit timeout for each search request. Defaults to no timeout.
# @option arguments [Number] :size Deprecated, please use `max_docs` instead
# @option arguments [Number] :max_docs Maximum number of documents to process (default: all documents)
# @option arguments [List] :sort A comma-separated list of <field>:<direction> pairs
# @option arguments [List] :_source True or false to return the _source field or not, or a list of fields to return
# @option arguments [List] :_source_excludes A list of fields to exclude from the returned _source field
# @option arguments [List] :_source_includes A list of fields to extract and return from the _source field
# @option arguments [Number] :terminate_after The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early.
# @option arguments [List] :stats Specific 'tag' of the request for logging and statistical purposes
# @option arguments [Boolean] :version Specify whether to return document version as part of a hit
# @option arguments [Boolean] :version_type Should the document increment the version number (internal) on hit or not (reindex)
# @option arguments [Boolean] :request_cache Specify if request cache should be used for this request or not, defaults to index level setting
# @option arguments [Boolean] :refresh Should the affected indexes be refreshed?
# @option arguments [Time] :timeout Time each individual bulk request should wait for shards that are unavailable.
# @option arguments [String] :wait_for_active_shards Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)
# @option arguments [Number] :scroll_size Size on the scroll request powering the update by query
# @option arguments [Boolean] :wait_for_completion Should the request should block until the update by query operation is complete.
# @option arguments [Number] :requests_per_second The throttle to set on this request in sub-requests per second. -1 means no throttle.
# @option arguments [Number|string] :slices The number of slices this task should be divided into. Defaults to 1, meaning the task isn't sliced into subtasks. Can be set to `auto`.
# @option arguments [Hash] :headers Custom HTTP headers
# @option arguments [Hash] :body The search definition using the Query DSL
#
# @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
def update_by_query(index:, **options)
throw_error_when_readonly!

Esse::Events.instrument('elasticsearch.update_by_query') do |payload|
payload[:request] = opts = options.merge(index: index)
payload[:response] = coerce_exception { client.update_by_query(**opts) }
end
end
end

include InstanceMethods
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '5.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query', doc_type: true
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '5.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
include_examples 'transport#reindex', doc_type: true
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/index_documents_update_by_query'

stack_describe 'elasticsearch', '6.x', Esse::Index, '.update_by_query' do
include_examples 'index.update_by_query', doc_type: true
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '6.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query', doc_type: true
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
require 'support/shared_examples/transport_reindex'

stack_describe 'elasticsearch', '6.x', Esse::Transport, '#reindex' do
include_examples 'transport#reindex'
include_examples 'transport#reindex', doc_type: true
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/index_documents_update_by_query'

stack_describe 'elasticsearch', '7.x', Esse::Index, '.update_by_query' do
include_examples 'index.update_by_query'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '7.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/index_documents_update_by_query'

stack_describe 'elasticsearch', '8.x', Esse::Index, '.update_by_query' do
include_examples 'index.update_by_query'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/transport_update_by_query'

stack_describe 'elasticsearch', '8.x', Esse::Transport, '#update_by_query' do
include_examples 'transport#update_by_query'
end
61 changes: 61 additions & 0 deletions spec/support/shared_examples/index_documents_update_by_query.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

RSpec.shared_examples 'index.update_by_query' do |doc_type: false|
include_context 'with venues index definition'

let(:params) do
doc_type ? { type: 'venue' } : {}
end
let(:doc_params) do
doc_type ? { _type: 'venue' } : {}
end
let(:index_suffix) { SecureRandom.hex(8) }
let(:body) { { script: { source: 'ctx._source.title = "foo"' }, query: { match_all: {} } } }

it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do
es_client do |client, _conf, cluster|
cluster.warm_up!
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
VenuesIndex.update_by_query(body: body, **params)
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an Esse::Transport::ServerError exception when api throws an error' do
es_client do |client, _conf, cluster|
expect {
VenuesIndex.update_by_query(body: body, **params)
}.to raise_error(Esse::Transport::NotFoundError)
end
end

it 'updates the documents in the aliased index' do
es_client do |client, _conf, cluster|
VenuesIndex.create_index(alias: true, suffix: index_suffix)
VenuesIndex.import(refresh: true, suffix: index_suffix, **params)

resp = nil
expect {
resp = VenuesIndex.update_by_query(body: body, **params)
}.not_to raise_error
expect(resp['total']).to eq(total_venues)
expect(resp['updated']).to eq(total_venues)
end
end

it 'updates the documents in the unaliased index' do
es_client do |client, _conf, cluster|
VenuesIndex.create_index(alias: false, suffix: index_suffix)
VenuesIndex.import(refresh: true, suffix: index_suffix, **params)

resp = nil
expect {
resp = VenuesIndex.update_by_query(body: body, suffix: index_suffix, **params)
}.not_to raise_error
expect(resp['total']).to eq(total_venues)
expect(resp['updated']).to eq(total_venues)
end
end
end
15 changes: 9 additions & 6 deletions spec/support/shared_examples/transport_reindex.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# frozen_string_literal: true

RSpec.shared_examples 'transport#reindex' do
RSpec.shared_examples 'transport#reindex' do |doc_type: false|
let(:params) do
doc_type ? { type: 'geo' } : {}
end
let(:body) do
{
settings: {
Expand All @@ -18,15 +21,15 @@
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } })
cluster.api.reindex(**params, body: { source: { index: "#{cluster.index_prefix}_ro_from" }, dest: { index: "#{cluster.index_prefix}_ro_to" } })
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an #<Esse::Transport::NotFoundError exception when the source index does not exist' do
es_client do |_client, _conf, cluster|
expect {
cluster.api.reindex(body: { source: { index: "#{cluster.index_prefix}_non_existent_index" }, dest: { index: "#{cluster.index_prefix}_to" } })
cluster.api.reindex(**params, body: { source: { index: "#{cluster.index_prefix}_non_existent_index" }, dest: { index: "#{cluster.index_prefix}_to" } })
}.to raise_error(Esse::Transport::NotFoundError)
end
end
Expand All @@ -38,15 +41,15 @@
dest_index = "#{cluster.index_prefix}_reindex_to"
cluster.api.create_index(index: source_index, body: body)
cluster.api.create_index(index: dest_index, body: body)
cluster.api.index(index: source_index, id: 1, body: { title: 'foo' }, refresh: true)
cluster.api.index(**params, index: source_index, id: 1, body: { title: 'foo' }, refresh: true)

resp = nil
expect {
resp = cluster.api.reindex(body: { source: { index: source_index }, dest: { index: dest_index } }, refresh: true)
resp = cluster.api.reindex(**params, body: { source: { index: source_index }, dest: { index: dest_index } }, refresh: true)
}.not_to raise_error
expect(resp['total']).to eq(1)

resp = cluster.api.get(index: dest_index, id: 1, _source: false)
resp = cluster.api.get(**params, index: dest_index, id: 1, _source: false)
expect(resp['found']).to eq(true)
end
end
Expand Down
53 changes: 53 additions & 0 deletions spec/support/shared_examples/transport_update_by_query.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

RSpec.shared_examples 'transport#update_by_query' do |doc_type: false|
let(:params) do
doc_type ? { type: 'geo' } : {}
end
let(:body) do
{
settings: {
index: {
number_of_shards: 1,
number_of_replicas: 0
}
}
}
end

it 'raises an Esse::Transport::ReadonlyClusterError exception when the cluster is readonly' do
es_client do |client, _conf, cluster|
cluster.warm_up!
expect(client).not_to receive(:perform_request)
cluster.readonly = true
expect {
cluster.api.update_by_query(**params, index: "#{cluster.index_prefix}_redonly", body: { script: { source: 'ctx._source.title = "foo"' } }, q: '*')
}.to raise_error(Esse::Transport::ReadonlyClusterError)
end
end

it 'raises an #<Esse::Transport::NotFoundError exception when the source index does not exist' do
es_client do |_client, _conf, cluster|
expect {
cluster.api.update_by_query(**params, index: "#{cluster.index_prefix}_non_existent_index", body: { script: { source: 'ctx._source.title = "foo"' }, query: { match_all: {} } })
}.to raise_error(Esse::Transport::NotFoundError)
end
end

context 'when the index exists' do
it 'reindexes the source index to the destination index' do
es_client do |client, _conf, cluster|
index_name = "#{cluster.index_prefix}_update_by_query"
cluster.api.create_index(index: index_name, body: body)
cluster.api.index(**params, index: index_name, id: 1, body: { title: 'old title' }, refresh: true)

resp = nil
expect {
resp = cluster.api.update_by_query(**params, index: index_name, body: { script: { source: 'ctx._source.title = "new title"' }, query: { match_all: {} } })
}.not_to raise_error
expect(resp['total']).to eq(1)
expect(resp['updated']).to eq(1)
end
end
end
end
Loading