feat: add list menu of recipe, check origin

- fix: zombie thread, safe deseialize, disable backup cycle

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-05-15 15:43:09 +07:00
parent d7f5e12d51
commit b16fa72383
14 changed files with 368 additions and 133 deletions

1
Cargo.lock generated
View file

@ -3236,6 +3236,7 @@ dependencies = [
"tokio", "tokio",
"tokio-cron-scheduler", "tokio-cron-scheduler",
"tokio-stream", "tokio-stream",
"tokio-util",
"uuid", "uuid",
"wasmtime", "wasmtime",
"wasmtime-wasi", "wasmtime-wasi",

View file

@ -24,6 +24,7 @@ sqlx = { version = "0.8.6", features = ["runtime-tokio", "tls-rustls", "sqlite"]
tokio = { version = "1.49.0", features = ["full"] } tokio = { version = "1.49.0", features = ["full"] }
tokio-cron-scheduler = "0.15.1" tokio-cron-scheduler = "0.15.1"
tokio-stream = "0.1.18" tokio-stream = "0.1.18"
tokio-util = "0.7.18"
uuid = { version = "1.20.0", features = ["v4"] } uuid = { version = "1.20.0", features = ["v4"] }
wasmtime = { version = "44.0.1", features = ["async"] } wasmtime = { version = "44.0.1", features = ["async"] }
wasmtime-wasi = "44.0.1" wasmtime-wasi = "44.0.1"

View file

@ -33,6 +33,7 @@ pub struct DevConfig {
pub api_redis_url: String, pub api_redis_url: String,
pub api_resolver: String, pub api_resolver: String,
pub api_sheet_endpoints: Arc<Mutex<Vec<String>>>, pub api_sheet_endpoints: Arc<Mutex<Vec<String>>>,
pub allowed_origins: Vec<String>,
} }
impl DevConfig { impl DevConfig {
@ -51,9 +52,15 @@ impl DevConfig {
api_redis_url, api_redis_url,
api_resolver, api_resolver,
api_sheet_endpoints, api_sheet_endpoints,
allowed_origins: Vec::new(),
} }
} }
pub fn with_allowed_origins(&mut self, raw_origin: &str) -> &mut Self {
self.allowed_origins = raw_origin.split(",").map(|x| x.to_string()).collect();
self
}
pub fn get_recipe_url(&self) -> String { pub fn get_recipe_url(&self) -> String {
format!("{}{}", self.api_domain, self.api_recipe_service) format!("{}{}", self.api_domain, self.api_recipe_service)
} }
@ -126,69 +133,7 @@ impl AppState {
// backup job // backup job
let dev_config_backup = dev_config.clone(); let dev_config_backup = dev_config.clone();
tokio::spawn(async move { // NOTE: removed backup process, let each app handled by themselves
let m_cfg = dev_config_backup.clone();
loop {
// auto sync
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
warn!("pulling repo unhealthy, retry again in 5 minutes");
continue;
}
match read_dir(".").await {
Ok(mut d) => {
while let Ok(Some(entry)) = d.next_entry().await {
let ent_path = entry.path();
if let Some(filename) = ent_path.file_name()
&& let Some(filename_str) = filename.to_str()
&& filename_str.starts_with("gtx")
&& filename_str.ends_with(".json")
{
// read file
//
let f = match File::open(ent_path.clone()) {
Ok(f) => f,
Err(_) => continue,
};
let buf = BufReader::new(f);
let commit_from_backup: CommitPayload =
match serde_json::from_reader(buf) {
Ok(cm) => cm,
Err(_) => continue,
};
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
warn!("pulling repo unhealthy, retry again in 5 minutes");
continue;
}
let _ =
invoke_commit_request(m_cfg.clone(), commit_from_backup).await;
if invoke_push_request(m_cfg.clone()).await.is_ok() {
// push success
info!("push backup success");
if fs::remove_file(ent_path.clone()).is_ok() {
info!("clean backup");
}
}
} else {
continue;
}
}
}
Err(_) => {}
}
info!("[backup] idle");
tokio::time::sleep(Duration::from_mins(5)).await;
}
});
tokio::spawn(async move { tokio::spawn(async move {
let mut lredis = redis_cli_clone.clone(); let mut lredis = redis_cli_clone.clone();
@ -202,7 +147,7 @@ impl AppState {
let sys_msg = crate::websocket::helper::convert_sys_msg_command(&rmsg); let sys_msg = crate::websocket::helper::convert_sys_msg_command(&rmsg);
// add queue process // add queue process
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) { let command_req: CommandRequestPayload = match safe_deserialize(&rmsg) {
Ok(cmd) => cmd, Ok(cmd) => cmd,
Err(e) => { Err(e) => {
if sys_msg.is_none() { if sys_msg.is_none() {
@ -291,6 +236,8 @@ impl AppState {
} }
}); });
// spawn product sync process
result result
} }
} }
@ -439,11 +386,13 @@ pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver"); let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver");
let allowed_origins = env::var("ALLOWED_ORIGINS").expect("allowed origin not provided");
// read up sheet config // read up sheet config
// //
let sheet_endpoint_config = read_sheet_config()?; let sheet_endpoint_config = read_sheet_config()?;
let dev_cfg = crate::app::DevConfig::new( let mut dev_cfg = crate::app::DevConfig::new(
api_key, api_key,
api_domain, api_domain,
api_recipe_service, api_recipe_service,
@ -451,6 +400,7 @@ pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
api_resolver, api_resolver,
Arc::new(Mutex::new(sheet_endpoint_config)), Arc::new(Mutex::new(sheet_endpoint_config)),
); );
dev_cfg = dev_cfg.with_allowed_origins(&allowed_origins).clone();
// test_send(dev_cfg).await?; // test_send(dev_cfg).await?;
// //
@ -458,7 +408,7 @@ pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16); let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
let app_state = AppState::new(dev_cfg, redis_cli, sys_tx, sys_rx).await; let app_state = AppState::new(dev_cfg.clone(), redis_cli, sys_tx, sys_rx).await;
let rp_router = create_recipe_repo_router().await; let rp_router = create_recipe_repo_router().await;
// let doc_router = create_tx_patcher_route().await; // let doc_router = create_tx_patcher_route().await;

