From e2b9586a6d196c045510bed848ec26b7981bb28f Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 09:59:20 -0300 Subject: [PATCH 01/11] during import, use the doc directly should reduce amount of allocated objects --- CHANGELOG.md | 2 +- lib/esse/core.rb | 1 + lib/esse/document.rb | 17 +- lib/esse/document_for_partial_update.rb | 16 ++ lib/esse/index/documents.rb | 11 +- lib/esse/index/object_document_mapper.rb | 2 +- lib/esse/lazy_document_header.rb | 36 ++-- lib/esse/repository/documents.rb | 11 +- lib/esse/repository/object_document_mapper.rb | 6 +- spec/esse/document_for_partial_update_spec.rb | 46 +++++ spec/esse/lazzy_document_header_spec.rb | 168 +++++++++++++++--- spec/esse/repository/lazy_document_spec.rb | 30 ++-- 12 files changed, 260 insertions(+), 86 deletions(-) create mode 100644 lib/esse/document_for_partial_update.rb create mode 100644 spec/esse/document_for_partial_update_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 262d2e2..b91a649 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing ## 0.3.6 - 2024-08-07 -* Esse::LazyDocumentHeader#to_doc return `Esse::LazyDocumentHeader::Document` instance to properly separate context metadata from document source +* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source * Add `.collection_class` method to the `Esse::Repository` class to let external plugins and extensions to access it instead of read @collection_proc variable ## 0.3.5 - 2024-08-02 diff --git a/lib/esse/core.rb b/lib/esse/core.rb index 1b5bf67..7d844f5 100644 --- a/lib/esse/core.rb +++ b/lib/esse/core.rb @@ -6,6 +6,7 @@ module Esse require_relative 'primitives' require_relative 'collection' require_relative 'document' + require_relative 'document_for_partial_update' require_relative 'document_lazy_attribute' require_relative 'lazy_document_header' require_relative 'hash_document' diff --git a/lib/esse/document.rb b/lib/esse/document.rb index a47f9f6..9c3a413 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -84,11 +84,16 @@ def ignore_on_delete? id.nil? end - def ==(other) - other.is_a?(self.class) && ( - id == other.id && type == other.type && routing == other.routing && meta == other.meta && source == other.source - ) + def eql?(other, match_lazy_doc_header: false) + if match_lazy_doc_header && other.is_a?(LazyDocumentHeader) + other.eql?(self) + else + other.is_a?(self.class) && ( + id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta && source == other.source + ) + end end + alias_method :==, :eql? def doc_header { _id: id }.tap do |h| @@ -120,5 +125,9 @@ def mutated_source @__mutated_source__ ||= source.merge(@__mutations__) end + + def document_for_partial_update(source) + DocumentForPartialUpdate.new(self, source: source) + end end end diff --git a/lib/esse/document_for_partial_update.rb b/lib/esse/document_for_partial_update.rb new file mode 100644 index 0000000..88b19f0 --- /dev/null +++ b/lib/esse/document_for_partial_update.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Esse + class DocumentForPartialUpdate < Esse::Document + extend Forwardable + + def_delegators :object, :id, :type, :routing, :options + + attr_reader :source + + def initialize(lazy_header, source:) + @source = source + super(lazy_header) + end + end +end diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index b6dc6dc..91e88ac 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -252,7 +252,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load) attrs.each do |attr_name| repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } doc&.mutate(attr_name) { value } end end @@ -262,8 +262,9 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l if lazy_attrs_to_search_preload.any? hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits hits.each do |hit| - doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly - next unless doc_header.valid? + doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing', '_type')) + next if doc_header.id.nil? + hit.dig('_source')&.each do |attr_name, attr_value| real_attr_name = repo.lazy_document_attribute_names(attr_name).first preload_search_result[real_attr_name][doc_header] = attr_value @@ -271,7 +272,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l end preload_search_result.each do |attr_name, values| values.each do |doc_header, value| - doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } doc&.mutate(attr_name) { value } end end @@ -282,7 +283,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l lazy_attrs_to_update_after.each do |attr_name| preloaded_ids = preload_search_result[attr_name].keys filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing } + doc.ignore_on_index? || preloaded_ids.any? { |d| doc.eql?(d, match_lazy_doc_header: true) } end next if filtered_docs.empty? diff --git a/lib/esse/index/object_document_mapper.rb b/lib/esse/index/object_document_mapper.rb index 933f592..dfcc6e5 100644 --- a/lib/esse/index/object_document_mapper.rb +++ b/lib/esse/index/object_document_mapper.rb @@ -24,7 +24,7 @@ def each_serialized_batch(repo_name = nil, **kwargs, &block) # @return [Enumerator] All serialized entries def documents(repo_name = nil, **kwargs) Enumerator.new do |yielder| - each_serialized_batch(repo_name, **kwargs) do |documents, **_collection_kargs| + each_serialized_batch(repo_name, **kwargs) do |documents| documents.each { |document| yielder.yield(document) } end end diff --git a/lib/esse/lazy_document_header.rb b/lib/esse/lazy_document_header.rb index bfc97a8..7144d3b 100644 --- a/lib/esse/lazy_document_header.rb +++ b/lib/esse/lazy_document_header.rb @@ -2,11 +2,14 @@ module Esse class LazyDocumentHeader + ACCEPTABLE_CLASSES = [Esse::LazyDocumentHeader, Esse::Document].freeze + ACCEPTABLE_DOC_TYPES = [nil, '_doc', 'doc'].freeze + def self.coerce_each(values) arr = [] Esse::ArrayUtils.wrap(values).flatten.map do |value| instance = coerce(value) - arr << instance if instance&.valid? + arr << instance if instance && !instance.id.nil? end arr end @@ -17,7 +20,7 @@ def self.coerce(value) if value.is_a?(Esse::LazyDocumentHeader) value elsif value.is_a?(Esse::Document) - new(**value.options, id: value.id, type: value.type, routing: value.routing) + value elsif value.is_a?(Hash) resp = value.transform_keys do |key| case key @@ -47,10 +50,6 @@ def initialize(id:, type: nil, routing: nil, **extra_attributes) @options = extra_attributes.freeze end - def valid? - !id.nil? - end - def to_h options.merge(_id: id).tap do |hash| hash[:_type] = type if type @@ -58,26 +57,17 @@ def to_h end end - def to_doc(source = {}) - Document.new(self, source: source) + def document_for_partial_update(source) + Esse::DocumentForPartialUpdate.new(self, source: source) end - def eql?(other) - self.class == other.class && id == other.id && type == other.type && routing == other.routing + def eql?(other, **) + ACCEPTABLE_CLASSES.any? { |klass| other.is_a?(klass) } && + id.to_s == other.id.to_s && + routing == other.routing && + ((ACCEPTABLE_DOC_TYPES.include?(type) && ACCEPTABLE_DOC_TYPES.include?(other.type)) || type == other.type) end alias_method :==, :eql? - - class Document < Esse::Document - extend Forwardable - - def_delegators :object, :id, :type, :routing, :options - - attr_reader :source - - def initialize(lazy_header, source: {}) - @source = source - super(lazy_header) - end - end end end + diff --git a/lib/esse/repository/documents.rb b/lib/esse/repository/documents.rb index 0d2a7d6..69e4f3b 100644 --- a/lib/esse/repository/documents.rb +++ b/lib/esse/repository/documents.rb @@ -16,7 +16,7 @@ def update_documents_attribute(name, ids_or_doc_headers = [], kwargs = {}) def documents_for_lazy_attribute(name, ids_or_doc_headers) retrieve_lazy_attribute_values(name, ids_or_doc_headers).map do |doc_header, datum| - doc_header.to_doc(name => datum) + doc_header.document_for_partial_update(name => datum) end end @@ -36,11 +36,10 @@ def retrieve_lazy_attribute_values(name, ids_or_doc_headers) return [] unless result.is_a?(Hash) result.each_with_object({}) do |(key, value), memo| - if key.is_a?(LazyDocumentHeader) && (doc = docs.find { |d| d == key || d.id == key.id }) - memo[doc] = value - elsif (doc = docs.find { |d| d.id == key }) - memo[doc] = value - end + val = docs.find { |doc| doc.eql?(key, match_lazy_doc_header: true) || doc.id == key } + next unless val + + memo[val] = value end end end diff --git a/lib/esse/repository/object_document_mapper.rb b/lib/esse/repository/object_document_mapper.rb index d4f5d95..237b52d 100644 --- a/lib/esse/repository/object_document_mapper.rb +++ b/lib/esse/repository/object_document_mapper.rb @@ -92,13 +92,13 @@ def each_serialized_batch(lazy_attributes: false, **kwargs) attrs = lazy_attributes.is_a?(Array) ? lazy_attributes : lazy_document_attribute_names(lazy_attributes) attrs.each do |attr_name| retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } doc&.mutate(attr_name) { value } end end end - yield entries, **collection_context + yield entries end end @@ -110,7 +110,7 @@ def each_serialized_batch(lazy_attributes: false, **kwargs) # @return [Enumerator] All serialized entries def documents(**kwargs) Enumerator.new do |yielder| - each_serialized_batch(**kwargs) do |docs, **_collection_kargs| + each_serialized_batch(**kwargs) do |docs| docs.each { |document| yielder.yield(document) } end end diff --git a/spec/esse/document_for_partial_update_spec.rb b/spec/esse/document_for_partial_update_spec.rb new file mode 100644 index 0000000..1714b0c --- /dev/null +++ b/spec/esse/document_for_partial_update_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'spec_helper' + +# rubocop:disable RSpec/VerifiedDoubles +RSpec.describe Esse::DocumentForPartialUpdate do + let(:document) { described_class.new(obj, source: source) } + let(:obj) { double(id: 1) } + let(:source) { { foo: :bar } } + + describe '#object' do + subject { document.object } + + it { is_expected.to be obj } + end + + describe '#id' do + subject { document.id } + + it { is_expected.to eq 1 } + end + + describe '#type' do + subject { document.type } + + let(:obj) { double(id: 1, type: 'foo', source: source) } + + it { is_expected.to eq 'foo' } + end + + describe '#routing' do + subject { document.routing } + + let(:obj) { double(id: 1, routing: 'foo', source: source) } + + it { is_expected.to eq 'foo' } + end + + describe '#source' do + subject { document.source } + + let(:obj) { double(id: 1, source: { original: 'source' }) } + + it { is_expected.to eq source } + end +end diff --git a/spec/esse/lazzy_document_header_spec.rb b/spec/esse/lazzy_document_header_spec.rb index f710a53..6e1d628 100644 --- a/spec/esse/lazzy_document_header_spec.rb +++ b/spec/esse/lazzy_document_header_spec.rb @@ -7,22 +7,6 @@ let(:object) { { id: nil } } let(:options) { {} } - describe '#valid?' do - it { expect(doc).to respond_to :valid? } - - it 'returns false' do - expect(doc.valid?).to be_falsey - end - - context 'when id is present' do - let(:object) { { id: 1 } } - - it 'returns true' do - expect(doc.valid?).to be_truthy - end - end - end - describe '#id' do it { expect(doc).to respond_to :id } @@ -118,7 +102,7 @@ let(:object) { Esse::HashDocument.new(_id: 1) } it 'returns a LazyDocumentHeader instance' do - expect(described_class.coerce(object)).to be_a(described_class) + expect(described_class.coerce(object)).to be(object) end end @@ -161,7 +145,7 @@ def id it 'returns a LazyDocumentHeader instance with the proper id' do instance = described_class.coerce(object) - expect(instance).to be_a(described_class) + expect(instance).to be(instance) expect(instance.id).to eq(2) end end @@ -194,40 +178,40 @@ def id expect(described_class.coerce_each([[{_id: 1}], {_id: 2}]).size).to eq(2) end - it 'coerces a list of Esse::Document instances' do + it 'return same instances when the given argument is already a Esse::Document' do list = [Esse::HashDocument.new(_id: 1), Class.new(Esse::HashDocument).new(_id: 2)] - expect(described_class.coerce_each(list)).to all(be_a(described_class)) + expect(described_class.coerce_each(list)).to all(be_a(Esse::HashDocument)) end end - describe '#to_doc' do + describe '#document_for_partial_update' do let(:options) { { admin: true } } let(:object) { { id: 1, routing: 'il', type: 'state' } } - it { expect(doc).to respond_to :to_doc } + it { expect(doc).to respond_to :document_for_partial_update } it 'returns a Esse::Document instance' do - expect(doc.to_doc).to be_a(Esse::Document) + expect(doc.document_for_partial_update({})).to be_a(Esse::DocumentForPartialUpdate) end it 'returns a Esse::Document instance with the id' do - expect(doc.to_doc.id).to eq(1) + expect(doc.document_for_partial_update({}).id).to eq(1) end it 'returns a Esse::Document instance routing' do - expect(doc.to_doc.routing).to eq('il') + expect(doc.document_for_partial_update({}).routing).to eq('il') end it 'returns a Esse::Document instance with the type' do - expect(doc.to_doc.type).to eq('state') + expect(doc.document_for_partial_update({}).type).to eq('state') end it 'returns a Esse::Document instance with the options' do - expect(doc.to_doc.options).to eq(admin: true) + expect(doc.document_for_partial_update({}).options).to eq(admin: true) end it 'returns a Esse::Document instance with the object as source and the given source' do - new_doc = doc.to_doc(foo: 'bar') + new_doc = doc.document_for_partial_update(foo: 'bar') expect(new_doc.source).to eq(foo: 'bar') expect(new_doc.object).to eq(doc) expect(new_doc.options).to eq(admin: true) @@ -249,4 +233,132 @@ def id end end end + + describe '#eql?' do + specify do + same = described_class.new(id: 1) + diff = described_class.new(id: 2) + expect(described_class.new(id: 1)).to eq(same) + expect(described_class.new(id: 1)).not_to eq(diff) + end + + specify do + same = described_class.new(id: 1) + diff = described_class.new(id: 2) + expect(described_class.new(id: '1')).to eq(same) + end + + specify do + same = described_class.new(id: 1, type: 'foo') + diff = described_class.new(id: 1, type: 'bar') + expect(described_class.new(id: 1, type: 'foo')).to eq(same) + expect(described_class.new(id: 1, type: 'foo')).not_to eq(diff) + end + + specify do + same = described_class.new(id: 1, routing: 'foo') + diff = described_class.new(id: 1, routing: 'bar') + expect(described_class.new(id: 1, routing: 'foo')).to eq(same) + expect(described_class.new(id: 1, routing: 'foo')).not_to eq(diff) + end + + specify do + same = described_class.new(id: 1, type: 'foo', routing: 'bar') + diff = described_class.new(id: 1, type: 'bar', routing: 'foo') + expect(described_class.new(id: 1, type: 'foo', routing: 'bar')).to eq(same) + expect(described_class.new(id: 1, type: 'foo', routing: 'bar')).not_to eq(diff) + end + + context 'when comparing doc type' do + specify do + expected = described_class.new(id: 1, type: '_doc') + expect(described_class.new(id: 1, type: '_doc')).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: '_doc') + expect(described_class.new(id: 1, type: 'doc')).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: '_doc') + expect(described_class.new(id: 1, type: nil)).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: nil) + expect(described_class.new(id: 1, type: '_doc')).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: nil) + expect(described_class.new(id: 1, type: 'doc')).to eq(expected) + end + end + + context 'when comparing with a Esse::Document' do + specify do + header = described_class.new(id: 1) + doc = Esse::HashDocument.new(_id: 1) + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1) + doc = Esse::HashDocument.new(_id: 2) + expect(header).not_to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(false) + end + + specify do + header = described_class.new(id: 1, type: '_doc') + doc = Esse::HashDocument.new(_id: 1) + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1) + doc = Esse::HashDocument.new(_id: 1, _type: '_doc') + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1, type: '_doc') + doc = Esse::HashDocument.new(_id: 1, _type: 'foo') + expect(header).not_to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(false) + end + + specify do + header = described_class.new(id: 1, routing: 'il') + doc = Esse::HashDocument.new(_id: 1, _routing: 'il') + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1, routing: 'il') + doc = Esse::HashDocument.new(_id: 1, _routing: 'fl') + expect(header).not_to eq(doc) + expect(doc.eql?(header)).to eq(false) + end + + specify do + header = described_class.new(id: 1, extra: 'il') + doc = Esse::HashDocument.new(_id: 1, extra: 'fl') + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + end + end end diff --git a/spec/esse/repository/lazy_document_spec.rb b/spec/esse/repository/lazy_document_spec.rb index a9a8675..bd3b600 100644 --- a/spec/esse/repository/lazy_document_spec.rb +++ b/spec/esse/repository/lazy_document_spec.rb @@ -90,7 +90,7 @@ it 'returns an array of documents that match with the provided ids' do docs = repo.documents_for_lazy_attribute(:city_names, '2') expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end @@ -98,14 +98,14 @@ header = Esse::LazyDocumentHeader.coerce(id: '2') docs = repo.documents_for_lazy_attribute(:city_names, header) expect(docs).to eq([ - Esse::LazyDocumentHeader::Document.new(header, source: { city_names: 'London' }) + Esse::DocumentForPartialUpdate.new(header, source: { city_names: 'London' }) ]) end it 'returns an array of documents that match with the provided single hash' do docs = repo.documents_for_lazy_attribute(:city_names, {id: '2', admin: true}) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2', admin: true).to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2', admin: true).document_for_partial_update(city_names: 'London') ]) end end @@ -133,21 +133,21 @@ it 'returns an array of documents that match with the provided ids' do docs = repo.documents_for_lazy_attribute(:city_names, '2') expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end it 'returns an array of documents that match with the provided LazyDocumentHeader' do docs = repo.documents_for_lazy_attribute(:city_names, Esse::LazyDocumentHeader.coerce(id: '2')) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end it 'do not include duplicate documents' do docs = repo.documents_for_lazy_attribute(:city_names, ['2', '2', Esse::LazyDocumentHeader.coerce(id: '2')]) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end end @@ -169,9 +169,9 @@ it 'returns an array of documents that match with the provided ids' do docs = repo.documents_for_lazy_attribute(:city_names, ['2', '3', '4']) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London'), - Esse::LazyDocumentHeader.new(id: '3').to_doc(city_names: nil), - Esse::LazyDocumentHeader.new(id: '4').to_doc(city_names: ''), + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London'), + Esse::LazyDocumentHeader.new(id: '3').document_for_partial_update(city_names: nil), + Esse::LazyDocumentHeader.new(id: '4').document_for_partial_update(city_names: ''), ]) end end @@ -202,7 +202,7 @@ it 'updates the documents' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, '2') @@ -212,7 +212,7 @@ header = Esse::LazyDocumentHeader.coerce(id: '2') expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader::Document.new(header, source: { city_names: 'London' }) + Esse::DocumentForPartialUpdate.new(header, source: { city_names: 'London' }) ] ) repo.update_documents_attribute(:city_names, header) @@ -221,7 +221,7 @@ it 'updates the documents when passing a single hash' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2', admin: true).to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2', admin: true).document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, {id: '2', admin: true}) @@ -230,7 +230,7 @@ it 'updates the documents when passing an array of ids' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, ['2']) @@ -240,7 +240,7 @@ header = Esse::LazyDocumentHeader.coerce(id: '2') expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader::Document.new(header, source: { city_names: 'London' }) + Esse::DocumentForPartialUpdate.new(header, source: { city_names: 'London' }) ] ) repo.update_documents_attribute(:city_names, [header]) @@ -249,7 +249,7 @@ it 'updates the documents when passing an array of hashes' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, [{id: '2', admin: true}]) From e2a21fb6984a30281edbca7ab13c58119033633d Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 11:24:08 -0300 Subject: [PATCH 02/11] feat: start updating bulk to work with std hash as default --- lib/esse/import/bulk.rb | 30 +++++++++++++++++------------- lib/esse/index/documents.rb | 2 +- spec/esse/import/bulk_spec.rb | 4 ++-- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/lib/esse/import/bulk.rb b/lib/esse/import/bulk.rb index f531a24..60c9f69 100644 --- a/lib/esse/import/bulk.rb +++ b/lib/esse/import/bulk.rb @@ -1,27 +1,35 @@ module Esse module Import class Bulk - def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil) - @index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + def self.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil) + index = Array(index).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type - { index: value } + value end - @create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + create = Array(create).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type - { create: value } + value end - @update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + update = Array(update).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk(operation: :update) value[:_type] ||= type if type - { update: value } + value end - @delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc| + delete = Array(delete).select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| value = doc.to_bulk(data: false) value[:_type] ||= type if type - { delete: value } + value end + new(index: index, delete: delete, create: create, update: update) + end + + def initialize(index: nil, delete: nil, create: nil, update: nil) + @index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } } + @create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } } + @update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } } + @delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } } end # Return an array of RequestBody instances @@ -68,10 +76,6 @@ def each_request(max_retries: 4, last_retry_in_small_chunks: true) private - def valid_doc?(doc) - Esse.document?(doc) - end - def optimistic_request request = Import::RequestBodyAsJson.new request.create = @create diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index 91e88ac..b2a2124 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -172,7 +172,7 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n cluster.may_update_type!(definition) # @TODO Wrap the return in a some other Stats object with more information - Esse::Import::Bulk.new( + Esse::Import::Bulk.build_from_documents( **definition.slice(:type), create: create, delete: delete, diff --git a/spec/esse/import/bulk_spec.rb b/spec/esse/import/bulk_spec.rb index 555fb1a..6bc46d5 100644 --- a/spec/esse/import/bulk_spec.rb +++ b/spec/esse/import/bulk_spec.rb @@ -8,7 +8,7 @@ let(:create) { [Esse::HashDocument.new(_id: 2, foo: 'bar')] } let(:delete) { [Esse::HashDocument.new(_id: 3, foo: 'bar')] } let(:update) { [Esse::HashDocument.new(_id: 4, foo: 'bar')] } - let(:bulk) { described_class.new(index: index, create: create, delete: delete, update: update) } + let(:bulk) { described_class.build_from_documents(index: index, create: create, delete: delete, update: update) } it 'yields a request body instance' do expect { |b| bulk.each_request(&b) }.to yield_with_args(Esse::Import::RequestBodyAsJson) @@ -61,7 +61,7 @@ let(:delete) do %w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 30, name: name) } end - let(:bulk) { described_class.new(index: index, create: create, delete: delete) } + let(:bulk) { described_class.build_from_documents(index: index, create: create, delete: delete) } it 'retries in small chunks' do expect(bulk).to receive(:sleep).with(an_instance_of(Integer)).exactly(3).times From 8e1c832e1eff7ee7f264d3f335b009a46da37569 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 13:39:45 -0300 Subject: [PATCH 03/11] reduce memory usage by creating raw hash instead of coercing them to doc/header doc --- lib/esse/index/documents.rb | 82 ++++++++++++++++++++++++-------- lib/esse/lazy_document_header.rb | 7 +++ 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index b2a2124..196cbd5 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -171,9 +171,36 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n }.merge(options) cluster.may_update_type!(definition) + to_index = Esse::ArrayUtils.wrap(index) + to_create = Esse::ArrayUtils.wrap(create) + to_update = Esse::ArrayUtils.wrap(update) + to_delete = Esse::ArrayUtils.wrap(delete) + index = to_index.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk + value[:_type] ||= type if type + value + end + index.push(*to_create.select { |item| item.is_a?(Hash) }) + create = to_create.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk + value[:_type] ||= type if type + value + end + update = to_update.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk(operation: :update) + value[:_type] ||= type if type + value + end + update.push(*to_update.select { |item| item.is_a?(Hash) }) + delete = to_delete.select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| + value = doc.to_bulk(data: false) + value[:_type] ||= type if type + value + end + delete.push(*to_delete.select { |item| item.is_a?(Hash) }) + # @TODO Wrap the return in a some other Stats object with more information - Esse::Import::Bulk.build_from_documents( - **definition.slice(:type), + Esse::Import::Bulk.new( create: create, delete: delete, index: index, @@ -213,6 +240,12 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes) eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes) + doc_header_check = ->(doc, hash) do + hash['_id'] && hash['_id'].to_s == doc.id.to_s && + hash['_routing'] == doc.routing && + (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(hash['_type']) || doc.type == hash['_type']) + end + repo_hash.slice(*repo_types).each do |repo_name, repo| # Elasticsearch 6.x and older have multiple types per index. # This gem supports multiple types per index for backward compatibility, but we recommend to update @@ -260,39 +293,48 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} } if lazy_attrs_to_search_preload.any? - hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits - hits.each do |hit| - doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing', '_type')) - next if doc_header.id.nil? + entries.group_by(&:routing).each do |routing, docs| + search_request = { query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload } + search_request[:routing] = routing if routing + hits = repo.index.search(**search_request).response.hits + hits.each do |hit| + header = hit.slice('_id', '_routing', '_type') + next if header['_id'].nil? - hit.dig('_source')&.each do |attr_name, attr_value| - real_attr_name = repo.lazy_document_attribute_names(attr_name).first - preload_search_result[real_attr_name][doc_header] = attr_value + hit.dig('_source')&.each do |attr_name, attr_value| + real_attr_name = repo.lazy_document_attribute_names(attr_name).first + preload_search_result[real_attr_name][header] = attr_value + end end - end - preload_search_result.each do |attr_name, values| - values.each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } + preload_search_result.each do |attr_name, values| + values.each do |header, value| + doc = entries.find { |d| doc_header_check.call(d, header) } + doc&.mutate(attr_name) { value } + end end end end bulk(**bulk_kwargs, index: entries) + update_lazy_attrs = Hash.new { |h, k| h[k] = {} } lazy_attrs_to_update_after.each do |attr_name| preloaded_ids = preload_search_result[attr_name].keys filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |d| doc.eql?(d, match_lazy_doc_header: true) } + doc.ignore_on_index? || preloaded_ids.any? { |h| doc_header_check.call(doc, h) } end next if filtered_docs.empty? - partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs) - next if partial_docs.empty? - - bulk(**bulk_kwargs, update: partial_docs) + repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).map do |doc, datum| + update_lazy_attrs[doc.doc_header][attr_name] = datum + end + end + if update_lazy_attrs.any? + bulk_update = update_lazy_attrs.map do |header, values| + header.merge(data: {doc: values}) + end + bulk(**bulk_kwargs, update: bulk_update) end - count += entries.size end end diff --git a/lib/esse/lazy_document_header.rb b/lib/esse/lazy_document_header.rb index 7144d3b..972b572 100644 --- a/lib/esse/lazy_document_header.rb +++ b/lib/esse/lazy_document_header.rb @@ -61,6 +61,13 @@ def document_for_partial_update(source) Esse::DocumentForPartialUpdate.new(self, source: source) end + def doc_header + { _id: id }.tap do |hash| + hash[:_type] = type if type + hash[:routing] = routing if routing + end + end + def eql?(other, **) ACCEPTABLE_CLASSES.any? { |klass| other.is_a?(klass) } && id.to_s == other.id.to_s && From b700e6804816fa2888b604c833cdb72ecebae675 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 13:52:07 -0300 Subject: [PATCH 04/11] keep bulk order --- lib/esse/index/documents.rb | 67 ++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index 196cbd5..b6f9f46 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -171,40 +171,53 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n }.merge(options) cluster.may_update_type!(definition) - to_index = Esse::ArrayUtils.wrap(index) - to_create = Esse::ArrayUtils.wrap(create) - to_update = Esse::ArrayUtils.wrap(update) - to_delete = Esse::ArrayUtils.wrap(delete) - index = to_index.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| - value = doc.to_bulk - value[:_type] ||= type if type - value + to_index = [] + to_create = [] + to_update = [] + to_delete = [] + Esse::ArrayUtils.wrap(index).each do |doc| + if doc.is_a?(Hash) + to_index << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_index << hash + end end - index.push(*to_create.select { |item| item.is_a?(Hash) }) - create = to_create.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| - value = doc.to_bulk - value[:_type] ||= type if type - value + Esse::ArrayUtils.wrap(create).each do |doc| + if doc.is_a?(Hash) + to_create << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_create << hash + end end - update = to_update.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| - value = doc.to_bulk(operation: :update) - value[:_type] ||= type if type - value + Esse::ArrayUtils.wrap(update).each do |doc| + if doc.is_a?(Hash) + to_update << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk(operation: :update) + hash[:_type] ||= type if type + to_update << hash + end end - update.push(*to_update.select { |item| item.is_a?(Hash) }) - delete = to_delete.select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| - value = doc.to_bulk(data: false) - value[:_type] ||= type if type - value + Esse::ArrayUtils.wrap(delete).each do |doc| + if doc.is_a?(Hash) + to_delete << doc + elsif Esse.document?(doc) && !doc.ignore_on_delete? + hash = doc.to_bulk(data: false) + hash[:_type] ||= type if type + to_delete << hash + end end - delete.push(*to_delete.select { |item| item.is_a?(Hash) }) # @TODO Wrap the return in a some other Stats object with more information Esse::Import::Bulk.new( - create: create, - delete: delete, - index: index, - update: update, + create: to_create, + delete: to_delete, + index: to_index, + update: to_update, ).each_request do |request_body| cluster.api.bulk(**definition, body: request_body.body) do |event_payload| event_payload[:body_stats] = request_body.stats From e2e3b6242abbc975ef48c4d6d4ae0c8992baf345 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 16:55:51 -0300 Subject: [PATCH 05/11] minor refactoring --- lib/esse/document.rb | 2 +- lib/esse/index/documents.rb | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/esse/document.rb b/lib/esse/document.rb index 9c3a413..9c3327a 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -88,7 +88,7 @@ def eql?(other, match_lazy_doc_header: false) if match_lazy_doc_header && other.is_a?(LazyDocumentHeader) other.eql?(self) else - other.is_a?(self.class) && ( + other.is_a?(Esse::Document) && ( id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta && source == other.source ) end diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index b6f9f46..cd9b971 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -294,13 +294,10 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l collection_context ||= {} entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact - if lazy_attrs_to_eager_load - attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load) - attrs.each do |attr_name| - repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } - end + lazy_attrs_to_eager_load.each do |attr_name| + repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } + doc&.mutate(attr_name) { value } end end From 27b8491ff3dd32e4f4fb2e909e9bbe56c0b85692 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 17:42:36 -0300 Subject: [PATCH 06/11] fix: do not compare document source, they may have attributes dinamically generated --- lib/esse/document.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/esse/document.rb b/lib/esse/document.rb index 9c3327a..53318a3 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -85,11 +85,11 @@ def ignore_on_delete? end def eql?(other, match_lazy_doc_header: false) - if match_lazy_doc_header && other.is_a?(LazyDocumentHeader) + if match_lazy_doc_header other.eql?(self) else other.is_a?(Esse::Document) && ( - id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta && source == other.source + id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta ) end end From 1c0df514e4cd57ef895982570f942c108aaf0f52 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Tue, 20 Aug 2024 17:17:53 -0300 Subject: [PATCH 07/11] feat: do not create new array when values are already a doc instances --- lib/esse/lazy_document_header.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/esse/lazy_document_header.rb b/lib/esse/lazy_document_header.rb index 972b572..0dbcfee 100644 --- a/lib/esse/lazy_document_header.rb +++ b/lib/esse/lazy_document_header.rb @@ -6,8 +6,13 @@ class LazyDocumentHeader ACCEPTABLE_DOC_TYPES = [nil, '_doc', 'doc'].freeze def self.coerce_each(values) + values = Esse::ArrayUtils.wrap(values) + return values if values.all? do |value| + ACCEPTABLE_CLASSES.any? { |klass| value.is_a?(klass) } + end + arr = [] - Esse::ArrayUtils.wrap(values).flatten.map do |value| + values.flatten.map do |value| instance = coerce(value) arr << instance if instance && !instance.id.nil? end From dcc4479c24a5550336e75a2a5ccbd62eee458092 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Tue, 20 Aug 2024 17:52:01 -0300 Subject: [PATCH 08/11] feat: simplify data structure --- lib/esse/index/documents.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index cd9b971..2ff5347 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -253,10 +253,10 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes) eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes) - doc_header_check = ->(doc, hash) do - hash['_id'] && hash['_id'].to_s == doc.id.to_s && - hash['_routing'] == doc.routing && - (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(hash['_type']) || doc.type == hash['_type']) + doc_header_check = ->(doc, (id, routing, type)) do + id && id.to_s == doc.id.to_s && + routing == doc.routing && + (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || doc.type == type) end repo_hash.slice(*repo_types).each do |repo_name, repo| @@ -308,8 +308,8 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l search_request[:routing] = routing if routing hits = repo.index.search(**search_request).response.hits hits.each do |hit| - header = hit.slice('_id', '_routing', '_type') - next if header['_id'].nil? + header = [hit['_id'], hit['_routing'], hit['_type']] + next if header[0].nil? hit.dig('_source')&.each do |attr_name, attr_value| real_attr_name = repo.lazy_document_attribute_names(attr_name).first @@ -331,7 +331,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l lazy_attrs_to_update_after.each do |attr_name| preloaded_ids = preload_search_result[attr_name].keys filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |h| doc_header_check.call(doc, h) } + doc.ignore_on_index? || preloaded_ids.any? { |header| doc_header_check.call(doc, header) } end next if filtered_docs.empty? From 528631171e31bdd135c99664d7d17f80eb31d8f4 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 23 Aug 2024 16:53:35 -0300 Subject: [PATCH 09/11] chore: rename lazy_attributes to eager_load_lazy_attributes --- lib/esse/index/documents.rb | 13 +++++++++---- lib/esse/repository/object_document_mapper.rb | 10 +++++++--- spec/esse/repository/document_spec.rb | 10 +++++----- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index 2ff5347..9eea726 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -249,9 +249,14 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l repo_types = repo_hash.keys if repo_types.empty? count = 0 - # Backward compatibility while I change plugins using it - update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes) - eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes) + if options.key?(:eager_include_document_attributes) + warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' + eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) + end + if options.key?(:lazy_update_document_attributes) + warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.' + update_lazy_attributes = options.delete(:lazy_update_document_attributes) + end doc_header_check = ->(doc, (id, routing, type)) do id && id.to_s == doc.id.to_s && @@ -276,7 +281,7 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load # @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before: - # context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any? + # context[:eager_load_lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any? # repo.each_serialized_batch(**context) do |batch| # bulk(**bulk_kwargs, index: batch) diff --git a/lib/esse/repository/object_document_mapper.rb b/lib/esse/repository/object_document_mapper.rb index 237b52d..9e5a0e0 100644 --- a/lib/esse/repository/object_document_mapper.rb +++ b/lib/esse/repository/object_document_mapper.rb @@ -83,13 +83,17 @@ def collection_class # @param [Hash] kwargs The context # @return [Enumerator] The enumerator # @yield [Array, **context] serialized collection and the optional context from the collection - def each_serialized_batch(lazy_attributes: false, **kwargs) + def each_serialized_batch(eager_load_lazy_attributes: false, **kwargs) + if kwargs.key?(:lazy_attributes) + warn 'The `lazy_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' + eager_load_lazy_attributes = kwargs.delete(:lazy_attributes) + end each_batch(**kwargs) do |*args| batch, collection_context = args collection_context ||= {} entries = [*batch].map { |entry| serialize(entry, **collection_context) }.compact - if lazy_attributes - attrs = lazy_attributes.is_a?(Array) ? lazy_attributes : lazy_document_attribute_names(lazy_attributes) + if eager_load_lazy_attributes + attrs = eager_load_lazy_attributes.is_a?(Array) ? eager_load_lazy_attributes : lazy_document_attribute_names(eager_load_lazy_attributes) attrs.each do |attr_name| retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } diff --git a/spec/esse/repository/document_spec.rb b/spec/esse/repository/document_spec.rb index 0409c8d..0476b72 100644 --- a/spec/esse/repository/document_spec.rb +++ b/spec/esse/repository/document_spec.rb @@ -157,20 +157,20 @@ context 'with lazy_load_attributes' do include_context 'with stories index definition' - it 'yields serialized objects with lazy attributes when passing lazy_attributes: true' do + it 'yields serialized objects with lazy attributes when passing eager_load_lazy_attributes: true' do expected_data = [] expect { - StoriesIndex::Story.each_serialized_batch(lazy_attributes: true) do |batch| + StoriesIndex::Story.each_serialized_batch(eager_load_lazy_attributes: true) do |batch| expected_data.push(*batch) end }.not_to raise_error expect(expected_data.select { |doc| doc.to_h.key?(:tags) && doc.to_h.key?(:tags_count) }).not_to be_empty end - it 'yields serialized objects without lazy attributes when passing lazy_attributes: false' do + it 'yields serialized objects without lazy attributes when passing eager_load_lazy_attributes: false' do expected_data = [] expect { - StoriesIndex::Story.each_serialized_batch(lazy_attributes: false) do |batch| + StoriesIndex::Story.each_serialized_batch(eager_load_lazy_attributes: false) do |batch| expected_data.push(*batch) end }.not_to raise_error @@ -180,7 +180,7 @@ it 'yields serialized objects with lazy attributes when passing specific attributes' do expected_data = [] expect { - StoriesIndex::Story.each_serialized_batch(lazy_attributes: %i[tags]) do |batch| + StoriesIndex::Story.each_serialized_batch(eager_load_lazy_attributes: %i[tags]) do |batch| expected_data.push(*batch) end }.not_to raise_error From 0d7142ae9424a8a5e00a5424de9b590843c1e009 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 23 Aug 2024 20:08:19 -0300 Subject: [PATCH 10/11] feat: refactoring by reusing each_serialized_batch --- lib/esse/core.rb | 6 ++ lib/esse/document.rb | 14 ++- lib/esse/index/documents.rb | 95 ++++--------------- lib/esse/repository/object_document_mapper.rb | 39 ++++++-- spec/esse/repository/document_spec.rb | 58 ++++++++++- 5 files changed, 125 insertions(+), 87 deletions(-) diff --git a/lib/esse/core.rb b/lib/esse/core.rb index 7d844f5..f3d4bb5 100644 --- a/lib/esse/core.rb +++ b/lib/esse/core.rb @@ -92,4 +92,10 @@ def self.document?(object) !!(object.is_a?(Esse::Document) && object.id) end + + def self.document_match_with_header?(document, id, routing, type) + id && id.to_s == document.id.to_s && + routing == document.routing && + (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(document.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || document.type == type) + end end diff --git a/lib/esse/document.rb b/lib/esse/document.rb index 53318a3..bb663d5 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -2,6 +2,8 @@ module Esse class Document + MUTATIONS_FALLBACK = {}.freeze + attr_reader :object, :options def initialize(object, **options) @@ -102,6 +104,10 @@ def doc_header end end + def document_for_partial_update(source) + DocumentForPartialUpdate.new(self, source: source) + end + def inspect attributes = %i[id routing source].map do |attr| value = send(attr) @@ -120,14 +126,14 @@ def mutate(key) instance_variable_set(:@__mutated_source__, nil) end + def mutations + @__mutations__ || MUTATIONS_FALLBACK + end + def mutated_source return source unless @__mutations__ @__mutated_source__ ||= source.merge(@__mutations__) end - - def document_for_partial_update(source) - DocumentForPartialUpdate.new(self, source: source) - end end end diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index 9eea726..f611751 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -258,12 +258,6 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l update_lazy_attributes = options.delete(:lazy_update_document_attributes) end - doc_header_check = ->(doc, (id, routing, type)) do - id && id.to_s == doc.id.to_s && - routing == doc.routing && - (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || doc.type == type) - end - repo_hash.slice(*repo_types).each do |repo_name, repo| # Elasticsearch 6.x and older have multiple types per index. # This gem supports multiple types per index for backward compatibility, but we recommend to update @@ -274,83 +268,34 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l bulk_kwargs = { suffix: suffix, type: repo_name, **options } cluster.may_update_type!(bulk_kwargs) - lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes) - lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes) - lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes) - lazy_attrs_to_update_after -= lazy_attrs_to_eager_load - lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load - - # @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before: - # context[:eager_load_lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any? - # repo.each_serialized_batch(**context) do |batch| - # bulk(**bulk_kwargs, index: batch) - - # lazy_attrs_to_update_after.each do |attr_name| - # partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?)) - # next if partial_docs.empty? - - # bulk(**bulk_kwargs, update: partial_docs) - # end - # count += batch.size - # end context ||= {} - repo.send(:each_batch, **context) do |*args| - batch, collection_context = args - collection_context ||= {} - entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact + context[:eager_load_lazy_attributes] = eager_load_lazy_attributes + context[:preload_lazy_attributes] = preload_lazy_attributes + repo.each_serialized_batch(**context) do |batch| + bulk(**bulk_kwargs, index: batch) - lazy_attrs_to_eager_load.each do |attr_name| - repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } - end - end - - preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} } - if lazy_attrs_to_search_preload.any? - entries.group_by(&:routing).each do |routing, docs| - search_request = { query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload } - search_request[:routing] = routing if routing - hits = repo.index.search(**search_request).response.hits - hits.each do |hit| - header = [hit['_id'], hit['_routing'], hit['_type']] - next if header[0].nil? + if update_lazy_attributes != false + attrs = repo.lazy_document_attribute_names(update_lazy_attributes) + attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes) + update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo| + filtered_docs = batch.reject do |doc| + doc.ignore_on_index? || doc.mutations.key?(attr_name) + end + next if filtered_docs.empty? - hit.dig('_source')&.each do |attr_name, attr_value| - real_attr_name = repo.lazy_document_attribute_names(attr_name).first - preload_search_result[real_attr_name][header] = attr_value - end + repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value| + memo[doc.doc_header][attr_name] = value end - preload_search_result.each do |attr_name, values| - values.each do |header, value| - doc = entries.find { |d| doc_header_check.call(d, header) } - doc&.mutate(attr_name) { value } - end + end + if update_attrs.any? + bulk_update = update_attrs.map do |header, values| + header.merge(data: {doc: values}) end + bulk(**bulk_kwargs, update: bulk_update) end end - bulk(**bulk_kwargs, index: entries) - - update_lazy_attrs = Hash.new { |h, k| h[k] = {} } - lazy_attrs_to_update_after.each do |attr_name| - preloaded_ids = preload_search_result[attr_name].keys - filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |header| doc_header_check.call(doc, header) } - end - next if filtered_docs.empty? - - repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).map do |doc, datum| - update_lazy_attrs[doc.doc_header][attr_name] = datum - end - end - if update_lazy_attrs.any? - bulk_update = update_lazy_attrs.map do |header, values| - header.merge(data: {doc: values}) - end - bulk(**bulk_kwargs, update: bulk_update) - end - count += entries.size + count += batch.size end end count diff --git a/lib/esse/repository/object_document_mapper.rb b/lib/esse/repository/object_document_mapper.rb index 9e5a0e0..9f038a2 100644 --- a/lib/esse/repository/object_document_mapper.rb +++ b/lib/esse/repository/object_document_mapper.rb @@ -83,21 +83,46 @@ def collection_class # @param [Hash] kwargs The context # @return [Enumerator] The enumerator # @yield [Array, **context] serialized collection and the optional context from the collection - def each_serialized_batch(eager_load_lazy_attributes: false, **kwargs) + def each_serialized_batch(eager_load_lazy_attributes: false, preload_lazy_attributes: false, **kwargs) if kwargs.key?(:lazy_attributes) warn 'The `lazy_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' eager_load_lazy_attributes = kwargs.delete(:lazy_attributes) end + + lazy_attrs_to_eager_load = lazy_document_attribute_names(eager_load_lazy_attributes) + lazy_attrs_to_search_preload = lazy_document_attribute_names(preload_lazy_attributes) + lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load + each_batch(**kwargs) do |*args| batch, collection_context = args collection_context ||= {} entries = [*batch].map { |entry| serialize(entry, **collection_context) }.compact - if eager_load_lazy_attributes - attrs = eager_load_lazy_attributes.is_a?(Array) ? eager_load_lazy_attributes : lazy_document_attribute_names(eager_load_lazy_attributes) - attrs.each do |attr_name| - retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } + lazy_attrs_to_eager_load.each do |attr_name| + retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } + doc&.mutate(attr_name) { value } + end + end + + if lazy_attrs_to_search_preload.any? + entries.group_by(&:routing).each do |routing, docs| + search_request = { + query: { ids: { values: docs.map(&:id) } }, + size: docs.size, + _source: lazy_attrs_to_search_preload + } + search_request[:routing] = routing if routing + index.search(**search_request).response.hits.each do |hit| + header = [hit['_id'], hit['_routing'], hit['_type']] + next if header[0].nil? + + hit.dig('_source')&.each do |attr_name, attr_value| + real_attr_name = lazy_document_attribute_names(attr_name).first + next if real_attr_name.nil? + + doc = entries.find { |d| Esse.document_match_with_header?(d, *header) } + doc&.mutate(real_attr_name) { attr_value } + end end end end diff --git a/spec/esse/repository/document_spec.rb b/spec/esse/repository/document_spec.rb index 0476b72..85afbcc 100644 --- a/spec/esse/repository/document_spec.rb +++ b/spec/esse/repository/document_spec.rb @@ -154,7 +154,7 @@ end end - context 'with lazy_load_attributes' do + context 'with eager_load_lazy_attributes' do include_context 'with stories index definition' it 'yields serialized objects with lazy attributes when passing eager_load_lazy_attributes: true' do @@ -187,6 +187,62 @@ expect(expected_data.select { |doc| doc.to_h.key?(:tags) && !doc.to_h.key?(:tags_count) }).not_to be_empty end end + + context 'with preload_lazy_attributes' do + include_context 'with stories index definition' + + let(:nyt_hits) do + nyt_stories.map do |hash| + { + '_id' => hash[:id].to_s, + '_routing' => 'nyt', + '_type' => '_doc', + '_source' => { + 'tags' => hash[:tags], + 'tags_count' => hash[:tags].size, + }, + } + end + end + + let(:wsj_hits) do + wsj_stories.map do |hash| + { + '_id' => hash[:id].to_s, + '_routing' => 'wsj', + '_type' => '_doc', + '_source' => { + 'tags' => hash[:tags], + 'tags_count' => hash[:tags].size, + }, + } + end + end + + it 'yields serialized objects with lazy attributes when passing preload_lazy_attributes: true' do + expect(StoriesIndex).to receive(:search).with( + query: { ids: { values: nyt_stories.map { |hash| hash[:id] } } }, + size: nyt_stories.size, + _source: %i[tags tags_count], + routing: 'nyt', + ).and_return(double(response: double(hits: nyt_hits))) + expect(StoriesIndex).to receive(:search).with( + query: { ids: { values: wsj_stories.map { |hash| hash[:id] } } }, + size: wsj_stories.size, + _source: %i[tags tags_count], + routing: 'wsj', + ).and_return(double(response: double(hits: wsj_hits))) + expected_data = [] + expect { + StoriesIndex::Story.each_serialized_batch(preload_lazy_attributes: true) do |batch| + expected_data.push(*batch) + end + }.not_to raise_error + expect(expected_data.map(&:mutations)).to all( + include(tags: be_a(Array), tags_count: be_a(Integer)), + ) + end + end end describe '.documents' do From 7db14e7e20c7898e017135a34d41de4153add422 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Fri, 23 Aug 2024 20:09:26 -0300 Subject: [PATCH 11/11] feat: update rexml close #7 --- Gemfile.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gemfile.lock b/Gemfile.lock index 4120646..3637805 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -37,7 +37,7 @@ GEM rainbow (3.1.1) rake (12.3.3) regexp_parser (2.9.2) - rexml (3.3.1) + rexml (3.3.6) strscan rspec (3.13.0) rspec-core (~> 3.13.0)