Skip to content

Commit

Permalink
Fix bulk indexing documents with routing (#18)
Browse files Browse the repository at this point in the history
* fix: renaming :_routing to :routing in the bulk action

* feat: let the each_serialized_batch eager load attributes

* chore: update changelog with release updates

* chore: update changelog with release updates

* fix: es 1.x does not include _routing in the response

* chore: rename attributes kwarg to lazy_attributes
  • Loading branch information
marcosgz authored Jul 12, 2024
1 parent a6dc226 commit 7ad918a
Show file tree
Hide file tree
Showing 31 changed files with 375 additions and 74 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 0.3.2 - 2024-07-12
* fix bulk indexing routing issue
* add `attributes:` to the `Esse::Repository.each_serialized_batch` to preload `lazy_document_attributes`
* Stop stringifying the `lazy_document_attributes` attribute name
* The `Esse::Repository.update_documents_attribute` was not working when calling with a single hash as document

## 0.3.0 - 2024-07-10
* Extend bulk indexing API to support `update`.
* Last attempt of bulk, index each document individually if the bulk fails.
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-5.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-6.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-7.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.elasticsearch-8.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-1.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/Gemfile.opensearch-2.x.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ..
specs:
esse (0.3.1)
esse (0.3.2)
multi_json
thor (>= 0.19)

Expand Down
3 changes: 2 additions & 1 deletion lib/esse/document.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def ==(other)
def doc_header
{ _id: id }.tap do |h|
h[:_type] = type if type
h[:_routing] = routing if routing?
h[:routing] = routing if routing?
end
end

Expand All @@ -102,6 +102,7 @@ def inspect
value = send(attr)
"#{attr}: #{value.inspect}" if value
end.compact.join(', ')
attributes << " mutations: #{@__mutations__.inspect}" if @__mutations__
"#<#{self.class.name || 'Esse::Document'} #{attributes}>"
end

Expand Down
13 changes: 4 additions & 9 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def import(*repo_types, context: {}, eager_include_document_attributes: false, l
doc_attrs[:lazy] = repo.lazy_document_attribute_names(lazy_update_document_attributes)
doc_attrs[:lazy] -= doc_attrs[:eager]

repo.each_serialized_batch(**(context || {})) do |batch|
context ||= {}
context[:lazy_attributes] = doc_attrs[:eager] if doc_attrs[:eager].any?
repo.each_serialized_batch(**context) do |batch|
# Elasticsearch 6.x and older have multiple types per index.
# This gem supports multiple types per index for backward compatibility, but we recommend to update
# your elasticsearch to a at least 7.x version and use a single type per index.
Expand All @@ -219,17 +221,10 @@ def import(*repo_types, context: {}, eager_include_document_attributes: false, l
kwargs = { suffix: suffix, type: repo_name, **options }
cluster.may_update_type!(kwargs)

doc_attrs[:eager].each do |attr_name|
repo.retrieve_lazy_attribute_values(attr_name, *batch.reject(&:ignore_on_index?)).each do |doc_header, value|
doc = batch.find { |d| doc_header.id == d.id && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
end
end

bulk(**kwargs, index: batch)

doc_attrs[:lazy].each do |attr_name|
partial_docs = repo.documents_for_lazy_attribute(attr_name, *batch.reject(&:ignore_on_index?))
partial_docs = repo.documents_for_lazy_attribute(attr_name, batch.reject(&:ignore_on_index?))
next if partial_docs.empty?

bulk(**kwargs, update: partial_docs)
Expand Down
6 changes: 3 additions & 3 deletions lib/esse/lazy_document_header.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Esse
class LazyDocumentHeader
def self.coerce_each(values)
arr = []
Array(values).map do |value|
Esse::ArrayUtils.wrap(values).map do |value|
instance = coerce(value)
arr << instance if instance&.valid?
end
Expand All @@ -24,7 +24,7 @@ def self.coerce(value)
when :_id, :id, '_id', 'id'
:_id
when :_routing, :routing, '_routing', 'routing'
:_routing
:routing
when :_type, :type, '_type', 'type'
:_type
else
Expand Down Expand Up @@ -58,7 +58,7 @@ def type
end

def routing
@attributes[:_routing]
@attributes[:routing]
end

def to_doc(source = {})
Expand Down
1 change: 1 addition & 0 deletions lib/esse/primitives.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

require_relative 'primitives/hstring'
require_relative 'primitives/hash_utils'
require_relative 'primitives/array_utils'
17 changes: 17 additions & 0 deletions lib/esse/primitives/array_utils.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module Esse
# The idea here is to add useful methods to the ruby standard objects without
# monkey patching them
module ArrayUtils
module_function

def wrap(object)
if object.nil?
[]
elsif object.respond_to?(:to_ary)
object.to_ary || [object]
else
[object]
end
end
end
end
12 changes: 6 additions & 6 deletions lib/esse/repository/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ def import(**kwargs)
index.import(repo_name, **kwargs)
end

def update_documents_attribute(name, *ids_or_doc_headers, **kwargs)
batch = documents_for_lazy_attribute(name, *ids_or_doc_headers)
def update_documents_attribute(name, ids_or_doc_headers = [], kwargs = {})
batch = documents_for_lazy_attribute(name, ids_or_doc_headers)
return if batch.empty?

index.bulk(**kwargs, update: batch)
index.bulk(**kwargs.transform_keys(&:to_sym), update: batch)
end

def documents_for_lazy_attribute(name, *ids_or_doc_headers)
retrieve_lazy_attribute_values(name, *ids_or_doc_headers).map do |doc_header, datum|
def documents_for_lazy_attribute(name, ids_or_doc_headers)
retrieve_lazy_attribute_values(name, ids_or_doc_headers).map do |doc_header, datum|
doc_header.to_doc(name => datum)
end
end

def retrieve_lazy_attribute_values(name, *ids_or_doc_headers)
def retrieve_lazy_attribute_values(name, ids_or_doc_headers)
unless lazy_document_attribute?(name)
raise ArgumentError, <<~MSG
The attribute `#{name}` is not defined as a lazy document attribute.
Expand Down
26 changes: 16 additions & 10 deletions lib/esse/repository/lazy_document_attributes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ def lazy_document_attribute_names(all = true)
when true
lazy_document_attributes.keys
else
Array(all).map(&:to_s) & lazy_document_attributes.keys
filtered = Array(all).map(&:to_s)
lazy_document_attributes.keys.select { |name| filtered.include?(name.to_s) }
end
end

def lazy_document_attribute?(attr_name)
lazy_document_attributes.key?(attr_name.to_s)
end

def fetch_lazy_document_attribute(attr_name)
klass, kwargs = lazy_document_attributes.fetch(attr_name.to_s)
klass, kwargs = lazy_document_attributes.fetch(attr_name)
klass.new(**kwargs)
rescue KeyError
raise ArgumentError, format('Attribute %<attr>p is not defined as a lazy document attribute', attr: attr_name)
end

def lazy_document_attribute(attr_name, klass = nil, **kwargs, &block)
if lazy_document_attribute?(attr_name)
if attr_name.nil?
raise ArgumentError, 'Attribute name is required to define a lazy document attribute'
end
if lazy_document_attribute?(attr_name.to_sym) || lazy_document_attribute?(attr_name.to_s)
raise ArgumentError, format('Attribute %<attr>p is already defined as a lazy document attribute', attr: attr_name)
end

Expand All @@ -40,11 +40,11 @@ def lazy_document_attribute(attr_name, klass = nil, **kwargs, &block)
klass = Class.new(Esse::DocumentLazyAttribute) do
define_method(:call, &block)
end
@lazy_document_attributes[attr_name.to_s] = [klass, kwargs]
@lazy_document_attributes[attr_name] = [klass, kwargs]
elsif klass.is_a?(Class) && klass <= Esse::DocumentLazyAttribute
@lazy_document_attributes[attr_name.to_s] = [klass, kwargs]
@lazy_document_attributes[attr_name] = [klass, kwargs]
elsif klass.is_a?(Class) && klass.instance_methods.include?(:call)
@lazy_document_attributes[attr_name.to_s] = [klass, kwargs]
@lazy_document_attributes[attr_name] = [klass, kwargs]
elsif klass.nil?
raise ArgumentError, format('A block or a class that responds to `call` is required to define a lazy document attribute')
else
Expand All @@ -53,6 +53,12 @@ def lazy_document_attribute(attr_name, klass = nil, **kwargs, &block)
ensure
@lazy_document_attributes&.freeze
end

protected

def lazy_document_attribute?(attr_name)
lazy_document_attributes.key?(attr_name)
end
end

extend ClassMethods
Expand Down
12 changes: 11 additions & 1 deletion lib/esse/repository/object_document_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,21 @@ def collection(collection_klass = nil, **_, &block)
# @param [Hash] kwargs The context
# @return [Enumerator] The enumerator
# @yield [Array, **context] serialized collection and the optional context from the collection
def each_serialized_batch(**kwargs)
def each_serialized_batch(lazy_attributes: false, **kwargs)
each_batch(**kwargs) do |*args|
batch, collection_context = args
collection_context ||= {}
entries = [*batch].map { |entry| serialize(entry, **collection_context) }.compact
if lazy_attributes
attrs = lazy_attributes.is_a?(Array) ? lazy_attributes : lazy_document_attribute_names(lazy_attributes)
attrs.each do |attr_name|
retrieve_lazy_attribute_values(attr_name, entries).each do |doc_header, value|
doc = entries.find { |d| doc_header.id.to_s == d.id.to_s && doc_header.type == d.type && doc_header.routing == d.routing }
doc&.mutate(attr_name) { value }
end
end
end

yield entries, **kwargs
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/esse/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Esse
VERSION = '0.3.1'
VERSION = '0.3.2'
end
16 changes: 8 additions & 8 deletions spec/esse/document_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ def source
context 'with data: true' do
subject { document.to_bulk(data: true) }

it { is_expected.to eq(_id: 1, _type: 'foo', _routing: 'bar', timeout: 10, data: { foo: 'bar' }) }
it { is_expected.to eq(_id: 1, _type: 'foo', routing: 'bar', timeout: 10, data: { foo: 'bar' }) }
end

context 'with data: false' do
subject { document.to_bulk(data: false) }

it { is_expected.to eq(_id: 1, _type: 'foo', _routing: 'bar', timeout: 10) }
it { is_expected.to eq(_id: 1, _type: 'foo', routing: 'bar', timeout: 10) }
end

context 'with operation: :update' do
subject { document.to_bulk(data: true, operation: :update) }

it { is_expected.to eq(_id: 1, _type: 'foo', _routing: 'bar', timeout: 10, data: { doc: { foo: 'bar' } }) }
it { is_expected.to eq(_id: 1, _type: 'foo', routing: 'bar', timeout: 10, data: { doc: { foo: 'bar' } }) }
end

context 'when document does not have a routing' do
Expand All @@ -116,21 +116,21 @@ def source
context 'when document does not have a type' do
it 'should not include the type' do
allow(document).to receive(:type).and_return(nil)
expect(document.to_bulk(data: true)).to eq(_id: 1, _routing: 'bar', timeout: 10, data: { foo: 'bar' })
expect(document.to_bulk(data: true)).to eq(_id: 1, routing: 'bar', timeout: 10, data: { foo: 'bar' })
end
end

context 'when document does not have a meta' do
it 'should not include the meta' do
allow(document).to receive(:meta).and_return({})
expect(document.to_bulk(data: true)).to eq(_id: 1, _type: 'foo', _routing: 'bar', data: { foo: 'bar' })
expect(document.to_bulk(data: true)).to eq(_id: 1, _type: 'foo', routing: 'bar', data: { foo: 'bar' })
end
end

context 'when document does not have a source' do
it 'should not include the source' do
allow(document).to receive(:source).and_return({})
expect(document.to_bulk(data: true)).to eq(_id: 1, _type: 'foo', _routing: 'bar', timeout: 10, data: {})
expect(document.to_bulk(data: true)).to eq(_id: 1, _type: 'foo', routing: 'bar', timeout: 10, data: {})
end
end
end
Expand All @@ -156,7 +156,7 @@ def routing

subject { document.doc_header }

it { is_expected.to eq(_id: 1, _type: 'foo', _routing: 'bar') }
it { is_expected.to eq(_id: 1, _type: 'foo', routing: 'bar') }

context 'when document does not have a routing' do
it 'should not include the routing' do
Expand All @@ -168,7 +168,7 @@ def routing
context 'when document does not have a type' do
it 'should not include the type' do
allow(document).to receive(:type).and_return(nil)
expect(document.doc_header).to eq(_id: 1, _routing: 'bar')
expect(document.doc_header).to eq(_id: 1, routing: 'bar')
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/repository_documents_update_documents_attribute'

stack_describe 'elasticsearch', '6.x', Esse::Repository, '.update_documents_attribute' do
include_examples 'repository.update_documents_attribute'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/repository_documents_update_documents_attribute'

stack_describe 'elasticsearch', '7.x', Esse::Repository, '.update_documents_attribute' do
include_examples 'repository.update_documents_attribute'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

require 'spec_helper'
require 'support/shared_examples/repository_documents_update_documents_attribute'

stack_describe 'elasticsearch', '8.x', Esse::Repository, '.update_documents_attribute' do
include_examples 'repository.update_documents_attribute'
end
Loading

0 comments on commit 7ad918a

Please sign in to comment.