Skip to content

Commit

Permalink
feat: preload_lazy_attributes on import fetch the existing value from…
Browse files Browse the repository at this point in the history
… doc search before index
  • Loading branch information
marcosgz committed Aug 16, 2024
1 parent 95ba06b commit 6803804
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 23 deletions.
2 changes: 0 additions & 2 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ def mutate(key)
instance_variable_set(:@__mutated_source__, nil)
end

protected

def mutated_source
return source unless @__mutations__

Expand Down
100 changes: 79 additions & 21 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def delete(doc = nil, suffix: nil, **options)
def update(doc = nil, suffix: nil, **options)
if document?(doc)
options[:id] = doc.id
options[:body] = { doc: doc.source }
options[:body] = { doc: doc.mutated_source }
options[:type] = doc.type if doc.type?
options[:routing] = doc.routing if doc.routing?
end
Expand All @@ -140,7 +140,7 @@ def update(doc = nil, suffix: nil, **options)
def index(doc = nil, suffix: nil, **options)
if document?(doc)
options[:id] = doc.id
options[:body] = doc.source
options[:body] = doc.mutated_source
options[:type] = doc.type if doc.type?
options[:routing] = doc.routing if doc.routing?
end
Expand Down Expand Up @@ -198,39 +198,97 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n
# @option [String, nil] :suffix The index suffix. Defaults to the nil.
# @option [Hash] :context The collection context. This value will be passed as argument to the collection
# May be SQL condition or any other filter you have defined on the collection.
# @option [Boolean, Array<String>] :eager_load_lazy_attributes A list of lazy document attributes to include to the bulk index request.
# Or pass `true` to include all lazy attributes.
# @option [Boolean, Array<String>] :update_lazy_attributes A list of lazy document attributes to bulk update each after the bulk import.
# Or pass `true` to update all lazy attributes.
# @option [Boolean, Array<String>] :preload_lazy_attributes A list of lazy document attributes to preload using search API before the bulk import.
# Or pass `true` to preload all lazy attributes.
# @return [Numeric] The number of documents imported
def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options)
repo_types = repo_hash.keys if repo_types.empty?
count = 0

repo_hash.slice(*repo_types).each do |repo_name, repo|
doc_attrs = {eager: [], lazy: []}
doc_attrs[:eager] = repo.lazy_document_attribute_names(eager_load_lazy_attributes)
doc_attrs[:lazy] = repo.lazy_document_attribute_names(update_lazy_attributes)
doc_attrs[:lazy] -= doc_attrs[:eager]
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
#
# Note that the repository name will be used as the document type.
# mapping_default_type
bulk_kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(bulk_kwargs)

lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes)
lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes)
lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes)
lazy_attrs_to_update_after -= lazy_attrs_to_eager_load
lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load

# @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before:
# context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any?
# repo.each_serialized_batch(**context) do |batch|
# bulk(**bulk_kwargs, index: batch)

# lazy_attrs_to_update_after.each do |attr_name|
# partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
# next if partial_docs.empty?

# bulk(**bulk_kwargs, update: partial_docs)
# end
# count += batch.size
# end
context ||= {}
context[:lazy_attributes] = doc_attrs[:eager] if doc_attrs[:eager].any?
repo.each_serialized_batch(**context) do |batch|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
#
# Note that the repository name will be used as the document type.
# mapping_default_type
kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(kwargs)
repo.send(:each_batch, **context) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact

if lazy_attrs_to_eager_load
attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load)
attrs.each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
end
end
end

preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} }
if lazy_attrs_to_search_preload.any?
hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits
hits.each do |hit|
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly
next unless doc_header.valid?
hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][doc_header] = attr_value
end
end
preload_search_result.each do |attr_name, values|
values.each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
end
end
end

bulk(**bulk_kwargs, index: entries)

bulk(**kwargs, index: batch)
lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing }
end
next if filtered_docs.empty?

doc_attrs[:lazy].each do |attr_name|
partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs)
next if partial_docs.empty?

bulk(**kwargs, update: partial_docs)
bulk(**bulk_kwargs, update: partial_docs)
end

count += batch.size
count += entries.size
end
end
count
Expand Down
54 changes: 54 additions & 0 deletions spec/support/shared_examples/repository_documents_import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,60 @@
end
end

context 'when the preload_lazy_attributes is set' do
it 'search the given lazy document attributes before the bulk import' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true)

doc_to_import = GeosIndex::County.documents(conditions: ->(h) { h[:id] == 888 }).first
doc_to_import.mutate(:country) { 'BR' }
GeosIndex.index(doc_to_import, refresh: :wait_for)

resp = nil
expect {
resp = GeosIndex::County.import(preload_lazy_attributes: %i[country])
}.not_to raise_error
expect(resp).to eq(total_counties)

GeosIndex.refresh
expect(GeosIndex.count).to eq(total_counties)

doc = GeosIndex.get(id: '888')
expect(doc.dig('_source', 'country')).to eq('BR')

doc = GeosIndex.get(id: '999')
expect(doc.dig('_source', 'country')).to eq(nil)
end
end
end

context 'when the both preload_lazy_attributes and update_lazy_attributes are set' do
it 'search the given lazy document attributes before the bulk import, and do an additional bulk update for the not preloaded attributes' do
es_client do |client, _conf, cluster|
GeosIndex.create_index(alias: true)

doc_to_import = GeosIndex::County.documents(conditions: ->(h) { h[:id] == 888 }).first
doc_to_import.mutate(:country) { 'BR' }
GeosIndex.index(doc_to_import, refresh: :wait_for)

resp = nil
expect {
resp = GeosIndex::County.import(preload_lazy_attributes: %i[country], update_lazy_attributes: %i[country])
}.not_to raise_error
expect(resp).to eq(total_counties)

GeosIndex.refresh
expect(GeosIndex.count).to eq(total_counties)

doc = GeosIndex.get(id: '888')
expect(doc.dig('_source', 'country')).to eq('BR')

doc = GeosIndex.get(id: '999')
expect(doc.dig('_source', 'country')).to eq('US')
end
end
end

context 'when the document routing is set' do
include_context 'with stories index definition'

Expand Down

0 comments on commit 6803804

Please sign in to comment.