Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import preloading lazy attribute #24

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 0.4.0 - 2024-08-16
* Rename lazy_update_document_attributes to update_lazy_attributes
* Rename eager_include_document_attributes to eager_load_lazy_attributes
* Add preload_lazy_attributes option to the import in order to fetch the lazy attributes in a single query before bulk indexing

## 0.3.6 - 2024-08-07
* Esse::LazyDocumentHeader#to_doc return `Esse::LazyDocumentHeader::Document` instance to properly separate context metadata from document source
* Add `.collection_class` method to the `Esse::Repository` class to let external plugins and extensions to access it instead of read @collection_proc variable
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.6)
esse (0.4.0.rc1)
multi_json
thor (>= 0.19)

Expand Down
17 changes: 8 additions & 9 deletions lib/esse/cli/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,17 @@ def open(*index_classes)
option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name'
option :context, type: :hash, default: {}, required: true, desc: 'List of options to pass to the index class'
option :repo, type: :string, default: nil, alias: '-r', desc: 'Repository to use for import'
option :eager_include_document_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to include to the bulk index request. Or pass `true` to include all lazy attributes'
option :lazy_update_document_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to bulk update after the bulk index request Or pass `true` to include all lazy attributes'
option :preload_lazy_attributes, type: :string, default: nil, desc: 'Command separated list of lazy document attributes to preload using search API before the bulk import. Or pass `true` to preload all lazy attributes'
option :eager_load_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to include to the bulk index request. Or pass `true` to include all lazy attributes'
option :update_lazy_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to bulk update after the bulk index request Or pass `true` to include all lazy attributes'

def import(*index_classes)
require_relative 'index/import'
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
opts.delete(:lazy_update_document_attributes) if opts[:lazy_update_document_attributes] == 'false'
opts.delete(:eager_include_document_attributes) if opts[:eager_include_document_attributes] == 'false'
if (val = opts[:eager_include_document_attributes])
opts[:eager_include_document_attributes] = (val == 'true') ? true : val.split(',')
end
if (val = opts[:lazy_update_document_attributes])
opts[:lazy_update_document_attributes] = (val == 'true') ? true : val.split(',')
%i[preload_lazy_attributes eager_load_lazy_attributes update_lazy_attributes].each do |key|
if (val = opts.delete(key)) && val != 'false'
opts[key] = (val == 'true') ? true : val.split(',')
end
end
Import.new(indices: index_classes, **opts).run
end
Expand Down
2 changes: 0 additions & 2 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ def mutate(key)
instance_variable_set(:@__mutated_source__, nil)
end

protected

def mutated_source
return source unless @__mutations__

Expand Down
106 changes: 84 additions & 22 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def delete(doc = nil, suffix: nil, **options)
def update(doc = nil, suffix: nil, **options)
if document?(doc)
options[:id] = doc.id
options[:body] = { doc: doc.source }
options[:body] = { doc: doc.mutated_source }
options[:type] = doc.type if doc.type?
options[:routing] = doc.routing if doc.routing?
end
Expand All @@ -140,7 +140,7 @@ def update(doc = nil, suffix: nil, **options)
def index(doc = nil, suffix: nil, **options)
if document?(doc)
options[:id] = doc.id
options[:body] = doc.source
options[:body] = doc.mutated_source
options[:type] = doc.type if doc.type?
options[:routing] = doc.routing if doc.routing?
end
Expand Down Expand Up @@ -198,39 +198,101 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n
# @option [String, nil] :suffix The index suffix. Defaults to the nil.
# @option [Hash] :context The collection context. This value will be passed as argument to the collection
# May be SQL condition or any other filter you have defined on the collection.
# @option [Boolean, Array<String>] :eager_load_lazy_attributes A list of lazy document attributes to include to the bulk index request.
# Or pass `true` to include all lazy attributes.
# @option [Boolean, Array<String>] :update_lazy_attributes A list of lazy document attributes to bulk update each after the bulk import.
# Or pass `true` to update all lazy attributes.
# @option [Boolean, Array<String>] :preload_lazy_attributes A list of lazy document attributes to preload using search API before the bulk import.
# Or pass `true` to preload all lazy attributes.
# @return [Numeric] The number of documents imported
def import(*repo_types, context: {}, eager_include_document_attributes: false, lazy_update_document_attributes: false, suffix: nil, **options)
def import(*repo_types, context: {}, eager_load_lazy_attributes: false, update_lazy_attributes: false, preload_lazy_attributes: false, suffix: nil, **options)
repo_types = repo_hash.keys if repo_types.empty?
count = 0

