From e9803ba8f823af28806c7c30422f99ec15e21d07 Mon Sep 17 00:00:00 2001 From: Pakin Date: Mon, 30 Mar 2026 12:02:14 +0700 Subject: [PATCH] feat: reduce payload size, nodelay, timeout - reduce payload from 200 to 5 - change tcp connection to nodelay, enable no wait payload - add timeout 15 minutes for client with no heartbeat Signed-off-by: Pakin --- src/main.rs | 126 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 111 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index f6cb67a..037456b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,13 +11,14 @@ use axum::{ }, response::IntoResponse, routing::{get, post}, + serve::ListenerExt, }; use futures::{ SinkExt, StreamExt, stream::{SplitSink, SplitStream}, }; use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01}; -use log::{error, info}; +use log::{debug, error, info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use redis::{TypedCommands, cmd}; use serde::{Deserialize, Serialize}; @@ -28,6 +29,7 @@ use tokio::{ Mutex, mpsc::{self, Receiver, Sender}, }, + time::Instant, }; use uuid::Uuid; @@ -42,7 +44,8 @@ use crate::{ mod stream; mod tx; -const CHUNK_SIZE: usize = 200; +const CHUNK_SIZE: usize = 5; +const TIMEOUT: Duration = Duration::from_secs(60 * 15); // features // - get result from recipe_repo @@ -177,8 +180,38 @@ async fn handle_socket( let (tx, mut rx) = mpsc::channel::(2); let user_sys_rx = state.system_tx.subscribe(); - tokio::spawn(write(sender, rx)); - tokio::spawn(read(state, receiver, tx.clone(), user_sys_rx)); + let last_seen = Arc::new(Mutex::new(Instant::now())); + + let reader_last_seen = last_seen.clone(); + let watchdog_last_seen = last_seen.clone(); + + let sender = tokio::spawn(write(sender, rx)); + let reader = tokio::spawn(read( + state, + receiver, + tx.clone(), + user_sys_rx, + reader_last_seen, + )); + + let watchdog = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + let last = *watchdog_last_seen.lock().await; + if last.elapsed() > TIMEOUT { + warn!("Timeout close connection"); + let _ = tx + .send(serde_json::json!({ + "timeout": "watchdog" + })) + .await; + break; + } + } + }); + + let _ = tokio::join!(reader, sender, watchdog); + Ok(()) } @@ -269,7 +302,7 @@ async fn read( mut receiver: SplitStream, tx: Sender, mut system_rx: tokio::sync::broadcast::Receiver, - // cmd_atom: crossbeam_queue::ArrayQueue, + last_seen: Arc>, // cmd_atom: crossbeam_queue::ArrayQueue, ) -> Result<(), Box> { let redis = state.redis_cli.clone(); let config = state.dev_config.clone(); @@ -354,11 +387,38 @@ async fn read( match get_local_file(req_file) { Ok(mut f) => { + println!("get local file ok"); let mut file_content = String::new(); f.read_to_string(&mut file_content)?; + if !file_content.is_empty() { + info!("local file -> buffer OK"); + } + // split send - let recipe: Recipe = serde_json::from_str(&file_content)?; + let recipe: Recipe = match serde_json::from_str(&file_content) { + Ok(c) => c, + Err(e) => { + error!( + "error deserialize struct fail, file may be corrupted: {e:?}" + ); + if !file_content.ends_with("}") { + error!("File corrupted, invalid json format"); + } + + let _ = tx.send(serde_json::json!({ + "type": "notify", + "payload": { + "from": "system_tx", + "level": "error", + "msg": format!("Some requested file on cache is corrupt, {} version {}", recipe_param.country, latest_version), + "to": "" + } + })).await; + + return Err(e.into()); + } + }; throttle_send_recipe( &recipe, @@ -369,6 +429,7 @@ async fn read( .await; } Err(_) => { + println!("retry by fetching git"); let lvc = latest_version.clone(); // concurrent fetch for i in 1..6 { @@ -468,11 +529,17 @@ async fn read( .await; } } + "heartbeat" => { + *last_seen.lock().await = Instant::now(); + } _ => { // not implemented } } } + Message::Ping(_) => { + *last_seen.lock().await = Instant::now(); + } _ => { // unhanled, ignore } @@ -486,9 +553,26 @@ async fn write( mut rx: Receiver, ) -> Result<(), Box> { while let Some(res) = rx.recv().await { - // no check - // println!("sending {res:?}"); - info!("sending to client ..."); + // force close + if let Some(force_timeout_by) = res.get("timeout") + && let Some(from_who) = force_timeout_by.as_str() + && from_who.eq("watchdog") + { + warn!("receive close from watchdog"); + let _ = sender.send(Message::Close(None)).await; + break; + } + + let payload_size = res.to_string().len(); + + if payload_size >= 100000 { + // large payload + warn!( + "sending large payload to client ... ({})", + res.to_string().len() + ); + } + let _ = sender.send(res.to_string().into()).await; // リミットブレく - limito breaku!! (uncomment to slow down messages) @@ -607,10 +691,11 @@ async fn throttle_send_recipe( } } - let rp_clone = recipe.clone(); - tokio::task::spawn(async move { - rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json"))); - }); + // NOTE: disable from case concurrent write may causes corrupted file + // let rp_clone = recipe.clone(); + // tokio::task::spawn(async move { + // rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json"))); + // }); info!("sending {sid}"); // return sid; @@ -796,8 +881,19 @@ async fn main() -> Result<(), Box> { .nest("/docs", doc_router) .with_state(app_state); - let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}")).await?; - axum::serve(listener, app).await?; + // feature: no delay, full throttle + let nodelay_listener = || async { + tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}")) + .await + .unwrap() + .tap_io(|tcp_stream| { + if let Err(err) = tcp_stream.set_nodelay(true) { + error!("failed to set TCP_NODELAY on incoming connection: {err:#?}"); + } + }) + }; + + axum::serve(nodelay_listener().await, app).await?; Ok(()) }