Skip to content

Commit

Permalink
Memory Optimization (#25)
Browse files Browse the repository at this point in the history
* during import, use the doc directly should reduce amount of allocated objects

* feat: start updating bulk to work with std hash as default

* reduce memory usage by creating raw hash instead of coercing them to doc/header doc

* keep bulk order

* minor refactoring

* fix: do not compare document source, they may have attributes dinamically generated

* feat: do not create new array when values are already a doc instances

* feat: simplify data structure

* chore: rename lazy_attributes to eager_load_lazy_attributes

* feat: refactoring by reusing each_serialized_batch

* feat: update rexml close #7
  • Loading branch information
marcosgz authored Aug 23, 2024
1 parent 56e9a20 commit 8fd3022
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 177 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing

## 0.3.6 - 2024-08-07
* Esse::LazyDocumentHeader#to_doc return `Esse::LazyDocumentHeader::Document` instance to properly separate context metadata from document source
* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source
* Add `.collection_class` method to the `Esse::Repository` class to let external plugins and extensions to access it instead of read @collection_proc variable

## 0.3.5 - 2024-08-02
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ GEM
rainbow (3.1.1)
rake (12.3.3)
regexp_parser (2.9.2)
rexml (3.3.1)
rexml (3.3.6)
strscan
rspec (3.13.0)
rspec-core (~> 3.13.0)
Expand Down
7 changes: 7 additions & 0 deletions lib/esse/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Esse
require_relative 'primitives'
require_relative 'collection'
require_relative 'document'
require_relative 'document_for_partial_update'
require_relative 'document_lazy_attribute'
require_relative 'lazy_document_header'
require_relative 'hash_document'
Expand Down Expand Up @@ -91,4 +92,10 @@ def self.document?(object)

!!(object.is_a?(Esse::Document) && object.id)
end

def self.document_match_with_header?(document, id, routing, type)
id && id.to_s == document.id.to_s &&
routing == document.routing &&
(LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(document.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || document.type == type)
end
end
23 changes: 19 additions & 4 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module Esse
class Document
MUTATIONS_FALLBACK = {}.freeze

attr_reader :object, :options

def initialize(object, **options)
Expand Down Expand Up @@ -84,11 +86,16 @@ def ignore_on_delete?
id.nil?
end

def ==(other)
other.is_a?(self.class) && (
id == other.id && type == other.type && routing == other.routing && meta == other.meta && source == other.source
)
def eql?(other, match_lazy_doc_header: false)
if match_lazy_doc_header
other.eql?(self)
else
other.is_a?(Esse::Document) && (
id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta
)
end
end
alias_method :==, :eql?

def doc_header
{ _id: id }.tap do |h|
Expand All @@ -97,6 +104,10 @@ def doc_header
end
end

def document_for_partial_update(source)
DocumentForPartialUpdate.new(self, source: source)
end

def inspect
attributes = %i[id routing source].map do |attr|
value = send(attr)
Expand All @@ -115,6 +126,10 @@ def mutate(key)
instance_variable_set(:@__mutated_source__, nil)
end

def mutations
@__mutations__ || MUTATIONS_FALLBACK
end

def mutated_source
return source unless @__mutations__

Expand Down
16 changes: 16 additions & 0 deletions lib/esse/document_for_partial_update.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Esse
class DocumentForPartialUpdate < Esse::Document
extend Forwardable

def_delegators :object, :id, :type, :routing, :options

attr_reader :source

def initialize(lazy_header, source:)
@source = source
super(lazy_header)
end
end
end
30 changes: 17 additions & 13 deletions lib/esse/import/bulk.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
module Esse
module Import
class Bulk
def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil)
@index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
def self.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil)
index = Array(index).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
{ index: value }
value
end
@create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
create = Array(create).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
{ create: value }
value
end
@update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
update = Array(update).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk(operation: :update)
value[:_type] ||= type if type
{ update: value }
value
end
@delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc|
delete = Array(delete).select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc|
value = doc.to_bulk(data: false)
value[:_type] ||= type if type
{ delete: value }
value
end
new(index: index, delete: delete, create: create, update: update)
end

def initialize(index: nil, delete: nil, create: nil, update: nil)
@index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } }
@create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } }
@update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } }
@delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } }
end

# Return an array of RequestBody instances
Expand Down Expand Up @@ -68,10 +76,6 @@ def each_request(max_retries: 4, last_retry_in_small_chunks: true)

private

def valid_doc?(doc)
Esse.document?(doc)
end

