Skip to content

Commit dd4d3a8

Browse files
committed
horsed: use custom run_on_socket implement
1 parent 3763612 commit dd4d3a8

File tree

2 files changed

+67
-5
lines changed

2 files changed

+67
-5
lines changed

Cargo.lock

+7-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

horsed/src/ssh/mod.rs

+60
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use stable::{
2727
task::TaskManager,
2828
};
2929
use tokio::io::{AsyncReadExt, AsyncWriteExt};
30+
use tokio::net::TcpListener;
3031
use tokio::process::Command;
3132
use tokio::sync::Mutex;
3233

@@ -409,6 +410,7 @@ impl AppServer {
409410
cout.write_all(&buf[..len]).await?;
410411
}
411412

413+
tracing::info!("目录传输完成!");
412414
cout.shutdown().await?;
413415
handle.eof().await?;
414416
return Ok(());
@@ -454,6 +456,7 @@ impl AppServer {
454456
cout.write_all(&buf[..len]).await?;
455457
}
456458

459+
tracing::info!("文件传输完成!");
457460
cout.shutdown().await?;
458461
handle.eof().await?;
459462
return Ok(());
@@ -1004,6 +1007,7 @@ impl AppServer {
10041007
}
10051008
}
10061009

1010+
#[async_trait::async_trait]
10071011
impl Server for AppServer {
10081012
type Handler = Self;
10091013
/// 创建新连接
@@ -1016,6 +1020,62 @@ impl Server for AppServer {
10161020
fn handle_session_error(&mut self, error: <Self::Handler as Handler>::Error) {
10171021
tracing::error!("会话错误: {:?}", error);
10181022
}
1023+
1024+
async fn run_on_socket(
1025+
&mut self,
1026+
config: Arc<Config>,
1027+
socket: &TcpListener,
1028+
) -> Result<(), std::io::Error> {
1029+
if config.maximum_packet_size > 65535 {
1030+
tracing::error!(
1031+
"Maximum packet size ({:?}) should not larger than a TCP packet (65535)",
1032+
config.maximum_packet_size
1033+
);
1034+
}
1035+
1036+
let (error_tx, mut error_rx) = tokio::sync::mpsc::unbounded_channel();
1037+
let mut tm = TaskManager::default();
1038+
let handle = tm.spawn_handle();
1039+
1040+
loop {
1041+
tokio::select! {
1042+
accept_result = socket.accept() => {
1043+
match accept_result {
1044+
Ok((socket, _)) => {
1045+
let config = config.clone();
1046+
let handler = self.new_client(socket.peer_addr().ok());
1047+
let error_tx = error_tx.clone();
1048+
handle.spawn(async move {
1049+
let session = match run_stream(config, socket, handler).await {
1050+
Ok(s) => s,
1051+
Err(e) => {
1052+
let _ = error_tx.send(e);
1053+
panic!("Connection setup failed");
1054+
}
1055+
};
1056+
match session.await {
1057+
Ok(_) => tracing::debug!("Connection closed"),
1058+
Err(e) => {
1059+
tracing::debug!("Connection closed with error");
1060+
let _ = error_tx.send(e.into());
1061+
}
1062+
}
1063+
Ok(())
1064+
});
1065+
}
1066+
_ => break,
1067+
}
1068+
},
1069+
Some(error) = error_rx.recv() => {
1070+
self.handle_session_error(error);
1071+
}
1072+
}
1073+
}
1074+
1075+
tm.terminate();
1076+
1077+
Ok(())
1078+
}
10191079
}
10201080

10211081
#[async_trait::async_trait]

0 commit comments

Comments
 (0)