Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Optimization #25

Merged
merged 11 commits into from
Aug 23, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing

## 0.3.6 - 2024-08-07
* Esse::LazyDocumentHeader#to_doc return `Esse::LazyDocumentHeader::Document` instance to properly separate context metadata from document source
* Esse::LazyDocumentHeader#to_doc return `Esse::DocumentForPartialUpdate` instance to properly separate context metadata from document source
* Add `.collection_class` method to the `Esse::Repository` class to let external plugins and extensions to access it instead of read @collection_proc variable

## 0.3.5 - 2024-08-02
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ GEM
rainbow (3.1.1)
rake (12.3.3)
regexp_parser (2.9.2)
rexml (3.3.1)
rexml (3.3.6)
strscan
rspec (3.13.0)
rspec-core (~> 3.13.0)
Expand Down
7 changes: 7 additions & 0 deletions lib/esse/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Esse
require_relative 'primitives'
require_relative 'collection'
require_relative 'document'
require_relative 'document_for_partial_update'
require_relative 'document_lazy_attribute'
require_relative 'lazy_document_header'
require_relative 'hash_document'
Expand Down Expand Up @@ -91,4 +92,10 @@ def self.document?(object)

!!(object.is_a?(Esse::Document) && object.id)
end

def self.document_match_with_header?(document, id, routing, type)
id && id.to_s == document.id.to_s &&
routing == document.routing &&
(LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(document.type) && LazyDocumentHeader::ACCEPTABLE_DOC_TYPES.include?(type) || document.type == type)
end
end
23 changes: 19 additions & 4 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module Esse
class Document
MUTATIONS_FALLBACK = {}.freeze

attr_reader :object, :options

def initialize(object, **options)
Expand Down Expand Up @@ -84,11 +86,16 @@ def ignore_on_delete?
id.nil?
end

def ==(other)
other.is_a?(self.class) && (
id == other.id && type == other.type && routing == other.routing && meta == other.meta && source == other.source
)
def eql?(other, match_lazy_doc_header: false)
if match_lazy_doc_header
other.eql?(self)
else
other.is_a?(Esse::Document) && (
id.to_s == other.id.to_s && type == other.type && routing == other.routing && meta == other.meta
)
end
end
alias_method :==, :eql?

def doc_header
{ _id: id }.tap do |h|
Expand All @@ -97,6 +104,10 @@ def doc_header
end
end

def document_for_partial_update(source)
DocumentForPartialUpdate.new(self, source: source)
end

def inspect
attributes = %i[id routing source].map do |attr|
value = send(attr)
Expand All @@ -115,6 +126,10 @@ def mutate(key)
instance_variable_set(:@__mutated_source__, nil)
end

def mutations
@__mutations__ || MUTATIONS_FALLBACK
end

def mutated_source
return source unless @__mutations__

Expand Down
16 changes: 16 additions & 0 deletions lib/esse/document_for_partial_update.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

module Esse
class DocumentForPartialUpdate < Esse::Document
extend Forwardable

def_delegators :object, :id, :type, :routing, :options

attr_reader :source

def initialize(lazy_header, source:)
@source = source
super(lazy_header)
end
end
end
30 changes: 17 additions & 13 deletions lib/esse/import/bulk.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
module Esse
module Import
class Bulk
def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil)
@index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
def self.build_from_documents(type: nil, index: nil, delete: nil, create: nil, update: nil)
index = Array(index).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
{ index: value }
value
end
@create = Array(create).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
create = Array(create).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
{ create: value }
value
end
@update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
update = Array(update).select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk(operation: :update)
value[:_type] ||= type if type
{ update: value }
value
end
@delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc|
delete = Array(delete).select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc|
value = doc.to_bulk(data: false)
value[:_type] ||= type if type
{ delete: value }
value
end
new(index: index, delete: delete, create: create, update: update)
end

def initialize(index: nil, delete: nil, create: nil, update: nil)
@index = Esse::ArrayUtils.wrap(index).map { |payload| { index: payload } }
@create = Esse::ArrayUtils.wrap(create).map { |payload| { create: payload } }
@update = Esse::ArrayUtils.wrap(update).map { |payload| { update: payload } }
@delete = Esse::ArrayUtils.wrap(delete).map { |payload| { delete: payload } }
end

# Return an array of RequestBody instances
Expand Down Expand Up @@ -68,10 +76,6 @@ def each_request(max_retries: 4, last_retry_in_small_chunks: true)

private

def valid_doc?(doc)
Esse.document?(doc)
end

def optimistic_request
request = Import::RequestBodyAsJson.new
request.create = @create
Expand Down
139 changes: 71 additions & 68 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,53 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n
}.merge(options)
cluster.may_update_type!(definition)

