diff --git a/Cargo.lock b/Cargo.lock index 5d955e5..4e23c48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,6 +19,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -43,6 +52,56 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "arbitrary" version = "1.4.2" @@ -320,6 +379,12 @@ dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "combine" version = "4.6.7" @@ -617,6 +682,29 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1139,12 +1227,42 @@ dependencies = [ "serde", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itoa" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "jiff" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "jni" version = "0.21.1" @@ -1401,6 +1519,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "openssl-probe" version = "0.1.6" @@ -1542,6 +1666,21 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + +[[package]] +name = "portable-atomic-util" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -1758,6 +1897,35 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "reqwest" version = "0.13.1" @@ -2040,8 +2208,10 @@ dependencies = [ "chrono", "crossbeam-queue", "dotenv", + "env_logger", "futures", "libtbr", + "log", "rayon", "redis", "reqwest", @@ -2550,6 +2720,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.20.0" diff --git a/Cargo.toml b/Cargo.toml index 7fc0931..aeac9d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,10 @@ celes = "2.6.0" chrono = "0.4.43" crossbeam-queue = "0.3.12" dotenv = "0.15.0" +env_logger = "0.11.9" futures = "0.3.32" libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" } +log = "0.4.29" rayon = "1.11.0" redis = { version = "1.0.2", features = ["tokio-comp"] } reqwest = "0.13.1" diff --git a/src/main.rs b/src/main.rs index efc0ac7..2bcc4d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use futures::{ stream::{SplitSink, SplitStream}, }; use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01}; +use log::info; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use redis::{TypedCommands, cmd}; use serde::{Deserialize, Serialize}; @@ -41,6 +42,8 @@ use crate::{ mod stream; mod tx; +const CHUNK_SIZE: usize = 200; + // features // - get result from recipe_repo // - store in redis @@ -105,6 +108,22 @@ impl DevConfig { // Ok(()) // } +async fn invoke_checkout_request( + config: DevConfig, + path: String, +) -> Result> { + let client = reqwest::Client::new(); + + let req_path = config.get_file_from_recipe_repo(path); + // println!("dbg: {req_path}"); + let res = client.get(req_path).send().await?; + + match res.text().await { + Ok(raw) => Ok(raw), + Err(e) => Err(format!("{e}").into()), + } +} + pub async fn create_recipe_repo_router() -> Router> { Router::new() // .route("/", get(get_root_files)) @@ -160,59 +179,6 @@ async fn handle_socket( tokio::spawn(write(sender, rx)); tokio::spawn(read(state, receiver, tx.clone(), user_sys_rx)); - // NOTE: - - // while let Some(msg) = socket.recv().await { - // if let Ok(msg) = msg { - // match msg { - // Message::Text(utf8_bytes) => { - // println!("text recv: {utf8_bytes}"); - - // let req = utf8_bytes.clone().as_str().to_string(); - // let req_struct: serde_json::Value = match serde_json::from_str(&req) { - // Ok(r) => r, - // Err(_) => serde_json::Value::Null, - // }; - - // // response - // let n_state = state.clone(); - // let response = - // handle_recipe_websocket_message(req_struct, n_state, &mut socket).await?; - // let response_next = match serde_json::to_string(&response) { - // Ok(p) => p, - // Err(_) => "{'result': 'error'}".to_string(), - // }; - - // let result = socket.send(Message::Text(response_next.into())).await; - // if let Err(error) = result { - // println!("Error sending: {error}"); - // send_close_message(socket, 1011, &format!("Error occured: {error}")).await; - // break; - // } - // } - // Message::Binary(bytes) => { - // println!("recv bytes len: {}", bytes.len()); - // let result = socket - // .send(Message::Text( - // format!("recv bytes len: {}", bytes.len()).into(), - // )) - // .await; - // if let Err(error) = result { - // println!("Error sending: {error}"); - // send_close_message(socket, 1011, &format!("Error occured: {error}")).await; - // break; - // } - // } - // _ => {} - // } - // } else { - // let error = msg.err().unwrap(); - // println!("Error while receiving message: {error:?}"); - // send_close_message(socket, 1011, &format!("Error occured: {error}")).await; - // break; - // } - // } - Ok(()) } @@ -257,18 +223,6 @@ async fn fetch_content_from_redis_byte(redis: redis::Client, key: &str) -> Resul return Ok(vec![]); } } - - // match res { - // Ok(s) => { - // if let Some(res) = s { - // Ok(res.as_bytes().to_vec()) - // } else { - // Err(format!("result error from key: {key}")) - // } - // } - // Err(e) => Err(format!("redis get failed: {e}")), - // } - // } #[derive(Serialize, Deserialize)] @@ -309,27 +263,6 @@ fn convert_ack_command(cmd_req: &serde_json::Value) -> Option, -// description: Option, -// tags: String, -// status: MenuStatus, -// } - -// #[derive(Serialize, Deserialize)] -// enum MenuStatus { -// #[serde(rename = "ready")] -// Ready, -// #[serde(rename = "obsolete")] -// Obsolete, -// #[serde(rename = "drafted")] -// Drafted, -// Status(String), // additional status -// } - async fn read( // redis: redis::Client, mut state: Arc, @@ -339,6 +272,7 @@ async fn read( // cmd_atom: crossbeam_queue::ArrayQueue, ) -> Result<(), Box> { let redis = state.redis_cli.clone(); + let config = state.dev_config.clone(); let tx_to_client = tx.clone(); tokio::spawn(async move { @@ -365,10 +299,9 @@ async fn read( // get actual version // let latest_key = - format!("master:{country}/version", country = recipe_param.country); + format!("{country}/version", country = recipe_param.country); - println!("latest key: {latest_key}"); - let latest_version = + let mut latest_version = match fetch_content_from_redis_byte(redis.clone(), &latest_key).await { Ok(x) => { // decode brotli @@ -390,6 +323,18 @@ async fn read( } }; + if latest_version.is_empty() { + // cannot get actual version, try get from git + latest_version = + match invoke_checkout_request(config.clone(), latest_key).await { + Ok(version) => version, + Err(e) => { + println!("Error on checkout: {e}"); + "".to_string() + } + }; + } + let req_file = if is_req_patch(&recipe_param) { format!( "stx_{country}_{version}.json", @@ -405,7 +350,6 @@ async fn read( }; let mut retry_cnt = 0; - let sid = Uuid::new_v4(); println!("init req: {req_file}"); match get_local_file(req_file) { @@ -416,67 +360,15 @@ async fn read( // split send let recipe: Recipe = serde_json::from_str(&file_content)?; - // concat all recipes including subs - let r01s: Vec = recipe - .Recipe01 - .par_iter() - .flat_map(|x| { - let mut v = Vec::new(); - v.push(x.clone()); - - if let Some(sub) = x.clone().SubMenu { - v.extend(sub); - } - - v - }) - .collect(); - - let matset: Vec = recipe.MaterialSetting.clone(); - - let ss = StreamDataStart::new( - r01s.len(), - 100, - Some("local".to_string()), - ); - let sid = ss.get_id(); - - if let Some(err) = tx.send(ss.as_msg()).await.err() { - println!("ERR: send tx error, {err:?}"); - } - - for (index, chunk) in r01s.chunks(100).enumerate() { - let sda = - StreamDataChunk::new(&sid, index * 100, chunk.to_vec()); - - // no validate - if let Some(err) = tx.send(sda.as_msg()).await.err() { - println!("ERR: send tx error, {err:?}"); - } - } - - let mat_exid = sid.clone(); - let extp = "matset"; - for (index, chunk) in matset.chunks(100).enumerate() { - let curr_ch_id = format!("{mat_exid}_{index}"); - - let extra_matset = - StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec()); - - if let Some(err) = tx.send(extra_matset.as_msg()).await.err() { - println!("ERR: send tx extra error: {err:?}"); - } - } - - let end_msg = StreamDataEnd::new(&sid); - - if let Some(err) = tx.send(end_msg.as_msg()).await.err() { - println!("ERR: send tx error, {err:?}"); - } + throttle_send_recipe( + &recipe, + &tx, + recipe_param.country, + latest_version, + ) + .await; } Err(_) => { - // savable sid means it could find data but not guarantee if sendable - let mut success_sid = String::new(); // concurrent fetch for i in 1..5 { retry_cnt = i; @@ -505,109 +397,23 @@ async fn read( { let recipe: Recipe = serde_json::from_str(&sbuf)?; - - let r01s: Vec = recipe - .Recipe01 - .par_iter() - .flat_map(|x| { - let mut v = Vec::new(); - v.push(x.clone()); - - if let Some(sub) = x.clone().SubMenu { - v.extend(sub); - } - - v - }) - .collect(); - - let matset: Vec = - recipe.MaterialSetting.clone(); - - // test stream start model - let ss = StreamDataStart::new( - r01s.len(), - 100, - Some("redis".to_string()), - ); - - let sid = ss.get_id(); - success_sid = sid.clone(); - - if let Some(err) = - tx.send(ss.as_msg()).await.err() - { - println!("ERR: send tx error, {err:?}"); - } - - // split send - - for (index, chunk) in - r01s.chunks(100).enumerate() - { - let sda = StreamDataChunk::new( - &sid, - index * 100, - chunk.to_vec(), - ); - - // no validate - if let Some(err) = - tx.send(sda.as_msg()).await.err() - { - println!("ERR: send tx error, {err:?}"); - } - } - - let mat_exid = sid.clone(); - let extp = "matset"; - for (index, chunk) in - matset.chunks(100).enumerate() - { - let curr_ch_id = - format!("{mat_exid}_{index}"); - - let extra_matset = StreamDataExtra::new( - &curr_ch_id, - &extp, - chunk.to_vec(), - ); - - if let Some(err) = tx - .send(extra_matset.as_msg()) - .await - .err() - { - println!( - "ERR: send tx extra error: {err:?}" - ); - } - } - - let rp_clone = recipe.clone(); - tokio::task::spawn(async move { - rp_clone.export_to_json_file(Some( - format!( - "result.{country}.{version}.json", - country = recipe_param.country, - version = latest_version - ), - )); - }); + throttle_send_recipe( + &recipe, + &tx, + recipe_param.country, + latest_version, + ) + .await; break; } } Err(_) => {} } + } else { + // retry get from git } } - - let end_msg = StreamDataEnd::new(&success_sid); - - if let Some(err) = tx.send(end_msg.as_msg()).await.err() { - println!("ERR: send tx error, {err:?}"); - } } } } @@ -653,8 +459,11 @@ async fn write( while let Some(res) = rx.recv().await { // no check // println!("sending {res:?}"); + info!("sending to client ..."); let _ = sender.send(res.to_string().into()).await; - let _ = tokio::time::sleep(Duration::from_millis(100)).await; + + // リミットブレく - limito breaku!! (uncomment to slow down messages) + // let _ = tokio::time::sleep(Duration::from_millis(100)).await; } Ok(()) } @@ -691,6 +500,94 @@ fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt: i3 } } +async fn throttle_send_recipe( + recipe: &Recipe, + tx: &Sender, + country: String, + version: String, +) { + let r01s: Vec = recipe + .Recipe01 + .par_iter() + .flat_map(|x| { + let mut v = Vec::new(); + v.push(x.clone()); + + if let Some(sub) = x.clone().SubMenu { + v.extend(sub); + } + + v + }) + .collect(); + + let matset: Vec = recipe.MaterialSetting.clone(); + + // test stream start model + let ss = StreamDataStart::new(r01s.len(), CHUNK_SIZE, Some("redis".to_string())); + + let sid = ss.get_id(); + info!("starting {sid}"); + + if let Some(err) = tx.send(ss.as_msg()).await.err() { + println!("ERR: send tx error, {err:?}"); + } + + // split send + + for (index, chunk) in r01s.chunks(CHUNK_SIZE).enumerate() { + let sda = StreamDataChunk::new(&sid, index * CHUNK_SIZE, chunk.to_vec()); + + // no validate + if let Some(err) = tx.send(sda.as_msg()).await.err() { + println!("ERR: send tx error, {err:?}"); + } + } + + let mat_exid = sid.clone(); + let extp = "matset"; + for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() { + let curr_ch_id = format!("{mat_exid}_{index}"); + + let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec()); + + if let Some(err) = tx.send(extra_matset.as_msg()).await.err() { + println!("ERR: send tx extra error: {err:?}"); + } + } + + let extl = "topplist"; + for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() { + let curr_ch_id = format!("{mat_exid}_tl{index}"); + let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec()); + if let Some(err) = tx.send(extra_topplist.as_msg()).await.err() { + println!("ERR: send tx extra2 error: {err:?}"); + } + } + + let extg = "toppgrp"; + for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() { + let curr_ch_id = format!("{mat_exid}_tg{index}"); + let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec()); + if let Some(err) = tx.send(extra_toppgrp.as_msg()).await.err() { + println!("ERR: send tx extra2 error: {err:?}"); + } + } + + let rp_clone = recipe.clone(); + tokio::task::spawn(async move { + rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json"))); + }); + info!("sending {sid}"); + + // return sid; + let end_msg = StreamDataEnd::new(&sid); + + if let Some(err) = tx.send(end_msg.as_msg()).await.err() { + println!("ERR: send tx error, {err:?}"); + } +} + pub struct AppState { dev_config: DevConfig, redis_cli: redis::Client, @@ -809,6 +706,8 @@ impl AppState { async fn main() -> Result<(), Box> { println!("Hello, world!"); dotenv::dotenv().ok(); + + env_logger::init(); // send req to repo service let server_port = env::var("PORT").unwrap_or("36579".to_string());