feat: save_recipe

- fix: bug commit fail if file exceed size limit
- change: add new product code from previous machine recipe in list-menu

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-05-19 16:38:16 +07:00
parent 32033820a9
commit 9124126637
7 changed files with 333 additions and 27 deletions

View file

@ -290,8 +290,14 @@ pub async fn invoke_commit_request(
config: DevConfig,
payload: CommitPayload,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(60))
.build()?;
let commit_path = config.get_post_file_to_recipe_repo();
let filename = payload.path.split("/").last().unwrap_or("temp").to_string();
info!("committing {}", filename);
let form = multipart::Form::new()
.text("message", payload.message)
.text("signature_username", payload.signature_username)
@ -300,12 +306,17 @@ pub async fn invoke_commit_request(
.part(
"file",
multipart::Part::bytes(payload.file_bytes)
.file_name(filename)
.mime_str("application/octet-stream")
.unwrap(),
);
let response = client.post(commit_path).multipart(form).send().await?;
info!("commit status: {}", response.status());
info!(
"commit status: {}, {:?}",
response.status(),
response.text().await
);
Ok(())
}
@ -315,7 +326,9 @@ pub async fn invoke_commit_multiple_files_request(
config: DevConfig,
payloads: Vec<CommitPayload>,
) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(60))
.build()?;
let commit_path = config.get_post_file_to_recipe_repo();
let mut form = multipart::Form::new()
.text("message", payloads.first().unwrap().message.clone())
@ -329,11 +342,18 @@ pub async fn invoke_commit_multiple_files_request(
);
for (index, payload) in payloads.iter().enumerate() {
let filename = payload
.path
.split("/")
.last()
.unwrap_or(format!("temp{index}").as_str())
.to_string();
form = form
.text(format!("path{index}"), payload.path.clone())
.part(
format!("file{index}"),
multipart::Part::bytes(payload.file_bytes.clone())
.file_name(filename)
.mime_str("application/octet-stream")
.unwrap(),
);

View file

@ -10,6 +10,9 @@ pub const CHUNK_SIZE: usize = 5;
/// CONFIG: default timeout for each socket connection
pub const TIMEOUT: Duration = Duration::from_secs(60 * 5);
/// CONFIG: date format for using in recipe
pub const LAST_CHANGE_DATE_FORMAT: &str = "%v %T";
#[derive(Clone)]
pub enum TxControlMessage {
Payload(serde_json::Value),

View file

@ -1,9 +1,10 @@
use std::{collections::HashMap, fs::File, io::BufReader};
use std::{cmp::Ordering, collections::HashMap, fs::File, io::BufReader};
use crate::websocket::core::safe_deserialize;
use crate::websocket::core::{LAST_CHANGE_DATE_FORMAT, safe_deserialize};
use super::model::*;
use axum::extract::ws::{CloseFrame, Message, WebSocket};
use chrono::NaiveDateTime;
use redis::{TypedCommands, cmd};
#[deprecated]
@ -106,3 +107,40 @@ pub fn read_sheet_config() -> Result<Vec<String>, Box<dyn std::error::Error>> {
Ok(res)
}
pub fn parse_date_from_string(date: &str, fmt: Option<&str>) -> Option<NaiveDateTime> {
let fmt = match fmt {
Some(fm) => fm,
None => LAST_CHANGE_DATE_FORMAT,
};
NaiveDateTime::parse_from_str(date, fmt).ok()
}
pub fn compare_dates(d1: Option<NaiveDateTime>, d2: Option<NaiveDateTime>) -> usize {
// println!("comparing date: {d1:?} --- {d2:?}");
if d1.is_none() && d2.is_none() {
// case no date provided
return 0;
}
if d1.is_none() && d2.is_some() {
return 2;
}
if d1.is_some() && d2.is_none() {
return 1;
}
if let Some(d1d) = d1
&& let Some(d2d) = d2
{
let cmp_v = d1d.cmp(&d2d);
// println!("compare get {cmp_v:?}");
return match cmp_v {
Ordering::Equal => 0,
Ordering::Greater => 1,
Ordering::Less => 2,
};
} else {
return 0;
}
}

View file

@ -106,6 +106,7 @@ pub struct SaveRecipePayload {
pub user_info: serde_json::Value,
pub country: String,
pub values: serde_json::Value,
pub plugins: Option<String>,
}
/// Message for authentication before use m2 service
@ -147,7 +148,7 @@ impl From<CommitPayload> for WebsocketMessageRequest {
type_w: "commit_part".to_string(),
payload: Some(serde_json::json!({
"commit": value,
"plugin": "apply_recipe"
"plugin": "example-js"
})),
}
}
@ -160,4 +161,6 @@ pub struct RequestMenuListPayload {
pub user_info: serde_json::Value,
/// target country to get recipe, version will always use latest
pub country: String,
/// box id
pub boxid: String,
}

View file

@ -148,14 +148,18 @@ pub async fn read(
let now = Instant::now();
}
"save_recipe" if req.payload.is_some() => {
tasks::recipe::handle_recipe_save_change_request(
if tasks::recipe::handle_recipe_save_change_request(
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await?;
.await
.is_err()
{
continue;
}
}
"auth" if req.payload.is_some() => {
tasks::auth::handle_auth_request(

View file

@ -163,6 +163,10 @@ pub async fn handle_price_request(
let user_info = price_param.user_info.clone();
if let Some(mp) = rpp.find_by_pd("52-03-01-0129") {
info!("found menu price: {mp:?}");
}
match price_action {
PriceRequestAction::View(view_opt) => {
let viewing_options: HashMap<String, String> = get_extra_parameters(view_opt);

View file

@ -31,6 +31,9 @@ use tokio::{
};
use wasmtime::{Config, Engine};
const NO_MERGE_FLAG: i32 = 1000;
const MERGE_DONE_FLAG: i32 = 0;
pub fn is_req_patch(param: &RecipeRequestPayload) -> bool {
param.version != -1 && param.partial.is_some() && param.partial.unwrap()
}
@ -67,6 +70,71 @@ pub fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt
}
}
async fn get_latest_recipe_from_git(
config: &DevConfig,
country: &str,
) -> Result<Recipe, Box<dyn std::error::Error>> {
let latest_key = format!("{country}/version");
let latest_version = match invoke_checkout_request(config.clone(), latest_key).await {
Ok(version) => version,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
let mut recipe_result: Option<Recipe> = None;
let init_key = 3;
for i in init_key..6 {
let r1_key = get_key_cache(country.to_string(), latest_version.clone(), false, i);
let content = match invoke_checkout_request(config.clone(), r1_key).await {
Ok(file_content) => file_content,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
info!("[get-latest] content ready: {}", content.len());
let recipe = serde_json::from_str::<Recipe>(&content);
if let Ok(rp) = recipe {
recipe_result = Some(rp);
break;
}
}
match recipe_result {
Some(rr) => Ok(rr),
None => return Err("RecipeNotFound".into()),
}
}
async fn get_latest_recipe_saved_machine_from_git(
config: &DevConfig,
country: &str,
boxid: &str,
) -> Result<Recipe, Box<dyn std::error::Error>> {
let latest_key = format!("{country}/coffeethai02_{country}_{boxid}_temp.json");
let content = match invoke_checkout_request(config.clone(), latest_key.clone()).await {
Ok(content) => content,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
info!(
"[get-latest] {} -> content ready: {}",
latest_key,
content.len()
);
match serde_json::from_str::<Recipe>(&content) {
Ok(rr) => Ok(rr),
Err(_) => return Err("RecipeNotFound".into()),
}
}
pub async fn throttle_send_recipe(
recipe: &Recipe,
tx: &Sender<TxControlMessage>,
@ -429,12 +497,63 @@ pub async fn handle_recipe_save_change_request(
req: WebsocketMessageRequest,
uid_clone: Arc<Mutex<String>>,
) -> WebsocketMessageResult {
let timestamp = Local::now();
let p = req.payload.unwrap();
let save_recipe_param: SaveRecipePayload = safe_deserialize(&p)?;
info!(
"[save_recipe] get save recipe request: {} {:?}",
save_recipe_param.country, save_recipe_param.user_info
);
let single_recipe = serde_json::to_string_pretty(&save_recipe_param.values)?;
let (country, mut box_id) = if save_recipe_param.country.contains("_") {
// send with box id
let spl: Vec<String> = save_recipe_param
.country
.split("_")
.map(|x| x.to_string())
.collect();
(spl[0].clone(), spl[1].clone())
} else {
(save_recipe_param.country, "".to_string())
};
if box_id.is_empty() {
box_id = String::from("unknown");
}
let expected_file_path = format!("{country}/coffeethai02_{country}_{box_id}_temp.json");
let mut single_recipe: Recipe01 = serde_json::from_value(save_recipe_param.values)?;
// try get saved machine recipe
let mut result_pre: Option<Recipe> =
match get_latest_recipe_saved_machine_from_git(&config, &country, &box_id).await {
Ok(saved) => Some(saved),
Err(_) => {
error!("[save_recipe] previous save not found ...");
None
}
};
if result_pre.is_none() {
result_pre = match get_latest_recipe_from_git(&config, &country).await {
Ok(r) => Some(r),
Err(e) => {
return Err(format!("{e}").into());
}
};
}
if result_pre.is_none() {
return Err(format!("cannot fetch recipe").into());
}
let mut result = result_pre.unwrap();
let version = result
.MachineSetting
.configNumber
.as_i64()
.unwrap_or_default();
let display_name = save_recipe_param
.user_info
@ -452,27 +571,63 @@ pub async fn handle_recipe_save_change_request(
.unwrap_or(&"unknown".to_string())
.to_string();
let expected_file_path = format!(
"{}/part_coffeethai02_{}_{}.json",
save_recipe_param.country,
display_name,
timestamp.timestamp()
);
let mut commit_message = String::new();
// do resolve within service
if let Some(found) = result.search_pd(single_recipe.productCode.to_string()) {
let merged_result = handle_case_found_existed_recipe(found, &mut single_recipe);
match merged_result {
Ok(mr) => {
let mr_clone = mr.clone();
// apply now
for rp in result.Recipe01.iter_mut() {
if rp.productCode.eq(mr_clone.productCode.as_str()) {
commit_message.push_str(format!("change {}", mr.productCode).as_str());
*rp = mr_clone.clone();
info!("applied success");
}
}
}
Err(e) => {
error!("[save_recipe] error while applying recipe: {e}");
return Err(format!("{e}").into());
}
}
} else {
// CASE: new recipe, do add
info!("[save_recipe] add recipe {}", single_recipe.productCode);
result.Recipe01.push(single_recipe);
}
let serial_recipe = match serde_json::to_string_pretty(&serde_json::json!(result)) {
Ok(s) => s,
Err(e) => {
error!("failed to serialize recipe: {e}");
return Err(format!("{e}").into());
}
};
let commit_payload = CommitPayload {
file_bytes: single_recipe.as_bytes().to_vec(),
file_bytes: serial_recipe.as_bytes().to_vec(),
path: expected_file_path.clone(),
signature_username: display_name,
signature_email: email,
message: format!("resolve-{expected_file_path}"),
message: format!(""),
};
let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap();
call_plugin_if_existed(
WebsocketMessageRequest::from(commit_payload),
engine.clone(),
)
.await;
if let Err(commit_error) = invoke_commit_request(config, commit_payload.clone()).await {
error!("failed to commit: {commit_error}");
let _ = commit_payload.dump_backup();
return Err(format!("{commit_error}").into());
}
// NOTE: disable from reason pure wasm cannot see envs and cannot do fetch by itself.
// let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap();
// call_plugin_if_existed(
// WebsocketMessageRequest::from(commit_payload),
// engine.clone(),
// )
// .await;
Ok(())
}
@ -498,6 +653,19 @@ pub async fn handle_request_list_menu_recipe(
}
};
let country = req_menu_list.clone().country;
let box_id = req_menu_list.clone().boxid;
// merge from already saved recipe
let result_previous_on_same_boxid: Option<Recipe> =
match get_latest_recipe_saved_machine_from_git(&config, &country, &box_id).await {
Ok(saved) => Some(saved),
Err(e) => {
error!("[list-menu-restore] previous save not found, {e}");
None
}
};
let mut result: Vec<String> = Vec::new();
// skip git-like key
let init_key = 3;
@ -509,14 +677,14 @@ pub async fn handle_request_list_menu_recipe(
i,
);
let content = match invoke_checkout_request(config.clone(), r1_key).await {
let content = match invoke_checkout_request(config.clone(), r1_key.clone()).await {
Ok(file_content) => file_content,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
info!("[list-menu] content ready: {}", content.len());
info!("[list-menu] {r1_key} -> content ready: {}", content.len());
let recipe = serde_json::from_str::<Recipe>(&content);
if let Ok(rp) = recipe {
@ -529,6 +697,25 @@ pub async fn handle_request_list_menu_recipe(
}
}
if let Some(rp_from_previous) = result_previous_on_same_boxid {
let previous_product_codes: Vec<String> = rp_from_previous
.list_menu_product_code()
.iter()
.map(|x| x.to_string())
.collect();
let mut cnt_new = 0;
// add only new to result
for pd in previous_product_codes {
if !result.contains(&pd) {
result.push(pd);
cnt_new += 1;
}
}
info!("[list-menu] add more {cnt_new} from previous temp recipe {box_id}");
}
let uidd = uid_clone.lock().await.to_string();
info!("[list-menu] result: {}", result.len());
@ -549,3 +736,50 @@ pub async fn handle_request_list_menu_recipe(
Ok(())
}
fn handle_case_found_existed_recipe(
source: &Recipe01,
apply: &mut Recipe01,
) -> Result<Recipe01, Box<dyn std::error::Error>> {
let mut result = source.clone();
apply.ExtendID = serde_json::json!(NO_MERGE_FLAG);
if result.LastChange.is_none() && apply.LastChange.is_some() {
// CASE: original has no timestamp
// do apply immediately
//
apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG);
result = apply.clone();
info!("[save_recipe] applied recipe success!");
return Ok(result);
} else if (result.LastChange.is_some() && apply.LastChange.is_none())
|| (result.LastChange.is_none() && result.LastChange.is_none())
{
// CASE: original has timestamp but applying recipe has no timestamp
warn!("[save_recipe] ignore applying recipe");
apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG);
} else if let Some(original_time) = &result.LastChange
&& let Some(applied_time) = &apply.LastChange
{
if original_time.eq(applied_time) {
// CASE: original has matched timestamp to applying recipe
apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG);
info!("[save_recipe] recipe identical, skip applying");
} else {
// start comparing date
let result_compare_date = compare_dates(
parse_date_from_string(original_time.as_str().unwrap_or_default(), None),
parse_date_from_string(applied_time.as_str().unwrap_or_default(), None),
);
if result_compare_date == 2 {
// do apply
apply.ExtendID = serde_json::json!(MERGE_DONE_FLAG);
result = apply.clone();
}
}
}
Ok(result.clone())
}