Skip to content

Commit

Permalink
Add: saturate API
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvardanian committed Aug 21, 2024
1 parent 5bddeed commit c07751b
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 3 deletions.
13 changes: 10 additions & 3 deletions cpp/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,16 @@ void test_collection(index_at& index, typename index_at::vector_key_t const star

// Invoke the search kernel
if constexpr (punned_ak) {
index_search_result_t result = index.search(task_data, count_requested, args...);
expect(result);
matched_count = result.dump_to(matched_keys.data(), matched_distances.data());
{
index_search_result_t result = index.search(task_data, count_requested, args...);
expect(result);
matched_count = result.dump_to(matched_keys.data(), matched_distances.data());
}

if (matched_count != max_possible_matches) {
auto unreachable_count = index.unreachable_nodes();
index_search_result_t other_result = index.search(task_data, count_requested, args...);
}

// In approximate search we can't always expect the right answer to be found
expect_eq(matched_count, max_possible_matches);
Expand Down
95 changes: 95 additions & 0 deletions include/usearch/index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3552,6 +3552,101 @@ class index_gt {
progress(processed.load(), nodes_count);
}

/**
* @brief Scans the whole collection, maximizing the number of links
* from every entry, and ensuring that the graph is fully connected.
*
* @param[in] executor Thread-pool to execute the job in parallel.
* @param[in] progress Callback to report the execution progress.
* @return The number of added links.
*/
template < //
typename allow_member_at = dummy_predicate_t, //
typename executor_at = dummy_executor_t, //
typename progress_at = dummy_progress_t //
>
expected_gt<std::size_t> saturate( //
executor_at&& executor = executor_at{}, //
progress_at&& progress = progress_at{}) noexcept {

expected_gt<std::size_t> expected{};
std::size_t total_nodes = size();

// We can use as little as just a bitset to track the presence of an incoming link,
// but as we start rebalancing the graph, we may need to prune and replace existing links.
// That may produce new isolated components of the graph, so instead of a boolean - let's
// keep a reference counter. For simplicity, let's use STL's `std::atomic`.
// For performance, let's avoid `compressed_slot_t` if it's a non-trivial integral
// type and use a larger integer instead.
using ref_counter_t = typename std::conditional< //
std::is_integral<compressed_slot_t>::value || (sizeof(compressed_slot_t) > sizeof(std::uint64_t)),
compressed_slot_t, std::uint64_t>::type;
using atomic_ref_counter_t = std::atomic<ref_counter_t>;
buffer_gt<atomic_ref_counter_t> incoming_links(total_nodes);
if (!incoming_links)
return expected.failed("Can't allocate flags");

for (level_t level = 0; level <= max_level_; ++level) {

// First of all, ensure we don't have disconnected entries in this layer
incoming_links.clear();
executor.dynamic(total_nodes, [&](std::size_t, std::size_t node_idx) {
node_t node = node_at_(node_idx);
if (static_cast<std::size_t>(node.level()) < level)
return true;
for (auto neighbor : neighbors_(node, level))
incoming_links[static_cast<compressed_slot_t>(neighbor)].fetch_add(1, std::memory_order_relaxed);
return true;
});

// If there are no unreachable nodes, we can save some time.
// Generally, in large graphs, no more than 0.1% of nodes are unreachable.
// Unfortunatelly, the `std::transform_reduce` is only available in C++17 and newer.
std::size_t count_unreachable = 0;
for (auto const& ref_counter : incoming_links)
count_unreachable += ref_counter.load(std::memory_order_relaxed) == 0;

if (count_unreachable) {
for (std::size_t i = 0; i != incoming_links.size(); ++i) {
// Skip connected and reachable nodes
if (incoming_links[i])
continue;
}
}

// Now iterate through all the nodes again and add "skip connections",
// that would lead to the closes second-degree connections.
}

// Progress status
std::atomic<bool> do_tasks{true};
std::atomic<std::size_t> processed{0};

// Erase all the incoming links
std::size_t nodes_count = size();
executor.dynamic(nodes_count, [&](std::size_t thread_idx, std::size_t node_idx) {
node_t node = node_at_(node_idx);
for (level_t level = 0; level <= node.level(); ++level) {
neighbors_ref_t neighbors = neighbors_(node, level);
std::size_t old_size = neighbors.size();
neighbors.clear();
for (std::size_t i = 0; i != old_size; ++i) {
compressed_slot_t neighbor_slot = neighbors[i];
node_t neighbor = node_at_(neighbor_slot);
if (allow_member(member_cref_t{neighbor.ckey(), neighbor_slot}))
neighbors.push_back(neighbor_slot);
}
}
++processed;
if (thread_idx == 0)
do_tasks = progress(processed.load(), nodes_count);
return do_tasks.load();
});

// At the end report the latest numbers, because the reporter thread may be finished earlier
progress(processed.load(), nodes_count);
}

private:
inline static precomputed_constants_t precompute_(index_config_t const& config) noexcept {
precomputed_constants_t pre;
Expand Down

0 comments on commit c07751b

Please sign in to comment.