From 68038042d9d6274628c030e881a8d086042bd157 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 16 Aug 2024 18:33:10 -0300 Subject: [PATCH] feat: preload_lazy_attributes on import fetch the existing value from doc search before index --- lib/esse/document.rb | 2 - lib/esse/index/documents.rb | 100 ++++++++++++++---- .../repository_documents_import.rb | 54 ++++++++++ 3 files changed, 133 insertions(+), 23 deletions(-) diff --git a/lib/esse/document.rb b/lib/esse/document.rb index 841d907..a47f9f6 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -115,8 +115,6 @@ def mutate(key) instance_variable_set(:@__mutated_source__, nil) end - protected - def mutated_source return source unless @__mutations__ diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index ae3725d..c64167a 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -114,7 +114,7 @@ def delete(doc = nil, suffix: nil, **options) def update(doc = nil, suffix: nil, **options) if document?(doc) options[:id] = doc.id - options[:body] = { doc: doc.source } + options[:body] = { doc: doc.mutated_source } options[:type] = doc.type if doc.type? options[:routing] = doc.routing if doc.routing? end @@ -140,7 +140,7 @@ def update(doc = nil, suffix: nil, **options) def index(doc = nil, suffix: nil, **options) if document?(doc) options[:id] = doc.id - options[:body] = doc.source + options[:body] = doc.mutated_source options[:type] = doc.type if doc.type? options[:routing] = doc.routing if doc.routing? end @@ -198,39 +198,97 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n # @option [String, nil] :suffix The index suffix. Defaults to the nil. # @option [Hash] :context The collection context. This value will be passed as argument to the collection # May be SQL condition or any other filter you have defined on the collection. + # @option [Boolean, Array] :eager_load_lazy_attributes A list of lazy document attributes to include to the bulk index request. + # Or pass `true` to include all lazy attributes. + # @option [Boolean, Array] :update_lazy_attributes A list of lazy document attributes to bulk update each after the bulk import. + # Or pass `true` to update all lazy attributes. + # @option [Boolean, Array] :preload_lazy_attributes A list of lazy document attributes to preload using search API before the bulk import. + # Or pass `true` to preload all lazy attributes. # @return [Numeric] The number of documents imported def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options) repo_types = repo_hash.keys if repo_types.empty? count = 0 repo_hash.slice(*repo_types).each do |repo_name, repo| - doc_attrs = {eager: [], lazy: []} - doc_attrs[:eager] = repo.lazy_document_attribute_names(eager_load_lazy_attributes) - doc_attrs[:lazy] = repo.lazy_document_attribute_names(update_lazy_attributes) - doc_attrs[:lazy] -= doc_attrs[:eager] + # Elasticsearch 6.x and older have multiple types per index. + # This gem supports multiple types per index for backward compatibility, but we recommend to update + # your elasticsearch to a at least 7.x version and use a single type per index. + # + # Note that the repository name will be used as the document type. + # mapping_default_type + bulk_kwargs = { suffix: suffix, type: repo_name, **options } + cluster.may_update_type!(bulk_kwargs) + lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes) + lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes) + lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes) + lazy_attrs_to_update_after -= lazy_attrs_to_eager_load + lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load + + # @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before: + # context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any? + # repo.each_serialized_batch(**context) do |batch| + # bulk(**bulk_kwargs, index: batch) + + # lazy_attrs_to_update_after.each do |attr_name| + # partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?)) + # next if partial_docs.empty? + + # bulk(**bulk_kwargs, update: partial_docs) + # end + # count += batch.size + # end context ||= {} - context[:lazy_attributes] = doc_attrs[:eager] if doc_attrs[:eager].any? - repo.each_serialized_batch(**context) do |batch| - # Elasticsearch 6.x and older have multiple types per index. - # This gem supports multiple types per index for backward compatibility, but we recommend to update - # your elasticsearch to a at least 7.x version and use a single type per index. - # - # Note that the repository name will be used as the document type. - # mapping_default_type - kwargs = { suffix: suffix, type: repo_name, **options } - cluster.may_update_type!(kwargs) + repo.send(:each_batch, **context) do |*args| + batch, collection_context = args + collection_context ||= {} + entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact + + if lazy_attrs_to_eager_load + attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load) + attrs.each do |attr_name| + repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| + doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } + doc&.mutate(attr_name) { value } + end + end + end + + preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} } + if lazy_attrs_to_search_preload.any? + hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits + hits.each do |hit| + doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly + next unless doc_header.valid? + hit.dig('_source')&.each do |attr_name, attr_value| + real_attr_name = repo.lazy_document_attribute_names(attr_name).first + preload_search_result[real_attr_name][doc_header] = attr_value + end + end + preload_search_result.each do |attr_name, values| + values.each do |doc_header, value| + doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } + doc&.mutate(attr_name) { value } + end + end + end + + bulk(**bulk_kwargs, index: entries) - bulk(**kwargs, index: batch) + lazy_attrs_to_update_after.each do |attr_name| + preloaded_ids = preload_search_result[attr_name].keys + filtered_docs = entries.reject do |doc| + doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing } + end + next if filtered_docs.empty? - doc_attrs[:lazy].each do |attr_name| - partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?)) + partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs) next if partial_docs.empty? - bulk(**kwargs, update: partial_docs) + bulk(**bulk_kwargs, update: partial_docs) end - count += batch.size + count += entries.size end end count diff --git a/spec/support/shared_examples/repository_documents_import.rb b/spec/support/shared_examples/repository_documents_import.rb index 955041b..587cc3e 100644 --- a/spec/support/shared_examples/repository_documents_import.rb +++ b/spec/support/shared_examples/repository_documents_import.rb @@ -143,6 +143,60 @@ end end + context 'when the preload_lazy_attributes is set' do + it 'search the given lazy document attributes before the bulk import' do + es_client do |client, _conf, cluster| + GeosIndex.create_index(alias: true) + + doc_to_import = GeosIndex::County.documents(conditions: ->(h) { h[:id] == 888 }).first + doc_to_import.mutate(:country) { 'BR' } + GeosIndex.index(doc_to_import, refresh: :wait_for) + + resp = nil + expect { + resp = GeosIndex::County.import(preload_lazy_attributes: %i[country]) + }.not_to raise_error + expect(resp).to eq(total_counties) + + GeosIndex.refresh + expect(GeosIndex.count).to eq(total_counties) + + doc = GeosIndex.get(id: '888') + expect(doc.dig('_source', 'country')).to eq('BR') + + doc = GeosIndex.get(id: '999') + expect(doc.dig('_source', 'country')).to eq(nil) + end + end + end + + context 'when the both preload_lazy_attributes and update_lazy_attributes are set' do + it 'search the given lazy document attributes before the bulk import, and do an additional bulk update for the not preloaded attributes' do + es_client do |client, _conf, cluster| + GeosIndex.create_index(alias: true) + + doc_to_import = GeosIndex::County.documents(conditions: ->(h) { h[:id] == 888 }).first + doc_to_import.mutate(:country) { 'BR' } + GeosIndex.index(doc_to_import, refresh: :wait_for) + + resp = nil + expect { + resp = GeosIndex::County.import(preload_lazy_attributes: %i[country], update_lazy_attributes: %i[country]) + }.not_to raise_error + expect(resp).to eq(total_counties) + + GeosIndex.refresh + expect(GeosIndex.count).to eq(total_counties) + + doc = GeosIndex.get(id: '888') + expect(doc.dig('_source', 'country')).to eq('BR') + + doc = GeosIndex.get(id: '999') + expect(doc.dig('_source', 'country')).to eq('US') + end + end + end + context 'when the document routing is set' do include_context 'with stories index definition'