Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: wrapping jsonrpsee:TowerService is a new type pattern is not possible #1383

Open
niklasad1 opened this issue May 30, 2024 · 0 comments

Comments

@niklasad1
Copy link
Member

niklasad1 commented May 30, 2024

So after upgrading hyper to v1.0 users may run into rust-lang/rust#102211 where the error type can't be deduced to Box<std::error::Error + Send Sync + 'static

For instance when using the tower::service_fn such as:

    let svc = tower::service_fn(move |req| {
        let stop_handle = stop_handle2.clone();
        let svc_builder = svc_builder2.clone();
        let methods = methods2.clone();

        let mut svc = svc_builder.build(methods, stop_handle.clone());

        // It's not possible to know whether the websocket upgrade handshake failed or not here.
        let is_websocket = ws::is_upgrade_request(&req);

        if is_websocket {
            println!("websocket")
        } else {
            println!("http")
        }
        
        // This won't work without converting the error to a concrete error....
        async move { svc.call(req).await.map_err(|e| anyhow::anyhow!("{:?}", e)) }
    });

This tower:service_fn is recommended because dealing with the trait bounds for the service itself is hard and tricky
but it's also possible to wrap the TowerService/TowerServiceBuilder in a new type with code like this:

#[derive(Clone)]
struct Svc<RpcMiddleware, HttpMiddleware> {
	svc_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
	methods: Methods,
	stop_handle: StopHandle,
	metrics: Metrics,
}

impl<RpcMiddleware, HttpMiddleware, B> tower::Service<HttpRequest<B>> for Svc<RpcMiddleware, HttpMiddleware>
where
	RpcMiddleware: for<'a> tower::Layer<RpcService> Send + Clone,
	<RpcMiddleware as tower::Layer<RpcService>>::Service: Send + Sync + 'static,
	for<'a> <RpcMiddleware as tower::Layer<RpcService>>::Service: RpcServiceT<'a>,
	HttpMiddleware: tower::Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static + Clone,
	<HttpMiddleware as tower::Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service:
		Send + tower::Service<HttpRequest<B>, Response = HttpResponse, Error = BoxError>,
	<<HttpMiddleware as tower::Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as tower::Service<HttpRequest<B>>>::Future:
		Send + 'static,
	B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
	B::Error: Into<BoxError>,
{
	type Error = BoxError;
	type Response = HttpResponse;
	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

	fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
		let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
		let transport_label = if is_websocket { "ws" } else { "http" };

		let metrics = self.metrics.clone();


		let mut svc = self.svc_builder.clone().build(self.methods.clone(), self.stop_handle.clone());

		if is_websocket {
			// Utilize the session close future to know when the actual WebSocket
			// session was closed.
			let session_close = svc.on_session_closed();

			// A little bit weird API but the response to HTTP request must be returned below
			// and we spawn a task to register when the session is closed.
			tokio::spawn(async move {
				session_close.await;
				tracing::info!("Closed WebSocket connection");
				metrics.closed_ws_connections.fetch_add(1, Ordering::Relaxed);
			});

			async move {
				tracing::info!("Opened WebSocket connection");
				metrics.opened_ws_connections.fetch_add(1, Ordering::Relaxed);
				svc.call(req).await
			}
			.boxed()
		} else {
			// HTTP.
			async move {
				tracing::info!("Opened HTTP connection");
				metrics.http_calls.fetch_add(1, Ordering::Relaxed);
				let rp = svc.call(req).await;

				if rp.is_ok() {
					metrics.success_http_calls.fetch_add(1, Ordering::Relaxed);
				}

				tracing::info!("Closed HTTP connection");
				rp
			}
			.boxed()
		}
	}

	fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}
}

So, the question is really if we should re-export TowerNoHttpService and add an example with this mess?
Maybe it's possible to simplify the trait bounds somehow?

//cc @jsdw @lexnv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant