From 0f857445a430fb97ce496ca76f6db1b62193cf11 Mon Sep 17 00:00:00 2001 From: Pakin Date: Tue, 5 May 2026 17:03:33 +0700 Subject: [PATCH] feat: add price handler, commit, push, pull - price handler for getting or editing price (only applied to main profile) - routine pull sync recipe repo & backup commit recover Signed-off-by: Pakin --- Cargo.lock | 18 ++ Cargo.toml | 2 +- src/app.rs | 167 ++++++++++++++++- src/websocket/helper.rs | 20 ++ src/websocket/model.rs | 55 ++++++ src/websocket/rw.rs | 10 + src/websocket/tasks/mod.rs | 1 + src/websocket/tasks/price.rs | 341 ++++++++++++++++++++++++++++++++++ src/websocket/tasks/recipe.rs | 19 -- 9 files changed, 610 insertions(+), 23 deletions(-) create mode 100644 src/websocket/tasks/price.rs diff --git a/Cargo.lock b/Cargo.lock index 7e9cce9..97e4e2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1567,6 +1567,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2074,6 +2084,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-core", + "futures-util", "h2", "http", "http-body", @@ -2084,6 +2095,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "percent-encoding", "pin-project-lite", "quinn", @@ -2827,6 +2839,12 @@ version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index aeac9d8..0c602b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.g log = "0.4.29" rayon = "1.11.0" redis = { version = "1.0.2", features = ["tokio-comp"] } -reqwest = "0.13.1" +reqwest = { version = "0.13.1", features = ["multipart"] } serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" tokio = { version = "1.49.0", features = ["full"] } diff --git a/src/app.rs b/src/app.rs index cc117ce..15d2454 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,14 +4,21 @@ use axum::{ routing::{get, post}, serve::ListenerExt, }; -use log::{error, info}; +use log::{error, info, warn}; use redis::TypedCommands; +use reqwest::{StatusCode, multipart}; use std::{ collections::{HashMap, VecDeque}, env, + fs::{self, File}, + io::BufReader, sync::Arc, + time::Duration, +}; +use tokio::{ + fs::read_dir, + sync::{Mutex, mpsc::Sender}, }; -use tokio::sync::{Mutex, mpsc::Sender}; #[derive(Clone)] pub struct Hub { @@ -52,6 +59,18 @@ impl DevConfig { format!("{}/checkout?path={}", self.get_recipe_url(), path) } + pub fn get_post_file_to_recipe_repo(&self) -> String { + format!("{}/commit", self.get_recipe_url()) + } + + pub fn get_pull_recipe_repo(&self) -> String { + format!("{}/pull", self.get_recipe_url()) + } + + pub fn get_push_recipe_repo(&self) -> String { + format!("{}/push", self.get_recipe_url()) + } + pub fn get_api_header(&self) -> (String, String) { ("X-API-Key".to_string(), self.api_key.clone()) } @@ -83,7 +102,7 @@ impl AppState { let redis_cli_clone = redis_cli.clone(); let tx_new = system_tx.clone(); let result = Arc::new(AppState { - dev_config, + dev_config: dev_config.clone(), redis_cli, system_tx, connectors_mapping: Arc::new(Mutex::new(Hub { @@ -91,6 +110,72 @@ impl AppState { })), }); + // backup job + let dev_config_backup = dev_config.clone(); + tokio::spawn(async move { + let m_cfg = dev_config_backup.clone(); + + loop { + // auto sync + if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { + warn!("pulling repo unhealthy, retry again in 5 minutes"); + continue; + } + + match read_dir(".").await { + Ok(mut d) => { + while let Ok(Some(entry)) = d.next_entry().await { + let ent_path = entry.path(); + + if let Some(filename) = ent_path.file_name() + && let Some(filename_str) = filename.to_str() + && filename_str.starts_with("gtx") + && filename_str.ends_with(".json") + { + // read file + // + let f = match File::open(ent_path.clone()) { + Ok(f) => f, + Err(_) => continue, + }; + + let buf = BufReader::new(f); + + let commit_from_backup: CommitPayload = + match serde_json::from_reader(buf) { + Ok(cm) => cm, + Err(_) => continue, + }; + + if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { + warn!("pulling repo unhealthy, retry again in 5 minutes"); + continue; + } + + let _ = + invoke_commit_request(m_cfg.clone(), commit_from_backup).await; + + if invoke_push_request(m_cfg.clone()).await.is_ok() { + // push success + info!("push backup success"); + if fs::remove_file(ent_path.clone()).is_ok() { + info!("clean backup"); + } + } + } else { + continue; + } + } + } + Err(_) => {} + } + + info!("[backup] idle"); + + tokio::time::sleep(Duration::from_mins(5)).await; + } + }); + tokio::spawn(async move { let mut lredis = redis_cli_clone.clone(); let current_queue: crossbeam_queue::ArrayQueue = @@ -212,6 +297,82 @@ pub async fn invoke_checkout_request( } } +/// Invoke git pull, may takes sometime +pub async fn invoke_pull_sync_request( + config: DevConfig, +) -> Result> { + let client = reqwest::Client::new(); + + let req_path = config.get_pull_recipe_repo(); + // println!("dbg: {req_path}"); + let res = client.get(req_path).send().await?; + + if res.status() != StatusCode::OK { + // pull fail + + error!( + "invoke pull fail: [{}] {:?}", + res.status(), + res.text().await + ); + return Err("pull fail".into()); + } + + match res.text().await { + Ok(raw) => Ok(raw), + Err(e) => Err(format!("{e}").into()), + } +} + +/// Invoke sending from server to server for committing +pub async fn invoke_commit_request( + config: DevConfig, + payload: CommitPayload, +) -> Result<(), Box> { + let client = reqwest::Client::new(); + let commit_path = config.get_post_file_to_recipe_repo(); + let form = multipart::Form::new() + .text("message", payload.message) + .text("signature_username", payload.signature_username) + .text("signature_email", payload.signature_email) + .text("path", payload.path) + .part( + "file", + multipart::Part::bytes(payload.file_bytes) + .mime_str("application/octet-stream") + .unwrap(), + ); + let response = client.post(commit_path).multipart(form).send().await?; + + info!("commit status: {}", response.status()); + + Ok(()) +} + +pub async fn invoke_push_request(config: DevConfig) -> Result> { + let client = reqwest::Client::new(); + + let req_path = config.get_push_recipe_repo(); + // println!("dbg: {req_path}"); + let res = client.get(req_path).send().await?; + + if res.status() != StatusCode::OK { + // pull fail + + error!( + "invoke push fail: [{}] {:?}", + res.status(), + res.text().await + ); + return Err("push fail".into()); + } + + match res.text().await { + Ok(raw) => Ok(raw), + Err(e) => Err(format!("{e}").into()), + } +} + pub async fn create_recipe_repo_router() -> Router> { Router::new().route("/ws", get(crate::websocket::handler::websocket_handler)) } diff --git a/src/websocket/helper.rs b/src/websocket/helper.rs index aa8d287..d481a6f 100644 --- a/src/websocket/helper.rs +++ b/src/websocket/helper.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use super::model::*; use axum::extract::ws::{CloseFrame, Message, WebSocket}; use redis::{TypedCommands, cmd}; @@ -64,3 +66,21 @@ pub fn convert_sys_msg_command(msg: &serde_json::Value) -> Option { Err(_) => None, } } + +pub fn get_extra_parameters(s: String) -> HashMap { + let mut result = HashMap::new(); + + let plist: Vec = s.split(",").map(|x| x.to_string()).collect(); + + for pl in plist { + let sm: Vec = pl.split("=").map(|x| x.to_string()).collect(); + + if sm.len() != 2 { + continue; + } + + result.insert(sm[0].to_string(), sm[1].to_string()); + } + + result +} diff --git a/src/websocket/model.rs b/src/websocket/model.rs index 0f289f2..0bff95b 100644 --- a/src/websocket/model.rs +++ b/src/websocket/model.rs @@ -1,5 +1,21 @@ +use std::fs::File; +use std::io::{Read, Write}; + use serde::{Deserialize, Serialize}; +pub trait Backup: Send + Sync + Serialize { + fn dump_backup(&self) -> Result<(), Box> { + let tx = format!("gtx-{}.json", uuid::Uuid::new_v4().to_string()); + let json = serde_json::to_string(&self).unwrap(); + let json2: serde_json::Value = serde_json::from_str(&json).unwrap(); + + let writer = File::create(format!("./{tx}")).unwrap(); + let _ = serde_json::to_writer(writer, &json2); + + Ok(()) + } +} + /// system message to send back to client, this may be called from other services #[derive(Debug, Serialize, Deserialize)] pub struct SysMessage { @@ -40,6 +56,27 @@ pub struct RecipeRequestPayload { pub parameters: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum PriceRequestAction { + View(String), + Edit(String), +} + +/// Price request payload struct +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PriceRequestPayload { + /// Allowed interactions of price request + pub action: PriceRequestAction, + /// Country of recipe + pub country: String, + /// Filename to override default get, expect path + pub override_file: Option, + /// Extended infos, required parameters or unimplemented fields in the current struct. Expected pattern `=,=,...` + pub parameters: Option, + /// User info expect at least id, token, name + pub user_info: serde_json::Value, +} + /// Command request for external services #[derive(Debug, Serialize, Deserialize, Clone)] pub struct CommandRequestPayload { @@ -84,3 +121,21 @@ pub struct AuthUserField { pub email: String, pub permissions: String, } + +/// For sending to recipe repo, saving/committing value/file to git +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct CommitPayload { + /// file/content as bytes + pub file_bytes: Vec, + /// path to commit file + pub path: String, + /// signature username part (committer) + pub signature_username: String, + /// signature email part (committer) + pub signature_email: String, + /// commit message + pub message: String, +} + +// use default backup method +impl Backup for CommitPayload {} diff --git a/src/websocket/rw.rs b/src/websocket/rw.rs index d8897f0..18911fb 100644 --- a/src/websocket/rw.rs +++ b/src/websocket/rw.rs @@ -73,6 +73,16 @@ pub async fn read( ) .await?; } + "price" if req.payload.is_some() => { + tasks::price::handle_price_request( + config.clone(), + redis.clone(), + tx.clone(), + req, + uid_clone.clone(), + ) + .await?; + } "command" if req.payload.is_some() => { tasks::command::handle_command_request(state.clone(), tx.clone(), req) .await?; diff --git a/src/websocket/tasks/mod.rs b/src/websocket/tasks/mod.rs index 5de2e76..15c63fe 100644 --- a/src/websocket/tasks/mod.rs +++ b/src/websocket/tasks/mod.rs @@ -1,5 +1,6 @@ pub mod auth; pub mod command; +pub mod price; pub mod recipe; pub mod sheet; pub mod watchdog; diff --git a/src/websocket/tasks/price.rs b/src/websocket/tasks/price.rs new file mode 100644 index 0000000..51b6248 --- /dev/null +++ b/src/websocket/tasks/price.rs @@ -0,0 +1,341 @@ +use crate::app::*; +use crate::stream::model::{ + IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, +}; +use crate::websocket::{core::*, helper::*, model::*}; + +use log::{debug, error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::{fs::File, io::Read, sync::Arc}; +use tokio::sync::{Mutex, mpsc::Sender}; + +use crate::websocket::core::WebsocketMessageResult; + +#[allow(non_snake_case)] +#[derive(Debug, Serialize, Deserialize, Clone)] +struct MenuPrice { + ProductCode: String, + NewPrice: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + StringParam: Option, + #[serde(skip_serializing_if = "Option::is_none")] + Discount: Option, + #[serde(skip_serializing_if = "Option::is_none")] + Percent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + roundup: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RecipePrice { + #[serde(flatten)] + metadata: serde_json::Value, + content: Vec, +} + +impl RecipePrice { + pub const HIDE_PARAM: &str = "hide=true"; + + pub fn import(path: String) -> RecipePrice { + debug!("try import {path}"); + let mut file = File::open(path).expect("file not found"); + let mut data = String::new(); + + file.read_to_string(&mut data).unwrap(); + + let res: Result = serde_json::from_str(&data); + match res { + Ok(rp) => rp, + Err(e) => { + error!("error while deserialize price: {e}"); + RecipePrice { + content: Vec::new(), + metadata: serde_json::Value::Null, + } + } + } + } + + pub fn import_from_raw_string(raw: String) -> RecipePrice { + let res: Result = serde_json::from_str(&raw); + match res { + Ok(rp) => rp, + Err(e) => { + error!("error while deserialize price: {e}"); + RecipePrice { + content: Vec::new(), + metadata: serde_json::Value::Null, + } + } + } + } + + // only for getting data without modify + pub fn find_by_pd(&self, pd: &str) -> Option { + self.content + .iter() + .find(|x| x.ProductCode.eq(pd)) + .map(|x| x.clone()) + } + + pub fn modify_price_by_pd(&mut self, pd: &str, price: serde_json::Value) { + if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) { + mp.NewPrice = price; + } + } + + fn modify_string_param_by_pd(&mut self, pd: &str, param: &str) { + if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) { + mp.StringParam = Some(param.to_string()); + } + } + + pub fn set_visibility_by_pd(&mut self, pd: &str, show: bool) { + if !show { + self.modify_string_param_by_pd(pd, RecipePrice::HIDE_PARAM); + } else { + self.modify_string_param_by_pd(pd, ""); + } + } + + // NOTE: disable write to file + // pub fn export_to_json_file(self, outpath: Option) { + // let json = serde_json::to_string(&self).unwrap(); + // let json2: serde_json::Value = serde_json::from_str(&json).unwrap(); + + // if let Some(outpath) = outpath { + // let writer = File::create(outpath).unwrap(); + // let _ = serde_json::to_writer_pretty(writer, &json2); + // } else { + // println!("Default save to (execute)/price.json"); + // let writer = File::create("price.json").unwrap(); + // let _ = serde_json::to_writer_pretty(writer, &json2); + // } + // } +} + +/// Get list of price +// pub async fn handle_price_list_request( +// config: DevConfig, +// redis: redis::Client, +// tx: Sender, +// req: WebsocketMessageRequest, +// uid_clone: Arc>, +// ) -> WebsocketMessageResult { + +// } + +/// Get main price profile of country +pub async fn handle_price_request( + config: DevConfig, + redis: redis::Client, + tx: Sender, + req: WebsocketMessageRequest, + uid_clone: Arc>, +) -> WebsocketMessageResult { + let p = req.payload.unwrap(); + + let price_param: PriceRequestPayload = serde_json::from_value(p)?; + + let mut price_file_format = format!( + "{}/profile_{}_master.json", + price_param.country, + price_param.country.to_uppercase() + ); + + if let Some(override_file) = price_param.override_file { + price_file_format = override_file; + } + + let price_action = price_param.action; + + let price_content = + match invoke_checkout_request(config.clone(), price_file_format.clone()).await { + Ok(pc) => pc, + Err(e) => return Err(format!("Cannot find price of expected country: {e:?}").into()), + }; + + info!("price content len: {}", price_content.len()); + + let mut rpp = RecipePrice::import_from_raw_string(price_content); + let _uid = uid_clone.clone(); + let uidd = _uid.try_lock().unwrap(); + + info!("price action: {price_action:?}"); + + let user_info = price_param.user_info.clone(); + + match price_action { + PriceRequestAction::View(view_opt) => { + let viewing_options: HashMap = get_extra_parameters(view_opt); + // sa=all + // sa=get,pd=... + // sa=query,list=1|2|3 + // sa=query,where=contain,kw=... + let sub_action = viewing_options.get("sa"); + let pd = viewing_options.get("pd"); + + if let Some(sa) = sub_action { + let mut result = Vec::new(); + let action_done = match sa.as_str() { + "all" => { + result = rpp.content; + + true + } + "get" + if let Some(pd) = pd + && let Some(mp) = rpp.find_by_pd(pd) => + { + result.push(mp); + true + } + _ => false, + }; + + if action_done { + let _ = tx + .send(TxControlMessage::Payload(serde_json::json!({ + "type": "price", + "payload": { + "req_action": sa, + "status": "ok", + "content": result, + "to": uidd.to_string() + } + }))) + .await; + } else { + let _ = tx + .send(TxControlMessage::Payload(serde_json::json!({ + "type": "price", + "payload": { + "req_action": sa, + "status": "fail", + "to": uidd.to_string() + } + }))) + .await; + } + } + } + PriceRequestAction::Edit(edit_opt) => { + let editing_options: HashMap = get_extra_parameters(edit_opt); + // sa=change,pd=...,to=... + // sa=hide,pd=... + // sa=disable,pd=... disable = hide + // sa=show,pd=... + // sa=toggle,pd=...,state=show|hide + let sub_action = editing_options.get("sa"); + let pd = editing_options.get("pd"); + let to = editing_options.get("to"); + let state = editing_options.get("state"); + + if let Some(sa) = sub_action { + let mut action_message = String::new(); + let action_done = match sa.as_str() { + "change" + if let Some(pd) = pd + && let Some(to) = to + && let Some(mp) = rpp.find_by_pd(pd.as_str()) => + { + info!( + "[CHANGE] price of {pd} from {} to {to}", + mp.NewPrice.as_i64().unwrap() + ); + action_message = format!( + "[CHANGE] price of {pd} from {} to {to}", + mp.NewPrice.as_i64().unwrap() + ); + + let price_int = to.parse::()?; + + rpp.modify_price_by_pd(pd, serde_json::json!(price_int)); + true + } + "toggle" + if let Some(pd) = pd + && let Some(state) = state => + { + info!("[TOGGLE] {pd} to {state}"); + action_message = format!("[TOGGLE] {pd} to {state}"); + + match state.as_str() { + "show" => rpp.set_visibility_by_pd(pd.as_str(), true), + "hide" | "disable" => rpp.set_visibility_by_pd(pd.as_str(), false), + _ => { + warn!("unknown state toggle"); + } + } + true + } + _ => false, + }; + + if action_done { + let _ = tx + .send(TxControlMessage::Payload(serde_json::json!({ + "type": "price", + "payload": { + "req_action": sa, + "status": "ok", + "to": uidd.to_string() + } + }))) + .await; + + // send save + // + + let all_prices_str = serde_json::to_string_pretty(&rpp)?; + let commit_payload = CommitPayload { + file_bytes: all_prices_str.as_bytes().to_vec(), + path: price_file_format.clone(), + signature_username: user_info + .get("displayName") + .unwrap_or_default() + .as_str() + .unwrap_or(&"unknown".to_string()) + .to_string(), + signature_email: user_info + .get("email") + .unwrap_or_default() + .as_str() + .unwrap_or(&"unknown".to_string()) + .to_string(), + message: action_message, + }; + + if invoke_pull_sync_request(config.clone()).await.is_err() { + // backup + let _ = commit_payload.dump_backup(); + return Err("Fail to sync repo, backing up ...".into()); + } + + let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await; + + if invoke_push_request(config.clone()).await.is_err() { + let _ = commit_payload.dump_backup(); + return Err("Fail to push repo, backing up ...".into()); + } + + // push to git + } else { + let _ = tx + .send(TxControlMessage::Payload(serde_json::json!({ + "type": "price", + "payload": { + "req_action": sa, + "status": "fail", + "to": uidd.to_string() + } + }))) + .await; + } + } + } + } + + Ok(()) +} diff --git a/src/websocket/tasks/recipe.rs b/src/websocket/tasks/recipe.rs index d07cf34..e846583 100644 --- a/src/websocket/tasks/recipe.rs +++ b/src/websocket/tasks/recipe.rs @@ -64,24 +64,6 @@ pub fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt } } -pub fn get_extra_parameters(s: String) -> HashMap { - let mut result = HashMap::new(); - - let plist: Vec = s.split(",").map(|x| x.to_string()).collect(); - - for pl in plist { - let sm: Vec = pl.split("=").map(|x| x.to_string()).collect(); - - if sm.len() != 2 { - continue; - } - - result.insert(sm[0].to_string(), sm[1].to_string()); - } - - result -} - pub async fn throttle_send_recipe( recipe: &Recipe, tx: &Sender, @@ -398,7 +380,6 @@ pub async fn handle_recipe_versions_list_request( req: WebsocketMessageRequest, uid_clone: Arc>, ) -> WebsocketMessageResult { - println!("trigger check versions ... "); let p = req.payload.unwrap(); let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?;