Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kademlia fixes #266

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/libp2p/injector/network_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ namespace libp2p::injector {
* );
* @endcode
*/
inline auto useKeyPair(const crypto::KeyPair &key_pair) {
inline auto useKeyPair(crypto::KeyPair key_pair) {
return boost::di::bind<crypto::KeyPair>().template to(
key_pair)[boost::di::override];
std::make_shared<crypto::KeyPair>(key_pair))[boost::di::override];
}

/**
Expand Down
5 changes: 4 additions & 1 deletion include/libp2p/protocol/kademlia/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ namespace libp2p::protocol::kademlia {

using namespace std::chrono_literals;

// https://github.com/libp2p/rust-libp2p/blob/e63975d7742710d4498b941e151c5177e06392ce/protocols/kad/src/lib.rs#L93
constexpr size_t K_VALUE = 20;

namespace {
struct RandomWalk {
/**
Expand Down Expand Up @@ -122,7 +125,7 @@ namespace libp2p::protocol::kademlia {
* This is implementation specified property.
* @note Default: 20
*/
size_t maxBucketSize = 20;
size_t maxBucketSize = K_VALUE;

/**
* Maximum time to waiting response
Expand Down
3 changes: 0 additions & 3 deletions include/libp2p/protocol/kademlia/impl/find_peer_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <libp2p/protocol/kademlia/impl/peer_routing_table.hpp>
#include <libp2p/protocol/kademlia/impl/session.hpp>
#include <libp2p/protocol/kademlia/impl/session_host.hpp>
#include <libp2p/protocol/kademlia/peer_routing.hpp>

namespace libp2p::protocol::kademlia {

Expand All @@ -35,7 +34,6 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Host> host,
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
std::shared_ptr<PeerRouting> peer_routing,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
PeerId peer_id,
FoundPeerInfoHandler handler);
Expand Down Expand Up @@ -70,7 +68,6 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Host> host_;
std::shared_ptr<basic::Scheduler> scheduler_;
std::shared_ptr<SessionHost> session_host_;
std::shared_ptr<PeerRouting> peer_routing_;

// Secondary
const PeerId sought_peer_id_;
Expand Down
3 changes: 0 additions & 3 deletions include/libp2p/protocol/kademlia/impl/get_value_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <libp2p/protocol/kademlia/impl/peer_routing_table.hpp>
#include <libp2p/protocol/kademlia/impl/session.hpp>
#include <libp2p/protocol/kademlia/impl/session_host.hpp>
#include <libp2p/protocol/kademlia/peer_routing.hpp>
#include <libp2p/protocol/kademlia/validator.hpp>

namespace libp2p::protocol::kademlia {
Expand All @@ -40,7 +39,6 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Host> host,
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
std::shared_ptr<PeerRouting> peer_routing,
std::shared_ptr<ContentRoutingTable> content_routing_table,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
std::shared_ptr<ExecutorsFactory> executor_factory,
Expand Down Expand Up @@ -76,7 +74,6 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Host> host_;
std::shared_ptr<basic::Scheduler> scheduler_;
std::shared_ptr<SessionHost> session_host_;
std::shared_ptr<PeerRouting> peer_routing_;
std::shared_ptr<ContentRoutingTable> content_routing_table_;
std::shared_ptr<ExecutorsFactory> executor_factory_;
std::shared_ptr<Validator> validator_;
Expand Down
1 change: 1 addition & 0 deletions include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ namespace libp2p::protocol::kademlia {

// Subscribtion to new connections
event::Handle new_connection_subscription_;
event::Handle on_disconnected_;

struct StreamPtrComparator {
bool operator()(const std::shared_ptr<connection::Stream> &lhs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
#include <libp2p/protocol/kademlia/node_id.hpp>

namespace libp2p::protocol::kademlia {

/**
* Used with `priority_queue<PeerIdWithDistance>`.
* `top()` must be minimal distance, so `operator<` is reversed.
*/
struct PeerIdWithDistance {
template <typename T>
PeerIdWithDistance(const PeerId &peer_id, T &&target)
Expand All @@ -21,7 +24,7 @@ namespace libp2p::protocol::kademlia {
bool operator<(const PeerIdWithDistance &other) const {
return std::memcmp(
distance_.data(), other.distance_.data(), distance_.size())
< 0;
> 0;
}

const PeerId &operator*() const {
Expand Down
32 changes: 13 additions & 19 deletions include/libp2p/protocol/kademlia/impl/peer_routing_table_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,19 @@ namespace libp2p::protocol::kademlia {
struct BucketPeerInfo {
peer::PeerId peer_id;
bool is_replaceable;
bool is_connected;
NodeId node_id;
BucketPeerInfo(const peer::PeerId &peer_id, bool is_replaceable)
: peer_id(peer_id), is_replaceable(is_replaceable), node_id(peer_id) {}
BucketPeerInfo(const PeerId &peer_id,
bool is_replaceable,
bool is_connected)
: peer_id{peer_id},
is_replaceable{is_replaceable},
is_connected{is_connected},
node_id{peer_id} {}
};

struct XorDistanceComparator {
explicit XorDistanceComparator(const peer::PeerId &from) {
hfrom = crypto::sha256(from.toVector()).value();
}

explicit XorDistanceComparator(const NodeId &from)
: hfrom(from.getData()) {}

explicit XorDistanceComparator(const Hash256 &hash) : hfrom(hash) {}

bool operator()(const BucketPeerInfo &a, const BucketPeerInfo &b) {
NodeId from(hfrom);
auto d1 = a.node_id.distance(from);
auto d2 = b.node_id.distance(from);
constexpr auto size = Hash256().size();
Expand All @@ -48,7 +44,7 @@ namespace libp2p::protocol::kademlia {
return std::memcmp(d1.data(), d2.data(), size) < 0;
}

Hash256 hfrom{};
NodeId from;
};

/**
Expand All @@ -67,7 +63,9 @@ namespace libp2p::protocol::kademlia {

bool moveToFront(const PeerId &pid);

void emplaceToFront(const PeerId &pid, bool is_replaceable);
void emplaceToFront(const PeerId &pid,
bool is_replaceable,
bool is_connected);

boost::optional<PeerId> removeReplaceableItem();

Expand All @@ -79,8 +77,6 @@ namespace libp2p::protocol::kademlia {

bool remove(const peer::PeerId &p);

Bucket split(size_t commonLenPrefix, const NodeId &target);

private:
std::list<BucketPeerInfo> peers_;
};
Expand Down Expand Up @@ -116,7 +112,7 @@ namespace libp2p::protocol::kademlia {
size_t size() const override;

private:
void nextBucket();
std::optional<size_t> getBucketIndex(const NodeId &key) const;

const Config &config_;
std::shared_ptr<peer::IdentityManager> identity_manager_;
Expand All @@ -125,8 +121,6 @@ namespace libp2p::protocol::kademlia {
const NodeId local_;

std::vector<Bucket> buckets_;

log::SubLogger log_;
};

} // namespace libp2p::protocol::kademlia
Expand Down
17 changes: 7 additions & 10 deletions include/libp2p/protocol/kademlia/node_id.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,17 @@ namespace libp2p::protocol::kademlia {
*/
class NodeId {
public:
explicit NodeId(const Hash256 &h) : data_(h) {}

explicit NodeId(const void *bytes) {
memcpy(data_.data(), bytes, data_.size());
}

explicit NodeId(const peer::PeerId &pid) {
auto digest_res = crypto::sha256(pid.toVector());
BOOST_ASSERT(digest_res.has_value());
data_ = std::move(digest_res.value());
}

explicit NodeId(const ContentId &content_id) {
auto digest_res = crypto::sha256(content_id);
BOOST_ASSERT(digest_res.has_value());
data_ = std::move(digest_res.value());
static NodeId prehashed(Hash256 hash) {
return NodeId{hash};
}
static NodeId hash(BytesIn key) {
return prehashed(crypto::sha256(key).value());
}

inline bool operator==(const NodeId &other) const {
Expand Down Expand Up @@ -106,6 +101,8 @@ namespace libp2p::protocol::kademlia {
}

private:
explicit NodeId(Hash256 hash) : data_{hash} {}

Hash256 data_;
};

Expand Down
4 changes: 2 additions & 2 deletions src/protocol/kademlia/impl/add_provider_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ namespace libp2p::protocol::kademlia {
scheduler_(std::move(scheduler)),
session_host_(std::move(session_host)),
key_(std::move(key)),
target_(key_),
target_{NodeId::hash(key_)},
log_("KademliaExecutor", "kademlia", "AddProvider", ++instance_number) {
auto nearest_peer_ids =
peer_routing_table->getNearestPeers(target_, config_.maxBucketSize);
peer_routing_table->getNearestPeers(target_, K_VALUE);

nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
Expand Down
6 changes: 2 additions & 4 deletions src/protocol/kademlia/impl/find_peer_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,19 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Host> host,
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
std::shared_ptr<PeerRouting> peer_routing,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
PeerId sought_peer_id,
FoundPeerInfoHandler handler)
: config_(config),
host_(std::move(host)),
scheduler_(std::move(scheduler)),
session_host_(std::move(session_host)),
peer_routing_(std::move(peer_routing)),
sought_peer_id_(std::move(sought_peer_id)),
target_(sought_peer_id_),
handler_(std::move(handler)),
log_("KademliaExecutor", "kademlia", "FindPeer", ++instance_number) {
auto nearest_peer_ids = peer_routing_table->getNearestPeers(
target_, config_.closerPeerCount * 2);
auto nearest_peer_ids =
peer_routing_table->getNearestPeers(target_, K_VALUE);

nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
Expand Down
6 changes: 3 additions & 3 deletions src/protocol/kademlia/impl/find_providers_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace libp2p::protocol::kademlia {
session_host_(std::move(session_host)),
content_id_(std::move(content_id)),
handler_(std::move(handler)),
target_(content_id_),
target_{NodeId::hash(content_id_)},
log_("KademliaExecutor",
"kademlia",
"FindProviders",
Expand All @@ -42,8 +42,8 @@ namespace libp2p::protocol::kademlia {
BOOST_ASSERT(scheduler_ != nullptr);
BOOST_ASSERT(session_host_ != nullptr);

auto nearest_peer_ids = peer_routing_table->getNearestPeers(
target_, config_.closerPeerCount * 2);
auto nearest_peer_ids =
peer_routing_table->getNearestPeers(target_, K_VALUE);

nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
Expand Down
9 changes: 3 additions & 6 deletions src/protocol/kademlia/impl/get_value_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Host> host,
std::shared_ptr<basic::Scheduler> scheduler,
std::shared_ptr<SessionHost> session_host,
std::shared_ptr<PeerRouting> peer_routing,
std::shared_ptr<ContentRoutingTable> content_routing_table,
const std::shared_ptr<PeerRoutingTable> &peer_routing_table,
std::shared_ptr<ExecutorsFactory> executor_factory,
Expand All @@ -40,24 +39,22 @@ namespace libp2p::protocol::kademlia {
host_(std::move(host)),
scheduler_(std::move(scheduler)),
session_host_(std::move(session_host)),
peer_routing_(std::move(peer_routing)),
content_routing_table_(std::move(content_routing_table)),
executor_factory_(std::move(executor_factory)),
validator_(std::move(validator)),
key_(std::move(key)),
handler_(std::move(handler)),
target_(key_),
target_{NodeId::hash(key_)},
log_("KademliaExecutor", "kademlia", "GetValue", ++instance_number) {
BOOST_ASSERT(host_ != nullptr);
BOOST_ASSERT(scheduler_ != nullptr);
BOOST_ASSERT(session_host_ != nullptr);
BOOST_ASSERT(peer_routing_ != nullptr);
BOOST_ASSERT(content_routing_table_ != nullptr);
BOOST_ASSERT(executor_factory_ != nullptr);
BOOST_ASSERT(validator_ != nullptr);

auto nearest_peer_ids = peer_routing_table->getNearestPeers(
target_, config_.closerPeerCount * 2);
auto nearest_peer_ids =
peer_routing_table->getNearestPeers(target_, K_VALUE);

nearest_peer_ids_.insert(std::move_iterator(nearest_peer_ids.begin()),
std::move_iterator(nearest_peer_ids.end()));
Expand Down
20 changes: 15 additions & 5 deletions src/protocol/kademlia/impl/kademlia_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,22 @@ namespace libp2p::protocol::kademlia {
addPeer(
peer::PeerInfo{std::move(remote_peer_res.value()),
{std::move(remote_peer_addr_res.value())}},
false);
false,
true);
}
}
});
on_disconnected_ =
host_->getBus()
.getChannel<event::network::OnPeerDisconnectedChannel>()
.subscribe([weak_self{weak_from_this()}](const PeerId &peer) {
auto self = weak_self.lock();
if (not self) {
return;
}
std::ignore =
self->peer_routing_table_->update(peer, false, false);
});

// start random walking
if (config_.randomWalk.enabled) {
Expand Down Expand Up @@ -434,7 +446,7 @@ namespace libp2p::protocol::kademlia {
}

peer_ids = peer_routing_table_->getNearestPeers(
NodeId(msg.key), config_.closerPeerCount * 2);
NodeId::hash(msg.key), config_.closerPeerCount * 2);

if (not peer_ids.empty()) {
std::vector<Message::Peer> peers;
Expand Down Expand Up @@ -491,7 +503,7 @@ namespace libp2p::protocol::kademlia {
log_.debug("MSG: FindNode ({})", multi::detail::encodeBase58(msg.key));

auto ids = peer_routing_table_->getNearestPeers(
NodeId(msg.key), config_.closerPeerCount * 2);
NodeId::hash(msg.key), config_.closerPeerCount * 2);

std::vector<Message::Peer> peers;
peers.reserve(config_.closerPeerCount);
Expand Down Expand Up @@ -639,7 +651,6 @@ namespace libp2p::protocol::kademlia {
host_,
scheduler_,
shared_from_this(),
shared_from_this(),
content_routing_table_,
peer_routing_table_,
shared_from_this(),
Expand Down Expand Up @@ -676,7 +687,6 @@ namespace libp2p::protocol::kademlia {
host_,
scheduler_,
shared_from_this(),
shared_from_this(),
peer_routing_table_,
std::move(peer_id),
std::move(handler));
Expand Down
Loading
Loading