Skip to content

Commit

Permalink
Lazy document attribute (#16)
Browse files Browse the repository at this point in the history
* feat: add lazy_document_attribute method to the repository definition

* feat: add update action to the bulk import

* feat: pass instance of LazyDocumentHeader to preserve extra meta data like routing

* feat: lazy doc may be initialized with extra data

* feat: add new functionality to fetch lazy attributes and bulk update documents

* feat: lazy update documents during the import

* feat: eager include lazy documents during the import

* add lazy-update-document-attributes and eager-include-document-attributes to import cli command

* chore: refactoring

* chore: refactoring
  • Loading branch information
marcosgz authored Jul 10, 2024
1 parent 3b6f343 commit 9d9e0fd
Show file tree
Hide file tree
Showing 31 changed files with 1,034 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
8 changes: 5 additions & 3 deletions bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ require 'dotenv/load'
require 'esse'
require 'pry'
require 'awesome_print'
require 'elasticsearch'

Esse.config.clusters.client = { url: ENV.fetch('ESSE_URL', ENV.fetch('ELASTICSEARCH_URL', 'http://localhost:9200')) }
Esse.config.clusters.client = Elasticsearch::Client.new url: ENV.fetch('ESSE_URL', ENV.fetch('ELASTICSEARCH_URL', 'http://localhost:9200'))
Esse.config.clusters.index_prefix = 'esse_console'

US_STATES = {
Expand Down Expand Up @@ -64,7 +65,7 @@ class GeosIndex < ApplicationIndex
autocomplete: {
type: 'custom',
tokenizer: 'standard',
filter: %w[lowercase asciifolding]
filter: %w[lowercase asciifolding]\
},
},
}
Expand Down Expand Up @@ -99,6 +100,7 @@ class GeosIndex < ApplicationIndex

document do |(state, name), **|
{
_id: state.downcase,
name: name,
routing: state,
}
Expand All @@ -114,7 +116,7 @@ class GeosIndex < ApplicationIndex

document do |(abbr, name), **|
{
id: abbr,
_id: abbr,
name: name,
routing: abbr,
}
Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.2.6)
esse (0.3.0)
multi_json
thor (>= 0.19)

Expand Down
13 changes: 12 additions & 1 deletion lib/esse/cli/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,20 @@ def open(*index_classes)
option :suffix, type: :string, default: nil, aliases: '-s', desc: 'Suffix to append to index name'
option :context, type: :hash, default: {}, required: true, desc: 'List of options to pass to the index class'
option :repo, type: :string, default: nil, alias: '-r', desc: 'Repository to use for import'
option :eager_include_document_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to include to the bulk index request'
option :lazy_update_document_attributes, type: :string, default: nil, desc: 'Comma separated list of lazy document attributes to bulk update after the bulk index request'
def import(*index_classes)
require_relative 'index/import'
Import.new(indices: index_classes, **HashUtils.deep_transform_keys(options.to_h, &:to_sym)).run
opts = HashUtils.deep_transform_keys(options.to_h, &:to_sym)
opts.delete(:lazy_update_document_attributes) if opts[:lazy_update_document_attributes] == 'false'
opts.delete(:eager_include_document_attributes) if opts[:eager_include_document_attributes] == 'false'
if (val = opts[:eager_include_document_attributes])
opts[:eager_include_document_attributes] = (val == 'true') ? true : val.split(',')
end
if (val = opts[:lazy_update_document_attributes])
opts[:lazy_update_document_attributes] = (val == 'true') ? true : val.split(',')
end
Import.new(indices: index_classes, **opts).run
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/esse/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ module Esse
require_relative 'primitives'
require_relative 'collection'
require_relative 'document'
require_relative 'document_lazy_attribute'
require_relative 'lazy_document_header'
require_relative 'hash_document'
require_relative 'null_document'
require_relative 'repository'
Expand Down
43 changes: 38 additions & 5 deletions lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ def to_h
end
end

