Skip to content

Commit 6db2794

Browse files
fix: Handle routing using worker header
1 parent e8f0876 commit 6db2794

File tree

5 files changed

+114
-2
lines changed

5 files changed

+114
-2
lines changed

Cargo.lock

+3-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

instance/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition = "2021"
77
balius-runtime = { git = "https://github.com/gonzalezzfelipe/balius.git", branch="feat/support-object-urls", features = ["http", "aws"] }
88
config = { version = "0.15.9", default-features = false, features = ["toml", "json"] }
99
futures-util = "0.3.30"
10+
hex = "0.4.3"
1011
k8s-openapi = { version = "0.24.0", features = ["latest"] }
1112
kube = { version = "0.98.0", features = ["runtime", "client", "derive"] }
1213
lazy_static = "1.4.0"
@@ -21,3 +22,4 @@ tokio-util = "0.7.13"
2122
tracing = "0.1.40"
2223
tracing-subscriber = "0.3.18"
2324
url = "2.5.4"
25+
warp = { version = "0.3.7" }

instance/src/main.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use tracing::{debug, warn, Level};
55

66
mod config;
77
mod runtime;
8+
mod server;
89

910
async fn wait_for_exit_signal() {
1011
let mut sigterm =
@@ -59,7 +60,7 @@ async fn main() -> miette::Result<()> {
5960
let cancel = hook_exit_token();
6061

6162
let jsonrpc_server = async {
62-
balius_runtime::drivers::jsonrpc::serve(config.rpc.clone(), runtime.clone(), cancel.clone())
63+
server::serve(config.rpc.clone(), runtime.clone(), cancel.clone())
6364
.await
6465
.into_diagnostic()
6566
.context("Running JsonRPC server")

instance/src/server.rs

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use serde::{Deserialize, Serialize};
2+
use serde_json::json;
3+
use std::net::SocketAddr;
4+
use tokio_util::sync::CancellationToken;
5+
use tracing::{debug, error};
6+
use warp::Filter as _;
7+
8+
use balius_runtime::{wit, Error, Runtime};
9+
10+
#[derive(Deserialize)]
11+
struct Request {
12+
pub id: Option<String>,
13+
pub method: String,
14+
pub params: serde_json::Value,
15+
}
16+
17+
#[derive(Serialize)]
18+
struct ErrorResponse {
19+
error: String,
20+
}
21+
22+
fn parse_request(body: serde_json::Value) -> Result<Request, ErrorResponse> {
23+
match serde_json::from_value(body) {
24+
Ok(x) => Ok(x),
25+
Err(x) => Err(ErrorResponse {
26+
error: x.to_string(),
27+
}),
28+
}
29+
}
30+
31+
pub async fn handle_request(
32+
runtime: Runtime,
33+
worker: String,
34+
body: serde_json::Value,
35+
) -> warp::reply::Json {
36+
let request = match parse_request(body) {
37+
Ok(x) => x,
38+
Err(err) => return warp::reply::json(&err),
39+
};
40+
41+
debug!(
42+
worker,
43+
id = request.id,
44+
method = request.method,
45+
"handling request"
46+
);
47+
48+
let params = serde_json::to_vec(&request.params).unwrap();
49+
50+
let reply = runtime
51+
.handle_request(&worker, &request.method, params)
52+
.await;
53+
54+
match reply {
55+
Ok(x) => {
56+
debug!(worker, id = request.id, "request successful");
57+
58+
let x = match x {
59+
wit::Response::Acknowledge => json!({}),
60+
wit::Response::Json(x) => serde_json::from_slice(&x).unwrap(),
61+
wit::Response::Cbor(x) => json!({ "cbor": hex::encode(x) }),
62+
wit::Response::PartialTx(x) => json!({ "tx": hex::encode(x) }),
63+
};
64+
65+
warp::reply::json(&x)
66+
}
67+
Err(err) => {
68+
error!(worker, id = request.id, "request failed");
69+
warp::reply::json(&ErrorResponse {
70+
error: err.to_string(),
71+
})
72+
}
73+
}
74+
}
75+
76+
pub async fn serve(
77+
config: balius_runtime::drivers::jsonrpc::Config,
78+
runtime: Runtime,
79+
cancel: CancellationToken,
80+
) -> Result<(), Error> {
81+
let filter = warp::any()
82+
.map(move || runtime.clone())
83+
.and(warp::header::<String>("worker"))
84+
.and(warp::post())
85+
.and(warp::body::json())
86+
.then(handle_request);
87+
88+
let address: SocketAddr = config
89+
.listen_address
90+
.parse()
91+
.map_err(|x: std::net::AddrParseError| Error::Config(x.to_string()))?;
92+
93+
let (addr, server) =
94+
warp::serve(filter).bind_with_graceful_shutdown(address, cancel.cancelled_owned());
95+
96+
tracing::info!(%addr, "Json-RPC server listening");
97+
98+
server.await;
99+
100+
Ok(())
101+
}

proxy/src/proxy.rs

+6
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ impl ProxyHttp for BaliusProxy {
142142
ctx.consumer.network, self.config.balius_dns, self.config.balius_port
143143
);
144144

145+
// replace existing header if any
146+
session
147+
.req_header_mut()
148+
.insert_header("worker", &ctx.consumer.port_name)
149+
.unwrap();
150+
145151
if self.limiter(&ctx.consumer).await? {
146152
session.respond_error(429).await?;
147153
return Ok(true);

0 commit comments

Comments
 (0)