View file

@ -1,12 +1,14 @@
use std::time::Duration; use std::time::Duration;
use serde::Deserialize;
/// CONFIG: chunk size for each payload /// CONFIG: chunk size for each payload
/// ///
/// note: using in sending recipe /// note: using in sending recipe
pub const CHUNK_SIZE: usize = 5; pub const CHUNK_SIZE: usize = 5;
/// CONFIG: default timeout for each socket connection /// CONFIG: default timeout for each socket connection
pub const TIMEOUT: Duration = Duration::from_secs(60 * 15); pub const TIMEOUT: Duration = Duration::from_secs(60 * 5);
#[derive(Clone)] #[derive(Clone)]
pub enum TxControlMessage { pub enum TxControlMessage {
@ -20,3 +22,30 @@ pub enum UserWebSocketAuthState {
} }
pub type WebsocketMessageResult = Result<(), Box<dyn std::error::Error + Send + Sync>>; pub type WebsocketMessageResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
pub fn safe_deserialize<'de, T>(value: &'de serde_json::Value) -> Result<T, serde_json::Error>
where
T: Deserialize<'de>,
{
let sanitized = sanitize_json_value(value);
T::deserialize(sanitized)
}
fn sanitize_json_value(value: &serde_json::Value) -> serde_json::Value {
match value {
serde_json::Value::Object(map) => {
let mut sanitized = serde_json::Map::new();
for (k, v) in map {
if k == "__proto__" || k == "constructor" || k == "prototype" {
continue;
}
sanitized.insert(k.clone(), sanitize_json_value(v));
}
serde_json::Value::Object(sanitized)
}
serde_json::Value::Array(arr) => {
serde_json::Value::Array(arr.iter().map(sanitize_json_value).collect())
}
_ => value.clone(),
}
}

View file