def to_bulk(data: true)
{ _id: id }.tap do |h|
h[:data] = source&.to_h if data
h[:_type] = type if type
h[:routing] = routing if routing?
def to_bulk(data: true, operation: nil)
doc_header.tap do |h|
if data && operation == :update
h[:data] = { doc: source_with_lazy_attributes }
elsif data
h[:data] = source_with_lazy_attributes
end
h.merge!(meta)
end
end
Expand All @@ -87,5 +89,36 @@ def ==(other)
id == other.id && type == other.type && routing == other.routing && meta == other.meta && source == other.source
)
end

def doc_header
{ _id: id }.tap do |h|
h[:_type] = type if type
h[:_routing] = routing if routing?
end
end

def inspect
attributes = %i[id routing source].map do |attr|
value = send(attr)
"#{attr}: #{value.inspect}" if value
end.compact.join(', ')
"#<#{self.class.name || 'Esse::Document'} #{attributes}>"
end

protected

def source_with_lazy_attributes
return source unless @__lazy_source_data__

@__lazy_source_data__.merge(source)
end

# api private
def __add_lazy_data_to_source__(hash)
return hash unless hash.is_a?(Hash)

@__lazy_source_data__ ||= {}
@__lazy_source_data__.merge!(hash)
end
end
end
18 changes: 18 additions & 0 deletions lib/esse/document_lazy_attribute.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Esse
class DocumentLazyAttribute
attr_reader :options

def initialize(**kwargs)
@options = kwargs
end

# Returns an Hash with the document ID as key and attribute data as value.
# @param doc_headers [Array<Esse::LazyDocumentHeader>] the document headers
# @return [Hash] An Hash with the instance of document header as key and the attribute data as value.
def call(doc_headers)
raise NotImplementedError, 'Override this method to return the document attribute data'
end
end
end
15 changes: 11 additions & 4 deletions lib/esse/import/bulk.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Esse
module Import
class Bulk
def initialize(type: nil, index: nil, delete: nil, create: nil)
def initialize(type: nil, index: nil, delete: nil, create: nil, update: nil)
@index = Array(index).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
Expand All @@ -12,6 +12,11 @@ def initialize(type: nil, index: nil, delete: nil, create: nil)
value[:_type] ||= type if type
{ create: value }
end
@update = Array(update).select(&method(:valid_doc?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk(operation: :update)
value[:_type] ||= type if type
{ update: value }
end
@delete = Array(delete).select(&method(:valid_doc?)).reject(&:ignore_on_delete?).map do |doc|
value = doc.to_bulk(data: false)
value[:_type] ||= type if type
Expand Down Expand Up @@ -69,17 +74,19 @@ def valid_doc?(doc)

def optimistic_request
request = Import::RequestBodyAsJson.new
request.delete = @delete
request.create = @create
request.index = @index
request.update = @update
request.delete = @delete
request
end

def requests_in_small_chunks(chunk_size: 1)
arr = []
@delete.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.delete = slice } }
@create.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.create = slice } }
@index.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.index = slice } }
@update.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.update = slice } }
@delete.each_slice(chunk_size) { |slice| arr << Import::RequestBodyAsJson.new.tap { |r| r.delete = slice } }
Esse.logger.warn <<~MSG
Retrying the last request in small chunks of #{chunk_size} documents.
This is a last resort to avoid timeout errors, consider increasing the bulk size or reducing the batch size.
Expand All @@ -90,7 +97,7 @@ def requests_in_small_chunks(chunk_size: 1)
# @return [Array<RequestBody>]
def balance_requests_size(err)
if (bulk_size = err.message.scan(/exceeded.(\d+).bytes/).dig(0, 0).to_i) > 0
requests = (@delete + @create + @index).each_with_object([Import::RequestBodyRaw.new]) do |as_json, result|
requests = (@create + @index + @update + @delete).each_with_object([Import::RequestBodyRaw.new]) do |as_json, result|
operation, meta = as_json.to_a.first
meta = meta.dup
data = meta.delete(:data)
Expand Down
7 changes: 6 additions & 1 deletion lib/esse/import/request_body.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class RequestBody