def optimistic_request
request = Import::RequestBodyAsJson.new
request.create = @create
Expand Down
139 changes: 71 additions & 68 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,53 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n
}.merge(options)
cluster.may_update_type!(definition)

to_index = []
to_create = []
to_update = []
to_delete = []
Esse::ArrayUtils.wrap(index).each do |doc|
if doc.is_a?(Hash)
to_index << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk
hash[:_type] ||= type if type
to_index << hash
end
end
Esse::ArrayUtils.wrap(create).each do |doc|
if doc.is_a?(Hash)
to_create << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk
hash[:_type] ||= type if type
to_create << hash
end
end
Esse::ArrayUtils.wrap(update).each do |doc|
if doc.is_a?(Hash)
to_update << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk(operation: :update)
hash[:_type] ||= type if type
to_update << hash
end
end
Esse::ArrayUtils.wrap(delete).each do |doc|
if doc.is_a?(Hash)
to_delete << doc
elsif Esse.document?(doc) && !doc.ignore_on_delete?
hash = doc.to_bulk(data: false)
hash[:_type] ||= type if type
to_delete << hash
end
end

# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.new(
**definition.slice(:type),
create: create,
delete: delete,
index: index,
update: update,
create: to_create,
delete: to_delete,
index: to_index,
update: to_update,
).each_request do |request_body|
cluster.api.bulk(**definition, body: request_body.body) do |event_payload|
event_payload[:body_stats] = request_body.stats
Expand Down Expand Up @@ -209,9 +249,14 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
repo_types = repo_hash.keys if repo_types.empty?
count = 0

# Backward compatibility while I change plugins using it
update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes)
eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes)
if options.key?(:eager_include_document_attributes)
warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.'
eager_load_lazy_attributes = options.delete(:eager_include_document_attributes)
end
if options.key?(:lazy_update_document_attributes)
warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.'
update_lazy_attributes = options.delete(:lazy_update_document_attributes)
end

repo_hash.slice(*repo_types).each do |repo_name, repo|
# Elasticsearch 6.x and older have multiple types per index.
Expand All @@ -223,76 +268,34 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
bulk_kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(bulk_kwargs)

lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes)
lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes)
lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes)
lazy_attrs_to_update_after -= lazy_attrs_to_eager_load
lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load

# @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before:
# context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any?
# repo.each_serialized_batch(**context) do |batch|
# bulk(**bulk_kwargs, index: batch)

# lazy_attrs_to_update_after.each do |attr_name|
# partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
# next if partial_docs.empty?

# bulk(**bulk_kwargs, update: partial_docs)
# end
# count += batch.size
# end
context ||= {}
repo.send(:each_batch, **context) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact
context[:eager_load_lazy_attributes] = eager_load_lazy_attributes
context[:preload_lazy_attributes] = preload_lazy_attributes
repo.each_serialized_batch(**context) do |batch|
bulk(**bulk_kwargs, index: batch)

if lazy_attrs_to_eager_load
attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load)
attrs.each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
if update_lazy_attributes != false
attrs = repo.lazy_document_attribute_names(update_lazy_attributes)
attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes)
update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo|
filtered_docs = batch.reject do |doc|
doc.ignore_on_index? || doc.mutations.key?(attr_name)
end
end
end
next if filtered_docs.empty?

preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} }
if lazy_attrs_to_search_preload.any?
hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits
hits.each do |hit|
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly
next unless doc_header.valid?
hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][doc_header] = attr_value
repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value|
memo[doc.doc_header][attr_name] = value
end
end
preload_search_result.each do |attr_name, values|
values.each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
if update_attrs.any?
bulk_update = update_attrs.map do |header, values|
header.merge(data: {doc: values})
end
bulk(**bulk_kwargs, update: bulk_update)
end
end

bulk(**bulk_kwargs, index: entries)

lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing }
end
next if filtered_docs.empty?

partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs)
next if partial_docs.empty?

bulk(**bulk_kwargs, update: partial_docs)
end

count += entries.size
count += batch.size
end
end
count
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/index/object_document_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def each_serialized_batch(repo_name = nil, **kwargs, &block)
# @return [Enumerator] All serialized entries
def documents(repo_name = nil, **kwargs)
Enumerator.new do |yielder|
each_serialized_batch(repo_name, **kwargs) do |documents, **_collection_kargs|
each_serialized_batch(repo_name, **kwargs) do |documents|
documents.each { |document| yielder.yield(document) }
end
end
Expand Down
Loading

0 comments on commit 8fd3022

Please sign in to comment.