@ -1,6 +1,6 @@
use axum::{ use axum::{
Json, Json,
extract::{State, WebSocketUpgrade, ws::WebSocket}, extract::{Request, State, WebSocketUpgrade, ws::WebSocket},
response::IntoResponse, response::IntoResponse,
}; };
use futures::StreamExt; use futures::StreamExt;
@ -95,10 +95,26 @@ pub async fn request_api_session_key(
pub async fn websocket_handler( pub async fn websocket_handler(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
req: Request,
) -> impl IntoResponse { ) -> impl IntoResponse {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
let hub_clone = Arc::clone(&state_clone.connectors_mapping); let hub_clone = Arc::clone(&state_clone.connectors_mapping);
let origin = req
.headers()
.get("origin")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !state
.dev_config
.allowed_origins
.contains(&origin.to_string())
{
warn!("unexpected origin: {}", origin);
return (axum::http::StatusCode::FORBIDDEN, "".to_string()).into_response();
}
// let mut uid_n = String::new(); // let mut uid_n = String::new();
// if let Some(uid) = headers.get("x-auth-uid") { // if let Some(uid) = headers.get("x-auth-uid") {
@ -143,11 +159,11 @@ async fn handle_socket(
Uuid::new_v4().to_string() Uuid::new_v4().to_string()
)))); ))));
let temp_session = user.try_lock().unwrap().to_string(); let temp_session = user.lock().await.to_string();
info!("{} connected", temp_session); info!("{} connected", temp_session);
{ {
let mut h = hub.try_lock().unwrap(); let mut h = hub.lock().await;
h.clients.insert(temp_session.clone(), tx.clone()); h.clients.insert(temp_session.clone(), tx.clone());
} }
@ -158,16 +174,16 @@ async fn handle_socket(
let reader_last_seen = last_seen.clone(); let reader_last_seen = last_seen.clone();
let watchdog_last_seen = last_seen.clone(); let watchdog_last_seen = last_seen.clone();
let sender = tokio::spawn(super::rw::write(sender, rx, user.clone())); let sender = tokio::spawn(super::rw::write(sender, rx, user.clone(), hub.clone()));
let reader = tokio::spawn(super::rw::read( let reader = tokio::spawn(super::rw::read(
state, state,
receiver, receiver,
tx.clone(), tx.clone(),
user_sys_rx,
reader_last_seen, reader_last_seen,
user.clone(), user.clone(),
hub.clone(), hub.clone(),
)); ));
let callback_to_client = super::rw::recv_sys_msg_send_back_client(tx.clone(), user_sys_rx);
let watchdog = super::tasks::watchdog::get_watchdog_task( let watchdog = super::tasks::watchdog::get_watchdog_task(
tx, tx,
@ -176,7 +192,29 @@ async fn handle_socket(
hub.clone(), hub.clone(),
); );
let _ = tokio::join!(reader, sender, watchdog); let (rf, sf, cbc, wds) = tokio::join!(reader, sender, callback_to_client, watchdog);
if let Ok(rf_js) = rf
&& let Ok(sf_js) = sf
{
info!(
"read end ok: {}, write end ok: {} [{}]",
rf_js.is_ok(),
sf_js.is_ok(),
user.clone().lock().await.to_string()
);
if !cbc.is_finished() {
info!("sys rx still running");
cbc.abort();
if cbc.await.unwrap_err().is_cancelled() {
info!("sys rx force stop ...");
}
}
if !wds.is_finished() {
info!("watchdog still existed");
}
}
Ok(()) Ok(())
} }

View file

