diff --git a/Cargo.toml b/Cargo.toml index e3cb00264..dac5af3af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ + "abci", "light-client", "light-node", "proto", diff --git a/abci/.gitignore b/abci/.gitignore new file mode 100644 index 000000000..33db11266 --- /dev/null +++ b/abci/.gitignore @@ -0,0 +1 @@ +/src/proto/*.rs diff --git a/abci/Cargo.toml b/abci/Cargo.toml new file mode 100644 index 000000000..7459ecc64 --- /dev/null +++ b/abci/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "abci" +version = "0.10.0" +authors = ["Devashish Dixit "] +license = "MIT/Apache-2.0" +description = "A Rust crate for creating ABCI applications" +homepage = "https://github.com/devashishdxt/abci-rs" +repository = "https://github.com/devashishdxt/abci-rs" +categories = ["network-programming"] +keywords = ["blockchain", "tendermint", "abci"] +readme = "README.md" +include = ["Cargo.toml", "src/**/*.rs", "README.md"] +edition = "2018" + +[lib] +name = "abci" + +[package.metadata.docs.rs] +features = ["doc"] +rustdoc-args = ["--cfg", "feature=\"doc\""] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { version = "1.6", optional = true } +async-trait = "0.1" +bytes = "0.5" +integer-encoding = { version = "1.1", optional = true } +prost = "0.6" +prost-types = "0.6" +tendermint-proto = { path = "../proto" } +tokio = { version = "0.2", optional = true, features = ["io-util", "sync", "tcp", "stream", "rt-core", "uds"] } +tracing = { version = "0.1", features = ["log"] } +tracing-futures = "0.2" + +[dev-dependencies] +tokio = { version = "0.2", features = ["macros"] } +tracing-subscriber = "0.2" + +[features] +default = ["use-tokio"] +doc = [] +use-async-std = ["async-std", "integer-encoding/futures_async"] +use-tokio = ["tokio", "integer-encoding/tokio_async"] diff --git a/abci/README.md b/abci/README.md new file mode 100644 index 000000000..de6fdfac7 --- /dev/null +++ b/abci/README.md @@ -0,0 +1,83 @@ +# abci + +[![Continuous Integration](https://github.com/devashishdxt/abci-rs/workflows/Continuous%20Integration/badge.svg)](https://github.com/devashishdxt/abci-rs/actions?query=workflow%3A%22Continuous+Integration%22) +[![Crates.io](https://img.shields.io/crates/v/abci-rs)](https://crates.io/crates/abci-rs) +[![Documentation](https://docs.rs/abci-rs/badge.svg)](https://docs.rs/abci-rs) +[![License](https://img.shields.io/crates/l/abci-rs)](https://github.com/devashishdxt/abci-rs/blob/master/LICENSE-MIT) + +A Rust crate for creating ABCI applications. + +## ABCI Overview + +ABCI is the interface between Tendermint (a state-machine replication engine) and your application (the actual state +machine). It consists of a set of methods, where each method has a corresponding `Request` and `Response` message type. +Tendermint calls the ABCI methods on the ABCI application by sending the `Request` messages and receiving the `Response` +messages in return. + +ABCI methods are split across 4 separate ABCI connections: + +- `Consensus` Connection: `InitChain`, `BeginBlock`, `DeliverTx`, `EndBlock`, `Commit` +- `Mempool` Connection: `CheckTx` +- `Info` Connection: `Info`, `SetOption`, `Query` +- `Snapshot` Connection: `ListSnapshots`, `LoadSnapshotChunk`, `OfferSnapshot`, `ApplySnapshotChunk` + +Additionally, there is a `Flush` method that is called on every connection, and an `Echo` method that is just for +debugging. + +To know more about ABCI protocol specifications, go to official ABCI [documentation](https://tendermint.com/docs/spec/abci/). + +## Usage + +Add `abci` in your `Cargo.toml`'s `dependencies` section: + +```toml +[dependencies] +abci = "0.10" +``` + +Each ABCI application has to implement three core traits corresponding to all three ABCI connections, `Consensus`, +`Mempool` and `Info`. + +> Note: Implementations of these traits are expected to be `Send + Sync` and methods take immutable reference of `self`. +So, internal mutability must be handled using thread safe (`Arc`, `Mutex`, etc.) constructs. + +After implementing all three above mentioned `trait`s, you can create a `Server` object and use `Server::run()` to start +ABCI application. + +`Server::run()` is an `async` function and returns a `Future`. So, you'll need an executor to drive `Future` returned +from `Server::run()`. `async-std` and `tokio` are two popular options. In `counter` example, we use `tokio`'s executor. + +To know more, go to `examples/` to see a sample ABCI application. + +### Documentation + +- [`master`](https://devashishdxt.github.io/abci-rs/abci/) +- [`release`](https://docs.rs/abci-rs/) + +### Features + +- `use-tokio`: Enables `tokio` backend for running ABCI TCP/UDS server + - **Enabled** by default. +- `use-async-std`: Enables `async-std` backend for running ABCI TCP/UDS server + - **Disabled** by default. + +> Features `use-tokio` and `use-async-std` are mutually exclusive, i.e., only one of them can be enabled at a time. +Compilation will fail if either both of them are enabled or none of them are enabled. + +## Minimum Supported Versions + +- Tendermint: [`v0.33.6`](https://github.com/tendermint/tendermint/releases/tag/v0.33.6) + +## License + +Licensed under either of + +- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE)) +- MIT license ([LICENSE-MIT](LICENSE-MIT)) + +at your option. + +## Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as +defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/abci/examples/counter.rs b/abci/examples/counter.rs new file mode 100644 index 000000000..c3f56fb5d --- /dev/null +++ b/abci/examples/counter.rs @@ -0,0 +1,213 @@ +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use abci::{async_trait, Consensus, Info, Mempool, Server, Snapshot}; +use tendermint_proto::abci::*; +use tracing::{subscriber::set_global_default, Level}; +use tracing_subscriber::FmtSubscriber; + +/// Simple counter +#[derive(Debug, Default, Clone)] +pub struct CounterState { + block_height: i64, + app_hash: Vec, + counter: u64, +} + +#[derive(Debug)] +pub struct ConsensusConnection { + committed_state: Arc>, + current_state: Arc>>, +} + +impl ConsensusConnection { + pub fn new( + committed_state: Arc>, + current_state: Arc>>, + ) -> Self { + Self { + committed_state, + current_state, + } + } +} + +#[async_trait] +impl Consensus for ConsensusConnection { + async fn init_chain(&self, _init_chain_request: RequestInitChain) -> ResponseInitChain { + Default::default() + } + + async fn begin_block(&self, _begin_block_request: RequestBeginBlock) -> ResponseBeginBlock { + let committed_state = self.committed_state.lock().unwrap().clone(); + + let mut current_state = self.current_state.lock().unwrap(); + *current_state = Some(committed_state); + + Default::default() + } + + async fn deliver_tx(&self, deliver_tx_request: RequestDeliverTx) -> ResponseDeliverTx { + let new_counter = parse_bytes_to_counter(&deliver_tx_request.tx); + + if new_counter.is_err() { + let mut error = ResponseDeliverTx::default(); + error.code = 1; + error.codespace = "Parsing error".to_owned(); + error.log = "Transaction should be 8 bytes long".to_owned(); + error.info = "Transaction is big-endian encoding of 64-bit integer".to_owned(); + + return error; + } + + let new_counter = new_counter.unwrap(); + + let mut current_state_lock = self.current_state.lock().unwrap(); + let mut current_state = current_state_lock.as_mut().unwrap(); + + if current_state.counter + 1 != new_counter { + let mut error = ResponseDeliverTx::default(); + error.code = 2; + error.codespace = "Validation error".to_owned(); + error.log = "Only consecutive integers are allowed".to_owned(); + error.info = "Numbers to counter app should be supplied in increasing order of consecutive integers staring from 1".to_owned(); + + return error; + } + + current_state.counter = new_counter; + + Default::default() + } + + async fn end_block(&self, end_block_request: RequestEndBlock) -> ResponseEndBlock { + let mut current_state_lock = self.current_state.lock().unwrap(); + let mut current_state = current_state_lock.as_mut().unwrap(); + + current_state.block_height = end_block_request.height; + current_state.app_hash = current_state.counter.to_be_bytes().to_vec(); + + Default::default() + } + + async fn commit(&self, _commit_request: RequestCommit) -> ResponseCommit { + let current_state = self.current_state.lock().unwrap().as_ref().unwrap().clone(); + let mut committed_state = self.committed_state.lock().unwrap(); + *committed_state = current_state; + + ResponseCommit { + data: (*committed_state).app_hash.clone(), + retain_height: 0, + } + } +} + +#[derive(Debug)] +pub struct MempoolConnection { + state: Arc>>, +} + +impl MempoolConnection { + pub fn new(state: Arc>>) -> Self { + Self { state } + } +} + +#[async_trait] +impl Mempool for MempoolConnection { + async fn check_tx(&self, check_tx_request: RequestCheckTx) -> ResponseCheckTx { + let new_counter = parse_bytes_to_counter(&check_tx_request.tx); + + if new_counter.is_err() { + let mut error = ResponseCheckTx::default(); + error.code = 1; + error.codespace = "Parsing error".to_owned(); + error.log = "Transaction should be 8 bytes long".to_owned(); + error.info = "Transaction is big-endian encoding of 64-bit integer".to_owned(); + + return error; + } + + let new_counter = new_counter.unwrap(); + + let state_lock = self.state.lock().unwrap(); + let state = state_lock.as_ref().unwrap(); + + if state.counter + 1 != new_counter { + let mut error = ResponseCheckTx::default(); + error.code = 2; + error.codespace = "Validation error".to_owned(); + error.log = "Only consecutive integers are allowed".to_owned(); + error.info = "Numbers to counter app should be supplied in increasing order of consecutive integers staring from 1".to_owned(); + + return error; + } else { + Default::default() + } + } +} + +pub struct InfoConnection { + state: Arc>, +} + +impl InfoConnection { + pub fn new(state: Arc>) -> Self { + Self { state } + } +} + +#[async_trait] +impl Info for InfoConnection { + async fn info(&self, _info_request: RequestInfo) -> ResponseInfo { + let state = self.state.lock().unwrap(); + + ResponseInfo { + data: Default::default(), + version: Default::default(), + app_version: Default::default(), + last_block_height: (*state).block_height, + last_block_app_hash: (*state).app_hash.clone(), + } + } +} + +pub struct SnapshotConnection; + +#[async_trait] +impl Snapshot for SnapshotConnection {} + +fn parse_bytes_to_counter(bytes: &[u8]) -> Result { + if bytes.len() != 8 { + return Err(()); + } + + let mut counter_bytes = [0; 8]; + counter_bytes.copy_from_slice(bytes); + + Ok(u64::from_be_bytes(counter_bytes)) +} + +#[tokio::main] +async fn main() -> std::io::Result<()> { + let subscriber = FmtSubscriber::builder() + .with_max_level(Level::DEBUG) + .finish(); + set_global_default(subscriber).unwrap(); + + let committed_state: Arc> = Default::default(); + let current_state: Arc>> = Default::default(); + + let consensus = ConsensusConnection::new(committed_state.clone(), current_state.clone()); + let mempool = MempoolConnection::new(current_state.clone()); + let info = InfoConnection::new(committed_state.clone()); + let snapshot = SnapshotConnection; + + let server = Server::new(consensus, mempool, info, snapshot)?; + + server + .run("127.0.0.1:26658".parse::().unwrap()) + .await +} diff --git a/abci/src/application.rs b/abci/src/application.rs new file mode 100644 index 000000000..4000c02ab --- /dev/null +++ b/abci/src/application.rs @@ -0,0 +1,284 @@ +use async_trait::async_trait; +use tendermint_proto::abci::*; + +/// Trait for initialization and for queries from the user. +#[async_trait] +pub trait Info: Send + Sync { + /// Echo a string to test abci client/server implementation. + async fn echo(&self, echo_request: RequestEcho) -> ResponseEcho { + ResponseEcho { + message: echo_request.message, + } + } + + /// Return information about the application state. + /// + /// # Crash Recovery + /// + /// On startup, Tendermint calls the [`info`] method to get the **latest committed state** of + /// the app. The app **MUST** return information consistent with the last block it + /// successfully completed [`commit`] for. + /// + /// If the app succesfully committed block `H` but not `H+1`, then + /// - `last_block_height = H` + /// - `last_block_app_hash = ` + /// + /// If the app failed during the [`commit`] of block `H`, then + /// - `last_block_height = H-1` + /// - `last_block_app_hash = ` + /// + /// [`info`]: trait.Info.html#tymethod.info + /// [`commit`]: trait.Consensus.html#tymethod.commit + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn info(&self, info_request: RequestInfo) -> ResponseInfo + /// ``` + async fn info(&self, info_request: RequestInfo) -> ResponseInfo; + + /// Set non-consensus critical application specific options. + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn set_option(&self, set_option_request: RequestSetOption) -> ResponseSetOption + /// ``` + async fn set_option(&self, _set_option_request: RequestSetOption) -> ResponseSetOption { + Default::default() + } + + /// Query for data from the application at current or past height. + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn query(&self, query_request: RequestQuery) -> ResponseQuery + /// ``` + async fn query(&self, _query_request: RequestQuery) -> ResponseQuery { + Default::default() + } +} + +/// Trait for managing consensus of blockchain. +/// +/// # Details +/// +/// [_Consensus_] should maintain a `consensus_state` - the working state for block execution. It +/// should be updated by the calls to [`begin_block`], [`deliver_tx`], and [`end_block`] during +/// block execution and committed to disk as the **latest committed state** during [`commit`]. +/// +/// Updates made to the `consensus_state` by each method call must be readable by each subsequent +/// method - ie. the updates are linearizable. +/// +/// [_Consensus_]: trait.Consensus.html#details +/// [`begin_block`]: trait.Consensus.html#tymethod.begin_block +/// [`deliver_tx`]: trait.Consensus.html#tymethod.deliver_tx +/// [`end_block`]: trait.Consensus.html#tymethod.end_block +/// [`commit`]: trait.Consensus.html#tymethod.commit +#[async_trait] +pub trait Consensus: Send + Sync { + /// Called once upon genesis. Usually used to establish initial (genesis) state. + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn init_chain(&self, init_chain_request: RequestInitChain) -> ResponseInitChain + /// ``` + async fn init_chain(&self, init_chain_request: RequestInitChain) -> ResponseInitChain; + + /// Signals the beginning of a new block. Called prior to any + /// [`deliver_tx`](trait.Consensus.html#tymethod.deliver_tx)s. + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn begin_block(&self, begin_block_request: RequestBeginBlock) -> ResponseBeginBlock + /// ``` + async fn begin_block(&self, begin_block_request: RequestBeginBlock) -> ResponseBeginBlock; + + /// Execute the transaction in full. The workhorse of the application. + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn deliver_tx(&self, deliver_tx_request: RequestDeliverTx) -> ResponseDeliverTx + /// ``` + async fn deliver_tx(&self, deliver_tx_request: RequestDeliverTx) -> ResponseDeliverTx; + + /// Signals the end of a block. Called after all transactions, prior to each + /// [`commit`](trait.Commit.html#tymethod.commit). + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn end_block(&self, end_block_request: RequestEndBlock) -> ResponseEndBlock + /// ``` + async fn end_block(&self, end_block_request: RequestEndBlock) -> ResponseEndBlock; + + /// Persist the application state. + /// + /// # Details + /// + /// Application state should only be persisted to disk during [`commit`]. + /// + /// Before [`commit`] is called, Tendermint locks and flushes the mempool so that no new + /// messages will be received on the mempool connection. This provides an opportunity to + /// safely update all three states ([_Consensus_], [_Mempool_] and [_Info_]) to the **latest + /// committed state** at once. + /// + /// When [`commit`] completes, it unlocks the mempool. + /// + /// # Warning + /// + /// If the ABCI application logic processing the [`commit`] message sends a `/broadcast_tx_sync` + /// or `/broadcast_tx_commit` and waits for the response before proceeding, it will + /// deadlock. Executing those `broadcast_tx` calls involves acquiring a lock that is held + /// during the [`commit`] call, so it's not possible. If you make the call to the + /// `broadcast_tx` endpoints concurrently, that's no problem, it just can't be part of the + /// sequential logic of the [`commit`] function. + /// + /// [`commit`]: trait.Commit.html#tymethod.commit + /// [_Consensus_]: trait.Consensus.html#details + /// [_Mempool_]: trait.Mempool.html#details + /// [_Info_]: trait.Info.html + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn commit(&self, commit_request: RequestCommit) -> ResponseCommit + /// ``` + async fn commit(&self, commit_request: RequestCommit) -> ResponseCommit; + + /// Signals that messages queued on the client should be flushed to the server. + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn flush(&self, flush_request: RequestFlush) -> ResponseFlush + /// ``` + async fn flush(&self, _flush_request: RequestFlush) -> ResponseFlush { + Default::default() + } +} + +/// Trait for managing tendermint's mempool. +/// +/// # Details +/// +/// [_Mempool_] should maintain a `mempool_state` to sequentially process pending transactions in +/// the mempool that have not yet been committed. It should be initialized to the latest committed +/// state at the end of every [`commit`]. +/// +/// The `mempool_state` may be updated concurrently with the `consensus_state`, as messages may be +/// sent concurrently on [_Consensus_] and [_Mempool_] connections. However, before calling +/// [`commit`], Tendermint will lock and flush the mempool connection, ensuring that all existing +/// [`check_tx`] are responded to and no new ones can begin. +/// +/// After [`commit`], [`check_tx`] is run again on all transactions that remain in the node's local +/// mempool after filtering those included in the block. To prevent the mempool from rechecking all +/// transactions every time a block is committed, set the configuration option +/// `mempool.recheck=false`. +/// +/// Finally, the mempool will unlock and new transactions can be processed through [`check_tx`] +/// again. +/// +/// Note that [`check_tx`] doesn't have to check everything that affects transaction validity; the +/// expensive things can be skipped. In fact, [`check_tx`] doesn't have to check anything; it might +/// say that any transaction is a valid transaction. Unlike [`deliver_tx`], [`check_tx`] is just +/// there as a sort of weak filter to keep invalid transactions out of the blockchain. It's weak, +/// because a Byzantine node doesn't care about [`check_tx`]; it can propose a block full of invalid +/// transactions if it wants. +/// +/// [_Mempool_]: trait.Mempool.html#details +/// [`commit`]: trait.Consensus.html#tymethod.commit +/// [_Consensus_]: trait.Consensus.html#details +/// [`deliver_tx`]: trait.Consensus.html#tymethod.deliver_tx +/// [`check_tx`]: trait.Mempool.html#method.check_tx +#[async_trait] +pub trait Mempool: Send + Sync { + /// Guardian of the mempool: every node runs CheckTx before letting a transaction into its local + /// mempool. Technically optional - not involved in processing blocks + /// + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn check_tx(&self, check_tx_request: RequestCheckTx) -> ResponseCheckTx + /// ``` + async fn check_tx(&self, check_tx_request: RequestCheckTx) -> ResponseCheckTx; +} + +/// Trait for serving and restoring tendermint's state sync snapshots. +/// +/// # Details +/// +/// State sync allows new nodes to rapidly bootstrap by discovering, fetching, and applying state +/// machine snapshots instead of replaying historical blocks. For more details, see the state sync +/// section. +/// +/// When a new node is discovering snapshots in the P2P network, existing nodes will call +/// [`list_snapshots`] on the application to retrieve any local state snapshots. The new node will +/// offer these snapshots to its local application via [`offer_snapshot`]. +/// +/// Once the application accepts a snapshot and begins restoring it, Tendermint will fetch snapshot +/// chunks from existing nodes via [`load_snapshot_chunk`] and apply them sequentially to the local +/// application with `apply_snapshot_chunk`. When all chunks have been applied, the application +/// `app_hash` is retrieved via an [`info`] query and compared to the blockchain's `app_hash` +/// verified via light client. +/// +/// [`list_snapshots`]: trait.StateSync.html#method.list_snapshots +/// [`offer_snapshot`]: trait.StateSync.html#method.offer_snapshot +/// [`load_snapshot_chunk`]: trait.StateSync.html#method.load_snapshot_chunk +/// [`apply_snapshot_chunk`]: trait.StateSync.html#method.apply_snapshot_chunk +/// [`info`]: trait.Info.html#tymethod.info +#[async_trait] +pub trait Snapshot: Send + Sync { + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn list_snapshots(&self, list_snapshots_request: RequestListSnapshots) -> ResponseListSnapshots + /// ``` + async fn list_snapshots( + &self, + _list_snapshots_request: RequestListSnapshots, + ) -> ResponseListSnapshots { + Default::default() + } + + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn offer_snapshot(&self: offer_snapshot_request: RequestOfferSnapshot) -> ResponseOfferSnapshot + /// ``` + async fn offer_snapshot( + &self, + _offer_snapshot_request: RequestOfferSnapshot, + ) -> ResponseOfferSnapshot { + Default::default() + } + + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn load_snapshot_chunk(&self, load_snapshot_chunk_request: RequestLoadSnapshotChunk) -> ResponseLoadSnapshotChunk + /// ``` + async fn load_snapshot_chunk( + &self, + _load_snapshot_chunk_request: RequestLoadSnapshotChunk, + ) -> ResponseLoadSnapshotChunk { + Default::default() + } + + /// # Equivalent to + /// + /// ```rust,ignore + /// async fn apply_snapshot_chunk(&self: apply_snapshot_chunk_request: RequestApplySnapshotChunk) -> ResponseApplySnapshotChunk + /// ``` + async fn apply_snapshot_chunk( + &self, + _apply_snapshot_chunk_request: RequestApplySnapshotChunk, + ) -> ResponseApplySnapshotChunk { + Default::default() + } +} diff --git a/abci/src/lib.rs b/abci/src/lib.rs new file mode 100644 index 000000000..d7f016d6e --- /dev/null +++ b/abci/src/lib.rs @@ -0,0 +1,85 @@ +#![deny(missing_docs, unsafe_code)] +//! A Rust crate for creating ABCI applications. +//! +//! ## ABCI Overview +//! +//! ABCI is the interface between Tendermint (a state-machine replication engine) and your +//! application (the actual state machine). It consists of a set of methods, where each method has a +//! corresponding `Request` and `Response` message type. Tendermint calls the ABCI methods on the +//! ABCI application by sending the `Request` messages and receiving the `Response` messages in +//! return. +//! +//! ABCI methods are split across 4 separate ABCI connections: +//! +//! - `Consensus` Connection: `InitChain`, `BeginBlock`, `DeliverTx`, `EndBlock`, `Commit` +//! - `Mempool` Connection: `CheckTx` +//! - `Info` Connection: `Info`, `SetOption`, `Query` +//! - `Snapshot` Connection: `ListSnapshots`, `LoadSnapshotChunk`, `OfferSnapshot`, +//! `ApplySnapshotChunk` +//! +//! Additionally, there is a `Flush` method that is called on every connection, and an `Echo` method +//! that is just for debugging. +//! +//! To know more about ABCI protocol specifications, go to official ABCI [documentation](https://tendermint.com/docs/spec/abci/). +//! +//! ## Usage +//! +//! Add `abci` in your `Cargo.toml`'s `dependencies` section: +//! +//! ```toml +//! [dependencies] +//! abci = "0.10" +//! ``` +//! +//! Each ABCI application has to implement three core traits corresponding to all three ABCI +//! connections, `Consensus`, `Mempool` and `Info`. +//! +//! > Note: Implementations of these traits are expected to be `Send + Sync` and methods take +//! immutable reference of `self`. So, internal mutability must be handled using thread safe (`Arc`, +//! `Mutex`, etc.) constructs. +//! +//! After implementing all three above mentioned `trait`s, you can create a `Server` object and use +//! `Server::run()`to start ABCI application. +//! +//! `Server::run()` is an `async` function and returns a `Future`. So, you'll need an executor to +//! drive `Future` returned from `Server::run()`. `async-std` and `tokio` are two popular options. +//! In `counter` example, we use `tokio`'s executor. +//! +//! To know more, go to `examples/` to see a sample ABCI application. +//! +//! ### Features +//! +//! - `use-tokio`: Enables `tokio` backend for running ABCI TCP/UDS server +//! - **Enabled** by default. +//! - `use-async-std`: Enables `async-std` backend for running ABCI TCP/UDS server +//! - **Disabled** by default. +//! +//! > Note: Features `use-tokio` and `use-async-std` are mutually exclusive, i.e., only one of them +//! can be enabled at a time. Compilation will fail if either both of them are enabled or none of +//! them are enabled. +//! +//! ## Minimum Supported Versions +//! +//! - Tendermint: [`v0.33.6`](https://github.com/tendermint/tendermint/releases/tag/v0.33.6) +#![cfg_attr(feature = "doc", feature(doc_cfg))] + +#[cfg(all(feature = "use-async-std", feature = "use-tokio"))] +compile_error!("Features `use-async-std` and `use-tokio` are mutually exclusive"); + +#[cfg(not(any(feature = "use-async-std", feature = "use-tokio")))] +compile_error!("Either feature `use-async-std` or `use-tokio` must be enabled for this crate"); + +mod application; +mod server; +mod state; +#[cfg(test)] +mod tests; + +pub mod types; + +/// Utility macro for implementing [`Consensus`](trait.Consensus.html), +/// [`Mempool`](trait.Mempool.html) and [`Info`](trait.Info.html) traits. +pub use async_trait::async_trait; + +pub use self::application::{Consensus, Info, Mempool, Snapshot}; +pub use self::server::{Address, Server}; diff --git a/abci/src/server.rs b/abci/src/server.rs new file mode 100644 index 000000000..ac0cafa69 --- /dev/null +++ b/abci/src/server.rs @@ -0,0 +1,278 @@ +#[cfg(unix)] +use std::path::PathBuf; +use std::{io::Result, net::SocketAddr, sync::Arc}; + +#[cfg(all(unix, feature = "use-async-std"))] +use async_std::os::unix::net::UnixListener; +#[cfg(feature = "use-async-std")] +use async_std::{ + io::{Read, Write}, + net::TcpListener, + prelude::*, + sync::Mutex, + task::spawn, +}; +use tendermint_proto::abci::{ + request::Value as RequestValue, response::Value as ResponseValue, Request, Response, +}; +#[cfg(all(unix, feature = "use-tokio"))] +use tokio::net::UnixListener; +#[cfg(feature = "use-tokio")] +use tokio::{ + io::{AsyncRead as Read, AsyncWrite as Write}, + net::TcpListener, + spawn, + stream::StreamExt, + sync::Mutex, +}; +use tracing::{debug, error, info, instrument}; + +use crate::{ + state::ConsensusStateValidator, + types::{decode, encode}, + Consensus, Info, Mempool, Snapshot, +}; + +/// ABCI Server +pub struct Server +where + C: Consensus + 'static, + M: Mempool + 'static, + I: Info + 'static, + S: Snapshot + 'static, +{ + /// Wrapping inner type in `Arc` so that it becomes clonable and can be shared between multiple + /// async tasks + pub(crate) inner: Arc>, +} + +/// Inner type that contains all the trait implementations +pub(crate) struct Inner +where + C: Consensus + 'static, + M: Mempool + 'static, + I: Info + 'static, + S: Snapshot + 'static, +{ + consensus: C, + mempool: M, + info: I, + snapshot: S, + consensus_state: Mutex, +} + +impl Server +where + C: Consensus + 'static, + M: Mempool + 'static, + I: Info + 'static, + S: Snapshot + 'static, +{ + /// Creates a new instance of [`Server`](struct.Server.html) + pub fn new(consensus: C, mempool: M, info: I, snapshot: S) -> Result { + Ok(Self { + inner: Arc::new(Inner { + consensus, + mempool, + info, + snapshot, + consensus_state: Default::default(), + }), + }) + } + + /// Starts ABCI server + /// + /// # Note + /// + /// This is an `async` function and returns a `Future`. So, you'll need an executor to drive the + /// `Future` returned from this function. `async-std` and `tokio` are two popular options. + pub async fn run(&self, addr: T) -> Result<()> + where + T: Into
, + { + let addr = addr.into(); + + match addr { + Address::Tcp(addr) => { + #[cfg(feature = "use-async-std")] + let listener = TcpListener::bind(addr).await?; + + #[cfg(feature = "use-tokio")] + let mut listener = TcpListener::bind(addr).await?; + + info!(message = "Started ABCI server at", %addr); + + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + let stream = stream?; + let peer_addr = stream.peer_addr().ok(); + self.handle_connection(stream, peer_addr); + } + } + #[cfg(unix)] + Address::Uds(path) => { + #[cfg(feature = "use-async-std")] + let listener = UnixListener::bind(&path).await?; + + #[cfg(feature = "use-tokio")] + let mut listener = UnixListener::bind(&path)?; + + info!(message = "Started ABCI server at", path = %path.display()); + + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + let stream = stream?; + let peer_addr = stream.peer_addr().ok(); + self.handle_connection(stream, peer_addr); + } + } + } + + Ok(()) + } + + #[instrument(skip(self, stream))] + pub(crate) fn handle_connection(&self, mut stream: D, peer_addr: Option

) + where + D: Read + Write + Send + Unpin + 'static, + P: std::fmt::Debug + Send + 'static, + { + info!("New peer connection"); + + let inner = self.inner.clone(); + + spawn(async move { + while let Ok(request) = decode(&mut stream).await { + match request { + Some(request) => { + let response = inner.process(request).await; + + if let Err(err) = encode(response, &mut stream).await { + error!(message = "Error while writing to stream", %err, ?peer_addr); + } + } + None => debug!(message = "Received empty request", ?peer_addr), + } + } + + error!( + message = "Error while receiving ABCI request from socket", + ?peer_addr + ); + }); + } +} + +impl Inner +where + C: Consensus + 'static, + M: Mempool + 'static, + I: Info + 'static, + S: Snapshot + 'static, +{ + #[instrument(skip(self))] + pub(crate) async fn process(&self, request: Request) -> Response { + if request.value.is_none() { + debug!(message = "Received a request without value", ?request); + return Response::default(); + } + + let value = match request.value.unwrap() { + RequestValue::Echo(request) => ResponseValue::Echo(self.info.echo(request).await), + RequestValue::Flush(request) => { + ResponseValue::Flush(self.consensus.flush(request).await) + } + RequestValue::Info(request) => { + let info_response = self.info.info(request).await; + self.consensus_state + .lock() + .await + .on_info_response(&info_response); + ResponseValue::Info(info_response) + } + RequestValue::SetOption(request) => { + ResponseValue::SetOption(self.info.set_option(request).await) + } + RequestValue::InitChain(request) => { + self.consensus_state.lock().await.on_init_chain_request(); + ResponseValue::InitChain(self.consensus.init_chain(request).await) + } + RequestValue::Query(request) => ResponseValue::Query(self.info.query(request).await), + RequestValue::BeginBlock(request) => { + self.consensus_state + .lock() + .await + .on_begin_block_request(&request); + ResponseValue::BeginBlock(self.consensus.begin_block(request).await) + } + RequestValue::CheckTx(request) => { + ResponseValue::CheckTx(self.mempool.check_tx(request).await) + } + RequestValue::DeliverTx(request) => { + self.consensus_state.lock().await.on_deliver_tx_request(); + ResponseValue::DeliverTx(self.consensus.deliver_tx(request).await) + } + RequestValue::EndBlock(request) => { + self.consensus_state + .lock() + .await + .on_end_block_request(&request); + ResponseValue::EndBlock(self.consensus.end_block(request).await) + } + RequestValue::Commit(request) => { + let mut consensus_state = self.consensus_state.lock().await; + consensus_state.on_commit_request(); + + let response = self.consensus.commit(request).await; + consensus_state.on_commit_response(&response); + ResponseValue::Commit(response) + } + RequestValue::ListSnapshots(request) => { + ResponseValue::ListSnapshots(self.snapshot.list_snapshots(request).await) + } + RequestValue::OfferSnapshot(request) => { + ResponseValue::OfferSnapshot(self.snapshot.offer_snapshot(request).await) + } + RequestValue::LoadSnapshotChunk(request) => { + ResponseValue::LoadSnapshotChunk(self.snapshot.load_snapshot_chunk(request).await) + } + RequestValue::ApplySnapshotChunk(request) => { + ResponseValue::ApplySnapshotChunk(self.snapshot.apply_snapshot_chunk(request).await) + } + }; + + let mut response = Response::default(); + response.value = Some(value); + + debug!(message = "Sending response", ?response); + + response + } +} + +/// Address of ABCI Server +#[derive(Debug)] +pub enum Address { + /// TCP Address + Tcp(SocketAddr), + /// UDS Address + #[cfg(unix)] + #[cfg_attr(feature = "doc", doc(cfg(unix)))] + Uds(PathBuf), +} + +impl From for Address { + fn from(addr: SocketAddr) -> Self { + Self::Tcp(addr) + } +} + +#[cfg(unix)] +impl From for Address { + fn from(path: PathBuf) -> Self { + Self::Uds(path) + } +} diff --git a/abci/src/state.rs b/abci/src/state.rs new file mode 100644 index 000000000..0efd156ad --- /dev/null +++ b/abci/src/state.rs @@ -0,0 +1,184 @@ +use tendermint_proto::abci::{RequestBeginBlock, RequestEndBlock, ResponseCommit, ResponseInfo}; + +#[derive(Debug, Default, Clone)] +pub struct ConsensusStateValidator { + state: ConsensusState, +} + +impl ConsensusStateValidator { + pub fn on_info_response(&mut self, info_response: &ResponseInfo) { + if self.state == ConsensusState::NoInfo { + let block_height = info_response.last_block_height; + + if block_height == 0 { + self.state = ConsensusState::NotInitialized; + } else { + self.state = ConsensusState::WaitingForBlock { + block_height: block_height + 1, + app_hash: info_response.last_block_app_hash.clone(), + }; + } + } + } + + pub fn on_init_chain_request(&mut self) { + if self.state != ConsensusState::NotInitialized { + panic!("Received `InitChain` call when chain is already initialized"); + } + + self.state = ConsensusState::InitChain; + } + + pub fn on_begin_block_request(&mut self, begin_block_request: &RequestBeginBlock) { + let new_state = match self.state { + ConsensusState::InitChain => { + let header = begin_block_request + .header + .as_ref() + .expect("`BeginBlock` request does not contain a header"); + + ConsensusState::ExecutingBlock { + block_height: header.height, + execution_state: BlockExecutionState::BeginBlock, + } + } + ConsensusState::WaitingForBlock { + ref block_height, + ref app_hash, + } => { + let block_height = *block_height; + + let header = begin_block_request + .header + .as_ref() + .expect("`BeginBlock` request does not contain a header"); + + if header.height != block_height { + panic!( + "Expected height {} in `BeginBlock` request. Got {}", + block_height, header.height + ); + } + + if &header.app_hash != app_hash { + panic!( + "Expected app hash {:?} in `BeginBlock`. Got {:?}", + app_hash, header.app_hash + ); + } + + ConsensusState::ExecutingBlock { + block_height, + execution_state: BlockExecutionState::BeginBlock, + } + } + _ => panic!("`BeginBlock` cannot be called after {:?}", self.state), + }; + + self.state = new_state; + } + + pub fn on_deliver_tx_request(&mut self) { + match self.state { + ConsensusState::ExecutingBlock { + ref mut execution_state, + .. + } => execution_state.validate(BlockExecutionState::DeliverTx), + _ => panic!("`DeliverTx` cannot be called after {:?}", self.state), + } + } + + pub fn on_end_block_request(&mut self, end_block_request: &RequestEndBlock) { + match self.state { + ConsensusState::ExecutingBlock { + ref mut execution_state, + ref block_height, + } => { + let block_height = *block_height; + + if block_height != end_block_request.height { + panic!( + "Expected `EndBlock` for height {}. But received for {}", + block_height, end_block_request.height + ) + } + + execution_state.validate(BlockExecutionState::EndBlock); + } + _ => panic!("`EndBlock` cannot be called after {:?}", self.state), + } + } + + pub fn on_commit_request(&mut self) { + match self.state { + ConsensusState::ExecutingBlock { + ref mut execution_state, + .. + } => execution_state.validate(BlockExecutionState::Commit), + _ => panic!("`Commit` cannot be called after {:?}", self.state), + } + } + + pub fn on_commit_response(&mut self, commit_response: &ResponseCommit) { + let new_state = match self.state { + ConsensusState::ExecutingBlock { + execution_state: BlockExecutionState::Commit, + block_height, + } => ConsensusState::WaitingForBlock { + block_height: block_height + 1, + app_hash: commit_response.data.clone(), + }, + _ => panic!("Received `CommitResponse` after {:?}", self.state), + }; + + self.state = new_state; + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ConsensusState { + NoInfo, + NotInitialized, + InitChain, + WaitingForBlock { + block_height: i64, + app_hash: Vec, + }, + ExecutingBlock { + block_height: i64, + execution_state: BlockExecutionState, + }, +} + +impl Default for ConsensusState { + fn default() -> Self { + Self::NoInfo + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlockExecutionState { + BeginBlock, + DeliverTx, + EndBlock, + Commit, +} + +impl BlockExecutionState { + pub fn validate(&mut self, next: Self) { + let is_valid = match (*self, next) { + (Self::BeginBlock, Self::DeliverTx) => true, + (Self::BeginBlock, Self::EndBlock) => true, + (Self::DeliverTx, Self::DeliverTx) => true, + (Self::DeliverTx, Self::EndBlock) => true, + (Self::EndBlock, Self::Commit) => true, + _ => false, + }; + + if is_valid { + *self = next; + } else { + panic!("{:?} cannot be called after {:?}", next, self); + } + } +} diff --git a/abci/src/tests.rs b/abci/src/tests.rs new file mode 100644 index 000000000..c6a96ee02 --- /dev/null +++ b/abci/src/tests.rs @@ -0,0 +1,564 @@ +mod counter; +mod request_generator; + +use tendermint_proto::abci::{response::Value as ResponseValue, Request, Response}; + +#[tokio::test] +async fn check_valid_abci_flow() { + let server = counter::server(); + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(0, info_response.last_block_height); + assert!(info_response.last_block_app_hash.is_empty()); + } else { + panic!("Info request should generate info response"); + } + + // Because the `block_height` returned by `info` call is `0`, tendermint will next call + // `init_chain` + let response = server.inner.process(request_generator::init_chain()).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `begin_block` with `block_height = 1` + let response = server + .inner + .process(request_generator::begin_block(1, Default::default())) + .await; + assert!(response.value.is_some()); + + // Next, tendermint may call multiple `deliver_tx` + let response = server.inner.process(request_generator::deliver_tx(1)).await; + assert!(response.value.is_some()); + + let response = server.inner.process(request_generator::deliver_tx(2)).await; + assert!(response.value.is_some()); + + // After all the transactions are delivered, tendermint will call `end_block` + let response = server.inner.process(request_generator::end_block(1)).await; + assert!(response.value.is_some()); + + // Finally, tendermint will call `commit` + let response = server.inner.process(request_generator::commit()).await; + assert!(response.value.is_some()); + if let ResponseValue::Commit(commit_response) = response.value.unwrap() { + assert_eq!(2u64.to_be_bytes().to_vec(), commit_response.data); + } else { + panic!("Commit request should generate commit response"); + } + + // Next, tendermint will call `begin_block` with `block_height = 2` + let response = server + .inner + .process(request_generator::begin_block( + 2, + 2u64.to_be_bytes().to_vec(), + )) + .await; + assert!(response.value.is_some()); + + // Next, tendermint may call multiple `deliver_tx` + let response = server.inner.process(request_generator::deliver_tx(3)).await; + assert!(response.value.is_some()); + + let response = server.inner.process(request_generator::deliver_tx(4)).await; + assert!(response.value.is_some()); + + // After all the transactions are delivered, tendermint will call `end_block` + let response = server.inner.process(request_generator::end_block(2)).await; + assert!(response.value.is_some()); + + // Finally, tendermint will call `commit` + let response = server.inner.process(request_generator::commit()).await; + assert!(response.value.is_some()); + if let ResponseValue::Commit(commit_response) = response.value.unwrap() { + assert_eq!(4u64.to_be_bytes().to_vec(), commit_response.data); + } else { + panic!("Commit request should generate commit response"); + } +} + +#[tokio::test] +async fn check_valid_abci_flow_with_init_state() { + let server = counter::server_with_state(4, 2); + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(2, info_response.last_block_height); + assert_eq!( + 4u64.to_be_bytes().to_vec(), + info_response.last_block_app_hash + ); + } else { + panic!("Info request should generate info response"); + } + + // Because the `block_height` returned by `info` call is `2`, tendermint will next call + // `begin_block` with `block_height = 3` + let response = server + .inner + .process(request_generator::begin_block( + 3, + 4u64.to_be_bytes().to_vec(), + )) + .await; + assert!(response.value.is_some()); + + // Next, tendermint may call multiple `deliver_tx` + let response = server.inner.process(request_generator::deliver_tx(5)).await; + assert!(response.value.is_some()); + + let response = server.inner.process(request_generator::deliver_tx(6)).await; + assert!(response.value.is_some()); + + // After all the transactions are delivered, tendermint will call `end_block` + let response = server.inner.process(request_generator::end_block(3)).await; + assert!(response.value.is_some()); + + // Finally, tendermint will call `commit` + let response = server.inner.process(request_generator::commit()).await; + assert!(response.value.is_some()); + if let ResponseValue::Commit(commit_response) = response.value.unwrap() { + assert_eq!(6u64.to_be_bytes().to_vec(), commit_response.data); + } else { + panic!("Commit request should generate commit response"); + } +} + +#[tokio::test] +async fn can_call_init_chain_after_startup() { + let response = call_after_startup(request_generator::init_chain(), None).await; + assert!(response.value.is_some()); +} + +#[tokio::test] +#[should_panic(expected = "`BeginBlock` cannot be called after NotInitialized")] +async fn cannot_call_begin_block_after_startup() { + call_after_startup(request_generator::begin_block(0, Default::default()), None).await; +} + +#[tokio::test] +#[should_panic(expected = "`DeliverTx` cannot be called after NotInitialized")] +async fn cannot_call_deliver_tx_after_startup() { + call_after_startup(request_generator::deliver_tx(0), None).await; +} + +#[tokio::test] +#[should_panic(expected = "`EndBlock` cannot be called after NotInitialized")] +async fn cannot_call_end_block_after_startup() { + call_after_startup(request_generator::end_block(0), None).await; +} + +#[tokio::test] +#[should_panic(expected = "`Commit` cannot be called after NotInitialized")] +async fn cannot_call_commit_after_startup() { + call_after_startup(request_generator::commit(), None).await; +} + +#[tokio::test] +#[should_panic(expected = "Received `InitChain` call when chain is already initialized")] +async fn cannot_call_init_chain_after_startup_with_state() { + call_after_startup(request_generator::init_chain(), Some((1, 1))).await; +} + +#[tokio::test] +#[should_panic( + expected = "`DeliverTx` cannot be called after WaitingForBlock { block_height: 2, app_hash: [0, 0, 0, 0, 0, 0, 0, 1] }" +)] +async fn cannot_call_deliver_tx_after_startup_with_state() { + call_after_startup(request_generator::deliver_tx(0), Some((1, 1))).await; +} + +#[tokio::test] +#[should_panic( + expected = "`EndBlock` cannot be called after WaitingForBlock { block_height: 2, app_hash: [0, 0, 0, 0, 0, 0, 0, 1] }" +)] +async fn cannot_call_end_block_after_startup_with_state() { + call_after_startup(request_generator::end_block(0), Some((1, 1))).await; +} + +#[tokio::test] +#[should_panic( + expected = "`Commit` cannot be called after WaitingForBlock { block_height: 2, app_hash: [0, 0, 0, 0, 0, 0, 0, 1] }" +)] +async fn cannot_call_commit_after_startup_with_state() { + call_after_startup(request_generator::commit(), Some((1, 1))).await; +} + +#[tokio::test] +async fn can_call_begin_block_after_startup_with_state() { + let response = call_after_startup( + request_generator::begin_block(2, 1u64.to_be_bytes().to_vec()), + Some((1, 1)), + ) + .await; + assert!(response.value.is_some()) +} + +#[tokio::test] +#[should_panic(expected = "Expected height 2 in `BeginBlock` request. Got 3")] +async fn cannot_call_begin_block_with_different_block_height_after_startup_with_state() { + call_after_startup( + request_generator::begin_block(3, 1u64.to_be_bytes().to_vec()), + Some((1, 1)), + ) + .await; +} + +#[tokio::test] +#[should_panic( + expected = "Expected app hash [0, 0, 0, 0, 0, 0, 0, 1] in `BeginBlock`. Got [0, 0, 0, 0, 0, 0, 0, 2]" +)] +async fn cannot_call_begin_block_with_different_app_hash_after_startup_with_state() { + call_after_startup( + request_generator::begin_block(2, 2u64.to_be_bytes().to_vec()), + Some((1, 1)), + ) + .await; +} + +async fn call_after_startup(request: Request, state: Option<(u64, i64)>) -> Response { + let (server, block_height, app_hash) = match state { + None => (counter::server(), 0, Vec::new()), + Some((counter, block_height)) => ( + counter::server_with_state(counter, block_height), + block_height, + counter.to_be_bytes().to_vec(), + ), + }; + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(block_height, info_response.last_block_height); + assert_eq!(app_hash, info_response.last_block_app_hash); + } else { + panic!("Info request should generate info response"); + } + + // Send provided request + server.inner.process(request).await +} + +#[tokio::test] +#[should_panic(expected = "Received `InitChain` call when chain is already initialized")] +async fn cannot_call_init_chain_after_begin_block() { + call_after_begin_block(request_generator::init_chain()).await; +} + +#[tokio::test] +#[should_panic( + expected = "`BeginBlock` cannot be called after ExecutingBlock { block_height: 1, execution_state: BeginBlock }" +)] +async fn cannot_call_begin_block_after_begin_block() { + call_after_begin_block(request_generator::begin_block(2, Default::default())).await; +} + +#[tokio::test] +#[should_panic(expected = "Commit cannot be called after BeginBlock")] +async fn cannot_call_commit_after_begin_block() { + call_after_begin_block(request_generator::commit()).await; +} + +#[tokio::test] +#[should_panic(expected = "Expected `EndBlock` for height 1. But received for 2")] +async fn cannot_call_end_block_with_different_block_height_after_begin_block() { + call_after_begin_block(request_generator::end_block(2)).await; +} + +#[tokio::test] +async fn can_call_deliver_tx_after_begin_block() { + let response = call_after_begin_block(request_generator::deliver_tx(1)).await; + assert!(response.value.is_some()); +} + +#[tokio::test] +async fn can_call_end_block_after_begin_block() { + let response = call_after_begin_block(request_generator::end_block(1)).await; + assert!(response.value.is_some()); +} + +async fn call_after_begin_block(request: Request) -> Response { + let server = counter::server(); + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(0, info_response.last_block_height); + assert!(info_response.last_block_app_hash.is_empty()); + } else { + panic!("Info request should generate info response"); + } + + // Because the `block_height` returned by `info` call is `0`, tendermint will next call + // `init_chain` + let response = server.inner.process(request_generator::init_chain()).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `begin_block` with `block_height = 1` + let response = server + .inner + .process(request_generator::begin_block(1, Default::default())) + .await; + assert!(response.value.is_some()); + + // Send provided request + server.inner.process(request).await +} + +#[tokio::test] +#[should_panic(expected = "Received `InitChain` call when chain is already initialized")] +async fn cannot_call_init_chain_after_deliver_tx() { + call_after_deliver_tx(request_generator::init_chain()).await; +} + +#[tokio::test] +#[should_panic( + expected = "`BeginBlock` cannot be called after ExecutingBlock { block_height: 1, execution_state: DeliverTx }" +)] +async fn cannot_call_begin_block_after_deliver_tx() { + call_after_deliver_tx(request_generator::begin_block( + 2, + 1u64.to_be_bytes().to_vec(), + )) + .await; +} + +#[tokio::test] +#[should_panic(expected = "Commit cannot be called after DeliverTx")] +async fn cannot_call_commit_after_deliver_tx() { + call_after_deliver_tx(request_generator::commit()).await; +} + +#[tokio::test] +#[should_panic(expected = "Expected `EndBlock` for height 1. But received for 2")] +async fn cannot_call_end_block_with_different_height_after_deliver_tx() { + call_after_deliver_tx(request_generator::end_block(2)).await; +} + +#[tokio::test] +async fn can_call_deliver_tx_after_deliver_tx() { + let response = call_after_deliver_tx(request_generator::deliver_tx(1)).await; + assert!(response.value.is_some()) +} + +#[tokio::test] +async fn can_call_end_block_after_deliver_tx() { + let response = call_after_deliver_tx(request_generator::end_block(1)).await; + assert!(response.value.is_some()) +} + +async fn call_after_deliver_tx(request: Request) -> Response { + let server = counter::server(); + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(0, info_response.last_block_height); + assert!(info_response.last_block_app_hash.is_empty()); + } else { + panic!("Info request should generate info response"); + } + + // Because the `block_height` returned by `info` call is `0`, tendermint will next call + // `init_chain` + let response = server.inner.process(request_generator::init_chain()).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `begin_block` with `block_height = 1` + let response = server + .inner + .process(request_generator::begin_block(1, Default::default())) + .await; + assert!(response.value.is_some()); + + // Next, tendermint will call `deliver_tx` + let response = server.inner.process(request_generator::deliver_tx(1)).await; + assert!(response.value.is_some()); + + // Send provided request + server.inner.process(request).await +} + +#[tokio::test] +#[should_panic(expected = "Received `InitChain` call when chain is already initialized")] +async fn cannot_call_init_chain_after_end_block() { + call_after_end_block(request_generator::init_chain()).await; +} + +#[tokio::test] +#[should_panic( + expected = "`BeginBlock` cannot be called after ExecutingBlock { block_height: 1, execution_state: EndBlock }" +)] +async fn cannot_call_begin_block_after_end_block() { + call_after_end_block(request_generator::begin_block( + 2, + 1u64.to_be_bytes().to_vec(), + )) + .await; +} + +#[tokio::test] +#[should_panic(expected = "DeliverTx cannot be called after EndBlock")] +async fn cannot_call_deliver_tx_after_end_block() { + call_after_end_block(request_generator::deliver_tx(2)).await; +} + +#[tokio::test] +#[should_panic(expected = "EndBlock cannot be called after EndBlock")] +async fn cannot_call_end_block_after_end_block() { + let respone = call_after_end_block(request_generator::end_block(1)).await; + assert!(respone.value.is_some()) +} + +#[tokio::test] +async fn can_call_commit_after_end_block() { + call_after_end_block(request_generator::commit()).await; +} + +async fn call_after_end_block(request: Request) -> Response { + let server = counter::server(); + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(0, info_response.last_block_height); + assert!(info_response.last_block_app_hash.is_empty()); + } else { + panic!("Info request should generate info response"); + } + + // Because the `block_height` returned by `info` call is `0`, tendermint will next call + // `init_chain` + let response = server.inner.process(request_generator::init_chain()).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `begin_block` with `block_height = 1` + let response = server + .inner + .process(request_generator::begin_block(1, Default::default())) + .await; + assert!(response.value.is_some()); + + // Next, tendermint will call `deliver_tx` + let response = server.inner.process(request_generator::deliver_tx(1)).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `end_block` + let response = server.inner.process(request_generator::end_block(1)).await; + assert!(response.value.is_some()); + + // Send provided request + server.inner.process(request).await +} + +#[tokio::test] +#[should_panic(expected = "Received `InitChain` call when chain is already initialized")] +async fn cannot_call_init_chain_after_commit() { + call_after_commit(request_generator::init_chain()).await; +} + +#[tokio::test] +#[should_panic(expected = "Expected height 2 in `BeginBlock` request. Got 3")] +async fn cannot_call_begin_block_with_different_height_after_commit() { + call_after_commit(request_generator::begin_block( + 3, + 1u64.to_be_bytes().to_vec(), + )) + .await; +} + +#[tokio::test] +#[should_panic( + expected = "Expected app hash [0, 0, 0, 0, 0, 0, 0, 1] in `BeginBlock`. Got [0, 0, 0, 0, 0, 0, 0, 2]" +)] +async fn cannot_call_begin_block_with_different_app_hash_after_commit() { + call_after_commit(request_generator::begin_block( + 2, + 2u64.to_be_bytes().to_vec(), + )) + .await; +} + +#[tokio::test] +#[should_panic( + expected = "`DeliverTx` cannot be called after WaitingForBlock { block_height: 2, app_hash: [0, 0, 0, 0, 0, 0, 0, 1] }" +)] +async fn cannot_call_deliver_tx_after_commit() { + call_after_commit(request_generator::deliver_tx(2)).await; +} + +#[tokio::test] +#[should_panic( + expected = "`EndBlock` cannot be called after WaitingForBlock { block_height: 2, app_hash: [0, 0, 0, 0, 0, 0, 0, 1] }" +)] +async fn cannot_call_end_block_after_commit() { + call_after_commit(request_generator::end_block(2)).await; +} + +#[tokio::test] +#[should_panic( + expected = "`Commit` cannot be called after WaitingForBlock { block_height: 2, app_hash: [0, 0, 0, 0, 0, 0, 0, 1] }" +)] +async fn cannot_call_commit_after_commit() { + call_after_commit(request_generator::commit()).await; +} + +#[tokio::test] +async fn can_call_begin_block_after_commit() { + let response = call_after_commit(request_generator::begin_block( + 2, + 1u64.to_be_bytes().to_vec(), + )) + .await; + assert!(response.value.is_some()); +} + +async fn call_after_commit(request: Request) -> Response { + let server = counter::server(); + + // First, tendermint calls `info` to get information about ABCI application + let response = server.inner.process(request_generator::info()).await; + assert!(response.value.is_some()); + if let ResponseValue::Info(info_response) = response.value.unwrap() { + assert_eq!(0, info_response.last_block_height); + assert!(info_response.last_block_app_hash.is_empty()); + } else { + panic!("Info request should generate info response"); + } + + // Because the `block_height` returned by `info` call is `0`, tendermint will next call + // `init_chain` + let response = server.inner.process(request_generator::init_chain()).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `begin_block` with `block_height = 1` + let response = server + .inner + .process(request_generator::begin_block(1, Default::default())) + .await; + assert!(response.value.is_some()); + + // Next, tendermint will call `deliver_tx` + let response = server.inner.process(request_generator::deliver_tx(1)).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `end_block` + let response = server.inner.process(request_generator::end_block(1)).await; + assert!(response.value.is_some()); + + // Next, tendermint will call `commit` + let response = server.inner.process(request_generator::commit()).await; + assert!(response.value.is_some()); + + // Send provided request + server.inner.process(request).await +} diff --git a/abci/src/tests/counter.rs b/abci/src/tests/counter.rs new file mode 100644 index 000000000..b8799a6f0 --- /dev/null +++ b/abci/src/tests/counter.rs @@ -0,0 +1,219 @@ +use std::sync::{Arc, Mutex}; + +use tendermint_proto::abci::*; + +use crate::{async_trait, Consensus, Info, Mempool, Server, Snapshot}; + +/// Simple counter +#[derive(Debug, Default, Clone)] +pub struct CounterState { + block_height: i64, + app_hash: Vec, + counter: u64, +} + +#[derive(Debug)] +pub struct ConsensusConnection { + committed_state: Arc>, + current_state: Arc>>, +} + +impl ConsensusConnection { + pub fn new( + committed_state: Arc>, + current_state: Arc>>, + ) -> Self { + Self { + committed_state, + current_state, + } + } +} + +#[async_trait] +impl Consensus for ConsensusConnection { + async fn init_chain(&self, _init_chain_request: RequestInitChain) -> ResponseInitChain { + Default::default() + } + + async fn begin_block(&self, _begin_block_request: RequestBeginBlock) -> ResponseBeginBlock { + let committed_state = self.committed_state.lock().unwrap().clone(); + + let mut current_state = self.current_state.lock().unwrap(); + *current_state = Some(committed_state); + + Default::default() + } + + async fn deliver_tx(&self, deliver_tx_request: RequestDeliverTx) -> ResponseDeliverTx { + let new_counter = parse_bytes_to_counter(&deliver_tx_request.tx); + + if new_counter.is_err() { + let mut error = ResponseDeliverTx::default(); + error.code = 1; + error.codespace = "Parsing error".to_owned(); + error.log = "Transaction should be 8 bytes long".to_owned(); + error.info = "Transaction is big-endian encoding of 64-bit integer".to_owned(); + + return error; + } + + let new_counter = new_counter.unwrap(); + + let mut current_state_lock = self.current_state.lock().unwrap(); + let mut current_state = current_state_lock.as_mut().unwrap(); + + if current_state.counter + 1 != new_counter { + let mut error = ResponseDeliverTx::default(); + error.code = 2; + error.codespace = "Validation error".to_owned(); + error.log = "Only consecutive integers are allowed".to_owned(); + error.info = "Numbers to counter app should be supplied in increasing order of consecutive integers staring from 1".to_owned(); + + return error; + } + + current_state.counter = new_counter; + + Default::default() + } + + async fn end_block(&self, end_block_request: RequestEndBlock) -> ResponseEndBlock { + let mut current_state_lock = self.current_state.lock().unwrap(); + let mut current_state = current_state_lock.as_mut().unwrap(); + + current_state.block_height = end_block_request.height; + current_state.app_hash = current_state.counter.to_be_bytes().to_vec(); + + Default::default() + } + + async fn commit(&self, _commit_request: RequestCommit) -> ResponseCommit { + let current_state = self.current_state.lock().unwrap().as_ref().unwrap().clone(); + let mut committed_state = self.committed_state.lock().unwrap(); + *committed_state = current_state; + + ResponseCommit { + data: (*committed_state).app_hash.clone(), + retain_height: 0, + } + } +} + +#[derive(Debug)] +pub struct MempoolConnection { + state: Arc>>, +} + +impl MempoolConnection { + pub fn new(state: Arc>>) -> Self { + Self { state } + } +} + +#[async_trait] +impl Mempool for MempoolConnection { + async fn check_tx(&self, check_tx_request: RequestCheckTx) -> ResponseCheckTx { + let new_counter = parse_bytes_to_counter(&check_tx_request.tx); + + if new_counter.is_err() { + let mut error = ResponseCheckTx::default(); + error.code = 1; + error.codespace = "Parsing error".to_owned(); + error.log = "Transaction should be 8 bytes long".to_owned(); + error.info = "Transaction is big-endian encoding of 64-bit integer".to_owned(); + + return error; + } + + let new_counter = new_counter.unwrap(); + + let state_lock = self.state.lock().unwrap(); + let state = state_lock.as_ref().unwrap(); + + if state.counter + 1 != new_counter { + let mut error = ResponseCheckTx::default(); + error.code = 2; + error.codespace = "Validation error".to_owned(); + error.log = "Only consecutive integers are allowed".to_owned(); + error.info = "Numbers to counter app should be supplied in increasing order of consecutive integers staring from 1".to_owned(); + + return error; + } else { + Default::default() + } + } +} + +pub struct InfoConnection { + state: Arc>, +} + +impl InfoConnection { + pub fn new(state: Arc>) -> Self { + Self { state } + } +} + +#[async_trait] +impl Info for InfoConnection { + async fn info(&self, _info_request: RequestInfo) -> ResponseInfo { + let state = self.state.lock().unwrap(); + + ResponseInfo { + data: Default::default(), + version: Default::default(), + app_version: Default::default(), + last_block_height: (*state).block_height, + last_block_app_hash: (*state).app_hash.clone(), + } + } +} + +pub struct SnapshotConnection; + +#[async_trait] +impl Snapshot for SnapshotConnection {} + +fn parse_bytes_to_counter(bytes: &[u8]) -> Result { + if bytes.len() != 8 { + return Err(()); + } + + let mut counter_bytes = [0; 8]; + counter_bytes.copy_from_slice(bytes); + + Ok(u64::from_be_bytes(counter_bytes)) +} + +pub fn server() -> Server +{ + let committed_state: Arc> = Default::default(); + let current_state: Arc>> = Default::default(); + + let consensus = ConsensusConnection::new(committed_state.clone(), current_state.clone()); + let mempool = MempoolConnection::new(current_state.clone()); + let info = InfoConnection::new(committed_state.clone()); + let snapshot = SnapshotConnection; + + Server::new(consensus, mempool, info, snapshot).expect("Unable to create ABCI server") +} + +pub fn server_with_state( + counter: u64, + block_height: i64, +) -> Server { + let committed_state = Arc::new(Mutex::new(CounterState { + block_height, + counter, + app_hash: counter.to_be_bytes().to_vec(), + })); + let current_state: Arc>> = Default::default(); + + let consensus = ConsensusConnection::new(committed_state.clone(), current_state.clone()); + let mempool = MempoolConnection::new(current_state.clone()); + let info = InfoConnection::new(committed_state.clone()); + let snapshot = SnapshotConnection; + + Server::new(consensus, mempool, info, snapshot).expect("Unable to create ABCI server") +} diff --git a/abci/src/tests/request_generator.rs b/abci/src/tests/request_generator.rs new file mode 100644 index 000000000..ff9060b61 --- /dev/null +++ b/abci/src/tests/request_generator.rs @@ -0,0 +1,57 @@ +use tendermint_proto::{ + abci::{request::Value as RequestValue, *}, + types::*, +}; + +pub fn info() -> Request { + let mut request = Request::default(); + request.value = Some(RequestValue::Info(RequestInfo::default())); + request +} + +pub fn init_chain() -> Request { + let mut request = Request::default(); + request.value = Some(RequestValue::InitChain(Default::default())); + request +} + +pub fn begin_block(block_height: i64, app_hash: Vec) -> Request { + let mut begin_block_request = RequestBeginBlock::default(); + + let mut header = Header::default(); + header.height = block_height; + header.app_hash = app_hash; + + begin_block_request.header = Some(header).into(); + + let mut request = Request::default(); + request.value = Some(RequestValue::BeginBlock(begin_block_request)); + + request +} + +pub fn deliver_tx(counter: u64) -> Request { + let mut deliver_tx_request = RequestDeliverTx::default(); + deliver_tx_request.tx = counter.to_be_bytes().to_vec(); + + let mut request = Request::default(); + request.value = Some(RequestValue::DeliverTx(deliver_tx_request)); + + request +} + +pub fn end_block(block_height: i64) -> Request { + let mut end_block_request = RequestEndBlock::default(); + end_block_request.height = block_height; + + let mut request = Request::default(); + request.value = Some(RequestValue::EndBlock(end_block_request)); + + request +} + +pub fn commit() -> Request { + let mut request = Request::default(); + request.value = Some(RequestValue::Commit(RequestCommit::default())); + request +} diff --git a/abci/src/types.rs b/abci/src/types.rs new file mode 100644 index 000000000..79c747054 --- /dev/null +++ b/abci/src/types.rs @@ -0,0 +1,57 @@ +#![allow(missing_docs)] +//! Types used in ABCI +pub use prost_types::{Duration, Timestamp}; + +use std::{ + convert::TryFrom, + io::{Error, ErrorKind, Result}, +}; + +#[cfg(feature = "use-async-std")] +use async_std::{ + io::{Read, Write}, + prelude::*, +}; +#[cfg(feature = "use-tokio")] +use tokio::io::{AsyncRead as Read, AsyncReadExt, AsyncWrite as Write, AsyncWriteExt}; + +use integer_encoding::{VarIntAsyncReader, VarIntAsyncWriter}; +use prost::Message; + +/// Decodes a `Request` from stream +pub(crate) async fn decode( + mut reader: R, +) -> Result> { + let length: i64 = reader.read_varint_async().await?; + + if length == 0 { + return Ok(None); + } + + let mut bytes = vec![0; length as usize]; + reader.take(length as u64).read(&mut bytes).await?; + + ::decode(bytes.as_slice()) + .map(Some) + .map_err(|e| Error::new(ErrorKind::InvalidData, e)) +} + +/// Encodes a `Response` to stream +pub(crate) async fn encode( + message: M, + mut writer: W, +) -> Result<()> { + writer + .write_varint_async( + i64::try_from(message.encoded_len()).expect("Cannot convert from `i64` to `usize`"), + ) + .await?; + + let mut bytes = vec![]; + + message + .encode(&mut bytes) + .map_err(|e| Error::new(ErrorKind::Other, e))?; + + writer.write_all(&bytes).await +}