feat: reduce payload size, nodelay, timeout

- reduce payload from 200 to 5
- change tcp connection to nodelay, enable no wait payload
- add timeout 15 minutes for client with no heartbeat

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-03-30 12:02:14 +07:00
parent bdb17de9c3
commit e9803ba8f8

View file

@ -11,13 +11,14 @@ use axum::{
},
response::IntoResponse,
routing::{get, post},
serve::ListenerExt,
};
use futures::{
SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01};
use log::{error, info};
use log::{debug, error, info, warn};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use redis::{TypedCommands, cmd};
use serde::{Deserialize, Serialize};
@ -28,6 +29,7 @@ use tokio::{
Mutex,
mpsc::{self, Receiver, Sender},
},
time::Instant,
};
use uuid::Uuid;
@ -42,7 +44,8 @@ use crate::{
mod stream;
mod tx;
const CHUNK_SIZE: usize = 200;
const CHUNK_SIZE: usize = 5;
const TIMEOUT: Duration = Duration::from_secs(60 * 15);
// features
// - get result from recipe_repo
@ -177,8 +180,38 @@ async fn handle_socket(
let (tx, mut rx) = mpsc::channel::<serde_json::Value>(2);
let user_sys_rx = state.system_tx.subscribe();
tokio::spawn(write(sender, rx));
tokio::spawn(read(state, receiver, tx.clone(), user_sys_rx));
let last_seen = Arc::new(Mutex::new(Instant::now()));
let reader_last_seen = last_seen.clone();
let watchdog_last_seen = last_seen.clone();
let sender = tokio::spawn(write(sender, rx));
let reader = tokio::spawn(read(
state,
receiver,
tx.clone(),
user_sys_rx,
reader_last_seen,
));
let watchdog = tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
let last = *watchdog_last_seen.lock().await;
if last.elapsed() > TIMEOUT {
warn!("Timeout close connection");
let _ = tx
.send(serde_json::json!({
"timeout": "watchdog"
}))
.await;
break;
}
}
});
let _ = tokio::join!(reader, sender, watchdog);
Ok(())
}
@ -269,7 +302,7 @@ async fn read(
mut receiver: SplitStream<WebSocket>,
tx: Sender<serde_json::Value>,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
// cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>,
last_seen: Arc<Mutex<Instant>>, // cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let redis = state.redis_cli.clone();
let config = state.dev_config.clone();
@ -354,11 +387,38 @@ async fn read(
match get_local_file(req_file) {
Ok(mut f) => {
println!("get local file ok");
let mut file_content = String::new();
f.read_to_string(&mut file_content)?;
if !file_content.is_empty() {
info!("local file -> buffer OK");
}
// split send
let recipe: Recipe = serde_json::from_str(&file_content)?;
let recipe: Recipe = match serde_json::from_str(&file_content) {
Ok(c) => c,
Err(e) => {
error!(
"error deserialize struct fail, file may be corrupted: {e:?}"
);
if !file_content.ends_with("}") {
error!("File corrupted, invalid json format");
}
let _ = tx.send(serde_json::json!({
"type": "notify",
"payload": {
"from": "system_tx",
"level": "error",
"msg": format!("Some requested file on cache is corrupt, {} version {}", recipe_param.country, latest_version),
"to": ""
}
})).await;
return Err(e.into());
}
};
throttle_send_recipe(
&recipe,
@ -369,6 +429,7 @@ async fn read(
.await;
}
Err(_) => {
println!("retry by fetching git");
let lvc = latest_version.clone();
// concurrent fetch
for i in 1..6 {
@ -468,11 +529,17 @@ async fn read(
.await;
}
}
"heartbeat" => {
*last_seen.lock().await = Instant::now();
}
_ => {
// not implemented
}
}
}
Message::Ping(_) => {
*last_seen.lock().await = Instant::now();
}
_ => {
// unhanled, ignore
}
@ -486,9 +553,26 @@ async fn write(
mut rx: Receiver<serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
while let Some(res) = rx.recv().await {
// no check
// println!("sending {res:?}");
info!("sending to client ...");
// force close
if let Some(force_timeout_by) = res.get("timeout")
&& let Some(from_who) = force_timeout_by.as_str()
&& from_who.eq("watchdog")
{
warn!("receive close from watchdog");
let _ = sender.send(Message::Close(None)).await;
break;
}
let payload_size = res.to_string().len();
if payload_size >= 100000 {
// large payload
warn!(
"sending large payload to client ... ({})",
res.to_string().len()
);
}
let _ = sender.send(res.to_string().into()).await;
// リミットブレく - limito breaku!! (uncomment to slow down messages)
@ -607,10 +691,11 @@ async fn throttle_send_recipe(
}
}
let rp_clone = recipe.clone();
tokio::task::spawn(async move {
rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json")));
});
// NOTE: disable from case concurrent write may causes corrupted file
// let rp_clone = recipe.clone();
// tokio::task::spawn(async move {
// rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json")));
// });
info!("sending {sid}");
// return sid;
@ -796,8 +881,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.nest("/docs", doc_router)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}")).await?;
axum::serve(listener, app).await?;
// feature: no delay, full throttle
let nodelay_listener = || async {
tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}"))
.await
.unwrap()
.tap_io(|tcp_stream| {
if let Err(err) = tcp_stream.set_nodelay(true) {
error!("failed to set TCP_NODELAY on incoming connection: {err:#?}");
}
})
};
axum::serve(nodelay_listener().await, app).await?;
Ok(())
}