From 0d7142ae9424a8a5e00a5424de9b590843c1e009 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 23 Aug 2024 20:08:19 -0300 Subject: [PATCH] feat: refactoring by reusing each_serialized_batch --- lib/esse/core.rb | 6 ++ lib/esse/document.rb | 14 ++- lib/esse/index/documents.rb | 95 ++++--------------- lib/esse/repository/object_document_mapper.rb | 39 ++++++-- spec/esse/repository/document_spec.rb | 58 ++++++++++- 5 files changed, 125 insertions(+), 87 deletions(-) diff --git a/lib/esse/core.rb b/lib/esse/core.rb index 7d844f5..f3d4bb5 100644 --- a/lib/esse/core.rb +++ b/lib/esse/core.rb @@ -92,4 +92,10 @@ def self.document?(object) !!(object.is_a?(Esse::Document) && object.id) end + + def self.document_match_with_header?(document, id, routing, type) + id && id.to_s == document.id.to_s && + routing == document.routing && + (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(document.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || document.type == type) + end end diff --git a/lib/esse/document.rb b/lib/esse/document.rb index 53318a3..bb663d5 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -2,6 +2,8 @@ module Esse class Document + MUTATIONS_FALLBACK = {}.freeze + attr_reader :object, :options def initialize(object, **options) @@ -102,6 +104,10 @@ def doc_header end end + def document_for_partial_update(source) + DocumentForPartialUpdate.new(self, source: source) + end + def inspect attributes = %i[id routing source].map do |attr| value = send(attr) @@ -120,14 +126,14 @@ def mutate(key) instance_variable_set(:@__mutated_source__, nil) end + def mutations + @__mutations__ || MUTATIONS_FALLBACK + end + def mutated_source return source unless @__mutations__ @__mutated_source__ ||= source.merge(@__mutations__) end - - def document_for_partial_update(source) - DocumentForPartialUpdate.new(self, source: source) - end end end diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index 9eea726..f611751 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -258,12 +258,6 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l update_lazy_attributes = options.delete(:lazy_update_document_attributes) end - doc_header_check = ->(doc, (id, routing, type)) do - id && id.to_s == doc.id.to_s && - routing == doc.routing && - (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || doc.type == type) - end - repo_hash.slice(*repo_types).each do |repo_name, repo| # Elasticsearch 6.x and older have multiple types per index. # This gem supports multiple types per index for backward compatibility, but we recommend to update @@ -274,83 +268,34 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l bulk_kwargs = { suffix: suffix, type: repo_name, **options } cluster.may_update_type!(bulk_kwargs) - lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes) - lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes) - lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes) - lazy_attrs_to_update_after -= lazy_attrs_to_eager_load - lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load - - # @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before: - # context[:eager_load_lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any? - # repo.each_serialized_batch(**context) do |batch| - # bulk(**bulk_kwargs, index: batch) - - # lazy_attrs_to_update_after.each do |attr_name| - # partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?)) - # next if partial_docs.empty? - - # bulk(**bulk_kwargs, update: partial_docs) - # end - # count += batch.size - # end context ||= {} - repo.send(:each_batch, **context) do |*args| - batch, collection_context = args - collection_context ||= {} - entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact + context[:eager_load_lazy_attributes] = eager_load_lazy_attributes + context[:preload_lazy_attributes] = preload_lazy_attributes + repo.each_serialized_batch(**context) do |batch| + bulk(**bulk_kwargs, index: batch) - lazy_attrs_to_eager_load.each do |attr_name| - repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } - end - end - - preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} } - if lazy_attrs_to_search_preload.any? - entries.group_by(&:routing).each do |routing, docs| - search_request = { query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload } - search_request[:routing] = routing if routing - hits = repo.index.search(**search_request).response.hits - hits.each do |hit| - header = [hit['_id'], hit['_routing'], hit['_type']] - next if header[0].nil? + if update_lazy_attributes != false + attrs = repo.lazy_document_attribute_names(update_lazy_attributes) + attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes) + update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo| + filtered_docs = batch.reject do |doc| + doc.ignore_on_index? || doc.mutations.key?(attr_name) + end + next if filtered_docs.empty? - hit.dig('_source')&.each do |attr_name, attr_value| - real_attr_name = repo.lazy_document_attribute_names(attr_name).first - preload_search_result[real_attr_name][header] = attr_value - end + repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value| + memo[doc.doc_header][attr_name] = value end - preload_search_result.each do |attr_name, values| - values.each do |header, value| - doc = entries.find { |d| doc_header_check.call(d, header) } - doc&.mutate(attr_name) { value } - end + end + if update_attrs.any? + bulk_update = update_attrs.map do |header, values| + header.merge(data: {doc: values}) end + bulk(**bulk_kwargs, update: bulk_update) end end - bulk(**bulk_kwargs, index: entries) - - update_lazy_attrs = Hash.new { |h, k| h[k] = {} } - lazy_attrs_to_update_after.each do |attr_name| - preloaded_ids = preload_search_result[attr_name].keys - filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |header| doc_header_check.call(doc, header) } - end - next if filtered_docs.empty? - - repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).map do |doc, datum| - update_lazy_attrs[doc.doc_header][attr_name] = datum - end - end - if update_lazy_attrs.any? - bulk_update = update_lazy_attrs.map do |header, values| - header.merge(data: {doc: values}) - end - bulk(**bulk_kwargs, update: bulk_update) - end - count += entries.size + count += batch.size end end count diff --git a/lib/esse/repository/object_document_mapper.rb b/lib/esse/repository/object_document_mapper.rb index 9e5a0e0..9f038a2 100644 --- a/lib/esse/repository/object_document_mapper.rb +++ b/lib/esse/repository/object_document_mapper.rb @@ -83,21 +83,46 @@ def collection_class # @param [Hash] kwargs The context # @return [Enumerator] The enumerator # @yield [Array, **context] serialized collection and the optional context from the collection - def each_serialized_batch(eager_load_lazy_attributes: false, **kwargs) + def each_serialized_batch(eager_load_lazy_attributes: false, preload_lazy_attributes: false, **kwargs) if kwargs.key?(:lazy_attributes) warn 'The `lazy_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' eager_load_lazy_attributes = kwargs.delete(:lazy_attributes) end + + lazy_attrs_to_eager_load = lazy_document_attribute_names(eager_load_lazy_attributes) + lazy_attrs_to_search_preload = lazy_document_attribute_names(preload_lazy_attributes) + lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load + each_batch(**kwargs) do |*args| batch, collection_context = args collection_context ||= {} entries = [*batch].map { |entry| serialize(entry, **collection_context) }.compact - if eager_load_lazy_attributes - attrs = eager_load_lazy_attributes.is_a?(Array) ? eager_load_lazy_attributes : lazy_document_attribute_names(eager_load_lazy_attributes) - attrs.each do |attr_name| - retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } + lazy_attrs_to_eager_load.each do |attr_name| + retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } + doc&.mutate(attr_name) { value } + end + end + + if lazy_attrs_to_search_preload.any? + entries.group_by(&:routing).each do |routing, docs| + search_request = { + query: { ids: { values: docs.map(&:id) } }, + size: docs.size, + _source: lazy_attrs_to_search_preload + } + search_request[:routing] = routing if routing + index.search(**search_request).response.hits.each do |hit| + header = [hit['_id'], hit['_routing'], hit['_type']] + next if header[0].nil? + + hit.dig('_source')&.each do |attr_name, attr_value| + real_attr_name = lazy_document_attribute_names(attr_name).first + next if real_attr_name.nil? + + doc = entries.find { |d| Esse.document_match_with_header?(d, *header) } + doc&.mutate(real_attr_name) { attr_value } + end end end end diff --git a/spec/esse/repository/document_spec.rb b/spec/esse/repository/document_spec.rb index 0476b72..85afbcc 100644 --- a/spec/esse/repository/document_spec.rb +++ b/spec/esse/repository/document_spec.rb @@ -154,7 +154,7 @@ end end - context 'with lazy_load_attributes' do + context 'with eager_load_lazy_attributes' do include_context 'with stories index definition' it 'yields serialized objects with lazy attributes when passing eager_load_lazy_attributes: true' do @@ -187,6 +187,62 @@ expect(expected_data.select { |doc| doc.to_h.key?(:tags) && !doc.to_h.key?(:tags_count) }).not_to be_empty end end + + context 'with preload_lazy_attributes' do + include_context 'with stories index definition' + + let(:nyt_hits) do + nyt_stories.map do |hash| + { + '_id' => hash[:id].to_s, + '_routing' => 'nyt', + '_type' => '_doc', + '_source' => { + 'tags' => hash[:tags], + 'tags_count' => hash[:tags].size, + }, + } + end + end + + let(:wsj_hits) do + wsj_stories.map do |hash| + { + '_id' => hash[:id].to_s, + '_routing' => 'wsj', + '_type' => '_doc', + '_source' => { + 'tags' => hash[:tags], + 'tags_count' => hash[:tags].size, + }, + } + end + end + + it 'yields serialized objects with lazy attributes when passing preload_lazy_attributes: true' do + expect(StoriesIndex).to receive(:search).with( + query: { ids: { values: nyt_stories.map { |hash| hash[:id] } } }, + size: nyt_stories.size, + _source: %i[tags tags_count], + routing: 'nyt', + ).and_return(double(response: double(hits: nyt_hits))) + expect(StoriesIndex).to receive(:search).with( + query: { ids: { values: wsj_stories.map { |hash| hash[:id] } } }, + size: wsj_stories.size, + _source: %i[tags tags_count], + routing: 'wsj', + ).and_return(double(response: double(hits: wsj_hits))) + expected_data = [] + expect { + StoriesIndex::Story.each_serialized_batch(preload_lazy_attributes: true) do |batch| + expected_data.push(*batch) + end + }.not_to raise_error + expect(expected_data.map(&:mutations)).to all( + include(tags: be_a(Array), tags_count: be_a(Integer)), + ) + end + end end describe '.documents' do