Skip to content

Commit

Permalink
reduce memory usage by creating raw hash instead of coercing them to …
Browse files Browse the repository at this point in the history
…doc/header doc
  • Loading branch information
marcosgz committed Aug 19, 2024
1 parent e2a21fb commit 8e1c832
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 20 deletions.
82 changes: 62 additions & 20 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,36 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n
}.merge(options)
cluster.may_update_type!(definition)

to_index = Esse::ArrayUtils.wrap(index)
to_create = Esse::ArrayUtils.wrap(create)
to_update = Esse::ArrayUtils.wrap(update)
to_delete = Esse::ArrayUtils.wrap(delete)
index = to_index.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
value
end
index.push(*to_create.select { |item| item.is_a?(Hash) })
create = to_create.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
value
end
update = to_update.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk(operation: :update)
value[:_type] ||= type if type
value
end
update.push(*to_update.select { |item| item.is_a?(Hash) })
delete = to_delete.select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc|
value = doc.to_bulk(data: false)
value[:_type] ||= type if type
value
end
delete.push(*to_delete.select { |item| item.is_a?(Hash) })

# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.build_from_documents(
**definition.slice(:type),
Esse::Import::Bulk.new(
create: create,
delete: delete,
index: index,
Expand Down Expand Up @@ -213,6 +240,12 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes)
eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes)

doc_header_check = ->(doc, hash) do
hash['_id'] && hash['_id'].to_s == doc.id.to_s &&
hash['_routing'] == doc.routing &&
(LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(doc.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(hash['_type']) || doc.type == hash['_type'])
end

repo_hash.slice(*repo_types).each do |repo_name, repo|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
Expand Down Expand Up @@ -260,39 +293,48 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l

preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} }
if lazy_attrs_to_search_preload.any?
hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits
hits.each do |hit|
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing', '_type'))
next if doc_header.id.nil?
entries.group_by(&:routing).each do |routing, docs|
search_request = { query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload }
search_request[:routing] = routing if routing
hits = repo.index.search(**search_request).response.hits
hits.each do |hit|
header = hit.slice('_id', '_routing', '_type')
next if header['_id'].nil?

hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][doc_header] = attr_value
hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][header] = attr_value
end
end
end
preload_search_result.each do |attr_name, values|
values.each do |doc_header, value|
doc = entries.find { |d| d.eql?(doc_header, match_lazy_doc_header: true) }
doc&.mutate(attr_name) { value }
preload_search_result.each do |attr_name, values|
values.each do |header, value|
doc = entries.find { |d| doc_header_check.call(d, header) }
doc&.mutate(attr_name) { value }
end
end
end
end

bulk(**bulk_kwargs, index: entries)

update_lazy_attrs = Hash.new { |h, k| h[k] = {} }
lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |d| doc.eql?(d, match_lazy_doc_header: true) }
doc.ignore_on_index? || preloaded_ids.any? { |h| doc_header_check.call(doc, h) }
end
next if filtered_docs.empty?

partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs)
next if partial_docs.empty?

bulk(**bulk_kwargs, update: partial_docs)
repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).map do |doc, datum|
update_lazy_attrs[doc.doc_header][attr_name] = datum
end
end
if update_lazy_attrs.any?
bulk_update = update_lazy_attrs.map do |header, values|
header.merge(data: {doc: values})
end
bulk(**bulk_kwargs, update: bulk_update)
end

count += entries.size
end
end
Expand Down
7 changes: 7 additions & 0 deletions lib/esse/lazy_document_header.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ def document_for_partial_update(source)
Esse::DocumentForPartialUpdate.new(self, source: source)
end

def doc_header
{ _id: id }.tap do |hash|
hash[:_type] = type if type
hash[:routing] = routing if routing
end
end

def eql?(other, **)
ACCEPTABLE_CLASSES.any? { |klass| other.is_a?(klass) } &&
id.to_s == other.id.to_s &&
Expand Down

0 comments on commit 8e1c832

Please sign in to comment.