def initialize(body:)
@body = body # body may be String or Array<Hash>
@stats = { index: 0, create: 0, delete: 0 }
@stats = { index: 0, create: 0, delete: 0, update: 0 }
end

def body?
Expand Down Expand Up @@ -46,6 +46,11 @@ def index=(docs)
@stats[:index] += docs.size
end

def update=(docs)
@body += docs
@stats[:update] += docs.size
end

def create=(docs)
@body += docs
@stats[:create] += docs.size
Expand Down
46 changes: 40 additions & 6 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def index(doc = nil, suffix: nil, **options)
# @see https://www.elastic.co/guide/en/elasticsearch/reference/7.5/docs-bulk.html
# @see https://github.com/elastic/elasticsearch-ruby/blob/main/elasticsearch-api/lib/elasticsearch/api/utils.rb
# @see https://github.com/elastic/elasticsearch-ruby/blob/main/elasticsearch-api/lib/elasticsearch/api/actions/bulk.rb
def bulk(index: nil, delete: nil, create: nil, type: nil, suffix: nil, **options)
def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: nil, **options)
definition = {
index: index_name(suffix: suffix),
type: type,
Expand All @@ -174,9 +174,10 @@ def bulk(index: nil, delete: nil, create: nil, type: nil, suffix: nil, **options
# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.new(
**definition.slice(:type),
index: index,
delete: delete,
create: create,
delete: delete,
index: index,
update: update,
).each_request do |request_body|
cluster.api.bulk(**definition, body: request_body.body) do |event_payload|
event_payload[:body_stats] = request_body.stats
Expand All @@ -198,20 +199,53 @@ def bulk(index: nil, delete: nil, create: nil, type: nil, suffix: nil, **options
# @option [Hash] :context The collection context. This value will be passed as argument to the collection
# May be SQL condition or any other filter you have defined on the collection.
# @return [Numeric] The number of documents imported
def import(*repo_types, context: {}, suffix: nil, **options)
def import(*repo_types, context: {}, eager_include_document_attributes: false, lazy_update_document_attributes: false, suffix: nil, **options)
repo_types = repo_hash.keys if repo_types.empty?
count = 0

repo_hash.slice(*repo_types).each do |repo_name, repo|
doc_attrs = {eager: [], lazy: []}
if (expected = eager_include_document_attributes) != false
allowed = repo.lazy_document_attributes.keys
doc_attrs[:eager] = (expected == true) ? allowed : Array(expected).map(&:to_s) & allowed
end
if (expected = lazy_update_document_attributes) != false
allowed = repo.lazy_document_attributes.keys
doc_attrs[:lazy] = (expected == true) ? allowed : Array(expected).map(&:to_s) & allowed
doc_attrs[:lazy] -= doc_attrs[:eager]
end

repo.each_serialized_batch(**(context || {})) do |batch|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
#
# Note that the repository name will be used as the document type.
# mapping_default_type
kwargs = { index: batch, suffix: suffix, type: repo_name, **options }
kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(kwargs)
bulk(**kwargs)

doc_attrs[:eager].each do |attr_name|
partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?))
next if partial_docs.empty?

partial_docs.each do |part_doc|
doc = batch.find { |d| part_doc.id == d.id && part_doc.type == d.type && part_doc.routing == d.routing }
next unless doc

doc.send(:__add_lazy_data_to_source__, part_doc.source)
end
end

bulk(**kwargs, index: batch)

doc_attrs[:lazy].each do |attr_name|
partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?))
next if partial_docs.empty?

bulk(**kwargs, update: partial_docs)
end

count += batch.size
end
end
Expand Down
Loading

0 comments on commit 9d9e0fd

Please sign in to comment.