diff --git a/lib/esse/index/indices.rb b/lib/esse/index/indices.rb index d0f1261..3e34278 100644 --- a/lib/esse/index/indices.rb +++ b/lib/esse/index/indices.rb @@ -53,8 +53,11 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru suffix ||= Esse.timestamp suffix = Esse.timestamp while index_exist?(suffix: suffix) + syncronous_import = true + syncronous_import = false if reindex.is_a?(Hash) && reindex[:wait_for_completion] == false - if optimize && import + optimized_creation = optimize && syncronous_import && (import || reindex) + if optimized_creation definition = [settings_hash(settings: settings), mappings_hash].reduce(&:merge) number_of_replicas = definition.dig(Esse::SETTING_ROOT_KEY, :index, :number_of_replicas) refresh_interval = definition.dig(Esse::SETTING_ROOT_KEY, :index, :refresh_interval) @@ -84,21 +87,32 @@ def reset_index(suffix: index_suffix, settings: nil, optimize: true, import: tru end end - if optimize && import && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval + if optimized_creation && number_of_replicas != new_number_of_replicas || refresh_interval != new_refresh_interval update_settings(suffix: suffix, settings: settings) refresh(suffix: suffix) end - update_aliases(suffix: suffix) + update_aliases(suffix: suffix) if syncronous_import true end # Copies documents from a source to a destination. # + # To avoid http timeout, we are sending the request with `wait_for_completion: false` and polling the task + # until it is completed. + # # @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html - def reindex(body: , wait_for_completion: false, scroll: '30m', **options) - cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: wait_for_completion) + def reindex(body:, wait_for_completion: true, scroll: '30m', poll_interval: 5, **options) + resp = cluster.api.reindex(**options, body: body, scroll: scroll, wait_for_completion: false) + return resp unless wait_for_completion + + task_id = resp['task'] + task = nil + while (task = cluster.api.task(id: task_id))['completed'] == false + sleep poll_interval + end + task end # Checks the index existance. Returns true or false diff --git a/spec/support/shared_examples/index_reset_index.rb b/spec/support/shared_examples/index_reset_index.rb index de7882a..3fa99aa 100644 --- a/spec/support/shared_examples/index_reset_index.rb +++ b/spec/support/shared_examples/index_reset_index.rb @@ -82,29 +82,37 @@ end end - it 'reindex data from the old index to the new index' do + it 'create async task to reindex data from the old index and do not update the alias' do es_client do |client, _conf, cluster| GeosIndex.create_index(alias: true, suffix: '2021') GeosIndex.import(refresh: true) expect { - GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: true, refresh: true) + GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: false }, refresh: true) }.not_to raise_error - expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"]) + expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_2021"]) expect(GeosIndex.index_exist?(suffix: '2021')).to eq(true) expect(GeosIndex.index_exist?(suffix: index_suffix)).to eq(true) - expect(GeosIndex.count).to be_positive + + count = 0 + (0..3).each do |t| + GeosIndex.refresh(suffix: index_suffix) + count = GeosIndex.count(suffix: index_suffix) + break if count.positive? + sleep(t) if t.positive? + end + expect(count).to be_positive end end - it 'forwads the reindex options to the reindex method' do + it 'reindex data from the old index to the new index by awaiting for completion' do es_client do |client, _conf, cluster| GeosIndex.create_index(alias: true, suffix: '2021') GeosIndex.import(refresh: true) expect { - GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: true }, refresh: true) + GeosIndex.reset_index(suffix: index_suffix, import: false, reindex: { wait_for_completion: true, poll_interval: 0.2 }, refresh: true) }.not_to raise_error expect(GeosIndex.indices_pointing_to_alias).to eq(["#{GeosIndex.index_name}_#{index_suffix}"])