diff --git a/CHANGELOG.md b/CHANGELOG.md index 262d2e2..b91a649 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing ## 0.3.6 - 2024-08-07 -* Esse::LazyDocumentHeader#to_doc return `Esse::LazyDocumentHeader::Document` instance to properly separate context metadata from document source +* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source * Add `.collection_class` method to the `Esse::Repository` class to let external plugins and extensions to access it instead of read @collection_proc variable ## 0.3.5 - 2024-08-02 diff --git a/Gemfile.lock b/Gemfile.lock index 4120646..3637805 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -37,7 +37,7 @@ GEM rainbow (3.1.1) rake (12.3.3) regexp_parser (2.9.2) - rexml (3.3.1) + rexml (3.3.6) strscan rspec (3.13.0) rspec-core (~> 3.13.0) diff --git a/lib/esse/core.rb b/lib/esse/core.rb index 1b5bf67..f3d4bb5 100644 --- a/lib/esse/core.rb +++ b/lib/esse/core.rb @@ -6,6 +6,7 @@ module Esse require_relative 'primitives' require_relative 'collection' require_relative 'document' + require_relative 'document_for_partial_update' require_relative 'document_lazy_attribute' require_relative 'lazy_document_header' require_relative 'hash_document' @@ -91,4 +92,10 @@ def self.document?(object) !!(object.is_a?(Esse::Document) && object.id) end + + def self.document_match_with_header?(document, id, routing, type) + id && id.to_s == document.id.to_s && + routing == document.routing && + (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(document.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || document.type == type) + end end diff --git a/lib/esse/document.rb b/lib/esse/document.rb index a47f9f6..bb663d5 100644 --- a/lib/esse/document.rb +++ b/lib/esse/document.rb @@ -2,6 +2,8 @@ module Esse class Document + MUTATIONS_FALLBACK = {}.freeze + attr_reader :object, :options def initialize(object, **options) @@ -84,11 +86,16 @@ def ignore_on_delete? id.nil? end - def ==(other) - other.is_a?(self.class) && ( - id == other.id && type == other.type && routing == other.routing && meta == other.meta && source == other.source - ) + def eql?(other, match_lazy_doc_header: false) + if match_lazy_doc_header + other.eql?(self) + else + other.is_a?(Esse::Document) && ( + id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta + ) + end end + alias_method :==, :eql? def doc_header { _id: id }.tap do |h| @@ -97,6 +104,10 @@ def doc_header end end + def document_for_partial_update(source) + DocumentForPartialUpdate.new(self, source: source) + end + def inspect attributes = %i[id routing source].map do |attr| value = send(attr) @@ -115,6 +126,10 @@ def mutate(key) instance_variable_set(:@__mutated_source__, nil) end + def mutations + @__mutations__ || MUTATIONS_FALLBACK + end + def mutated_source return source unless @__mutations__ diff --git a/lib/esse/document_for_partial_update.rb b/lib/esse/document_for_partial_update.rb new file mode 100644 index 0000000..88b19f0 --- /dev/null +++ b/lib/esse/document_for_partial_update.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Esse + class DocumentForPartialUpdate < Esse::Document + extend Forwardable + + def_delegators :object, :id, :type, :routing, :options + + attr_reader :source + + def initialize(lazy_header, source:) + @source = source + super(lazy_header) + end + end +end diff --git a/lib/esse/import/bulk.rb b/lib/esse/import/bulk.rb index f531a24..60c9f69 100644 --- a/lib/esse/import/bulk.rb +++ b/lib/esse/import/bulk.rb @@ -1,27 +1,35 @@ module Esse module Import class Bulk - def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil) - @index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + def self.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil) + index = Array(index).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type - { index: value } + value end - @create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + create = Array(create).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk value[:_type] ||= type if type - { create: value } + value end - @update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc| + update = Array(update).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| value = doc.to_bulk(operation: :update) value[:_type] ||= type if type - { update: value } + value end - @delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc| + delete = Array(delete).select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| value = doc.to_bulk(data: false) value[:_type] ||= type if type - { delete: value } + value end + new(index: index, delete: delete, create: create, update: update) + end + + def initialize(index: nil, delete: nil, create: nil, update: nil) + @index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } } + @create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } } + @update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } } + @delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } } end # Return an array of RequestBody instances @@ -68,10 +76,6 @@ def each_request(max_retries: 4, last_retry_in_small_chunks: true) private - def valid_doc?(doc) - Esse.document?(doc) - end - def optimistic_request request = Import::RequestBodyAsJson.new request.create = @create diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index b6dc6dc..f611751 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -171,13 +171,53 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n }.merge(options) cluster.may_update_type!(definition) + to_index = [] + to_create = [] + to_update = [] + to_delete = [] + Esse::ArrayUtils.wrap(index).each do |doc| + if doc.is_a?(Hash) + to_index << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_index << hash + end + end + Esse::ArrayUtils.wrap(create).each do |doc| + if doc.is_a?(Hash) + to_create << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_create << hash + end + end + Esse::ArrayUtils.wrap(update).each do |doc| + if doc.is_a?(Hash) + to_update << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk(operation: :update) + hash[:_type] ||= type if type + to_update << hash + end + end + Esse::ArrayUtils.wrap(delete).each do |doc| + if doc.is_a?(Hash) + to_delete << doc + elsif Esse.document?(doc) && !doc.ignore_on_delete? + hash = doc.to_bulk(data: false) + hash[:_type] ||= type if type + to_delete << hash + end + end + # @TODO Wrap the return in a some other Stats object with more information Esse::Import::Bulk.new( - **definition.slice(:type), - create: create, - delete: delete, - index: index, - update: update, + create: to_create, + delete: to_delete, + index: to_index, + update: to_update, ).each_request do |request_body| cluster.api.bulk(**definition, body: request_body.body) do |event_payload| event_payload[:body_stats] = request_body.stats @@ -209,9 +249,14 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l repo_types = repo_hash.keys if repo_types.empty? count = 0 - # Backward compatibility while I change plugins using it - update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes) - eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes) + if options.key?(:eager_include_document_attributes) + warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' + eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) + end + if options.key?(:lazy_update_document_attributes) + warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.' + update_lazy_attributes = options.delete(:lazy_update_document_attributes) + end repo_hash.slice(*repo_types).each do |repo_name, repo| # Elasticsearch 6.x and older have multiple types per index. @@ -223,76 +268,34 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l bulk_kwargs = { suffix: suffix, type: repo_name, **options } cluster.may_update_type!(bulk_kwargs) - lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes) - lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes) - lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes) - lazy_attrs_to_update_after -= lazy_attrs_to_eager_load - lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load - - # @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before: - # context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any? - # repo.each_serialized_batch(**context) do |batch| - # bulk(**bulk_kwargs, index: batch) - - # lazy_attrs_to_update_after.each do |attr_name| - # partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?)) - # next if partial_docs.empty? - - # bulk(**bulk_kwargs, update: partial_docs) - # end - # count += batch.size - # end context ||= {} - repo.send(:each_batch, **context) do |*args| - batch, collection_context = args - collection_context ||= {} - entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact + context[:eager_load_lazy_attributes] = eager_load_lazy_attributes + context[:preload_lazy_attributes] = preload_lazy_attributes + repo.each_serialized_batch(**context) do |batch| + bulk(**bulk_kwargs, index: batch) - if lazy_attrs_to_eager_load - attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load) - attrs.each do |attr_name| - repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } - doc&.mutate(attr_name) { value } + if update_lazy_attributes != false + attrs = repo.lazy_document_attribute_names(update_lazy_attributes) + attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes) + update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo| + filtered_docs = batch.reject do |doc| + doc.ignore_on_index? || doc.mutations.key?(attr_name) end - end - end + next if filtered_docs.empty? - preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} } - if lazy_attrs_to_search_preload.any? - hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits - hits.each do |hit| - doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly - next unless doc_header.valid? - hit.dig('_source')&.each do |attr_name, attr_value| - real_attr_name = repo.lazy_document_attribute_names(attr_name).first - preload_search_result[real_attr_name][doc_header] = attr_value + repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value| + memo[doc.doc_header][attr_name] = value end end - preload_search_result.each do |attr_name, values| - values.each do |doc_header, value| - doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } - doc&.mutate(attr_name) { value } + if update_attrs.any? + bulk_update = update_attrs.map do |header, values| + header.merge(data: {doc: values}) end + bulk(**bulk_kwargs, update: bulk_update) end end - bulk(**bulk_kwargs, index: entries) - - lazy_attrs_to_update_after.each do |attr_name| - preloaded_ids = preload_search_result[attr_name].keys - filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing } - end - next if filtered_docs.empty? - - partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs) - next if partial_docs.empty? - - bulk(**bulk_kwargs, update: partial_docs) - end - - count += entries.size + count += batch.size end end count diff --git a/lib/esse/index/object_document_mapper.rb b/lib/esse/index/object_document_mapper.rb index 933f592..dfcc6e5 100644 --- a/lib/esse/index/object_document_mapper.rb +++ b/lib/esse/index/object_document_mapper.rb @@ -24,7 +24,7 @@ def each_serialized_batch(repo_name = nil, **kwargs, &block) # @return [Enumerator] All serialized entries def documents(repo_name = nil, **kwargs) Enumerator.new do |yielder| - each_serialized_batch(repo_name, **kwargs) do |documents, **_collection_kargs| + each_serialized_batch(repo_name, **kwargs) do |documents| documents.each { |document| yielder.yield(document) } end end diff --git a/lib/esse/lazy_document_header.rb b/lib/esse/lazy_document_header.rb index bfc97a8..0dbcfee 100644 --- a/lib/esse/lazy_document_header.rb +++ b/lib/esse/lazy_document_header.rb @@ -2,11 +2,19 @@ module Esse class LazyDocumentHeader + ACCEPTABLE_CLASSES = [Esse::LazyDocumentHeader, Esse::Document].freeze + ACCEPTABLE_DOC_TYPES = [nil, '_doc', 'doc'].freeze + def self.coerce_each(values) + values = Esse::ArrayUtils.wrap(values) + return values if values.all? do |value| + ACCEPTABLE_CLASSES.any? { |klass| value.is_a?(klass) } + end + arr = [] - Esse::ArrayUtils.wrap(values).flatten.map do |value| + values.flatten.map do |value| instance = coerce(value) - arr << instance if instance&.valid? + arr << instance if instance && !instance.id.nil? end arr end @@ -17,7 +25,7 @@ def self.coerce(value) if value.is_a?(Esse::LazyDocumentHeader) value elsif value.is_a?(Esse::Document) - new(**value.options, id: value.id, type: value.type, routing: value.routing) + value elsif value.is_a?(Hash) resp = value.transform_keys do |key| case key @@ -47,10 +55,6 @@ def initialize(id:, type: nil, routing: nil, **extra_attributes) @options = extra_attributes.freeze end - def valid? - !id.nil? - end - def to_h options.merge(_id: id).tap do |hash| hash[:_type] = type if type @@ -58,26 +62,24 @@ def to_h end end - def to_doc(source = {}) - Document.new(self, source: source) + def document_for_partial_update(source) + Esse::DocumentForPartialUpdate.new(self, source: source) end - def eql?(other) - self.class == other.class && id == other.id && type == other.type && routing == other.routing + def doc_header + { _id: id }.tap do |hash| + hash[:_type] = type if type + hash[:routing] = routing if routing + end end - alias_method :==, :eql? - - class Document < Esse::Document - extend Forwardable - - def_delegators :object, :id, :type, :routing, :options - attr_reader :source - - def initialize(lazy_header, source: {}) - @source = source - super(lazy_header) - end + def eql?(other, **) + ACCEPTABLE_CLASSES.any? { |klass| other.is_a?(klass) } && + id.to_s == other.id.to_s && + routing == other.routing && + ((ACCEPTABLE_DOC_TYPES.include?(type) && ACCEPTABLE_DOC_TYPES.include?(other.type)) || type == other.type) end + alias_method :==, :eql? end end + diff --git a/lib/esse/repository/documents.rb b/lib/esse/repository/documents.rb index 0d2a7d6..69e4f3b 100644 --- a/lib/esse/repository/documents.rb +++ b/lib/esse/repository/documents.rb @@ -16,7 +16,7 @@ def update_documents_attribute(name, ids_or_doc_headers = [], kwargs = {}) def documents_for_lazy_attribute(name, ids_or_doc_headers) retrieve_lazy_attribute_values(name, ids_or_doc_headers).map do |doc_header, datum| - doc_header.to_doc(name => datum) + doc_header.document_for_partial_update(name => datum) end end @@ -36,11 +36,10 @@ def retrieve_lazy_attribute_values(name, ids_or_doc_headers) return [] unless result.is_a?(Hash) result.each_with_object({}) do |(key, value), memo| - if key.is_a?(LazyDocumentHeader) && (doc = docs.find { |d| d == key || d.id == key.id }) - memo[doc] = value - elsif (doc = docs.find { |d| d.id == key }) - memo[doc] = value - end + val = docs.find { |doc| doc.eql?(key, match_lazy_doc_header: true) || doc.id == key } + next unless val + + memo[val] = value end end end diff --git a/lib/esse/repository/object_document_mapper.rb b/lib/esse/repository/object_document_mapper.rb index d4f5d95..9f038a2 100644 --- a/lib/esse/repository/object_document_mapper.rb +++ b/lib/esse/repository/object_document_mapper.rb @@ -83,22 +83,51 @@ def collection_class # @param [Hash] kwargs The context # @return [Enumerator] The enumerator # @yield [Array, **context] serialized collection and the optional context from the collection - def each_serialized_batch(lazy_attributes: false, **kwargs) + def each_serialized_batch(eager_load_lazy_attributes: false, preload_lazy_attributes: false, **kwargs) + if kwargs.key?(:lazy_attributes) + warn 'The `lazy_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.' + eager_load_lazy_attributes = kwargs.delete(:lazy_attributes) + end + + lazy_attrs_to_eager_load = lazy_document_attribute_names(eager_load_lazy_attributes) + lazy_attrs_to_search_preload = lazy_document_attribute_names(preload_lazy_attributes) + lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load + each_batch(**kwargs) do |*args| batch, collection_context = args collection_context ||= {} entries = [*batch].map { |entry| serialize(entry, **collection_context) }.compact - if lazy_attributes - attrs = lazy_attributes.is_a?(Array) ? lazy_attributes : lazy_document_attribute_names(lazy_attributes) - attrs.each do |attr_name| - retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| - doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing } - doc&.mutate(attr_name) { value } + lazy_attrs_to_eager_load.each do |attr_name| + retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value| + doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } + doc&.mutate(attr_name) { value } + end + end + + if lazy_attrs_to_search_preload.any? + entries.group_by(&:routing).each do |routing, docs| + search_request = { + query: { ids: { values: docs.map(&:id) } }, + size: docs.size, + _source: lazy_attrs_to_search_preload + } + search_request[:routing] = routing if routing + index.search(**search_request).response.hits.each do |hit| + header = [hit['_id'], hit['_routing'], hit['_type']] + next if header[0].nil? + + hit.dig('_source')&.each do |attr_name, attr_value| + real_attr_name = lazy_document_attribute_names(attr_name).first + next if real_attr_name.nil? + + doc = entries.find { |d| Esse.document_match_with_header?(d, *header) } + doc&.mutate(real_attr_name) { attr_value } + end end end end - yield entries, **collection_context + yield entries end end @@ -110,7 +139,7 @@ def each_serialized_batch(lazy_attributes: false, **kwargs) # @return [Enumerator] All serialized entries def documents(**kwargs) Enumerator.new do |yielder| - each_serialized_batch(**kwargs) do |docs, **_collection_kargs| + each_serialized_batch(**kwargs) do |docs| docs.each { |document| yielder.yield(document) } end end diff --git a/spec/esse/document_for_partial_update_spec.rb b/spec/esse/document_for_partial_update_spec.rb new file mode 100644 index 0000000..1714b0c --- /dev/null +++ b/spec/esse/document_for_partial_update_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'spec_helper' + +# rubocop:disable RSpec/VerifiedDoubles +RSpec.describe Esse::DocumentForPartialUpdate do + let(:document) { described_class.new(obj, source: source) } + let(:obj) { double(id: 1) } + let(:source) { { foo: :bar } } + + describe '#object' do + subject { document.object } + + it { is_expected.to be obj } + end + + describe '#id' do + subject { document.id } + + it { is_expected.to eq 1 } + end + + describe '#type' do + subject { document.type } + + let(:obj) { double(id: 1, type: 'foo', source: source) } + + it { is_expected.to eq 'foo' } + end + + describe '#routing' do + subject { document.routing } + + let(:obj) { double(id: 1, routing: 'foo', source: source) } + + it { is_expected.to eq 'foo' } + end + + describe '#source' do + subject { document.source } + + let(:obj) { double(id: 1, source: { original: 'source' }) } + + it { is_expected.to eq source } + end +end diff --git a/spec/esse/import/bulk_spec.rb b/spec/esse/import/bulk_spec.rb index 555fb1a..6bc46d5 100644 --- a/spec/esse/import/bulk_spec.rb +++ b/spec/esse/import/bulk_spec.rb @@ -8,7 +8,7 @@ let(:create) { [Esse::HashDocument.new(_id: 2, foo: 'bar')] } let(:delete) { [Esse::HashDocument.new(_id: 3, foo: 'bar')] } let(:update) { [Esse::HashDocument.new(_id: 4, foo: 'bar')] } - let(:bulk) { described_class.new(index: index, create: create, delete: delete, update: update) } + let(:bulk) { described_class.build_from_documents(index: index, create: create, delete: delete, update: update) } it 'yields a request body instance' do expect { |b| bulk.each_request(&b) }.to yield_with_args(Esse::Import::RequestBodyAsJson) @@ -61,7 +61,7 @@ let(:delete) do %w[foo bar baz].each_with_index.map { |name, idx| Esse::HashDocument.new(id: idx + 30, name: name) } end - let(:bulk) { described_class.new(index: index, create: create, delete: delete) } + let(:bulk) { described_class.build_from_documents(index: index, create: create, delete: delete) } it 'retries in small chunks' do expect(bulk).to receive(:sleep).with(an_instance_of(Integer)).exactly(3).times diff --git a/spec/esse/lazzy_document_header_spec.rb b/spec/esse/lazzy_document_header_spec.rb index f710a53..6e1d628 100644 --- a/spec/esse/lazzy_document_header_spec.rb +++ b/spec/esse/lazzy_document_header_spec.rb @@ -7,22 +7,6 @@ let(:object) { { id: nil } } let(:options) { {} } - describe '#valid?' do - it { expect(doc).to respond_to :valid? } - - it 'returns false' do - expect(doc.valid?).to be_falsey - end - - context 'when id is present' do - let(:object) { { id: 1 } } - - it 'returns true' do - expect(doc.valid?).to be_truthy - end - end - end - describe '#id' do it { expect(doc).to respond_to :id } @@ -118,7 +102,7 @@ let(:object) { Esse::HashDocument.new(_id: 1) } it 'returns a LazyDocumentHeader instance' do - expect(described_class.coerce(object)).to be_a(described_class) + expect(described_class.coerce(object)).to be(object) end end @@ -161,7 +145,7 @@ def id it 'returns a LazyDocumentHeader instance with the proper id' do instance = described_class.coerce(object) - expect(instance).to be_a(described_class) + expect(instance).to be(instance) expect(instance.id).to eq(2) end end @@ -194,40 +178,40 @@ def id expect(described_class.coerce_each([[{_id: 1}], {_id: 2}]).size).to eq(2) end - it 'coerces a list of Esse::Document instances' do + it 'return same instances when the given argument is already a Esse::Document' do list = [Esse::HashDocument.new(_id: 1), Class.new(Esse::HashDocument).new(_id: 2)] - expect(described_class.coerce_each(list)).to all(be_a(described_class)) + expect(described_class.coerce_each(list)).to all(be_a(Esse::HashDocument)) end end - describe '#to_doc' do + describe '#document_for_partial_update' do let(:options) { { admin: true } } let(:object) { { id: 1, routing: 'il', type: 'state' } } - it { expect(doc).to respond_to :to_doc } + it { expect(doc).to respond_to :document_for_partial_update } it 'returns a Esse::Document instance' do - expect(doc.to_doc).to be_a(Esse::Document) + expect(doc.document_for_partial_update({})).to be_a(Esse::DocumentForPartialUpdate) end it 'returns a Esse::Document instance with the id' do - expect(doc.to_doc.id).to eq(1) + expect(doc.document_for_partial_update({}).id).to eq(1) end it 'returns a Esse::Document instance routing' do - expect(doc.to_doc.routing).to eq('il') + expect(doc.document_for_partial_update({}).routing).to eq('il') end it 'returns a Esse::Document instance with the type' do - expect(doc.to_doc.type).to eq('state') + expect(doc.document_for_partial_update({}).type).to eq('state') end it 'returns a Esse::Document instance with the options' do - expect(doc.to_doc.options).to eq(admin: true) + expect(doc.document_for_partial_update({}).options).to eq(admin: true) end it 'returns a Esse::Document instance with the object as source and the given source' do - new_doc = doc.to_doc(foo: 'bar') + new_doc = doc.document_for_partial_update(foo: 'bar') expect(new_doc.source).to eq(foo: 'bar') expect(new_doc.object).to eq(doc) expect(new_doc.options).to eq(admin: true) @@ -249,4 +233,132 @@ def id end end end + + describe '#eql?' do + specify do + same = described_class.new(id: 1) + diff = described_class.new(id: 2) + expect(described_class.new(id: 1)).to eq(same) + expect(described_class.new(id: 1)).not_to eq(diff) + end + + specify do + same = described_class.new(id: 1) + diff = described_class.new(id: 2) + expect(described_class.new(id: '1')).to eq(same) + end + + specify do + same = described_class.new(id: 1, type: 'foo') + diff = described_class.new(id: 1, type: 'bar') + expect(described_class.new(id: 1, type: 'foo')).to eq(same) + expect(described_class.new(id: 1, type: 'foo')).not_to eq(diff) + end + + specify do + same = described_class.new(id: 1, routing: 'foo') + diff = described_class.new(id: 1, routing: 'bar') + expect(described_class.new(id: 1, routing: 'foo')).to eq(same) + expect(described_class.new(id: 1, routing: 'foo')).not_to eq(diff) + end + + specify do + same = described_class.new(id: 1, type: 'foo', routing: 'bar') + diff = described_class.new(id: 1, type: 'bar', routing: 'foo') + expect(described_class.new(id: 1, type: 'foo', routing: 'bar')).to eq(same) + expect(described_class.new(id: 1, type: 'foo', routing: 'bar')).not_to eq(diff) + end + + context 'when comparing doc type' do + specify do + expected = described_class.new(id: 1, type: '_doc') + expect(described_class.new(id: 1, type: '_doc')).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: '_doc') + expect(described_class.new(id: 1, type: 'doc')).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: '_doc') + expect(described_class.new(id: 1, type: nil)).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: nil) + expect(described_class.new(id: 1, type: '_doc')).to eq(expected) + end + + specify do + expected = described_class.new(id: 1, type: nil) + expect(described_class.new(id: 1, type: 'doc')).to eq(expected) + end + end + + context 'when comparing with a Esse::Document' do + specify do + header = described_class.new(id: 1) + doc = Esse::HashDocument.new(_id: 1) + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1) + doc = Esse::HashDocument.new(_id: 2) + expect(header).not_to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(false) + end + + specify do + header = described_class.new(id: 1, type: '_doc') + doc = Esse::HashDocument.new(_id: 1) + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1) + doc = Esse::HashDocument.new(_id: 1, _type: '_doc') + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1, type: '_doc') + doc = Esse::HashDocument.new(_id: 1, _type: 'foo') + expect(header).not_to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(false) + end + + specify do + header = described_class.new(id: 1, routing: 'il') + doc = Esse::HashDocument.new(_id: 1, _routing: 'il') + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + + specify do + header = described_class.new(id: 1, routing: 'il') + doc = Esse::HashDocument.new(_id: 1, _routing: 'fl') + expect(header).not_to eq(doc) + expect(doc.eql?(header)).to eq(false) + end + + specify do + header = described_class.new(id: 1, extra: 'il') + doc = Esse::HashDocument.new(_id: 1, extra: 'fl') + expect(header).to eq(doc) + expect(doc.eql?(header)).to eq(false) + expect(doc.eql?(header, match_lazy_doc_header: true)).to eq(true) + end + end + end end diff --git a/spec/esse/repository/document_spec.rb b/spec/esse/repository/document_spec.rb index 0409c8d..85afbcc 100644 --- a/spec/esse/repository/document_spec.rb +++ b/spec/esse/repository/document_spec.rb @@ -154,23 +154,23 @@ end end - context 'with lazy_load_attributes' do + context 'with eager_load_lazy_attributes' do include_context 'with stories index definition' - it 'yields serialized objects with lazy attributes when passing lazy_attributes: true' do + it 'yields serialized objects with lazy attributes when passing eager_load_lazy_attributes: true' do expected_data = [] expect { - StoriesIndex::Story.each_serialized_batch(lazy_attributes: true) do |batch| + StoriesIndex::Story.each_serialized_batch(eager_load_lazy_attributes: true) do |batch| expected_data.push(*batch) end }.not_to raise_error expect(expected_data.select { |doc| doc.to_h.key?(:tags) && doc.to_h.key?(:tags_count) }).not_to be_empty end - it 'yields serialized objects without lazy attributes when passing lazy_attributes: false' do + it 'yields serialized objects without lazy attributes when passing eager_load_lazy_attributes: false' do expected_data = [] expect { - StoriesIndex::Story.each_serialized_batch(lazy_attributes: false) do |batch| + StoriesIndex::Story.each_serialized_batch(eager_load_lazy_attributes: false) do |batch| expected_data.push(*batch) end }.not_to raise_error @@ -180,13 +180,69 @@ it 'yields serialized objects with lazy attributes when passing specific attributes' do expected_data = [] expect { - StoriesIndex::Story.each_serialized_batch(lazy_attributes: %i[tags]) do |batch| + StoriesIndex::Story.each_serialized_batch(eager_load_lazy_attributes: %i[tags]) do |batch| expected_data.push(*batch) end }.not_to raise_error expect(expected_data.select { |doc| doc.to_h.key?(:tags) && !doc.to_h.key?(:tags_count) }).not_to be_empty end end + + context 'with preload_lazy_attributes' do + include_context 'with stories index definition' + + let(:nyt_hits) do + nyt_stories.map do |hash| + { + '_id' => hash[:id].to_s, + '_routing' => 'nyt', + '_type' => '_doc', + '_source' => { + 'tags' => hash[:tags], + 'tags_count' => hash[:tags].size, + }, + } + end + end + + let(:wsj_hits) do + wsj_stories.map do |hash| + { + '_id' => hash[:id].to_s, + '_routing' => 'wsj', + '_type' => '_doc', + '_source' => { + 'tags' => hash[:tags], + 'tags_count' => hash[:tags].size, + }, + } + end + end + + it 'yields serialized objects with lazy attributes when passing preload_lazy_attributes: true' do + expect(StoriesIndex).to receive(:search).with( + query: { ids: { values: nyt_stories.map { |hash| hash[:id] } } }, + size: nyt_stories.size, + _source: %i[tags tags_count], + routing: 'nyt', + ).and_return(double(response: double(hits: nyt_hits))) + expect(StoriesIndex).to receive(:search).with( + query: { ids: { values: wsj_stories.map { |hash| hash[:id] } } }, + size: wsj_stories.size, + _source: %i[tags tags_count], + routing: 'wsj', + ).and_return(double(response: double(hits: wsj_hits))) + expected_data = [] + expect { + StoriesIndex::Story.each_serialized_batch(preload_lazy_attributes: true) do |batch| + expected_data.push(*batch) + end + }.not_to raise_error + expect(expected_data.map(&:mutations)).to all( + include(tags: be_a(Array), tags_count: be_a(Integer)), + ) + end + end end describe '.documents' do diff --git a/spec/esse/repository/lazy_document_spec.rb b/spec/esse/repository/lazy_document_spec.rb index a9a8675..bd3b600 100644 --- a/spec/esse/repository/lazy_document_spec.rb +++ b/spec/esse/repository/lazy_document_spec.rb @@ -90,7 +90,7 @@ it 'returns an array of documents that match with the provided ids' do docs = repo.documents_for_lazy_attribute(:city_names, '2') expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end @@ -98,14 +98,14 @@ header = Esse::LazyDocumentHeader.coerce(id: '2') docs = repo.documents_for_lazy_attribute(:city_names, header) expect(docs).to eq([ - Esse::LazyDocumentHeader::Document.new(header, source: { city_names: 'London' }) + Esse::DocumentForPartialUpdate.new(header, source: { city_names: 'London' }) ]) end it 'returns an array of documents that match with the provided single hash' do docs = repo.documents_for_lazy_attribute(:city_names, {id: '2', admin: true}) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2', admin: true).to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2', admin: true).document_for_partial_update(city_names: 'London') ]) end end @@ -133,21 +133,21 @@ it 'returns an array of documents that match with the provided ids' do docs = repo.documents_for_lazy_attribute(:city_names, '2') expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end it 'returns an array of documents that match with the provided LazyDocumentHeader' do docs = repo.documents_for_lazy_attribute(:city_names, Esse::LazyDocumentHeader.coerce(id: '2')) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end it 'do not include duplicate documents' do docs = repo.documents_for_lazy_attribute(:city_names, ['2', '2', Esse::LazyDocumentHeader.coerce(id: '2')]) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ]) end end @@ -169,9 +169,9 @@ it 'returns an array of documents that match with the provided ids' do docs = repo.documents_for_lazy_attribute(:city_names, ['2', '3', '4']) expect(docs).to eq([ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London'), - Esse::LazyDocumentHeader.new(id: '3').to_doc(city_names: nil), - Esse::LazyDocumentHeader.new(id: '4').to_doc(city_names: ''), + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London'), + Esse::LazyDocumentHeader.new(id: '3').document_for_partial_update(city_names: nil), + Esse::LazyDocumentHeader.new(id: '4').document_for_partial_update(city_names: ''), ]) end end @@ -202,7 +202,7 @@ it 'updates the documents' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, '2') @@ -212,7 +212,7 @@ header = Esse::LazyDocumentHeader.coerce(id: '2') expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader::Document.new(header, source: { city_names: 'London' }) + Esse::DocumentForPartialUpdate.new(header, source: { city_names: 'London' }) ] ) repo.update_documents_attribute(:city_names, header) @@ -221,7 +221,7 @@ it 'updates the documents when passing a single hash' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2', admin: true).to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2', admin: true).document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, {id: '2', admin: true}) @@ -230,7 +230,7 @@ it 'updates the documents when passing an array of ids' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, ['2']) @@ -240,7 +240,7 @@ header = Esse::LazyDocumentHeader.coerce(id: '2') expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader::Document.new(header, source: { city_names: 'London' }) + Esse::DocumentForPartialUpdate.new(header, source: { city_names: 'London' }) ] ) repo.update_documents_attribute(:city_names, [header]) @@ -249,7 +249,7 @@ it 'updates the documents when passing an array of hashes' do expect(repo.index).to receive(:bulk).with( update: [ - Esse::LazyDocumentHeader.new(id: '2').to_doc(city_names: 'London') + Esse::LazyDocumentHeader.new(id: '2').document_for_partial_update(city_names: 'London') ] ) repo.update_documents_attribute(:city_names, [{id: '2', admin: true}])