From b700e6804816fa2888b604c833cdb72ecebae675 Mon Sep 17 00:00:00 2001 From: "Marcos G. Zimmermann" Date: Mon, 19 Aug 2024 13:52:07 -0300 Subject: [PATCH] keep bulk order --- lib/esse/index/documents.rb | 67 ++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/lib/esse/index/documents.rb b/lib/esse/index/documents.rb index 196cbd5..b6f9f46 100644 --- a/lib/esse/index/documents.rb +++ b/lib/esse/index/documents.rb @@ -171,40 +171,53 @@ def bulk(create: nil, delete: nil, index: nil, update: nil, type: nil, suffix: n }.merge(options) cluster.may_update_type!(definition) - to_index = Esse::ArrayUtils.wrap(index) - to_create = Esse::ArrayUtils.wrap(create) - to_update = Esse::ArrayUtils.wrap(update) - to_delete = Esse::ArrayUtils.wrap(delete) - index = to_index.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| - value = doc.to_bulk - value[:_type] ||= type if type - value + to_index = [] + to_create = [] + to_update = [] + to_delete = [] + Esse::ArrayUtils.wrap(index).each do |doc| + if doc.is_a?(Hash) + to_index << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_index << hash + end end - index.push(*to_create.select { |item| item.is_a?(Hash) }) - create = to_create.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| - value = doc.to_bulk - value[:_type] ||= type if type - value + Esse::ArrayUtils.wrap(create).each do |doc| + if doc.is_a?(Hash) + to_create << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk + hash[:_type] ||= type if type + to_create << hash + end end - update = to_update.select(&Esse.method(:document?)).reject(&:ignore_on_index?).map do |doc| - value = doc.to_bulk(operation: :update) - value[:_type] ||= type if type - value + Esse::ArrayUtils.wrap(update).each do |doc| + if doc.is_a?(Hash) + to_update << doc + elsif Esse.document?(doc) && !doc.ignore_on_index? + hash = doc.to_bulk(operation: :update) + hash[:_type] ||= type if type + to_update << hash + end end - update.push(*to_update.select { |item| item.is_a?(Hash) }) - delete = to_delete.select(&Esse.method(:document?)).reject(&:ignore_on_delete?).map do |doc| - value = doc.to_bulk(data: false) - value[:_type] ||= type if type - value + Esse::ArrayUtils.wrap(delete).each do |doc| + if doc.is_a?(Hash) + to_delete << doc + elsif Esse.document?(doc) && !doc.ignore_on_delete? + hash = doc.to_bulk(data: false) + hash[:_type] ||= type if type + to_delete << hash + end end - delete.push(*to_delete.select { |item| item.is_a?(Hash) }) # @TODO Wrap the return in a some other Stats object with more information Esse::Import::Bulk.new( - create: create, - delete: delete, - index: index, - update: update, + create: to_create, + delete: to_delete, + index: to_index, + update: to_update, ).each_request do |request_body| cluster.api.bulk(**definition, body: request_body.body) do |event_payload| event_payload[:body_stats] = request_body.stats