diff --git a/Cargo.lock b/Cargo.lock index 97e4e2b..7e9cce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1567,16 +1567,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "miniz_oxide" version = "0.8.9" @@ -2084,7 +2074,6 @@ dependencies = [ "bytes", "encoding_rs", "futures-core", - "futures-util", "h2", "http", "http-body", @@ -2095,7 +2084,6 @@ dependencies = [ "js-sys", "log", "mime", - "mime_guess", "percent-encoding", "pin-project-lite", "quinn", @@ -2839,12 +2827,6 @@ version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" -[[package]] -name = "unicase" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" - [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/Cargo.toml b/Cargo.toml index 0c602b9..aeac9d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.g log = "0.4.29" rayon = "1.11.0" redis = { version = "1.0.2", features = ["tokio-comp"] } -reqwest = { version = "0.13.1", features = ["multipart"] } +reqwest = "0.13.1" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" tokio = { version = "1.49.0", features = ["full"] } diff --git a/src/app.rs b/src/app.rs index 15d2454..cc117ce 100644 --- a/src/app.rs +++ b/src/app.rs @@ -4,21 +4,14 @@ use axum::{ routing::{get, post}, serve::ListenerExt, }; -use log::{error, info, warn}; +use log::{error, info}; use redis::TypedCommands; -use reqwest::{StatusCode, multipart}; use std::{ collections::{HashMap, VecDeque}, env, - fs::{self, File}, - io::BufReader, sync::Arc, - time::Duration, -}; -use tokio::{ - fs::read_dir, - sync::{Mutex, mpsc::Sender}, }; +use tokio::sync::{Mutex, mpsc::Sender}; #[derive(Clone)] pub struct Hub { @@ -59,18 +52,6 @@ impl DevConfig { format!("{}/checkout?path={}", self.get_recipe_url(), path) } - pub fn get_post_file_to_recipe_repo(&self) -> String { - format!("{}/commit", self.get_recipe_url()) - } - - pub fn get_pull_recipe_repo(&self) -> String { - format!("{}/pull", self.get_recipe_url()) - } - - pub fn get_push_recipe_repo(&self) -> String { - format!("{}/push", self.get_recipe_url()) - } - pub fn get_api_header(&self) -> (String, String) { ("X-API-Key".to_string(), self.api_key.clone()) } @@ -102,7 +83,7 @@ impl AppState { let redis_cli_clone = redis_cli.clone(); let tx_new = system_tx.clone(); let result = Arc::new(AppState { - dev_config: dev_config.clone(), + dev_config, redis_cli, system_tx, connectors_mapping: Arc::new(Mutex::new(Hub { @@ -110,72 +91,6 @@ impl AppState { })), }); - // backup job - let dev_config_backup = dev_config.clone(); - tokio::spawn(async move { - let m_cfg = dev_config_backup.clone(); - - loop { - // auto sync - if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { - warn!("pulling repo unhealthy, retry again in 5 minutes"); - continue; - } - - match read_dir(".").await { - Ok(mut d) => { - while let Ok(Some(entry)) = d.next_entry().await { - let ent_path = entry.path(); - - if let Some(filename) = ent_path.file_name() - && let Some(filename_str) = filename.to_str() - && filename_str.starts_with("gtx") - && filename_str.ends_with(".json") - { - // read file - // - let f = match File::open(ent_path.clone()) { - Ok(f) => f, - Err(_) => continue, - }; - - let buf = BufReader::new(f); - - let commit_from_backup: CommitPayload = - match serde_json::from_reader(buf) { - Ok(cm) => cm, - Err(_) => continue, - }; - - if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { - warn!("pulling repo unhealthy, retry again in 5 minutes"); - continue; - } - - let _ = - invoke_commit_request(m_cfg.clone(), commit_from_backup).await; - - if invoke_push_request(m_cfg.clone()).await.is_ok() { - // push success - info!("push backup success"); - if fs::remove_file(ent_path.clone()).is_ok() { - info!("clean backup"); - } - } - } else { - continue; - } - } - } - Err(_) => {} - } - - info!("[backup] idle"); - - tokio::time::sleep(Duration::from_mins(5)).await; - } - }); - tokio::spawn(async move { let mut lredis = redis_cli_clone.clone(); let current_queue: crossbeam_queue::ArrayQueue = @@ -297,82 +212,6 @@ pub async fn invoke_checkout_request( } } -/// Invoke git pull, may takes sometime -pub async fn invoke_pull_sync_request( - config: DevConfig, -) -> Result> { - let client = reqwest::Client::new(); - - let req_path = config.get_pull_recipe_repo(); - // println!("dbg: {req_path}"); - let res = client.get(req_path).send().await?; - - if res.status() != StatusCode::OK { - // pull fail - - error!( - "invoke pull fail: [{}] {:?}", - res.status(), - res.text().await - ); - return Err("pull fail".into()); - } - - match res.text().await { - Ok(raw) => Ok(raw), - Err(e) => Err(format!("{e}").into()), - } -} - -/// Invoke sending from server to server for committing -pub async fn invoke_commit_request( - config: DevConfig, - payload: CommitPayload, -) -> Result<(), Box> { - let client = reqwest::Client::new(); - let commit_path = config.get_post_file_to_recipe_repo(); - let form = multipart::Form::new() - .text("message", payload.message) - .text("signature_username", payload.signature_username) - .text("signature_email", payload.signature_email) - .text("path", payload.path) - .part( - "file", - multipart::Part::bytes(payload.file_bytes) - .mime_str("application/octet-stream") - .unwrap(), - ); - let response = client.post(commit_path).multipart(form).send().await?; - - info!("commit status: {}", response.status()); - - Ok(()) -} - -pub async fn invoke_push_request(config: DevConfig) -> Result> { - let client = reqwest::Client::new(); - - let req_path = config.get_push_recipe_repo(); - // println!("dbg: {req_path}"); - let res = client.get(req_path).send().await?; - - if res.status() != StatusCode::OK { - // pull fail - - error!( - "invoke push fail: [{}] {:?}", - res.status(), - res.text().await - ); - return Err("push fail".into()); - } - - match res.text().await { - Ok(raw) => Ok(raw), - Err(e) => Err(format!("{e}").into()), - } -} - pub async fn create_recipe_repo_router() -> Router> { Router::new().route("/ws", get(crate::websocket::handler::websocket_handler)) } diff --git a/src/stream/model.rs b/src/stream/model.rs index 7b61e6e..f8519e8 100644 --- a/src/stream/model.rs +++ b/src/stream/model.rs @@ -20,24 +20,15 @@ pub struct StreamDataStart { #[serde(rename = "ref")] #[serde(skip_serializing_if = "Option::is_none")] pub stream_ref: Option, - /// extra data, information - pub metadata: String, } impl IntoStreamMessage for StreamDataStart { const MSG_NAME: &str = "stream_data_start"; fn build(&self) -> serde_json::Value { - let mut payload = serde_json::to_value(self).unwrap(); - - payload.as_object_mut().unwrap().insert( - "to".to_string(), - serde_json::json!(self.stream_ref.clone().unwrap_or_default()), - ); - serde_json::json!({ "type": StreamDataStart::MSG_NAME, - "payload": payload.clone() + "payload": self.clone() }) } @@ -47,18 +38,12 @@ impl IntoStreamMessage for StreamDataStart { } impl StreamDataStart { - pub fn new( - total_size: usize, - chunk_size: usize, - stream_ref: Option, - metadata: String, - ) -> Self { + pub fn new(total_size: usize, chunk_size: usize, stream_ref: Option) -> Self { Self { stream_id: Uuid::new_v4().to_string(), total_size, chunk_size, stream_ref, - metadata, } } @@ -76,7 +61,6 @@ pub struct StreamDataChunk { pub start_idx: usize, /// Chunked data which splited into N items per chunk pub data: Vec, - #[serde(rename = "to")] uid: String, } @@ -121,8 +105,6 @@ where pub struct StreamDataEnd { /// Uuid v4, client must mapping later values with this stream id pub stream_id: String, - /// endpoint user - pub to: String, } impl IntoStreamMessage for StreamDataEnd { @@ -141,10 +123,9 @@ impl IntoStreamMessage for StreamDataEnd { } impl StreamDataEnd { - pub fn new(sid: &str, to: String) -> Self { + pub fn new(sid: &str) -> Self { Self { stream_id: sid.to_string(), - to, } } @@ -161,7 +142,6 @@ pub struct StreamDataExtra { pub exid: String, pub extp: String, pub payload: Vec, - pub to: String, } impl IntoStreamMessage for StreamDataExtra @@ -186,12 +166,11 @@ impl StreamDataExtra where T: Serialize + Clone, { - pub fn new(exid: &str, extp: &str, data: Vec, to: String) -> Self { + pub fn new(exid: &str, extp: &str, data: Vec) -> Self { Self { exid: exid.to_string(), extp: extp.to_string(), payload: data.to_vec(), - to, } } diff --git a/src/websocket/helper.rs b/src/websocket/helper.rs index d481a6f..aa8d287 100644 --- a/src/websocket/helper.rs +++ b/src/websocket/helper.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use super::model::*; use axum::extract::ws::{CloseFrame, Message, WebSocket}; use redis::{TypedCommands, cmd}; @@ -66,21 +64,3 @@ pub fn convert_sys_msg_command(msg: &serde_json::Value) -> Option { Err(_) => None, } } - -pub fn get_extra_parameters(s: String) -> HashMap { - let mut result = HashMap::new(); - - let plist: Vec = s.split(",").map(|x| x.to_string()).collect(); - - for pl in plist { - let sm: Vec = pl.split("=").map(|x| x.to_string()).collect(); - - if sm.len() != 2 { - continue; - } - - result.insert(sm[0].to_string(), sm[1].to_string()); - } - - result -} diff --git a/src/websocket/model.rs b/src/websocket/model.rs index 0bff95b..0f289f2 100644 --- a/src/websocket/model.rs +++ b/src/websocket/model.rs @@ -1,21 +1,5 @@ -use std::fs::File; -use std::io::{Read, Write}; - use serde::{Deserialize, Serialize}; -pub trait Backup: Send + Sync + Serialize { - fn dump_backup(&self) -> Result<(), Box> { - let tx = format!("gtx-{}.json", uuid::Uuid::new_v4().to_string()); - let json = serde_json::to_string(&self).unwrap(); - let json2: serde_json::Value = serde_json::from_str(&json).unwrap(); - - let writer = File::create(format!("./{tx}")).unwrap(); - let _ = serde_json::to_writer(writer, &json2); - - Ok(()) - } -} - /// system message to send back to client, this may be called from other services #[derive(Debug, Serialize, Deserialize)] pub struct SysMessage { @@ -56,27 +40,6 @@ pub struct RecipeRequestPayload { pub parameters: Option, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum PriceRequestAction { - View(String), - Edit(String), -} - -/// Price request payload struct -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct PriceRequestPayload { - /// Allowed interactions of price request - pub action: PriceRequestAction, - /// Country of recipe - pub country: String, - /// Filename to override default get, expect path - pub override_file: Option, - /// Extended infos, required parameters or unimplemented fields in the current struct. Expected pattern `=,=,...` - pub parameters: Option, - /// User info expect at least id, token, name - pub user_info: serde_json::Value, -} - /// Command request for external services #[derive(Debug, Serialize, Deserialize, Clone)] pub struct CommandRequestPayload { @@ -121,21 +84,3 @@ pub struct AuthUserField { pub email: String, pub permissions: String, } - -/// For sending to recipe repo, saving/committing value/file to git -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct CommitPayload { - /// file/content as bytes - pub file_bytes: Vec, - /// path to commit file - pub path: String, - /// signature username part (committer) - pub signature_username: String, - /// signature email part (committer) - pub signature_email: String, - /// commit message - pub message: String, -} - -// use default backup method -impl Backup for CommitPayload {} diff --git a/src/websocket/rw.rs b/src/websocket/rw.rs index 18911fb..cedfbb2 100644 --- a/src/websocket/rw.rs +++ b/src/websocket/rw.rs @@ -51,7 +51,7 @@ pub async fn read( Message::Text(t) => { let req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?; - // info!("get msg: {}", req.type_w); + info!("get msg: {}", req.type_w); match req.type_w.as_str() { "recipe" if req.payload.is_some() => { tasks::recipe::handle_recipe_request( @@ -61,27 +61,7 @@ pub async fn read( req, uid_clone.clone(), ) - .await?; - } - "recipe_versions" if req.payload.is_some() => { - tasks::recipe::handle_recipe_versions_list_request( - config.clone(), - redis.clone(), - tx.clone(), - req, - uid_clone.clone(), - ) - .await?; - } - "price" if req.payload.is_some() => { - tasks::price::handle_price_request( - config.clone(), - redis.clone(), - tx.clone(), - req, - uid_clone.clone(), - ) - .await?; + .await? } "command" if req.payload.is_some() => { tasks::command::handle_command_request(state.clone(), tx.clone(), req) @@ -221,8 +201,6 @@ pub async fn write( } let _ = sender.send(res.to_string().into()).await; - } else { - warn!("failed to send message, as the receiver not detected: {res:?}"); } } TxControlMessage::CloseExist => { diff --git a/src/websocket/tasks/mod.rs b/src/websocket/tasks/mod.rs index 15c63fe..5de2e76 100644 --- a/src/websocket/tasks/mod.rs +++ b/src/websocket/tasks/mod.rs @@ -1,6 +1,5 @@ pub mod auth; pub mod command; -pub mod price; pub mod recipe; pub mod sheet; pub mod watchdog; diff --git a/src/websocket/tasks/price.rs b/src/websocket/tasks/price.rs deleted file mode 100644 index 51b6248..0000000 --- a/src/websocket/tasks/price.rs +++ /dev/null @@ -1,341 +0,0 @@ -use crate::app::*; -use crate::stream::model::{ - IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, -}; -use crate::websocket::{core::*, helper::*, model::*}; - -use log::{debug, error, info, warn}; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::{fs::File, io::Read, sync::Arc}; -use tokio::sync::{Mutex, mpsc::Sender}; - -use crate::websocket::core::WebsocketMessageResult; - -#[allow(non_snake_case)] -#[derive(Debug, Serialize, Deserialize, Clone)] -struct MenuPrice { - ProductCode: String, - NewPrice: serde_json::Value, - #[serde(skip_serializing_if = "Option::is_none")] - StringParam: Option, - #[serde(skip_serializing_if = "Option::is_none")] - Discount: Option, - #[serde(skip_serializing_if = "Option::is_none")] - Percent: Option, - #[serde(skip_serializing_if = "Option::is_none")] - roundup: Option, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct RecipePrice { - #[serde(flatten)] - metadata: serde_json::Value, - content: Vec, -} - -impl RecipePrice { - pub const HIDE_PARAM: &str = "hide=true"; - - pub fn import(path: String) -> RecipePrice { - debug!("try import {path}"); - let mut file = File::open(path).expect("file not found"); - let mut data = String::new(); - - file.read_to_string(&mut data).unwrap(); - - let res: Result = serde_json::from_str(&data); - match res { - Ok(rp) => rp, - Err(e) => { - error!("error while deserialize price: {e}"); - RecipePrice { - content: Vec::new(), - metadata: serde_json::Value::Null, - } - } - } - } - - pub fn import_from_raw_string(raw: String) -> RecipePrice { - let res: Result = serde_json::from_str(&raw); - match res { - Ok(rp) => rp, - Err(e) => { - error!("error while deserialize price: {e}"); - RecipePrice { - content: Vec::new(), - metadata: serde_json::Value::Null, - } - } - } - } - - // only for getting data without modify - pub fn find_by_pd(&self, pd: &str) -> Option { - self.content - .iter() - .find(|x| x.ProductCode.eq(pd)) - .map(|x| x.clone()) - } - - pub fn modify_price_by_pd(&mut self, pd: &str, price: serde_json::Value) { - if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) { - mp.NewPrice = price; - } - } - - fn modify_string_param_by_pd(&mut self, pd: &str, param: &str) { - if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) { - mp.StringParam = Some(param.to_string()); - } - } - - pub fn set_visibility_by_pd(&mut self, pd: &str, show: bool) { - if !show { - self.modify_string_param_by_pd(pd, RecipePrice::HIDE_PARAM); - } else { - self.modify_string_param_by_pd(pd, ""); - } - } - - // NOTE: disable write to file - // pub fn export_to_json_file(self, outpath: Option) { - // let json = serde_json::to_string(&self).unwrap(); - // let json2: serde_json::Value = serde_json::from_str(&json).unwrap(); - - // if let Some(outpath) = outpath { - // let writer = File::create(outpath).unwrap(); - // let _ = serde_json::to_writer_pretty(writer, &json2); - // } else { - // println!("Default save to (execute)/price.json"); - // let writer = File::create("price.json").unwrap(); - // let _ = serde_json::to_writer_pretty(writer, &json2); - // } - // } -} - -/// Get list of price -// pub async fn handle_price_list_request( -// config: DevConfig, -// redis: redis::Client, -// tx: Sender, -// req: WebsocketMessageRequest, -// uid_clone: Arc>, -// ) -> WebsocketMessageResult { - -// } - -/// Get main price profile of country -pub async fn handle_price_request( - config: DevConfig, - redis: redis::Client, - tx: Sender, - req: WebsocketMessageRequest, - uid_clone: Arc>, -) -> WebsocketMessageResult { - let p = req.payload.unwrap(); - - let price_param: PriceRequestPayload = serde_json::from_value(p)?; - - let mut price_file_format = format!( - "{}/profile_{}_master.json", - price_param.country, - price_param.country.to_uppercase() - ); - - if let Some(override_file) = price_param.override_file { - price_file_format = override_file; - } - - let price_action = price_param.action; - - let price_content = - match invoke_checkout_request(config.clone(), price_file_format.clone()).await { - Ok(pc) => pc, - Err(e) => return Err(format!("Cannot find price of expected country: {e:?}").into()), - }; - - info!("price content len: {}", price_content.len()); - - let mut rpp = RecipePrice::import_from_raw_string(price_content); - let _uid = uid_clone.clone(); - let uidd = _uid.try_lock().unwrap(); - - info!("price action: {price_action:?}"); - - let user_info = price_param.user_info.clone(); - - match price_action { - PriceRequestAction::View(view_opt) => { - let viewing_options: HashMap = get_extra_parameters(view_opt); - // sa=all - // sa=get,pd=... - // sa=query,list=1|2|3 - // sa=query,where=contain,kw=... - let sub_action = viewing_options.get("sa"); - let pd = viewing_options.get("pd"); - - if let Some(sa) = sub_action { - let mut result = Vec::new(); - let action_done = match sa.as_str() { - "all" => { - result = rpp.content; - - true - } - "get" - if let Some(pd) = pd - && let Some(mp) = rpp.find_by_pd(pd) => - { - result.push(mp); - true - } - _ => false, - }; - - if action_done { - let _ = tx - .send(TxControlMessage::Payload(serde_json::json!({ - "type": "price", - "payload": { - "req_action": sa, - "status": "ok", - "content": result, - "to": uidd.to_string() - } - }))) - .await; - } else { - let _ = tx - .send(TxControlMessage::Payload(serde_json::json!({ - "type": "price", - "payload": { - "req_action": sa, - "status": "fail", - "to": uidd.to_string() - } - }))) - .await; - } - } - } - PriceRequestAction::Edit(edit_opt) => { - let editing_options: HashMap = get_extra_parameters(edit_opt); - // sa=change,pd=...,to=... - // sa=hide,pd=... - // sa=disable,pd=... disable = hide - // sa=show,pd=... - // sa=toggle,pd=...,state=show|hide - let sub_action = editing_options.get("sa"); - let pd = editing_options.get("pd"); - let to = editing_options.get("to"); - let state = editing_options.get("state"); - - if let Some(sa) = sub_action { - let mut action_message = String::new(); - let action_done = match sa.as_str() { - "change" - if let Some(pd) = pd - && let Some(to) = to - && let Some(mp) = rpp.find_by_pd(pd.as_str()) => - { - info!( - "[CHANGE] price of {pd} from {} to {to}", - mp.NewPrice.as_i64().unwrap() - ); - action_message = format!( - "[CHANGE] price of {pd} from {} to {to}", - mp.NewPrice.as_i64().unwrap() - ); - - let price_int = to.parse::()?; - - rpp.modify_price_by_pd(pd, serde_json::json!(price_int)); - true - } - "toggle" - if let Some(pd) = pd - && let Some(state) = state => - { - info!("[TOGGLE] {pd} to {state}"); - action_message = format!("[TOGGLE] {pd} to {state}"); - - match state.as_str() { - "show" => rpp.set_visibility_by_pd(pd.as_str(), true), - "hide" | "disable" => rpp.set_visibility_by_pd(pd.as_str(), false), - _ => { - warn!("unknown state toggle"); - } - } - true - } - _ => false, - }; - - if action_done { - let _ = tx - .send(TxControlMessage::Payload(serde_json::json!({ - "type": "price", - "payload": { - "req_action": sa, - "status": "ok", - "to": uidd.to_string() - } - }))) - .await; - - // send save - // - - let all_prices_str = serde_json::to_string_pretty(&rpp)?; - let commit_payload = CommitPayload { - file_bytes: all_prices_str.as_bytes().to_vec(), - path: price_file_format.clone(), - signature_username: user_info - .get("displayName") - .unwrap_or_default() - .as_str() - .unwrap_or(&"unknown".to_string()) - .to_string(), - signature_email: user_info - .get("email") - .unwrap_or_default() - .as_str() - .unwrap_or(&"unknown".to_string()) - .to_string(), - message: action_message, - }; - - if invoke_pull_sync_request(config.clone()).await.is_err() { - // backup - let _ = commit_payload.dump_backup(); - return Err("Fail to sync repo, backing up ...".into()); - } - - let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await; - - if invoke_push_request(config.clone()).await.is_err() { - let _ = commit_payload.dump_backup(); - return Err("Fail to push repo, backing up ...".into()); - } - - // push to git - } else { - let _ = tx - .send(TxControlMessage::Payload(serde_json::json!({ - "type": "price", - "payload": { - "req_action": sa, - "status": "fail", - "to": uidd.to_string() - } - }))) - .await; - } - } - } - } - - Ok(()) -} diff --git a/src/websocket/tasks/recipe.rs b/src/websocket/tasks/recipe.rs index e846583..539c331 100644 --- a/src/websocket/tasks/recipe.rs +++ b/src/websocket/tasks/recipe.rs @@ -4,7 +4,6 @@ use crate::stream::model::{ }; use crate::websocket::{core::*, helper::*, model::*}; -use std::collections::HashMap; use std::{fs::File, io::Read, path::PathBuf, sync::Arc}; use async_compression::tokio::bufread::BrotliDecoder; @@ -93,7 +92,6 @@ pub async fn throttle_send_recipe( r01s.len(), CHUNK_SIZE, Some(uid.try_lock().unwrap().to_string()), - format!("version={version},country={country}").to_string(), ); let sid = ss.get_id(); @@ -120,7 +118,7 @@ pub async fn throttle_send_recipe( for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_{index}"); - let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec(), uidd.clone()); + let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_matset.as_msg())) @@ -134,7 +132,7 @@ pub async fn throttle_send_recipe( let extl = "topplist"; for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_tl{index}"); - let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec(), uidd.clone()); + let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_topplist.as_msg())) .await @@ -147,7 +145,7 @@ pub async fn throttle_send_recipe( let extg = "toppgrp"; for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_tg{index}"); - let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec(), uidd.clone()); + let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_toppgrp.as_msg())) .await @@ -165,7 +163,7 @@ pub async fn throttle_send_recipe( info!("sending {sid}"); // return sid; - let end_msg = StreamDataEnd::new(&sid, uidd.clone()); + let end_msg = StreamDataEnd::new(&sid); if let Some(err) = tx .send(TxControlMessage::Payload(end_msg.as_msg())) @@ -224,23 +222,6 @@ pub async fn handle_recipe_request( }; } - // detect if use different version - // parameter: use_legacy_version=true,version=888 - if let Some(extra_param) = recipe_param.clone().parameters { - let pmap = get_extra_parameters(extra_param); - - latest_version = if pmap.contains_key("use_legacy_version") - && let Some(legacy_cfg) = pmap.get("use_legacy_version") - && legacy_cfg.eq("true") - { - pmap.get("version").unwrap_or(&latest_version).to_string() - } else { - latest_version - }; - - info!("after param in recipe: {latest_version}"); - } - let req_file = if is_req_patch(&recipe_param) { format!( "stx_{country}_{version}.json", @@ -372,47 +353,3 @@ pub async fn handle_recipe_request( Ok(()) } - -pub async fn handle_recipe_versions_list_request( - config: DevConfig, - redis: redis::Client, - tx: Sender, - req: WebsocketMessageRequest, - uid_clone: Arc>, -) -> WebsocketMessageResult { - let p = req.payload.unwrap(); - let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?; - - let version_list = format!("{country}", country = recipe_param.country); - - let country_versions_str = match invoke_checkout_request(config.clone(), version_list).await { - Ok(vs) => vs, - Err(e) => return Err(format!("Cannot find versions of expected country: {e:?}").into()), - }; - - // extract version as list - let files: Vec = country_versions_str - .split(",") - .map(|x| x.to_string()) - .collect(); - - let result: Vec = files - .iter() - .filter(|x| x.starts_with("coffeethai02") && x.ends_with(".json")) - .map(|x| x.replace("coffeethai02_", "").replace(".json", "")) - .collect(); - - let uidd = uid_clone.clone().try_lock().unwrap().to_string(); - - let _ = tx - .send(TxControlMessage::Payload(serde_json::json!({ - "type": "version_selectors", - "payload": { - "versions": result, - "to": uidd - } - }))) - .await; - - Ok(()) -} diff --git a/src/websocket/tasks/watchdog.rs b/src/websocket/tasks/watchdog.rs index 1092f9d..4c2ddc3 100644 --- a/src/websocket/tasks/watchdog.rs +++ b/src/websocket/tasks/watchdog.rs @@ -21,7 +21,7 @@ pub async fn get_watchdog_task( let h = hub.try_lock().unwrap(); let curr_user = user.try_lock().unwrap().to_string(); - // info!("{}: checking invalid ...", curr_user); + info!("{}: checking invalid ...", curr_user); if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") { warn!("detect unauthorized -- {}", curr_user);