From 9124126637cfddcaee4cc3f14d03470b3130a23e Mon Sep 17 00:00:00 2001 From: Pakin Date: Tue, 19 May 2026 16:38:16 +0700 Subject: [PATCH] feat: save_recipe - fix: bug commit fail if file exceed size limit - change: add new product code from previous machine recipe in list-menu Signed-off-by: Pakin --- src/app.rs | 26 +++- src/websocket/core.rs | 3 + src/websocket/helper.rs | 42 +++++- src/websocket/model.rs | 5 +- src/websocket/rw.rs | 8 +- src/websocket/tasks/price.rs | 4 + src/websocket/tasks/recipe.rs | 272 +++++++++++++++++++++++++++++++--- 7 files changed, 333 insertions(+), 27 deletions(-) diff --git a/src/app.rs b/src/app.rs index b6ce4c8..374fcb0 100644 --- a/src/app.rs +++ b/src/app.rs @@ -290,8 +290,14 @@ pub async fn invoke_commit_request( config: DevConfig, payload: CommitPayload, ) -> Result<(), Box> { - let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(60)) + .build()?; let commit_path = config.get_post_file_to_recipe_repo(); + let filename = payload.path.split("/").last().unwrap_or("temp").to_string(); + + info!("committing {}", filename); + let form = multipart::Form::new() .text("message", payload.message) .text("signature_username", payload.signature_username) @@ -300,12 +306,17 @@ pub async fn invoke_commit_request( .part( "file", multipart::Part::bytes(payload.file_bytes) + .file_name(filename) .mime_str("application/octet-stream") .unwrap(), ); let response = client.post(commit_path).multipart(form).send().await?; - info!("commit status: {}", response.status()); + info!( + "commit status: {}, {:?}", + response.status(), + response.text().await + ); Ok(()) } @@ -315,7 +326,9 @@ pub async fn invoke_commit_multiple_files_request( config: DevConfig, payloads: Vec, ) -> Result<(), Box> { - let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(60)) + .build()?; let commit_path = config.get_post_file_to_recipe_repo(); let mut form = multipart::Form::new() .text("message", payloads.first().unwrap().message.clone()) @@ -329,11 +342,18 @@ pub async fn invoke_commit_multiple_files_request( ); for (index, payload) in payloads.iter().enumerate() { + let filename = payload + .path + .split("/") + .last() + .unwrap_or(format!("temp{index}").as_str()) + .to_string(); form = form .text(format!("path{index}"), payload.path.clone()) .part( format!("file{index}"), multipart::Part::bytes(payload.file_bytes.clone()) + .file_name(filename) .mime_str("application/octet-stream") .unwrap(), ); diff --git a/src/websocket/core.rs b/src/websocket/core.rs index 3c6a898..ec8c8fa 100644 --- a/src/websocket/core.rs +++ b/src/websocket/core.rs @@ -10,6 +10,9 @@ pub const CHUNK_SIZE: usize = 5; /// CONFIG: default timeout for each socket connection pub const TIMEOUT: Duration = Duration::from_secs(60 * 5); +/// CONFIG: date format for using in recipe +pub const LAST_CHANGE_DATE_FORMAT: &str = "%v %T"; + #[derive(Clone)] pub enum TxControlMessage { Payload(serde_json::Value), diff --git a/src/websocket/helper.rs b/src/websocket/helper.rs index 779cb71..1ba53ef 100644 --- a/src/websocket/helper.rs +++ b/src/websocket/helper.rs @@ -1,9 +1,10 @@ -use std::{collections::HashMap, fs::File, io::BufReader}; +use std::{cmp::Ordering, collections::HashMap, fs::File, io::BufReader}; -use crate::websocket::core::safe_deserialize; +use crate::websocket::core::{LAST_CHANGE_DATE_FORMAT, safe_deserialize}; use super::model::*; use axum::extract::ws::{CloseFrame, Message, WebSocket}; +use chrono::NaiveDateTime; use redis::{TypedCommands, cmd}; #[deprecated] @@ -106,3 +107,40 @@ pub fn read_sheet_config() -> Result, Box> { Ok(res) } + +pub fn parse_date_from_string(date: &str, fmt: Option<&str>) -> Option { + let fmt = match fmt { + Some(fm) => fm, + None => LAST_CHANGE_DATE_FORMAT, + }; + + NaiveDateTime::parse_from_str(date, fmt).ok() +} + +pub fn compare_dates(d1: Option, d2: Option) -> usize { + // println!("comparing date: {d1:?} --- {d2:?}"); + if d1.is_none() && d2.is_none() { + // case no date provided + return 0; + } + if d1.is_none() && d2.is_some() { + return 2; + } + if d1.is_some() && d2.is_none() { + return 1; + } + + if let Some(d1d) = d1 + && let Some(d2d) = d2 + { + let cmp_v = d1d.cmp(&d2d); + // println!("compare get {cmp_v:?}"); + return match cmp_v { + Ordering::Equal => 0, + Ordering::Greater => 1, + Ordering::Less => 2, + }; + } else { + return 0; + } +} diff --git a/src/websocket/model.rs b/src/websocket/model.rs index 21b8464..6604083 100644 --- a/src/websocket/model.rs +++ b/src/websocket/model.rs @@ -106,6 +106,7 @@ pub struct SaveRecipePayload { pub user_info: serde_json::Value, pub country: String, pub values: serde_json::Value, + pub plugins: Option, } /// Message for authentication before use m2 service @@ -147,7 +148,7 @@ impl From for WebsocketMessageRequest { type_w: "commit_part".to_string(), payload: Some(serde_json::json!({ "commit": value, - "plugin": "apply_recipe" + "plugin": "example-js" })), } } @@ -160,4 +161,6 @@ pub struct RequestMenuListPayload { pub user_info: serde_json::Value, /// target country to get recipe, version will always use latest pub country: String, + /// box id + pub boxid: String, } diff --git a/src/websocket/rw.rs b/src/websocket/rw.rs index 99bb1e3..f368bfa 100644 --- a/src/websocket/rw.rs +++ b/src/websocket/rw.rs @@ -148,14 +148,18 @@ pub async fn read( let now = Instant::now(); } "save_recipe" if req.payload.is_some() => { - tasks::recipe::handle_recipe_save_change_request( + if tasks::recipe::handle_recipe_save_change_request( config.clone(), redis.clone(), tx.clone(), req, uid_clone.clone(), ) - .await?; + .await + .is_err() + { + continue; + } } "auth" if req.payload.is_some() => { tasks::auth::handle_auth_request( diff --git a/src/websocket/tasks/price.rs b/src/websocket/tasks/price.rs index a2b1087..582d1e2 100644 --- a/src/websocket/tasks/price.rs +++ b/src/websocket/tasks/price.rs @@ -163,6 +163,10 @@ pub async fn handle_price_request( let user_info = price_param.user_info.clone(); + if let Some(mp) = rpp.find_by_pd("52-03-01-0129") { + info!("found menu price: {mp:?}"); + } + match price_action { PriceRequestAction::View(view_opt) => { let viewing_options: HashMap = get_extra_parameters(view_opt); diff --git a/src/websocket/tasks/recipe.rs b/src/websocket/tasks/recipe.rs index 8b975ad..428dc2e 100644 --- a/src/websocket/tasks/recipe.rs +++ b/src/websocket/tasks/recipe.rs @@ -31,6 +31,9 @@ use tokio::{ }; use wasmtime::{Config, Engine}; +const NO_MERGE_FLAG: i32 = 1000; +const MERGE_DONE_FLAG: i32 = 0; + pub fn is_req_patch(param: &RecipeRequestPayload) -> bool { param.version != -1 && param.partial.is_some() && param.partial.unwrap() } @@ -67,6 +70,71 @@ pub fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt } } +async fn get_latest_recipe_from_git( + config: &DevConfig, + country: &str, +) -> Result> { + let latest_key = format!("{country}/version"); + let latest_version = match invoke_checkout_request(config.clone(), latest_key).await { + Ok(version) => version, + Err(e) => { + println!("Error on checkout: {e}"); + "".to_string() + } + }; + + let mut recipe_result: Option = None; + let init_key = 3; + for i in init_key..6 { + let r1_key = get_key_cache(country.to_string(), latest_version.clone(), false, i); + + let content = match invoke_checkout_request(config.clone(), r1_key).await { + Ok(file_content) => file_content, + Err(e) => { + println!("Error on checkout: {e}"); + "".to_string() + } + }; + info!("[get-latest] content ready: {}", content.len()); + let recipe = serde_json::from_str::(&content); + + if let Ok(rp) = recipe { + recipe_result = Some(rp); + break; + } + } + + match recipe_result { + Some(rr) => Ok(rr), + None => return Err("RecipeNotFound".into()), + } +} + +async fn get_latest_recipe_saved_machine_from_git( + config: &DevConfig, + country: &str, + boxid: &str, +) -> Result> { + let latest_key = format!("{country}/coffeethai02_{country}_{boxid}_temp.json"); + let content = match invoke_checkout_request(config.clone(), latest_key.clone()).await { + Ok(content) => content, + Err(e) => { + println!("Error on checkout: {e}"); + "".to_string() + } + }; + info!( + "[get-latest] {} -> content ready: {}", + latest_key, + content.len() + ); + + match serde_json::from_str::(&content) { + Ok(rr) => Ok(rr), + Err(_) => return Err("RecipeNotFound".into()), + } +} + pub async fn throttle_send_recipe( recipe: &Recipe, tx: &Sender, @@ -429,12 +497,63 @@ pub async fn handle_recipe_save_change_request( req: WebsocketMessageRequest, uid_clone: Arc>, ) -> WebsocketMessageResult { - let timestamp = Local::now(); - let p = req.payload.unwrap(); let save_recipe_param: SaveRecipePayload = safe_deserialize(&p)?; + info!( + "[save_recipe] get save recipe request: {} {:?}", + save_recipe_param.country, save_recipe_param.user_info + ); - let single_recipe = serde_json::to_string_pretty(&save_recipe_param.values)?; + let (country, mut box_id) = if save_recipe_param.country.contains("_") { + // send with box id + let spl: Vec = save_recipe_param + .country + .split("_") + .map(|x| x.to_string()) + .collect(); + (spl[0].clone(), spl[1].clone()) + } else { + (save_recipe_param.country, "".to_string()) + }; + + if box_id.is_empty() { + box_id = String::from("unknown"); + } + + let expected_file_path = format!("{country}/coffeethai02_{country}_{box_id}_temp.json"); + + let mut single_recipe: Recipe01 = serde_json::from_value(save_recipe_param.values)?; + // try get saved machine recipe + + let mut result_pre: Option = + match get_latest_recipe_saved_machine_from_git(&config, &country, &box_id).await { + Ok(saved) => Some(saved), + Err(_) => { + error!("[save_recipe] previous save not found ..."); + None + } + }; + + if result_pre.is_none() { + result_pre = match get_latest_recipe_from_git(&config, &country).await { + Ok(r) => Some(r), + Err(e) => { + return Err(format!("{e}").into()); + } + }; + } + + if result_pre.is_none() { + return Err(format!("cannot fetch recipe").into()); + } + + let mut result = result_pre.unwrap(); + + let version = result + .MachineSetting + .configNumber + .as_i64() + .unwrap_or_default(); let display_name = save_recipe_param .user_info @@ -452,27 +571,63 @@ pub async fn handle_recipe_save_change_request( .unwrap_or(&"unknown".to_string()) .to_string(); - let expected_file_path = format!( - "{}/part_coffeethai02_{}_{}.json", - save_recipe_param.country, - display_name, - timestamp.timestamp() - ); + let mut commit_message = String::new(); + + // do resolve within service + if let Some(found) = result.search_pd(single_recipe.productCode.to_string()) { + let merged_result = handle_case_found_existed_recipe(found, &mut single_recipe); + match merged_result { + Ok(mr) => { + let mr_clone = mr.clone(); + // apply now + for rp in result.Recipe01.iter_mut() { + if rp.productCode.eq(mr_clone.productCode.as_str()) { + commit_message.push_str(format!("change {}", mr.productCode).as_str()); + *rp = mr_clone.clone(); + info!("applied success"); + } + } + } + Err(e) => { + error!("[save_recipe] error while applying recipe: {e}"); + return Err(format!("{e}").into()); + } + } + } else { + // CASE: new recipe, do add + info!("[save_recipe] add recipe {}", single_recipe.productCode); + result.Recipe01.push(single_recipe); + } + + let serial_recipe = match serde_json::to_string_pretty(&serde_json::json!(result)) { + Ok(s) => s, + Err(e) => { + error!("failed to serialize recipe: {e}"); + return Err(format!("{e}").into()); + } + }; let commit_payload = CommitPayload { - file_bytes: single_recipe.as_bytes().to_vec(), + file_bytes: serial_recipe.as_bytes().to_vec(), path: expected_file_path.clone(), signature_username: display_name, signature_email: email, - message: format!("resolve-{expected_file_path}"), + message: format!(""), }; - let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap(); - call_plugin_if_existed( - WebsocketMessageRequest::from(commit_payload), - engine.clone(), - ) - .await; + if let Err(commit_error) = invoke_commit_request(config, commit_payload.clone()).await { + error!("failed to commit: {commit_error}"); + let _ = commit_payload.dump_backup(); + return Err(format!("{commit_error}").into()); + } + + // NOTE: disable from reason pure wasm cannot see envs and cannot do fetch by itself. + // let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap(); + // call_plugin_if_existed( + // WebsocketMessageRequest::from(commit_payload), + // engine.clone(), + // ) + // .await; Ok(()) } @@ -498,6 +653,19 @@ pub async fn handle_request_list_menu_recipe( } }; + let country = req_menu_list.clone().country; + let box_id = req_menu_list.clone().boxid; + + // merge from already saved recipe + let result_previous_on_same_boxid: Option = + match get_latest_recipe_saved_machine_from_git(&config, &country, &box_id).await { + Ok(saved) => Some(saved), + Err(e) => { + error!("[list-menu-restore] previous save not found, {e}"); + None + } + }; + let mut result: Vec = Vec::new(); // skip git-like key let init_key = 3; @@ -509,14 +677,14 @@ pub async fn handle_request_list_menu_recipe( i, ); - let content = match invoke_checkout_request(config.clone(), r1_key).await { + let content = match invoke_checkout_request(config.clone(), r1_key.clone()).await { Ok(file_content) => file_content, Err(e) => { println!("Error on checkout: {e}"); "".to_string() } }; - info!("[list-menu] content ready: {}", content.len()); + info!("[list-menu] {r1_key} -> content ready: {}", content.len()); let recipe = serde_json::from_str::(&content); if let Ok(rp) = recipe { @@ -529,6 +697,25 @@ pub async fn handle_request_list_menu_recipe( } } + if let Some(rp_from_previous) = result_previous_on_same_boxid { + let previous_product_codes: Vec = rp_from_previous + .list_menu_product_code() + .iter() + .map(|x| x.to_string()) + .collect(); + + let mut cnt_new = 0; + // add only new to result + for pd in previous_product_codes { + if !result.contains(&pd) { + result.push(pd); + cnt_new += 1; + } + } + + info!("[list-menu] add more {cnt_new} from previous temp recipe {box_id}"); + } + let uidd = uid_clone.lock().await.to_string(); info!("[list-menu] result: {}", result.len()); @@ -549,3 +736,50 @@ pub async fn handle_request_list_menu_recipe( Ok(()) } + +fn handle_case_found_existed_recipe( + source: &Recipe01, + apply: &mut Recipe01, +) -> Result> { + let mut result = source.clone(); + + apply.ExtendID = serde_json::json!(NO_MERGE_FLAG); + + if result.LastChange.is_none() && apply.LastChange.is_some() { + // CASE: original has no timestamp + // do apply immediately + // + apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG); + result = apply.clone(); + info!("[save_recipe] applied recipe success!"); + return Ok(result); + } else if (result.LastChange.is_some() && apply.LastChange.is_none()) + || (result.LastChange.is_none() && result.LastChange.is_none()) + { + // CASE: original has timestamp but applying recipe has no timestamp + warn!("[save_recipe] ignore applying recipe"); + apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG); + } else if let Some(original_time) = &result.LastChange + && let Some(applied_time) = &apply.LastChange + { + if original_time.eq(applied_time) { + // CASE: original has matched timestamp to applying recipe + apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG); + info!("[save_recipe] recipe identical, skip applying"); + } else { + // start comparing date + let result_compare_date = compare_dates( + parse_date_from_string(original_time.as_str().unwrap_or_default(), None), + parse_date_from_string(applied_time.as_str().unwrap_or_default(), None), + ); + + if result_compare_date == 2 { + // do apply + apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG); + result = apply.clone(); + } + } + } + + Ok(result.clone()) +}