to_index = []
to_create = []
to_update = []
to_delete = []
Esse::ArrayUtils.wrap(index).each do |doc|
if doc.is_a?(Hash)
to_index << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk
hash[:_type] ||= type if type
to_index << hash
end
end
Esse::ArrayUtils.wrap(create).each do |doc|
if doc.is_a?(Hash)
to_create << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk
hash[:_type] ||= type if type
to_create << hash
end
end
Esse::ArrayUtils.wrap(update).each do |doc|
if doc.is_a?(Hash)
to_update << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk(operation: :update)
hash[:_type] ||= type if type
to_update << hash
end
end
Esse::ArrayUtils.wrap(delete).each do |doc|
if doc.is_a?(Hash)
to_delete << doc
elsif Esse.document?(doc) && !doc.ignore_on_delete?
hash = doc.to_bulk(data: false)
hash[:_type] ||= type if type
to_delete << hash
end
end

# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.new(
**definition.slice(:type),
create: create,
delete: delete,
index: index,
update: update,
create: to_create,
delete: to_delete,
index: to_index,
update: to_update,
).each_request do |request_body|
cluster.api.bulk(**definition, body: request_body.body) do |event_payload|
event_payload[:body_stats] = request_body.stats
Expand Down Expand Up @@ -209,9 +249,14 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
repo_types = repo_hash.keys if repo_types.empty?
count = 0

# Backward compatibility while I change plugins using it
update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes)
eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes)
if options.key?(:eager_include_document_attributes)
warn 'The `eager_include_document_attributes` option is deprecated. Use `eager_load_lazy_attributes` instead.'
eager_load_lazy_attributes = options.delete(:eager_include_document_attributes)
end
if options.key?(:lazy_update_document_attributes)
warn 'The `lazy_update_document_attributes` option is deprecated. Use `update_lazy_attributes` instead.'
update_lazy_attributes = options.delete(:lazy_update_document_attributes)
end

repo_hash.slice(*repo_types).each do |repo_name, repo|
# Elasticsearch 6.x and older have multiple types per index.
Expand All @@ -223,76 +268,34 @@ def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_l
bulk_kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(bulk_kwargs)

lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes)
lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes)
lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes)
lazy_attrs_to_update_after -= lazy_attrs_to_eager_load
lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load

# @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before:
# context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any?
# repo.each_serialized_batch(**context) do |batch|
# bulk(**bulk_kwargs, index: batch)

# lazy_attrs_to_update_after.each do |attr_name|
# partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
# next if partial_docs.empty?

# bulk(**bulk_kwargs, update: partial_docs)
# end
# count += batch.size
# end
context ||= {}
repo.send(:each_batch, **context) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact
context[:eager_load_lazy_attributes] = eager_load_lazy_attributes
context[:preload_lazy_attributes] = preload_lazy_attributes
repo.each_serialized_batch(**context) do |batch|
bulk(**bulk_kwargs, index: batch)

if lazy_attrs_to_eager_load
attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load)
attrs.each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
if update_lazy_attributes != false
attrs = repo.lazy_document_attribute_names(update_lazy_attributes)
attrs -= repo.lazy_document_attribute_names(eager_load_lazy_attributes)
update_attrs = attrs.each_with_object(Hash.new { |h, k| h[k] = {} }) do |attr_name, memo|
filtered_docs = batch.reject do |doc|
doc.ignore_on_index? || doc.mutations.key?(attr_name)
end
end
end
next if filtered_docs.empty?

preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} }
if lazy_attrs_to_search_preload.any?
hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits
hits.each do |hit|
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly
next unless doc_header.valid?
hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][doc_header] = attr_value
repo.retrieve_lazy_attribute_values(attr_name, filtered_docs).each do |doc, value|
memo[doc.doc_header][attr_name] = value
end
end
preload_search_result.each do |attr_name, values|
values.each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
if update_attrs.any?
bulk_update = update_attrs.map do |header, values|
header.merge(data: {doc: values})
end
bulk(**bulk_kwargs, update: bulk_update)
end
end

bulk(**bulk_kwargs, index: entries)

lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing }
end
next if filtered_docs.empty?

partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs)
next if partial_docs.empty?

bulk(**bulk_kwargs, update: partial_docs)
end

count += entries.size
count += batch.size
end
end
count
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/index/object_document_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def each_serialized_batch(repo_name = nil, **kwargs, &block)
# @return [Enumerator] All serialized entries
def documents(repo_name = nil, **kwargs)
Enumerator.new do |yielder|
each_serialized_batch(repo_name, **kwargs) do |documents, **_collection_kargs|
each_serialized_batch(repo_name, **kwargs) do |documents|
documents.each { |document| yielder.yield(document) }
end
end
Expand Down
Loading
Loading