# Backward compatibility while I change plugins using it
update_lazy_attributes = options.delete(:lazy_update_document_attributes) if options.key?(:lazy_update_document_attributes)
eager_load_lazy_attributes = options.delete(:eager_include_document_attributes) if options.key?(:eager_include_document_attributes)

repo_hash.slice(*repo_types).each do |repo_name, repo|
doc_attrs = {eager: [], lazy: []}
doc_attrs[:eager] = repo.lazy_document_attribute_names(eager_include_document_attributes)
doc_attrs[:lazy] = repo.lazy_document_attribute_names(lazy_update_document_attributes)
doc_attrs[:lazy] -= doc_attrs[:eager]
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
#
# Note that the repository name will be used as the document type.
# mapping_default_type
bulk_kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(bulk_kwargs)

lazy_attrs_to_eager_load = repo.lazy_document_attribute_names(eager_load_lazy_attributes)
lazy_attrs_to_search_preload = repo.lazy_document_attribute_names(preload_lazy_attributes)
lazy_attrs_to_update_after = repo.lazy_document_attribute_names(update_lazy_attributes)
lazy_attrs_to_update_after -= lazy_attrs_to_eager_load
lazy_attrs_to_search_preload -= lazy_attrs_to_eager_load

# @TODO Refactor this by combining the upcoming code again with repo.each_serialized_batch as it was before:
# context[:lazy_attributes] = lazy_attrs_to_eager_load if lazy_attrs_to_eager_load.any?
# repo.each_serialized_batch(**context) do |batch|
# bulk(**bulk_kwargs, index: batch)

# lazy_attrs_to_update_after.each do |attr_name|
# partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
# next if partial_docs.empty?

# bulk(**bulk_kwargs, update: partial_docs)
# end
# count += batch.size
# end
context ||= {}
context[:lazy_attributes] = doc_attrs[:eager] if doc_attrs[:eager].any?
repo.each_serialized_batch(**context) do |batch|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
#
# Note that the repository name will be used as the document type.
# mapping_default_type
kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(kwargs)
repo.send(:each_batch, **context) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| repo.serialize(entry, **collection_context) }.compact

if lazy_attrs_to_eager_load
attrs = lazy_attrs_to_eager_load.is_a?(Array) ? lazy_attrs_to_eager_load : repo.lazy_document_attribute_names(lazy_attrs_to_eager_load)
attrs.each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
end
end
end

preload_search_result = Hash.new { |h, arr_name| h[arr_name] = {} }
if lazy_attrs_to_search_preload.any?
hits = repo.index.search(query: {ids: {values: entries.map(&:id)} }, _source: lazy_attrs_to_search_preload).response.hits
hits.each do |hit|
doc_header = Esse::LazyDocumentHeader.coerce(hit.slice('_id', '_routing')) # TODO Add '_type', when adjusting eql to tread _doc properly
next unless doc_header.valid?
hit.dig('_source')&.each do |attr_name, attr_value|
real_attr_name = repo.lazy_document_attribute_names(attr_name).first
preload_search_result[real_attr_name][doc_header] = attr_value
end
end
preload_search_result.each do |attr_name, values|
values.each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
end
end
end

bulk(**bulk_kwargs, index: entries)

bulk(**kwargs, index: batch)
lazy_attrs_to_update_after.each do |attr_name|
preloaded_ids = preload_search_result[attr_name].keys
filtered_docs = entries.reject do |doc|
doc.ignore_on_index? || preloaded_ids.any? { |d| d.id.to_s == doc.id.to_s && d.type == doc.type && d.routing == doc.routing }
end
next if filtered_docs.empty?

