Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handshake followup #138

Merged
merged 9 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion anchor/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ libp2p = { version = "0.54", default-features = false, features = [
"request-response",
] }
lighthouse_network = { workspace = true }
parking_lot = { workspace = true }
quick-protobuf = "0.8.1"
serde = { workspace = true }
serde_json = "1.0.137"
Expand All @@ -44,3 +43,4 @@ async-channel = { workspace = true }
libp2p-swarm = { version = "0.45.1", features = ["macros"] }
libp2p-swarm-test = { version = "0.4.0" }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tracing-subscriber = { workspace = true }
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::handshake::envelope;
use crate::handshake::envelope::{parse_envelope, Envelope};
use crate::handshake::envelope::Envelope;
use crate::handshake::node_info::NodeInfo;
use crate::handshake::{envelope, node_info};
use async_trait::async_trait;
use futures::{AsyncReadExt, AsyncWriteExt};
use libp2p::futures::{AsyncRead, AsyncWrite};
use libp2p::identity::Keypair;
use libp2p::{request_response, StreamProtocol};
use std::io;
use tracing::debug;
use tracing::trace;

const MAXIMUM_SIZE: u64 = 1024;

Expand All @@ -15,15 +17,53 @@ impl From<envelope::Error> for io::Error {
}
}

impl From<node_info::Error> for io::Error {
fn from(err: node_info::Error) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, err)
}
}

/// A `Codec` that reads/writes an **`Envelope`**.
#[derive(Clone, Debug, Default)]
pub struct Codec;
#[derive(Clone, Debug)]
pub struct Codec {
keypair: Keypair,
}

impl Codec {
pub fn new(keypair: Keypair) -> Self {
Self { keypair }
}

async fn read<T>(&mut self, io: &mut T) -> io::Result<NodeInfo>
where
T: AsyncRead + Unpin + Send,
{
let mut msg_buf = Vec::new();
let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?;
trace!(?num_bytes_read, "read handshake");
let env = Envelope::parse_and_verify(&msg_buf)?;
let node_info = NodeInfo::unmarshal(&env.payload)?;
Ok(node_info)
}

async fn write<T>(&mut self, io: &mut T, node_info: NodeInfo) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let envelope = node_info.seal(&self.keypair)?;
let raw = envelope.encode_to_vec()?;
io.write_all(&raw).await?;
io.flush().await?;
io.close().await?;
Ok(())
}
}

#[async_trait]
impl request_response::Codec for Codec {
type Protocol = StreamProtocol;
type Request = Envelope;
type Response = Envelope;
type Request = NodeInfo;
type Response = NodeInfo;

async fn read_request<T>(
&mut self,
Expand All @@ -33,14 +73,8 @@ impl request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
debug!("reading handsake request");
let mut msg_buf = Vec::new();
let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?;
// TODO potentially try to read one more byte here and create a "message too large error"
debug!(?num_bytes_read, "read handshake request");
let env = Envelope::decode_from_slice(&msg_buf)?;
debug!(?env, "decoded handshake request");
Ok(env)
trace!("reading handshake request");
self.read(io).await
}

async fn read_response<T>(
Expand All @@ -51,17 +85,8 @@ impl request_response::Codec for Codec {
where
T: AsyncRead + Unpin + Send,
{
debug!("reading handshake response");
let mut msg_buf = Vec::new();
// We don't need a varint here because we always read only one message in the protocol.
// In this way we can just read until the end of the stream.
let num_bytes_read = io.take(MAXIMUM_SIZE).read_to_end(&mut msg_buf).await?;
debug!(?num_bytes_read, "read handshake response");

let env = parse_envelope(&msg_buf)?;

debug!(?env, "decoded handshake response");
Ok(env)
trace!("reading handshake response");
self.read(io).await
}

async fn write_request<T>(
Expand All @@ -73,12 +98,8 @@ impl request_response::Codec for Codec {
where
T: AsyncWrite + Unpin + Send,
{
debug!(req = ?req, "writing handshake request");
let raw = req.encode_to_vec()?;
io.write_all(&raw).await?;
io.close().await?;
debug!("wrote handshake request");
Ok(())
trace!(req = ?req, "writing handshake request");
self.write(io, req).await
}

async fn write_response<T>(
Expand All @@ -90,13 +111,7 @@ impl request_response::Codec for Codec {
where
T: AsyncWrite + Unpin + Send,
{
debug!("writing handshake response");
let raw = res
.encode_to_vec()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
io.write_all(&raw).await?;
io.close().await?;
debug!("wrote handshake response");
Ok(())
trace!("writing handshake response");
self.write(io, res).await
}
}
1 change: 0 additions & 1 deletion anchor/network/src/handshake/envelope/generated/mod.rs

This file was deleted.

41 changes: 20 additions & 21 deletions anchor/network/src/handshake/envelope/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
mod codec;
mod generated;
mod generated {
pub mod message;
}

use crate::handshake::envelope::Error::SignatureVerification;
use crate::handshake::node_info::NodeInfo;
use discv5::libp2p_identity::PublicKey;
pub use generated::message::pb::Envelope;
use libp2p::identity::DecodingError;
use quick_protobuf::{BytesReader, Error as ProtoError, MessageRead, MessageWrite, Writer};
use thiserror::Error;
Expand All @@ -29,31 +32,31 @@ impl Envelope {
}

/// Decode an Envelope from a Protobuf byte array (like `proto.Unmarshal` in Go).
pub fn decode_from_slice(data: &[u8]) -> Result<Self, Error> {
fn decode_from_slice(data: &[u8]) -> Result<Self, Error> {
let mut reader = BytesReader::from_bytes(data);
let env = Envelope::from_reader(&mut reader, data).map_err(Error::Coding)?;
Ok(env)
}
}

/// Decodes an Envelope and verify signature.
pub fn parse_envelope(bytes: &[u8]) -> Result<Envelope, Error> {
let env = Envelope::decode_from_slice(bytes)?;
/// Decodes an Envelope and verify signature.
pub fn parse_and_verify(bytes: &[u8]) -> Result<Envelope, Error> {
let env = Envelope::decode_from_slice(bytes)?;

let domain = NodeInfo::DOMAIN;
let payload_type = NodeInfo::CODEC;
let domain = NodeInfo::DOMAIN;
let payload_type = NodeInfo::CODEC;

let unsigned = make_unsigned(domain.as_bytes(), payload_type, &env.payload);
let unsigned = make_unsigned(domain.as_bytes(), payload_type, &env.payload);

let pk = PublicKey::try_decode_protobuf(&env.public_key.to_vec())?;
let pk = PublicKey::try_decode_protobuf(&env.public_key.to_vec())?;

if !pk.verify(&unsigned?, &env.signature) {
return Err(SignatureVerification(
"signature verification failed".into(),
));
}
if !pk.verify(&unsigned?, &env.signature) {
return Err(SignatureVerification(
"signature verification failed".into(),
));
}

Ok(env)
Ok(env)
}
}

pub fn make_unsigned(
Expand All @@ -70,7 +73,3 @@ pub fn make_unsigned(
}
Ok(buf)
}

use crate::handshake::envelope::Error::SignatureVerification;
pub use codec::Codec;
pub use generated::message::pb::Envelope;
Loading