Skip to content

Commit

Permalink
keep bulk order
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosgz committed Aug 19, 2024
1 parent 8e1c832 commit b700e68
Showing 1 changed file with 40 additions and 27 deletions.
67 changes: 40 additions & 27 deletions lib/esse/index/documents.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,40 +171,53 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n
}.merge(options)
cluster.may_update_type!(definition)

to_index = Esse::ArrayUtils.wrap(index)
to_create = Esse::ArrayUtils.wrap(create)
to_update = Esse::ArrayUtils.wrap(update)
to_delete = Esse::ArrayUtils.wrap(delete)
index = to_index.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
value
to_index = []
to_create = []
to_update = []
to_delete = []
Esse::ArrayUtils.wrap(index).each do |doc|
if doc.is_a?(Hash)
to_index << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk
hash[:_type] ||= type if type
to_index << hash
end
end
index.push(*to_create.select { |item| item.is_a?(Hash) })
create = to_create.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk
value[:_type] ||= type if type
value
Esse::ArrayUtils.wrap(create).each do |doc|
if doc.is_a?(Hash)
to_create << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk
hash[:_type] ||= type if type
to_create << hash
end
end
update = to_update.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc|
value = doc.to_bulk(operation: :update)
value[:_type] ||= type if type
value
Esse::ArrayUtils.wrap(update).each do |doc|
if doc.is_a?(Hash)
to_update << doc
elsif Esse.document?(doc) && !doc.ignore_on_index?
hash = doc.to_bulk(operation: :update)
hash[:_type] ||= type if type
to_update << hash
end
end
update.push(*to_update.select { |item| item.is_a?(Hash) })
delete = to_delete.select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc|
value = doc.to_bulk(data: false)
value[:_type] ||= type if type
value
Esse::ArrayUtils.wrap(delete).each do |doc|
if doc.is_a?(Hash)
to_delete << doc
elsif Esse.document?(doc) && !doc.ignore_on_delete?
hash = doc.to_bulk(data: false)
hash[:_type] ||= type if type
to_delete << hash
end
end
delete.push(*to_delete.select { |item| item.is_a?(Hash) })

# @TODO Wrap the return in a some other Stats object with more information
Esse::Import::Bulk.new(
create: create,
delete: delete,
index: index,
update: update,
create: to_create,
delete: to_delete,
index: to_index,
update: to_update,
).each_request do |request_body|
cluster.api.bulk(**definition, body: request_body.body) do |event_payload|
event_payload[:body_stats] = request_body.stats
Expand Down

0 comments on commit b700e68

Please sign in to comment.