@ -1,5 +1,7 @@
use std::{collections::HashMap, fs::File, io::BufReader}; use std::{collections::HashMap, fs::File, io::BufReader};
use crate::websocket::core::safe_deserialize;
use super::model::*; use super::model::*;
use axum::extract::ws::{CloseFrame, Message, WebSocket}; use axum::extract::ws::{CloseFrame, Message, WebSocket};
use redis::{TypedCommands, cmd}; use redis::{TypedCommands, cmd};
@ -61,7 +63,7 @@ pub fn convert_ack_command(cmd_req: &serde_json::Value) -> Option<CommandRequest
} }
pub fn convert_sys_msg_command(msg: &serde_json::Value) -> Option<SysMessage> { pub fn convert_sys_msg_command(msg: &serde_json::Value) -> Option<SysMessage> {
match serde_json::from_value(msg.clone()) { match safe_deserialize(msg) {
Ok(req) => Some(req), Ok(req) => Some(req),
Err(_) => None, Err(_) => None,
} }

View file

@ -140,3 +140,24 @@ pub struct CommitPayload {
// use default backup method // use default backup method
impl Backup for CommitPayload {} impl Backup for CommitPayload {}
impl From<CommitPayload> for WebsocketMessageRequest {
fn from(value: CommitPayload) -> Self {
WebsocketMessageRequest {
type_w: "commit_part".to_string(),
payload: Some(serde_json::json!({
"commit": value,
"plugin": "apply_recipe"
})),
}
}
}
/// For getting list of menus in recipe
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RequestMenuListPayload {
/// User info expect at least id, token, name
pub user_info: serde_json::Value,
/// target country to get recipe, version will always use latest
pub country: String,
}

View file

@ -13,7 +13,7 @@ use wasmtime_wasi_http::{
p2::{WasiHttpCtxView, WasiHttpView}, p2::{WasiHttpCtxView, WasiHttpView},
}; };
use crate::websocket::model::WebsocketMessageRequest; use crate::websocket::{core::safe_deserialize, model::WebsocketMessageRequest};
wasmtime::component::bindgen!({ wasmtime::component::bindgen!({
path: "plugins/plugin.wit", path: "plugins/plugin.wit",
@ -92,6 +92,7 @@ async fn call_plugin_logic(engine: &Engine, component: &Component, input: String
let ctx = WasiCtxBuilder::new() let ctx = WasiCtxBuilder::new()
.inherit_stdout() .inherit_stdout()
.inherit_stderr() .inherit_stderr()
.inherit_env()
.build(); .build();
let http_ctx = WasiHttpCtx::new(); let http_ctx = WasiHttpCtx::new();
let mut store = Store::new( let mut store = Store::new(
@ -114,9 +115,14 @@ async fn call_plugin_logic(engine: &Engine, component: &Component, input: String
return String::new(); return String::new();
} }
let instance_result = PluginWorld::instantiate_async(&mut store, component, &linker) let instance_result = match PluginWorld::instantiate_async(&mut store, component, &linker).await
.await {
.expect("Failed to instantiate plugin"); Ok(r) => r,
Err(e) => {
error!("unable to instantiate plugin: {e}");
return String::new();
}
};
// 3. Call the exported function from the WIT 'handler' interface // 3. Call the exported function from the WIT 'handler' interface
match instance_result match instance_result
@ -149,7 +155,7 @@ pub async fn call_plugin_if_existed(
return req.clone(); return req.clone();
} }
let plugin_payload: PluginPayload = match serde_json::from_value(req.clone().payload.unwrap()) { let plugin_payload: PluginPayload = match safe_deserialize(&req.clone().payload.unwrap()) {
Ok(p) => p, Ok(p) => p,
Err(_) => return req, Err(_) => return req,
}; };
@ -166,8 +172,13 @@ pub async fn call_plugin_if_existed(
for ap in apply_plugins { for ap in apply_plugins {
if all_plugins.contains_key(&ap) { if all_plugins.contains_key(&ap) {
let component = let component = match Component::from_file(&engine, all_plugins.get(&ap).unwrap()) {
Component::from_file(&engine, all_plugins.get(&ap).unwrap()).unwrap(); Ok(c) => c,
Err(e) => {
error!("plugin not found! {ap}");
continue;
}
};
res_str = call_plugin_logic(&engine, &component, res_str).await; res_str = call_plugin_logic(&engine, &component, res_str).await;
} }

View file

@ -3,20 +3,24 @@ use crate::{
app::*, app::*,
websocket::{plugins::call_plugin_if_existed, tasks}, websocket::{plugins::call_plugin_if_existed, tasks},
}; };
use std::{sync::Arc, time::Duration}; use std::{
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{Message, WebSocket};
use futures::{ use futures::{
SinkExt, StreamExt, SinkExt, StreamExt,
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
}; };
use log::{error, info, warn}; use log::{debug, error, info, warn};
use tokio::{ use tokio::{
sync::{ sync::{
Mutex, Mutex,
mpsc::{Receiver, Sender}, mpsc::{Receiver, Sender},
}, },
task::JoinHandle,
time::Instant, time::Instant,
}; };
use wasmtime::{Config, Engine}; use wasmtime::{Config, Engine};
@ -26,28 +30,12 @@ pub async fn read(
state: Arc<AppState>, state: Arc<AppState>,
mut receiver: SplitStream<WebSocket>, mut receiver: SplitStream<WebSocket>,
tx: Sender<TxControlMessage>, tx: Sender<TxControlMessage>,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
last_seen: Arc<Mutex<Instant>>, // cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>, last_seen: Arc<Mutex<Instant>>, // cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>,
uid: Arc<Mutex<String>>, uid: Arc<Mutex<String>>,
hub: Arc<Mutex<Hub>>, hub: Arc<Mutex<Hub>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let redis = state.redis_cli.clone(); let redis = state.redis_cli.clone();
let config = state.dev_config.clone(); let config = state.dev_config.clone();
let tx_to_client = tx.clone();
tokio::spawn(async move {
// Send back to client from services
while let Ok(s_msg) = system_rx.recv().await {
if convert_sys_msg_command(&s_msg).is_some()
&& let Some(err) = tx_to_client
.send(TxControlMessage::Payload(s_msg))
.await
.err()
{
error!("[SYS] failed to send back to client: {err}");
}
}
});
let uid_clone = uid.clone(); let uid_clone = uid.clone();
@ -66,38 +54,54 @@ pub async fn read(
// info!("get msg: {}", req.type_w); // info!("get msg: {}", req.type_w);
match req.type_w.as_str() { match req.type_w.as_str() {
"recipe" if req.payload.is_some() => { "recipe" if req.payload.is_some() => {
tasks::recipe::handle_recipe_request( if tasks::recipe::handle_recipe_request(
config.clone(), config.clone(),
redis.clone(), redis.clone(),
tx.clone(), tx.clone(),
req, req,
uid_clone.clone(), uid_clone.clone(),
) )
.await?; .await
.is_err()
{
continue;
}
} }
"recipe_versions" if req.payload.is_some() => { "recipe_versions" if req.payload.is_some() => {
tasks::recipe::handle_recipe_versions_list_request( if tasks::recipe::handle_recipe_versions_list_request(
config.clone(), config.clone(),
redis.clone(), redis.clone(),
tx.clone(), tx.clone(),
req, req,
uid_clone.clone(), uid_clone.clone(),
) )
.await?; .await
.is_err()
{
continue;
}
} }
"price" if req.payload.is_some() => { "price" if req.payload.is_some() => {
tasks::price::handle_price_request( if tasks::price::handle_price_request(
config.clone(), config.clone(),
redis.clone(), redis.clone(),
tx.clone(), tx.clone(),
req, req,
uid_clone.clone(), uid_clone.clone(),
) )
.await?; .await
.is_err()
{
continue;
}
} }
"command" if req.payload.is_some() => { "command" if req.payload.is_some() => {
tasks::command::handle_command_request(state.clone(), tx.clone(), req) if tasks::command::handle_command_request(state.clone(), tx.clone(), req)
.await?; .await
.is_err()
{
continue;
}
} }
"heartbeat" => { "heartbeat" => {
let new_updated_time = Instant::now(); let new_updated_time = Instant::now();
@ -132,7 +136,7 @@ pub async fn read(
} }
"log_report" if let Some(log_payload) = req.payload => { "log_report" if let Some(log_payload) = req.payload => {
let log_report_payload: LogReportPayload = let log_report_payload: LogReportPayload =
match serde_json::from_value(log_payload) { match safe_deserialize(&log_payload) {
Ok(lreq) => lreq, Ok(lreq) => lreq,
Err(e) => { Err(e) => {
error!("error deserialize body log request: {e:?} ---> Skip"); error!("error deserialize body log request: {e:?} ---> Skip");
@ -164,6 +168,20 @@ pub async fn read(
.await?; .await?;
} }
"list_menu" if req.payload.is_some() => {
if tasks::recipe::handle_request_list_menu_recipe(
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
_ => { _ => {
// not implemented // not implemented
} }
@ -173,7 +191,7 @@ pub async fn read(
*last_seen.lock().await = Instant::now(); *last_seen.lock().await = Instant::now();
} }
Message::Close(_) => { Message::Close(_) => {
info!("get close message"); info!("[read] get close message");
// remove current uid // remove current uid
{ {
@ -205,6 +223,9 @@ pub async fn read(
} }
} }
} }
info!("[read] canceling sys rx ...");
Ok(()) Ok(())
} }
@ -212,6 +233,7 @@ pub async fn write(
mut sender: SplitSink<WebSocket, Message>, mut sender: SplitSink<WebSocket, Message>,
mut rx: Receiver<TxControlMessage>, mut rx: Receiver<TxControlMessage>,
uid: Arc<Mutex<String>>, uid: Arc<Mutex<String>>,
hub: Arc<Mutex<Hub>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
while let Some(res) = rx.recv().await { while let Some(res) = rx.recv().await {
match res { match res {
@ -221,7 +243,7 @@ pub async fn write(
&& let Some(from_who) = force_timeout_by.as_str() && let Some(from_who) = force_timeout_by.as_str()
&& (from_who.eq("watchdog") || from_who.eq("disconnection")) && (from_who.eq("watchdog") || from_who.eq("disconnection"))
{ {
warn!("receive close from {from_who}"); warn!("[write] receive close from {from_who}");
if from_who.eq("disconnection") { if from_who.eq("disconnection") {
let _ = sender.close().await; let _ = sender.close().await;
@ -232,7 +254,7 @@ pub async fn write(
break; break;
} }
let current_uid = uid.try_lock().unwrap(); let current_uid = uid.lock().await;
if let Some(res_n) = res.as_object() if let Some(res_n) = res.as_object()
&& let Some(res_payload) = res_n.get("payload") && let Some(res_payload) = res_n.get("payload")
@ -247,14 +269,47 @@ pub async fn write(
if payload_size >= 100000 { if payload_size >= 100000 {
// large payload // large payload
warn!( warn!(
"sending large payload to client ... ({})", "[write] sending large payload to client ... ({})",
res.to_string().len() res.to_string().len()
); );
} }
let _ = sender.send(res.to_string().into()).await; let _ = sender.send(res.to_string().into()).await;
} else { } else {
warn!("failed to send message, as the receiver not detected: {res:?}"); // show error by case
let clients: Vec<String> = hub
.lock()
.await
.clients
.keys()
.map(|x| x.to_string())
.collect();
// step errors
if let Some(res_n) = res.as_object()
&& let Some(res_payload) = res_n.get("payload")
{
if let Some(res_payload_val) = res_payload.as_object() {
if let Some(recv_ident) = res_payload_val.get("to")
&& let Some(recv_ident_str) = recv_ident.as_str()
{
// has recp
if clients.contains(&recv_ident_str.to_string())
&& current_uid.ne(&recv_ident_str.to_string())
{
warn!("oops! receiving other receiver's messages. Ignore this");
} else {
error!("receiver not existed or already went offline");
}
} else {
error!("failed to send message, as the receiver not detected");
}
} else {
error!("incorrect type: payload not object")
}
} else {
error!("incorrect format: missing payload or response is not object");
}
} }
} }
TxControlMessage::CloseExist => { TxControlMessage::CloseExist => {
@ -268,3 +323,32 @@ pub async fn write(
} }
Ok(()) Ok(())
} }
pub async fn recv_sys_msg_send_back_client(
tx: Sender<TxControlMessage>,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
) -> JoinHandle<()> {
let tx_to_client = tx.clone();
tokio::spawn(async move {
loop {
match system_rx.recv().await {
Ok(s_msg) => {
if convert_sys_msg_command(&s_msg).is_some()
&& let Some(err) = tx_to_client
.send(TxControlMessage::Payload(s_msg))
.await
.err()
{
error!("[SYS] failed to send back to client: {err}");
}
}
Err(_) => {
// maybe channel closed
break;
}
}
}
info!("[sysrx-cli] ending client system rx");
})
}

View file

@ -15,7 +15,7 @@ pub async fn handle_auth_request(
// do command send to other services // do command send to other services
// // guard expect value // // guard expect value
let auth_request: AuthPayload = match serde_json::from_value(req.payload.unwrap()) { let auth_request: AuthPayload = match safe_deserialize(&req.clone().payload.unwrap()) {
Ok(areq) => areq, Ok(areq) => areq,
Err(e) => { Err(e) => {
error!("error body auth: {e:?}"); error!("error body auth: {e:?}");
@ -39,7 +39,7 @@ pub async fn handle_auth_request(
warn!("disconnecting old connection"); warn!("disconnecting old connection");
let _ = old_tx.send(TxControlMessage::CloseExist); let _ = old_tx.send(TxControlMessage::CloseExist);
} }
info!("re-new auth successful"); info!("update re-new auth successful ---> {}", new_uid.clone());
} }
{ {

View file

@ -1,7 +1,4 @@
use crate::app::*; use crate::app::*;
use crate::stream::model::{
IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart,
};
use crate::websocket::{core::*, helper::*, model::*}; use crate::websocket::{core::*, helper::*, model::*};
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
@ -136,7 +133,7 @@ pub async fn handle_price_request(
) -> WebsocketMessageResult { ) -> WebsocketMessageResult {
let p = req.payload.unwrap(); let p = req.payload.unwrap();
let price_param: PriceRequestPayload = serde_json::from_value(p)?; let price_param: PriceRequestPayload = safe_deserialize(&p)?;
let mut price_file_format = format!( let mut price_file_format = format!(
"{}/profile_{}_master.json", "{}/profile_{}_master.json",
@ -313,15 +310,12 @@ pub async fn handle_price_request(
// return Err("Fail to sync repo, backing up ...".into()); // return Err("Fail to sync repo, backing up ...".into());
// } // }
// let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await; let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await;
// if invoke_push_request(config.clone()).await.is_err() { // if invoke_push_request(config.clone()).await.is_err() {
// let _ = commit_payload.dump_backup(); // let _ = commit_payload.dump_backup();
// return Err("Fail to push repo, backing up ...".into()); // return Err("Fail to push repo, backing up ...".into());
// } // }
let _ = commit_payload.dump_backup();
// push to git
} else { } else {
let _ = tx let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({ .send(TxControlMessage::Payload(serde_json::json!({

View file

@ -2,6 +2,7 @@ use crate::app::*;
use crate::stream::model::{ use crate::stream::model::{
IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart,
}; };
use crate::websocket::plugins::call_plugin_if_existed;
use crate::websocket::{core::*, helper::*, model::*}; use crate::websocket::{core::*, helper::*, model::*};
use std::collections::HashMap; use std::collections::HashMap;
@ -28,6 +29,7 @@ use tokio::{
}, },
time::Instant, time::Instant,
}; };
use wasmtime::{Config, Engine};
pub fn is_req_patch(param: &RecipeRequestPayload) -> bool { pub fn is_req_patch(param: &RecipeRequestPayload) -> bool {
param.version != -1 && param.partial.is_some() && param.partial.unwrap() param.version != -1 && param.partial.is_some() && param.partial.unwrap()
@ -188,7 +190,7 @@ pub async fn handle_recipe_request(
) -> WebsocketMessageResult { ) -> WebsocketMessageResult {
// guard expect value // guard expect value
let p = req.payload.unwrap(); let p = req.payload.unwrap();
let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?; let recipe_param: RecipeRequestPayload = safe_deserialize(&p)?;
// get actual version // get actual version
// //
@ -384,7 +386,7 @@ pub async fn handle_recipe_versions_list_request(
uid_clone: Arc<Mutex<String>>, uid_clone: Arc<Mutex<String>>,
) -> WebsocketMessageResult { ) -> WebsocketMessageResult {
let p = req.payload.unwrap(); let p = req.payload.unwrap();
let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?; let recipe_param: RecipeRequestPayload = safe_deserialize(&p)?;
let version_list = format!("{country}", country = recipe_param.country); let version_list = format!("{country}", country = recipe_param.country);
@ -430,7 +432,7 @@ pub async fn handle_recipe_save_change_request(
let timestamp = Local::now(); let timestamp = Local::now();
let p = req.payload.unwrap(); let p = req.payload.unwrap();
let save_recipe_param: SaveRecipePayload = serde_json::from_value(p)?; let save_recipe_param: SaveRecipePayload = safe_deserialize(&p)?;
let single_recipe = serde_json::to_string_pretty(&save_recipe_param.values)?; let single_recipe = serde_json::to_string_pretty(&save_recipe_param.values)?;
@ -465,5 +467,84 @@ pub async fn handle_recipe_save_change_request(
message: format!("resolve-{expected_file_path}"), message: format!("resolve-{expected_file_path}"),
}; };
let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap();
call_plugin_if_existed(
WebsocketMessageRequest::from(commit_payload),
engine.clone(),
)
.await;
Ok(())
}
pub async fn handle_request_list_menu_recipe(
config: DevConfig,
redis: redis::Client,
tx: Sender<TxControlMessage>,
req: WebsocketMessageRequest,
uid_clone: Arc<Mutex<String>>,
) -> WebsocketMessageResult {
// suppose we already guard value
let p = req.payload.unwrap();
let req_menu_list: RequestMenuListPayload = safe_deserialize(&p)?;
let latest_key = format!("{country}/version", country = req_menu_list.country);
let latest_version = match invoke_checkout_request(config.clone(), latest_key).await {
Ok(version) => version,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
let mut result: Vec<String> = Vec::new();
// skip git-like key
let init_key = 3;
for i in init_key..6 {
let r1_key = get_key_cache(
req_menu_list.clone().country,
latest_version.clone(),
false,
i,
);
let content = match invoke_checkout_request(config.clone(), r1_key).await {
Ok(file_content) => file_content,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
info!("[list-menu] content ready: {}", content.len());
let recipe = serde_json::from_str::<Recipe>(&content);
if let Ok(rp) = recipe {
result = rp
.list_menu_product_code()
.iter()
.map(|x| x.to_string())
.collect();
break;
}
}
let uidd = uid_clone.lock().await.to_string();
info!("[list-menu] result: {}", result.len());
if let Err(e) = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"value": result
}
})))
.await
{
error!("ERR@list_menu: send tx error {e:?}");
}
Ok(()) Ok(())
} }

View file

@ -16,7 +16,7 @@ pub async fn handle_sheet_request(
let req_clone = req.clone(); let req_clone = req.clone();
// we can assume the payload is existed from handler // we can assume the payload is existed from handler
let payload_sheet_request: CommandRequestPayload = let payload_sheet_request: CommandRequestPayload =
match serde_json::from_value(req.payload.unwrap()) { match safe_deserialize(&req.clone().payload.unwrap()) {
Ok(sreq) => sreq, Ok(sreq) => sreq,
Err(e) => { Err(e) => {
error!("error deserialize body sheet request: {e:?} ---> Skip"); error!("error deserialize body sheet request: {e:?} ---> Skip");

View file

@ -1,6 +1,6 @@
use crate::{app::Hub, websocket::core::*}; use crate::{app::Hub, websocket::core::*};
use log::{debug, info, warn}; use log::{info, warn};
use std::{sync::Arc, time::Duration}; use std::{ops::Sub, sync::Arc, time::Duration};
use tokio::{ use tokio::{
sync::{Mutex, mpsc::Sender}, sync::{Mutex, mpsc::Sender},
task::JoinHandle, task::JoinHandle,
@ -14,16 +14,22 @@ pub async fn get_watchdog_task(
hub: Arc<Mutex<Hub>>, hub: Arc<Mutex<Hub>>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
tokio::spawn(async move { tokio::spawn(async move {
let uc = user.clone().lock().await.to_string();
info!("start watchdog for {uc}");
loop { loop {
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(2)).await;
{ {
let h = hub.try_lock().unwrap(); let h = hub.lock().await;
let curr_user = user.try_lock().unwrap().to_string(); let curr_user = user.lock().await.to_string();
// info!("{}: checking invalid ...", curr_user); // info!("{}: checking invalid ...", curr_user);
if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") { if !h.clients.contains_key(&curr_user) {
// not known
warn!("killing watchdog thread: {}", curr_user);
break;
} else if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") {
warn!("detect unauthorized -- {}", curr_user); warn!("detect unauthorized -- {}", curr_user);
let _ = tx let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({ .send(TxControlMessage::Payload(serde_json::json!({
@ -43,7 +49,24 @@ pub async fn get_watchdog_task(
}))) })))
.await; .await;
break; break;
} else if last.elapsed() == TIMEOUT.sub(Duration::from_secs(10)) {
// near last 10 s, send to client that they need to re-auth
//
// CHANGE: check by number of heartbeat instead.
// For sending back to client, confirming re-authentication before timeout.
// If user is actually online, the client should be able to send back auth info
warn!("");
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "reauth",
"payload": {
"to": uc.clone()
}
})))
.await;
} }
} }
info!("stop watchdog for {uc}");
}) })
} }