Skip to content

Commit d2eb4c6

Browse files
authored
feat: implement utxorpc watch module (txpipe#270)
1 parent 4b3a140 commit d2eb4c6

File tree

2 files changed

+100
-2
lines changed

2 files changed

+100
-2
lines changed

src/serve/grpc/mod.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{prelude::*, submit::Transaction};
1212
mod query;
1313
mod submit;
1414
mod sync;
15+
mod watch;
1516

1617
#[derive(Serialize, Deserialize, Clone)]
1718
pub struct Config {
@@ -29,13 +30,16 @@ pub async fn serve(
2930
) -> Result<(), Error> {
3031
let addr = config.listen_address.parse().unwrap();
3132

32-
let sync_service = sync::ChainSyncServiceImpl::new(wal, ledger.clone());
33+
let sync_service = sync::ChainSyncServiceImpl::new(wal.clone(), ledger.clone());
3334
let sync_service =
3435
u5c::sync::chain_sync_service_server::ChainSyncServiceServer::new(sync_service);
3536

36-
let query_service = query::QueryServiceImpl::new(ledger);
37+
let query_service = query::QueryServiceImpl::new(ledger.clone());
3738
let query_service = u5c::query::query_service_server::QueryServiceServer::new(query_service);
3839

40+
let watch_service = watch::WatchServiceImpl::new(wal.clone(), ledger.clone());
41+
let watch_service = u5c::watch::watch_service_server::WatchServiceServer::new(watch_service);
42+
3943
let submit_service = submit::SubmitServiceImpl::new(txs_out, mempool);
4044
let submit_service =
4145
u5c::submit::submit_service_server::SubmitServiceServer::new(submit_service);
@@ -45,6 +49,7 @@ pub async fn serve(
4549
.register_encoded_file_descriptor_set(u5c::sync::FILE_DESCRIPTOR_SET)
4650
.register_encoded_file_descriptor_set(u5c::query::FILE_DESCRIPTOR_SET)
4751
.register_encoded_file_descriptor_set(u5c::submit::FILE_DESCRIPTOR_SET)
52+
.register_encoded_file_descriptor_set(u5c::watch::FILE_DESCRIPTOR_SET)
4853
.register_encoded_file_descriptor_set(protoc_wkt::google::protobuf::FILE_DESCRIPTOR_SET)
4954
.build()
5055
.unwrap();
@@ -68,6 +73,7 @@ pub async fn serve(
6873
.add_service(tonic_web::enable(sync_service))
6974
.add_service(tonic_web::enable(query_service))
7075
.add_service(tonic_web::enable(submit_service))
76+
.add_service(tonic_web::enable(watch_service))
7177
.add_service(reflection)
7278
.serve_with_shutdown(addr, exit.cancelled())
7379
.await

src/serve/grpc/watch.rs

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use crate::{
2+
ledger,
3+
wal::{self, WalReader as _},
4+
};
5+
use futures_core::Stream;
6+
use futures_util::StreamExt;
7+
use pallas::interop::utxorpc as interop;
8+
use pallas::interop::utxorpc::spec as u5c;
9+
use pallas::ledger::traverse::MultiEraBlock;
10+
use std::pin::Pin;
11+
use tonic::{Request, Response, Status};
12+
13+
fn block_to_txs(
14+
block: &wal::RawBlock,
15+
mapper: &interop::Mapper<ledger::store::LedgerStore>,
16+
) -> Vec<u5c::watch::AnyChainTx> {
17+
let wal::RawBlock { body, .. } = block;
18+
let block = MultiEraBlock::decode(body).unwrap();
19+
let txs = block.txs();
20+
21+
txs.iter()
22+
.map(|x| mapper.map_tx(x))
23+
.map(|x| u5c::watch::AnyChainTx {
24+
chain: Some(u5c::watch::any_chain_tx::Chain::Cardano(x)),
25+
})
26+
.collect()
27+
}
28+
29+
fn roll_to_watch_response(
30+
mapper: &interop::Mapper<ledger::store::LedgerStore>,
31+
log: &wal::LogValue,
32+
) -> impl Stream<Item = u5c::watch::WatchTxResponse> {
33+
let txs: Vec<_> = match log {
34+
wal::LogValue::Apply(block) => block_to_txs(block, mapper)
35+
.into_iter()
36+
.map(u5c::watch::watch_tx_response::Action::Apply)
37+
.map(|x| u5c::watch::WatchTxResponse { action: Some(x) })
38+
.collect(),
39+
wal::LogValue::Undo(block) => block_to_txs(block, mapper)
40+
.into_iter()
41+
.map(u5c::watch::watch_tx_response::Action::Undo)
42+
.map(|x| u5c::watch::WatchTxResponse { action: Some(x) })
43+
.collect(),
44+
// TODO: shouldn't we have a u5c event for origin?
45+
wal::LogValue::Mark(..) => vec![],
46+
};
47+
48+
tokio_stream::iter(txs)
49+
}
50+
51+
pub struct WatchServiceImpl {
52+
wal: wal::redb::WalStore,
53+
mapper: interop::Mapper<ledger::store::LedgerStore>,
54+
}
55+
56+
impl WatchServiceImpl {
57+
pub fn new(wal: wal::redb::WalStore, ledger: ledger::store::LedgerStore) -> Self {
58+
Self {
59+
wal,
60+
mapper: interop::Mapper::new(ledger),
61+
}
62+
}
63+
}
64+
65+
#[async_trait::async_trait]
66+
impl u5c::watch::watch_service_server::WatchService for WatchServiceImpl {
67+
type WatchTxStream = Pin<
68+
Box<dyn Stream<Item = Result<u5c::watch::WatchTxResponse, tonic::Status>> + Send + 'static>,
69+
>;
70+
71+
async fn watch_tx(
72+
&self,
73+
request: Request<u5c::watch::WatchTxRequest>,
74+
) -> Result<Response<Self::WatchTxStream>, Status> {
75+
let _ = request.into_inner();
76+
77+
let from_seq = self
78+
.wal
79+
.find_tip()
80+
.map_err(|_err| Status::internal("can't read WAL"))?
81+
.map(|(x, _)| x)
82+
.unwrap_or_default();
83+
84+
let mapper = self.mapper.clone();
85+
86+
let stream = wal::WalStream::start(self.wal.clone(), from_seq)
87+
.flat_map(move |(_, log)| roll_to_watch_response(&mapper, &log))
88+
.map(Ok);
89+
90+
Ok(Response::new(Box::pin(stream)))
91+
}
92+
}

0 commit comments

Comments
 (0)