diff --git a/Cargo.lock b/Cargo.lock index a9b5829..a1097a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3236,6 +3236,7 @@ dependencies = [ "tokio", "tokio-cron-scheduler", "tokio-stream", + "tokio-util", "uuid", "wasmtime", "wasmtime-wasi", diff --git a/Cargo.toml b/Cargo.toml index f820be2..65162be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ sqlx = { version = "0.8.6", features = ["runtime-tokio", "tls-rustls", "sqlite"] tokio = { version = "1.49.0", features = ["full"] } tokio-cron-scheduler = "0.15.1" tokio-stream = "0.1.18" +tokio-util = "0.7.18" uuid = { version = "1.20.0", features = ["v4"] } wasmtime = { version = "44.0.1", features = ["async"] } wasmtime-wasi = "44.0.1" diff --git a/src/app.rs b/src/app.rs index 941d2fe..b6ce4c8 100644 --- a/src/app.rs +++ b/src/app.rs @@ -33,6 +33,7 @@ pub struct DevConfig { pub api_redis_url: String, pub api_resolver: String, pub api_sheet_endpoints: Arc>>, + pub allowed_origins: Vec, } impl DevConfig { @@ -51,9 +52,15 @@ impl DevConfig { api_redis_url, api_resolver, api_sheet_endpoints, + allowed_origins: Vec::new(), } } + pub fn with_allowed_origins(&mut self, raw_origin: &str) -> &mut Self { + self.allowed_origins = raw_origin.split(",").map(|x| x.to_string()).collect(); + self + } + pub fn get_recipe_url(&self) -> String { format!("{}{}", self.api_domain, self.api_recipe_service) } @@ -126,69 +133,7 @@ impl AppState { // backup job let dev_config_backup = dev_config.clone(); - tokio::spawn(async move { - let m_cfg = dev_config_backup.clone(); - - loop { - // auto sync - if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { - warn!("pulling repo unhealthy, retry again in 5 minutes"); - continue; - } - - match read_dir(".").await { - Ok(mut d) => { - while let Ok(Some(entry)) = d.next_entry().await { - let ent_path = entry.path(); - - if let Some(filename) = ent_path.file_name() - && let Some(filename_str) = filename.to_str() - && filename_str.starts_with("gtx") - && filename_str.ends_with(".json") - { - // read file - // - let f = match File::open(ent_path.clone()) { - Ok(f) => f, - Err(_) => continue, - }; - - let buf = BufReader::new(f); - - let commit_from_backup: CommitPayload = - match serde_json::from_reader(buf) { - Ok(cm) => cm, - Err(_) => continue, - }; - - if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { - warn!("pulling repo unhealthy, retry again in 5 minutes"); - continue; - } - - let _ = - invoke_commit_request(m_cfg.clone(), commit_from_backup).await; - - if invoke_push_request(m_cfg.clone()).await.is_ok() { - // push success - info!("push backup success"); - if fs::remove_file(ent_path.clone()).is_ok() { - info!("clean backup"); - } - } - } else { - continue; - } - } - } - Err(_) => {} - } - - info!("[backup] idle"); - - tokio::time::sleep(Duration::from_mins(5)).await; - } - }); + // NOTE: removed backup process, let each app handled by themselves tokio::spawn(async move { let mut lredis = redis_cli_clone.clone(); @@ -202,7 +147,7 @@ impl AppState { let sys_msg = crate::websocket::helper::convert_sys_msg_command(&rmsg); // add queue process - let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) { + let command_req: CommandRequestPayload = match safe_deserialize(&rmsg) { Ok(cmd) => cmd, Err(e) => { if sys_msg.is_none() { @@ -291,6 +236,8 @@ impl AppState { } }); + // spawn product sync process + result } } @@ -439,11 +386,13 @@ pub async fn initialize() -> Result<(), Box> { let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver"); + let allowed_origins = env::var("ALLOWED_ORIGINS").expect("allowed origin not provided"); + // read up sheet config // let sheet_endpoint_config = read_sheet_config()?; - let dev_cfg = crate::app::DevConfig::new( + let mut dev_cfg = crate::app::DevConfig::new( api_key, api_domain, api_recipe_service, @@ -451,6 +400,7 @@ pub async fn initialize() -> Result<(), Box> { api_resolver, Arc::new(Mutex::new(sheet_endpoint_config)), ); + dev_cfg = dev_cfg.with_allowed_origins(&allowed_origins).clone(); // test_send(dev_cfg).await?; // @@ -458,7 +408,7 @@ pub async fn initialize() -> Result<(), Box> { let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::(16); - let app_state = AppState::new(dev_cfg, redis_cli, sys_tx, sys_rx).await; + let app_state = AppState::new(dev_cfg.clone(), redis_cli, sys_tx, sys_rx).await; let rp_router = create_recipe_repo_router().await; // let doc_router = create_tx_patcher_route().await; diff --git a/src/websocket/core.rs b/src/websocket/core.rs index 19d5bc6..3c6a898 100644 --- a/src/websocket/core.rs +++ b/src/websocket/core.rs @@ -1,12 +1,14 @@ use std::time::Duration; +use serde::Deserialize; + /// CONFIG: chunk size for each payload /// /// note: using in sending recipe pub const CHUNK_SIZE: usize = 5; /// CONFIG: default timeout for each socket connection -pub const TIMEOUT: Duration = Duration::from_secs(60 * 15); +pub const TIMEOUT: Duration = Duration::from_secs(60 * 5); #[derive(Clone)] pub enum TxControlMessage { @@ -20,3 +22,30 @@ pub enum UserWebSocketAuthState { } pub type WebsocketMessageResult = Result<(), Box>; + +pub fn safe_deserialize<'de, T>(value: &'de serde_json::Value) -> Result +where + T: Deserialize<'de>, +{ + let sanitized = sanitize_json_value(value); + T::deserialize(sanitized) +} + +fn sanitize_json_value(value: &serde_json::Value) -> serde_json::Value { + match value { + serde_json::Value::Object(map) => { + let mut sanitized = serde_json::Map::new(); + for (k, v) in map { + if k == "__proto__" || k == "constructor" || k == "prototype" { + continue; + } + sanitized.insert(k.clone(), sanitize_json_value(v)); + } + serde_json::Value::Object(sanitized) + } + serde_json::Value::Array(arr) => { + serde_json::Value::Array(arr.iter().map(sanitize_json_value).collect()) + } + _ => value.clone(), + } +} diff --git a/src/websocket/handler.rs b/src/websocket/handler.rs index ea04f9e..2cd917b 100644 --- a/src/websocket/handler.rs +++ b/src/websocket/handler.rs @@ -1,6 +1,6 @@ use axum::{ Json, - extract::{State, WebSocketUpgrade, ws::WebSocket}, + extract::{Request, State, WebSocketUpgrade, ws::WebSocket}, response::IntoResponse, }; use futures::StreamExt; @@ -95,10 +95,26 @@ pub async fn request_api_session_key( pub async fn websocket_handler( State(state): State>, ws: WebSocketUpgrade, + req: Request, ) -> impl IntoResponse { let state_clone = Arc::clone(&state); let hub_clone = Arc::clone(&state_clone.connectors_mapping); + let origin = req + .headers() + .get("origin") + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + if !state + .dev_config + .allowed_origins + .contains(&origin.to_string()) + { + warn!("unexpected origin: {}", origin); + return (axum::http::StatusCode::FORBIDDEN, "".to_string()).into_response(); + } + // let mut uid_n = String::new(); // if let Some(uid) = headers.get("x-auth-uid") { @@ -143,11 +159,11 @@ async fn handle_socket( Uuid::new_v4().to_string() )))); - let temp_session = user.try_lock().unwrap().to_string(); + let temp_session = user.lock().await.to_string(); info!("{} connected", temp_session); { - let mut h = hub.try_lock().unwrap(); + let mut h = hub.lock().await; h.clients.insert(temp_session.clone(), tx.clone()); } @@ -158,16 +174,16 @@ async fn handle_socket( let reader_last_seen = last_seen.clone(); let watchdog_last_seen = last_seen.clone(); - let sender = tokio::spawn(super::rw::write(sender, rx, user.clone())); + let sender = tokio::spawn(super::rw::write(sender, rx, user.clone(), hub.clone())); let reader = tokio::spawn(super::rw::read( state, receiver, tx.clone(), - user_sys_rx, reader_last_seen, user.clone(), hub.clone(), )); + let callback_to_client = super::rw::recv_sys_msg_send_back_client(tx.clone(), user_sys_rx); let watchdog = super::tasks::watchdog::get_watchdog_task( tx, @@ -176,7 +192,29 @@ async fn handle_socket( hub.clone(), ); - let _ = tokio::join!(reader, sender, watchdog); + let (rf, sf, cbc, wds) = tokio::join!(reader, sender, callback_to_client, watchdog); + + if let Ok(rf_js) = rf + && let Ok(sf_js) = sf + { + info!( + "read end ok: {}, write end ok: {} [{}]", + rf_js.is_ok(), + sf_js.is_ok(), + user.clone().lock().await.to_string() + ); + if !cbc.is_finished() { + info!("sys rx still running"); + cbc.abort(); + + if cbc.await.unwrap_err().is_cancelled() { + info!("sys rx force stop ..."); + } + } + if !wds.is_finished() { + info!("watchdog still existed"); + } + } Ok(()) } diff --git a/src/websocket/helper.rs b/src/websocket/helper.rs index 91521ff..779cb71 100644 --- a/src/websocket/helper.rs +++ b/src/websocket/helper.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, fs::File, io::BufReader}; +use crate::websocket::core::safe_deserialize; + use super::model::*; use axum::extract::ws::{CloseFrame, Message, WebSocket}; use redis::{TypedCommands, cmd}; @@ -61,7 +63,7 @@ pub fn convert_ack_command(cmd_req: &serde_json::Value) -> Option Option { - match serde_json::from_value(msg.clone()) { + match safe_deserialize(msg) { Ok(req) => Some(req), Err(_) => None, } diff --git a/src/websocket/model.rs b/src/websocket/model.rs index da738bf..21b8464 100644 --- a/src/websocket/model.rs +++ b/src/websocket/model.rs @@ -140,3 +140,24 @@ pub struct CommitPayload { // use default backup method impl Backup for CommitPayload {} + +impl From for WebsocketMessageRequest { + fn from(value: CommitPayload) -> Self { + WebsocketMessageRequest { + type_w: "commit_part".to_string(), + payload: Some(serde_json::json!({ + "commit": value, + "plugin": "apply_recipe" + })), + } + } +} + +/// For getting list of menus in recipe +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RequestMenuListPayload { + /// User info expect at least id, token, name + pub user_info: serde_json::Value, + /// target country to get recipe, version will always use latest + pub country: String, +} diff --git a/src/websocket/plugins.rs b/src/websocket/plugins.rs index 4e25bf1..b7e1f5a 100644 --- a/src/websocket/plugins.rs +++ b/src/websocket/plugins.rs @@ -13,7 +13,7 @@ use wasmtime_wasi_http::{ p2::{WasiHttpCtxView, WasiHttpView}, }; -use crate::websocket::model::WebsocketMessageRequest; +use crate::websocket::{core::safe_deserialize, model::WebsocketMessageRequest}; wasmtime::component::bindgen!({ path: "plugins/plugin.wit", @@ -92,6 +92,7 @@ async fn call_plugin_logic(engine: &Engine, component: &Component, input: String let ctx = WasiCtxBuilder::new() .inherit_stdout() .inherit_stderr() + .inherit_env() .build(); let http_ctx = WasiHttpCtx::new(); let mut store = Store::new( @@ -114,9 +115,14 @@ async fn call_plugin_logic(engine: &Engine, component: &Component, input: String return String::new(); } - let instance_result = PluginWorld::instantiate_async(&mut store, component, &linker) - .await - .expect("Failed to instantiate plugin"); + let instance_result = match PluginWorld::instantiate_async(&mut store, component, &linker).await + { + Ok(r) => r, + Err(e) => { + error!("unable to instantiate plugin: {e}"); + return String::new(); + } + }; // 3. Call the exported function from the WIT 'handler' interface match instance_result @@ -149,7 +155,7 @@ pub async fn call_plugin_if_existed( return req.clone(); } - let plugin_payload: PluginPayload = match serde_json::from_value(req.clone().payload.unwrap()) { + let plugin_payload: PluginPayload = match safe_deserialize(&req.clone().payload.unwrap()) { Ok(p) => p, Err(_) => return req, }; @@ -166,8 +172,13 @@ pub async fn call_plugin_if_existed( for ap in apply_plugins { if all_plugins.contains_key(&ap) { - let component = - Component::from_file(&engine, all_plugins.get(&ap).unwrap()).unwrap(); + let component = match Component::from_file(&engine, all_plugins.get(&ap).unwrap()) { + Ok(c) => c, + Err(e) => { + error!("plugin not found! {ap}"); + continue; + } + }; res_str = call_plugin_logic(&engine, &component, res_str).await; } diff --git a/src/websocket/rw.rs b/src/websocket/rw.rs index 9051e02..e9952f4 100644 --- a/src/websocket/rw.rs +++ b/src/websocket/rw.rs @@ -3,20 +3,24 @@ use crate::{ app::*, websocket::{plugins::call_plugin_if_existed, tasks}, }; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{Arc, atomic::AtomicBool}, + time::Duration, +}; use axum::extract::ws::{Message, WebSocket}; use futures::{ SinkExt, StreamExt, stream::{SplitSink, SplitStream}, }; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use tokio::{ sync::{ Mutex, mpsc::{Receiver, Sender}, }, + task::JoinHandle, time::Instant, }; use wasmtime::{Config, Engine}; @@ -26,28 +30,12 @@ pub async fn read( state: Arc, mut receiver: SplitStream, tx: Sender, - mut system_rx: tokio::sync::broadcast::Receiver, last_seen: Arc>, // cmd_atom: crossbeam_queue::ArrayQueue, uid: Arc>, hub: Arc>, ) -> Result<(), Box> { let redis = state.redis_cli.clone(); let config = state.dev_config.clone(); - let tx_to_client = tx.clone(); - - tokio::spawn(async move { - // Send back to client from services - while let Ok(s_msg) = system_rx.recv().await { - if convert_sys_msg_command(&s_msg).is_some() - && let Some(err) = tx_to_client - .send(TxControlMessage::Payload(s_msg)) - .await - .err() - { - error!("[SYS] failed to send back to client: {err}"); - } - } - }); let uid_clone = uid.clone(); @@ -66,38 +54,54 @@ pub async fn read( // info!("get msg: {}", req.type_w); match req.type_w.as_str() { "recipe" if req.payload.is_some() => { - tasks::recipe::handle_recipe_request( + if tasks::recipe::handle_recipe_request( config.clone(), redis.clone(), tx.clone(), req, uid_clone.clone(), ) - .await?; + .await + .is_err() + { + continue; + } } "recipe_versions" if req.payload.is_some() => { - tasks::recipe::handle_recipe_versions_list_request( + if tasks::recipe::handle_recipe_versions_list_request( config.clone(), redis.clone(), tx.clone(), req, uid_clone.clone(), ) - .await?; + .await + .is_err() + { + continue; + } } "price" if req.payload.is_some() => { - tasks::price::handle_price_request( + if tasks::price::handle_price_request( config.clone(), redis.clone(), tx.clone(), req, uid_clone.clone(), ) - .await?; + .await + .is_err() + { + continue; + } } "command" if req.payload.is_some() => { - tasks::command::handle_command_request(state.clone(), tx.clone(), req) - .await?; + if tasks::command::handle_command_request(state.clone(), tx.clone(), req) + .await + .is_err() + { + continue; + } } "heartbeat" => { let new_updated_time = Instant::now(); @@ -132,7 +136,7 @@ pub async fn read( } "log_report" if let Some(log_payload) = req.payload => { let log_report_payload: LogReportPayload = - match serde_json::from_value(log_payload) { + match safe_deserialize(&log_payload) { Ok(lreq) => lreq, Err(e) => { error!("error deserialize body log request: {e:?} ---> Skip"); @@ -164,6 +168,20 @@ pub async fn read( .await?; } + "list_menu" if req.payload.is_some() => { + if tasks::recipe::handle_request_list_menu_recipe( + config.clone(), + redis.clone(), + tx.clone(), + req, + uid_clone.clone(), + ) + .await + .is_err() + { + continue; + } + } _ => { // not implemented } @@ -173,7 +191,7 @@ pub async fn read( *last_seen.lock().await = Instant::now(); } Message::Close(_) => { - info!("get close message"); + info!("[read] get close message"); // remove current uid { @@ -205,6 +223,9 @@ pub async fn read( } } } + + info!("[read] canceling sys rx ..."); + Ok(()) } @@ -212,6 +233,7 @@ pub async fn write( mut sender: SplitSink, mut rx: Receiver, uid: Arc>, + hub: Arc>, ) -> Result<(), Box> { while let Some(res) = rx.recv().await { match res { @@ -221,7 +243,7 @@ pub async fn write( && let Some(from_who) = force_timeout_by.as_str() && (from_who.eq("watchdog") || from_who.eq("disconnection")) { - warn!("receive close from {from_who}"); + warn!("[write] receive close from {from_who}"); if from_who.eq("disconnection") { let _ = sender.close().await; @@ -232,7 +254,7 @@ pub async fn write( break; } - let current_uid = uid.try_lock().unwrap(); + let current_uid = uid.lock().await; if let Some(res_n) = res.as_object() && let Some(res_payload) = res_n.get("payload") @@ -247,14 +269,47 @@ pub async fn write( if payload_size >= 100000 { // large payload warn!( - "sending large payload to client ... ({})", + "[write] sending large payload to client ... ({})", res.to_string().len() ); } let _ = sender.send(res.to_string().into()).await; } else { - warn!("failed to send message, as the receiver not detected: {res:?}"); + // show error by case + let clients: Vec = hub + .lock() + .await + .clients + .keys() + .map(|x| x.to_string()) + .collect(); + + // step errors + if let Some(res_n) = res.as_object() + && let Some(res_payload) = res_n.get("payload") + { + if let Some(res_payload_val) = res_payload.as_object() { + if let Some(recv_ident) = res_payload_val.get("to") + && let Some(recv_ident_str) = recv_ident.as_str() + { + // has recp + if clients.contains(&recv_ident_str.to_string()) + && current_uid.ne(&recv_ident_str.to_string()) + { + warn!("oops! receiving other receiver's messages. Ignore this"); + } else { + error!("receiver not existed or already went offline"); + } + } else { + error!("failed to send message, as the receiver not detected"); + } + } else { + error!("incorrect type: payload not object") + } + } else { + error!("incorrect format: missing payload or response is not object"); + } } } TxControlMessage::CloseExist => { @@ -268,3 +323,32 @@ pub async fn write( } Ok(()) } + +pub async fn recv_sys_msg_send_back_client( + tx: Sender, + mut system_rx: tokio::sync::broadcast::Receiver, +) -> JoinHandle<()> { + let tx_to_client = tx.clone(); + tokio::spawn(async move { + loop { + match system_rx.recv().await { + Ok(s_msg) => { + if convert_sys_msg_command(&s_msg).is_some() + && let Some(err) = tx_to_client + .send(TxControlMessage::Payload(s_msg)) + .await + .err() + { + error!("[SYS] failed to send back to client: {err}"); + } + } + Err(_) => { + // maybe channel closed + break; + } + } + } + + info!("[sysrx-cli] ending client system rx"); + }) +} diff --git a/src/websocket/tasks/auth.rs b/src/websocket/tasks/auth.rs index fe04e23..b98ba25 100644 --- a/src/websocket/tasks/auth.rs +++ b/src/websocket/tasks/auth.rs @@ -15,7 +15,7 @@ pub async fn handle_auth_request( // do command send to other services // // guard expect value - let auth_request: AuthPayload = match serde_json::from_value(req.payload.unwrap()) { + let auth_request: AuthPayload = match safe_deserialize(&req.clone().payload.unwrap()) { Ok(areq) => areq, Err(e) => { error!("error body auth: {e:?}"); @@ -39,7 +39,7 @@ pub async fn handle_auth_request( warn!("disconnecting old connection"); let _ = old_tx.send(TxControlMessage::CloseExist); } - info!("re-new auth successful"); + info!("update re-new auth successful ---> {}", new_uid.clone()); } { diff --git a/src/websocket/tasks/price.rs b/src/websocket/tasks/price.rs index f6ff50d..a2b1087 100644 --- a/src/websocket/tasks/price.rs +++ b/src/websocket/tasks/price.rs @@ -1,7 +1,4 @@ use crate::app::*; -use crate::stream::model::{ - IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, -}; use crate::websocket::{core::*, helper::*, model::*}; use log::{debug, error, info, warn}; @@ -136,7 +133,7 @@ pub async fn handle_price_request( ) -> WebsocketMessageResult { let p = req.payload.unwrap(); - let price_param: PriceRequestPayload = serde_json::from_value(p)?; + let price_param: PriceRequestPayload = safe_deserialize(&p)?; let mut price_file_format = format!( "{}/profile_{}_master.json", @@ -313,15 +310,12 @@ pub async fn handle_price_request( // return Err("Fail to sync repo, backing up ...".into()); // } - // let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await; + let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await; // if invoke_push_request(config.clone()).await.is_err() { // let _ = commit_payload.dump_backup(); // return Err("Fail to push repo, backing up ...".into()); // } - - let _ = commit_payload.dump_backup(); - // push to git } else { let _ = tx .send(TxControlMessage::Payload(serde_json::json!({ diff --git a/src/websocket/tasks/recipe.rs b/src/websocket/tasks/recipe.rs index 841888c..eb9a38c 100644 --- a/src/websocket/tasks/recipe.rs +++ b/src/websocket/tasks/recipe.rs @@ -2,6 +2,7 @@ use crate::app::*; use crate::stream::model::{ IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, }; +use crate::websocket::plugins::call_plugin_if_existed; use crate::websocket::{core::*, helper::*, model::*}; use std::collections::HashMap; @@ -28,6 +29,7 @@ use tokio::{ }, time::Instant, }; +use wasmtime::{Config, Engine}; pub fn is_req_patch(param: &RecipeRequestPayload) -> bool { param.version != -1 && param.partial.is_some() && param.partial.unwrap() @@ -188,7 +190,7 @@ pub async fn handle_recipe_request( ) -> WebsocketMessageResult { // guard expect value let p = req.payload.unwrap(); - let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?; + let recipe_param: RecipeRequestPayload = safe_deserialize(&p)?; // get actual version // @@ -384,7 +386,7 @@ pub async fn handle_recipe_versions_list_request( uid_clone: Arc>, ) -> WebsocketMessageResult { let p = req.payload.unwrap(); - let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?; + let recipe_param: RecipeRequestPayload = safe_deserialize(&p)?; let version_list = format!("{country}", country = recipe_param.country); @@ -430,7 +432,7 @@ pub async fn handle_recipe_save_change_request( let timestamp = Local::now(); let p = req.payload.unwrap(); - let save_recipe_param: SaveRecipePayload = serde_json::from_value(p)?; + let save_recipe_param: SaveRecipePayload = safe_deserialize(&p)?; let single_recipe = serde_json::to_string_pretty(&save_recipe_param.values)?; @@ -465,5 +467,84 @@ pub async fn handle_recipe_save_change_request( message: format!("resolve-{expected_file_path}"), }; + let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap(); + call_plugin_if_existed( + WebsocketMessageRequest::from(commit_payload), + engine.clone(), + ) + .await; + + Ok(()) +} + +pub async fn handle_request_list_menu_recipe( + config: DevConfig, + redis: redis::Client, + tx: Sender, + req: WebsocketMessageRequest, + uid_clone: Arc>, +) -> WebsocketMessageResult { + // suppose we already guard value + let p = req.payload.unwrap(); + let req_menu_list: RequestMenuListPayload = safe_deserialize(&p)?; + + let latest_key = format!("{country}/version", country = req_menu_list.country); + + let latest_version = match invoke_checkout_request(config.clone(), latest_key).await { + Ok(version) => version, + Err(e) => { + println!("Error on checkout: {e}"); + "".to_string() + } + }; + + let mut result: Vec = Vec::new(); + // skip git-like key + let init_key = 3; + for i in init_key..6 { + let r1_key = get_key_cache( + req_menu_list.clone().country, + latest_version.clone(), + false, + i, + ); + + let content = match invoke_checkout_request(config.clone(), r1_key).await { + Ok(file_content) => file_content, + Err(e) => { + println!("Error on checkout: {e}"); + "".to_string() + } + }; + info!("[list-menu] content ready: {}", content.len()); + let recipe = serde_json::from_str::(&content); + + if let Ok(rp) = recipe { + result = rp + .list_menu_product_code() + .iter() + .map(|x| x.to_string()) + .collect(); + break; + } + } + + let uidd = uid_clone.lock().await.to_string(); + + info!("[list-menu] result: {}", result.len()); + + if let Err(e) = tx + .send(TxControlMessage::Payload(serde_json::json!({ + "type": "notify", + "payload": { + "to": uidd, + "value": result + } + }))) + .await + { + error!("ERR@list_menu: send tx error {e:?}"); + } + Ok(()) } diff --git a/src/websocket/tasks/sheet.rs b/src/websocket/tasks/sheet.rs index 1bff3f4..f62b2a3 100644 --- a/src/websocket/tasks/sheet.rs +++ b/src/websocket/tasks/sheet.rs @@ -16,7 +16,7 @@ pub async fn handle_sheet_request( let req_clone = req.clone(); // we can assume the payload is existed from handler let payload_sheet_request: CommandRequestPayload = - match serde_json::from_value(req.payload.unwrap()) { + match safe_deserialize(&req.clone().payload.unwrap()) { Ok(sreq) => sreq, Err(e) => { error!("error deserialize body sheet request: {e:?} ---> Skip"); diff --git a/src/websocket/tasks/watchdog.rs b/src/websocket/tasks/watchdog.rs index 1092f9d..ddc7108 100644 --- a/src/websocket/tasks/watchdog.rs +++ b/src/websocket/tasks/watchdog.rs @@ -1,6 +1,6 @@ use crate::{app::Hub, websocket::core::*}; -use log::{debug, info, warn}; -use std::{sync::Arc, time::Duration}; +use log::{info, warn}; +use std::{ops::Sub, sync::Arc, time::Duration}; use tokio::{ sync::{Mutex, mpsc::Sender}, task::JoinHandle, @@ -14,16 +14,22 @@ pub async fn get_watchdog_task( hub: Arc>, ) -> JoinHandle<()> { tokio::spawn(async move { + let uc = user.clone().lock().await.to_string(); + info!("start watchdog for {uc}"); loop { - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(2)).await; { - let h = hub.try_lock().unwrap(); - let curr_user = user.try_lock().unwrap().to_string(); + let h = hub.lock().await; + let curr_user = user.lock().await.to_string(); // info!("{}: checking invalid ...", curr_user); - if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") { + if !h.clients.contains_key(&curr_user) { + // not known + warn!("killing watchdog thread: {}", curr_user); + break; + } else if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") { warn!("detect unauthorized -- {}", curr_user); let _ = tx .send(TxControlMessage::Payload(serde_json::json!({ @@ -43,7 +49,24 @@ pub async fn get_watchdog_task( }))) .await; break; + } else if last.elapsed() == TIMEOUT.sub(Duration::from_secs(10)) { + // near last 10 s, send to client that they need to re-auth + // + // CHANGE: check by number of heartbeat instead. + + // For sending back to client, confirming re-authentication before timeout. + // If user is actually online, the client should be able to send back auth info + warn!(""); + let _ = tx + .send(TxControlMessage::Payload(serde_json::json!({ + "type": "reauth", + "payload": { + "to": uc.clone() + } + }))) + .await; } } + info!("stop watchdog for {uc}"); }) }