doc_attrs[:lazy].each do |attr_name|
partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
partial_docs = repo.documents_for_lazy_attribute(attr_name, filtered_docs)
next if partial_docs.empty?

bulk(**kwargs, update: partial_docs)
bulk(**bulk_kwargs, update: partial_docs)
end

count += batch.size
count += entries.size
end
end
count
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Esse
VERSION = '0.3.6'
VERSION = '0.4.0.rc1'
end
59 changes: 37 additions & 22 deletions spec/esse/cli/index/import_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,44 +58,59 @@
cli_exec(%w[index import CountiesIndex CitiesIndex])
end

it 'allows --eager-include-document-attributes as a comma separated list' do
expect(CountiesIndex).to receive(:import).with(eager_include_document_attributes: %w[foo bar], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --eager-include-document-attributes=foo,bar])
it 'allows --eager-load-lazy-attributes as a comma separated list' do
expect(CountiesIndex).to receive(:import).with(eager_load_lazy_attributes: %w[foo bar], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --eager-load-lazy-attributes=foo,bar])
end

it 'allows --lazy-update-document-attributes as a single value' do
expect(CountiesIndex).to receive(:import).with(lazy_update_document_attributes: %w[foo], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=foo])
it 'allows --update-lazy-attributes as a single value' do
expect(CountiesIndex).to receive(:import).with(update_lazy_attributes: %w[foo], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=foo])
end

it 'allows --lazy-update-document-attributes as true' do
expect(CountiesIndex).to receive(:import).with(lazy_update_document_attributes: true, context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=true])
it 'allows --update-lazy-attributes as true' do
expect(CountiesIndex).to receive(:import).with(update_lazy_attributes: true, context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=true])
end

it 'allows --lazy-update-document-attributes as false' do
it 'allows --update-lazy-attributes as false' do
expect(CountiesIndex).to receive(:import).with(context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=false])
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=false])
end

it 'allows --lazy-update-document-attributes as a comma separated list' do
expect(CountiesIndex).to receive(:import).with(lazy_update_document_attributes: %w[foo bar], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=foo,bar])
it 'allows --update-lazy-attributes as a comma separated list' do
expect(CountiesIndex).to receive(:import).with(update_lazy_attributes: %w[foo bar], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=foo,bar])
end

it 'allows --lazy-update-document-attributes as a single value' do
expect(CountiesIndex).to receive(:import).with(lazy_update_document_attributes: %w[foo], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=foo])
it 'allows --update-lazy-attributes as a single value' do
expect(CountiesIndex).to receive(:import).with(update_lazy_attributes: %w[foo], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=foo])
end

it 'allows --lazy-update-document-attributes as true' do
expect(CountiesIndex).to receive(:import).with(lazy_update_document_attributes: true, context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=true])
it 'allows --update-lazy-attributes as true' do
expect(CountiesIndex).to receive(:import).with(update_lazy_attributes: true, context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=true])
end

it 'allows --lazy-update-document-attributes as false' do
it 'allows --update-lazy-attributes as false' do
expect(CountiesIndex).to receive(:import).with(context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --lazy-update-document-attributes=false])
cli_exec(%w[index import CountiesIndex --update-lazy-attributes=false])
end

it 'allows --preload-lazy-attributes as a comma separated list' do
expect(CountiesIndex).to receive(:import).with(preload_lazy_attributes: %w[foo bar], context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --preload-lazy-attributes=foo,bar])
end

it 'allows --preload-lazy-attributes as true' do
expect(CountiesIndex).to receive(:import).with(preload_lazy_attributes: true, context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --preload-lazy-attributes=true])
end

it 'allows --preload-lazy-attributes as false' do
expect(CountiesIndex).to receive(:import).with(context: {}).and_return(true)
cli_exec(%w[index import CountiesIndex --preload-lazy-attributes=false])
end
end
end
Expand Down
Loading
Loading