Skip to content

Commit

Permalink
Fixed websocket close handling
Browse files Browse the repository at this point in the history
  • Loading branch information
myOmikron committed Apr 29, 2023
1 parent 44f2202 commit aa57b5e
Showing 1 changed file with 38 additions and 29 deletions.
67 changes: 38 additions & 29 deletions src/server/handler/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tokio::sync::Mutex;
use uuid::Uuid;

use crate::chan::{WsManagerChan, WsManagerMessage, WsMessage};
use crate::invalid_msg;
use crate::server::handler::ApiError;
use crate::{invalid_msg, send_to_ws};

struct CommonMessages {
invalid_message: ByteString,
Expand Down Expand Up @@ -78,7 +78,20 @@ pub async fn websocket(
}

tokio::time::sleep(Duration::from_secs(10)).await;
send_to_ws!(hb_tx, Message::Ping(Bytes::from("")));

if let Err(err) = hb_tx.send(Message::Ping(Bytes::from(""))).await {
if let MailboxError::Closed = err {
debug!("Websocket closed");
if let Err(err) = hb_ws_manager
.send(WsManagerMessage::WebsocketClosed(hb_uuid))
.await
{
warn!("Could not send to ws_manager_chan: {err}");
}
break;
}
debug!("Sending to ran into tx timeout");
};
}
});

Expand All @@ -89,20 +102,25 @@ pub async fn websocket(
while let Some(res) = rx.recv().await {
match res {
Ok(msg) => match msg {
Message::Ping(req) => send_to_ws!(rx_tx, Message::Pong(req)),
Message::Ping(req) => {
if let Err(err) = rx_tx.send(Message::Ping(req)).await {
if let MailboxError::Closed = err {
debug!("Websocket closed");
if let Err(err) = rx_ws_manager
.send(WsManagerMessage::WebsocketClosed(rx_uuid))
.await
{
warn!("Could not send to ws_manager_chan: {err}");
}
break;
}
debug!("Sending to ran into tx timeout");
}
}
Message::Pong(_) => {
let mut r = last_hb.lock().await;
*r = Instant::now();
}
Message::Close(_) => {
debug!("Client closed websocket");
if let Err(err) = rx_ws_manager
.send(WsManagerMessage::WebsocketClosed(rx_uuid))
.await
{
warn!("Could not send to ws_manager_chan: {err}");
}
}
_ => {
invalid_msg!(rx_tx);
debug!("Received invalid message type via websocket");
Expand All @@ -113,6 +131,14 @@ pub async fn websocket(
}
}
}

debug!("Websocket closed");
if let Err(err) = rx_ws_manager
.send(WsManagerMessage::WebsocketClosed(rx_uuid))
.await
{
warn!("Could not send to ws_manager_chan: {err}");
}
});

// Give sender to ws manager
Expand Down Expand Up @@ -148,20 +174,3 @@ macro_rules! invalid_msg {
}
};
}

/// Use this macro to send arbitrary messages to the websocket
///
/// First argument is the websocket
/// Second argument the Message to send.
#[macro_export]
macro_rules! send_to_ws {
($tx:expr, $msg:expr) => {
if let Err(err) = $tx.send($msg).await {
if let MailboxError::Closed = err {
debug!("Websocket closed");
break;
}
debug!("Sending to ran into tx timeout");
}
};
}

0 comments on commit aa57b5e

Please sign in to comment.