Skip to content

Commit

Permalink
Close lobby if websocket of owner disconnects. Closes #27
Browse files Browse the repository at this point in the history
  • Loading branch information
myOmikron committed Apr 27, 2023
1 parent a6b6736 commit 196b3e0
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 4 deletions.
90 changes: 88 additions & 2 deletions src/chan/ws_manager_chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::collections::HashMap;

use actix_toolbox::ws;
use actix_toolbox::ws::Message;
use log::error;
use log::{error, info, warn};
use rorm::{delete, query, Database, Model};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Sender;
use tokio::sync::{mpsc, oneshot};
use tokio::task;
use uuid::Uuid;

use crate::models::{ChatRoom, Lobby, LobbyAccount};
use crate::server::handler::{AccountResponse, ChatMessage};

pub(crate) async fn start_ws_sender(tx: ws::Sender, mut rx: mpsc::Receiver<WsMessage>) {
Expand Down Expand Up @@ -177,6 +179,8 @@ pub type WsManagerChan = Sender<WsManagerMessage>;

/// Messages to control the websocket manager
pub enum WsManagerMessage {
/// The websocket was closed by the client (timeout, or closed event)
WebsocketClosed(Uuid),
/// Close the socket from the server side
CloseSocket(Uuid),
/// Client with given uuid initialized a websocket
Expand All @@ -203,14 +207,96 @@ pub enum WsManagerMessage {
/// Start the websocket manager
///
/// It will return a channel to this manager
pub async fn start_ws_manager() -> Result<WsManagerChan, String> {
pub async fn start_ws_manager(db: Database) -> Result<WsManagerChan, String> {
let mut lookup: HashMap<Uuid, Vec<Sender<WsMessage>>> = HashMap::new();

let (tx, mut rx) = mpsc::channel(16);

let rx_tx = tx.clone();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
match msg {
WsManagerMessage::WebsocketClosed(uuid) => {
lookup.remove(&uuid);

// Start cleanup task
let db = db.clone();
let cleanup_tx = rx_tx.clone();
tokio::spawn(async move {
let mut tx = match db.start_transaction().await {
Ok(tx) => tx,
Err(err) => {
error!("Database error: {err}");
return;
}
};

// Check if the account was a lobby owner
match query!(&mut tx, Lobby)
.condition(LobbyAccount::F.lobby.owner.equals(uuid.as_ref()))
.optional()
.await
{
Ok(lobby) => {
if let Some(mut lobby) = lobby {
info!(
"Closing lobby {} due to missing ws connection of owner {uuid}",
lobby.uuid
);

if let Err(err) =
Lobby::F.current_player.populate(&mut tx, &mut lobby).await
{
error!("Database error: {err}");
return;
}

if let Err(err) = delete!(&mut tx, ChatRoom)
.condition(
ChatRoom::F.uuid.equals(lobby.chat_room.key().as_ref()),
)
.await
{
error!("Database error: {err}");
return;
}

if let Err(err) = delete!(&mut tx, Lobby)
.condition(Lobby::F.uuid.equals(lobby.uuid.as_ref()))
.await
{
error!("Database error: {err}");
return;
}

// Queried beforehand
#[allow(clippy::unwrap_used)]
for player in lobby.current_player.cached.unwrap() {
if let Err(err) = cleanup_tx
.send(WsManagerMessage::SendMessage(
*player.player.key(),
WsMessage::LobbyClosed {
lobby_uuid: lobby.uuid,
},
))
.await
{
warn!("Could not send to ws manager chan: {err}");
}
}
}
}
Err(err) => {
error!("Database error: {err}");
return;
}
}

if let Err(err) = tx.commit().await {
error!("Database error: {err}");
}
});
}
WsManagerMessage::CloseSocket(uuid) => {
// Trigger close for all websockets associated with uuid
if let Some(sockets) = lookup.get(&uuid) {
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn main() -> Result<(), String> {
let db = get_db(&conf).await?;
info!("Connected to database");

let ws_manager_chan = start_ws_manager().await?;
let ws_manager_chan = start_ws_manager(db.clone()).await?;

if let Err(err) = start_server(&conf, db, ws_manager_chan).await {
error!("Error while starting server: {err}");
Expand Down
21 changes: 20 additions & 1 deletion src/server/handler/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use actix_web::web::{Data, Payload};
use actix_web::{get, HttpRequest, HttpResponse};
use bytes::Bytes;
use bytestring::ByteString;
use log::{debug, error};
use log::{debug, error, warn};
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use uuid::Uuid;
Expand Down Expand Up @@ -61,11 +61,19 @@ pub async fn websocket(
// Heartbeat task
let hb_tx = tx.clone();
let hb_time = last_hb.clone();
let hb_ws_manager = ws_manager_chan.clone();
let hb_uuid = uuid.clone();
tokio::spawn(async move {
loop {
if Instant::now().duration_since(*hb_time.lock().await) > CLIENT_TIMEOUT
&& hb_tx.close().await.is_ok()
{
if let Err(err) = hb_ws_manager
.send(WsManagerMessage::WebsocketClosed(hb_uuid))
.await
{
warn!("Could not send to ws_manager_chan: {err}");
}
debug!("Closed websocket due to missing heartbeat responses");
}

Expand All @@ -75,6 +83,8 @@ pub async fn websocket(
});

let rx_tx = tx.clone();
let rx_ws_manager = ws_manager_chan.clone();
let rx_uuid = uuid.clone();
tokio::spawn(async move {
while let Some(res) = rx.recv().await {
match res {
Expand All @@ -84,6 +94,15 @@ pub async fn websocket(
let mut r = last_hb.lock().await;
*r = Instant::now();
}
Message::Close(_) => {
debug!("Client closed websocket");
if let Err(err) = rx_ws_manager
.send(WsManagerMessage::WebsocketClosed(rx_uuid))
.await
{
warn!("Could not send to ws_manager_chan: {err}");
}
}
_ => {
invalid_msg!(rx_tx);
debug!("Received invalid message type via websocket");
Expand Down

0 comments on commit 196b3e0

Please sign in to comment.