From 8e1c832e1eff7ee7f264d3f335b009a46da37569 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 13:39:45 -0300 Subject: [PATCH] reduce memory usage by creating raw hash instead of coercing them to doc/header doc --- lib/esse/index/documents.rb | 82 ++++++++++++++++++++++++-------- lib/esse/lazy_document_header.rb | 7 +++ 2 files changed, 69 insertions(+), 20 deletions(-) diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index b2a2124..196cbd5 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -171,9 +171,36 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n }.merge(options) cluster.may_update_type!(definition) + to_index = Esse::ArrayUtils.wrap(index) + to_create = Esse::ArrayUtils.wrap(create) + to_update = Esse::ArrayUtils.wrap(update) + to_delete = Esse::ArrayUtils.wrap(delete) + index = to_index.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk + value[:_type] ||= type if type + value + end + index.push(*to_create.select { |item| item.is_a?(Hash) }) + create = to_create.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk + value[:_type] ||= type if type + value + end + update = to_update.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| + value = doc.to_bulk(operation: :update) + value[:_type] ||= type if type + value + end + update.push(*to_update.select { |item| item.is_a?(Hash) }) + delete = to_delete.select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| + value = doc.to_bulk(data: false) + value[:_type] ||= type if type + value + end + delete.push(*to_delete.select { |item| item.is_a?(Hash) }) + # @TODO Wrap the return in a some other Stats object with more information - Esse::Import::Bulk.build_from_documents( - **definition.slice(:type), + Esse::Import::Bulk.new( create: create, delete: delete, index: index, @@ -213,6 +240,12 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes) eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes) + doc_header_check = ->(doc, hash) do + hash['_id'] && hash['_id'].to_s == doc.id.to_s && + hash['_routing'] == doc.routing && + (LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(hash['_type']) || doc.type == hash['_type']) + end + repo_hash.slice(*repo_types).each do |repo_name, repo| # Elasticsearch 6.x and older have multiple types per index. # This gem supports multiple types per index for backward compatibility, but we recommend to update @@ -260,39 +293,48 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} } if lazy_attrs_to_search_preload.any? - hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits - hits.each do |hit| - doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing', '_type')) - next if doc_header.id.nil? + entries.group_by(&:routing).each do |routing, docs| + search_request = { query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload } + search_request[:routing] = routing if routing + hits = repo.index.search(**search_request).response.hits + hits.each do |hit| + header = hit.slice('_id', '_routing', '_type') + next if header['_id'].nil? - hit.dig('_source')&.each do |attr_name, attr_value| - real_attr_name = repo.lazy_document_attribute_names(attr_name).first - preload_search_result[real_attr_name][doc_header] = attr_value + hit.dig('_source')&.each do |attr_name, attr_value| + real_attr_name = repo.lazy_document_attribute_names(attr_name).first + preload_search_result[real_attr_name][header] = attr_value + end end - end - preload_search_result.each do |attr_name, values| - values.each do |doc_header, value| - doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) } - doc&.mutate(attr_name) { value } + preload_search_result.each do |attr_name, values| + values.each do |header, value| + doc = entries.find { |d| doc_header_check.call(d, header) } + doc&.mutate(attr_name) { value } + end end end end bulk(**bulk_kwargs, index: entries) + update_lazy_attrs = Hash.new { |h, k| h[k] = {} } lazy_attrs_to_update_after.each do |attr_name| preloaded_ids = preload_search_result[attr_name].keys filtered_docs = entries.reject do |doc| - doc.ignore_on_index? || preloaded_ids.any? { |d| doc.eql?(d, match_lazy_doc_header: true) } + doc.ignore_on_index? || preloaded_ids.any? { |h| doc_header_check.call(doc, h) } end next if filtered_docs.empty? - partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs) - next if partial_docs.empty? - - bulk(**bulk_kwargs, update: partial_docs) + repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).map do |doc, datum| + update_lazy_attrs[doc.doc_header][attr_name] = datum + end + end + if update_lazy_attrs.any? + bulk_update = update_lazy_attrs.map do |header, values| + header.merge(data: {doc: values}) + end + bulk(**bulk_kwargs, update: bulk_update) end - count += entries.size end end diff --git a/lib/esse/lazy_document_header.rb b/lib/esse/lazy_document_header.rb index 7144d3b..972b572 100644 --- a/lib/esse/lazy_document_header.rb +++ b/lib/esse/lazy_document_header.rb @@ -61,6 +61,13 @@ def document_for_partial_update(source) Esse::DocumentForPartialUpdate.new(self, source: source) end + def doc_header + { _id: id }.tap do |hash| + hash[:_type] = type if type + hash[:routing] = routing if routing + end + end + def eql?(other, **) ACCEPTABLE_CLASSES.any? { |klass| other.is_a?(klass) } && id.to_s == other.id.to_s &&