refactor & remove delay

- move duplication in recipe streaming
- remove unnessecary delay when sending back to client
- increase chunk size from 100 to 200

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-03-05 09:31:33 +07:00
parent 377b0df681
commit 8cdba50c40
3 changed files with 322 additions and 245 deletions

176
Cargo.lock generated
View file

@ -19,6 +19,15 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@ -43,6 +52,56 @@ dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78"
[[package]]
name = "anstyle-parse"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "anstyle-wincon"
version = "3.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
dependencies = [
"anstyle",
"once_cell_polyfill",
"windows-sys 0.61.2",
]
[[package]]
name = "arbitrary"
version = "1.4.2"
@ -320,6 +379,12 @@ dependencies = [
"cc",
]
[[package]]
name = "colorchoice"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "combine"
version = "4.6.7"
@ -617,6 +682,29 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "env_filter"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"jiff",
"log",
]
[[package]]
name = "equivalent"
version = "1.0.2"
@ -1139,12 +1227,42 @@ dependencies = [
"serde",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itoa"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2"
[[package]]
name = "jiff"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359"
dependencies = [
"jiff-static",
"log",
"portable-atomic",
"portable-atomic-util",
"serde_core",
]
[[package]]
name = "jiff-static"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "jni"
version = "0.21.1"
@ -1401,6 +1519,12 @@ version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "once_cell_polyfill"
version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "openssl-probe"
version = "0.1.6"
@ -1542,6 +1666,21 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "portable-atomic"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49"
[[package]]
name = "portable-atomic-util"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5"
dependencies = [
"portable-atomic",
]
[[package]]
name = "potential_utf"
version = "0.1.4"
@ -1758,6 +1897,35 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "reqwest"
version = "0.13.1"
@ -2040,8 +2208,10 @@ dependencies = [
"chrono",
"crossbeam-queue",
"dotenv",
"env_logger",
"futures",
"libtbr",
"log",
"rayon",
"redis",
"reqwest",
@ -2550,6 +2720,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.20.0"

View file

@ -11,8 +11,10 @@ celes = "2.6.0"
chrono = "0.4.43"
crossbeam-queue = "0.3.12"
dotenv = "0.15.0"
env_logger = "0.11.9"
futures = "0.3.32"
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
log = "0.4.29"
rayon = "1.11.0"
redis = { version = "1.0.2", features = ["tokio-comp"] }
reqwest = "0.13.1"

View file

@ -17,6 +17,7 @@ use futures::{
stream::{SplitSink, SplitStream},
};
use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01};
use log::info;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use redis::{TypedCommands, cmd};
use serde::{Deserialize, Serialize};
@ -41,6 +42,8 @@ use crate::{
mod stream;
mod tx;
const CHUNK_SIZE: usize = 200;
// features
// - get result from recipe_repo
// - store in redis
@ -105,6 +108,22 @@ impl DevConfig {
// Ok(())
// }
async fn invoke_checkout_request(
config: DevConfig,
path: String,
) -> Result<String, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let req_path = config.get_file_from_recipe_repo(path);
// println!("dbg: {req_path}");
let res = client.get(req_path).send().await?;
match res.text().await {
Ok(raw) => Ok(raw),
Err(e) => Err(format!("{e}").into()),
}
}
pub async fn create_recipe_repo_router() -> Router<Arc<AppState>> {
Router::new()
// .route("/", get(get_root_files))
@ -160,59 +179,6 @@ async fn handle_socket(
tokio::spawn(write(sender, rx));
tokio::spawn(read(state, receiver, tx.clone(), user_sys_rx));
// NOTE:
// while let Some(msg) = socket.recv().await {
// if let Ok(msg) = msg {
// match msg {
// Message::Text(utf8_bytes) => {
// println!("text recv: {utf8_bytes}");
// let req = utf8_bytes.clone().as_str().to_string();
// let req_struct: serde_json::Value = match serde_json::from_str(&req) {
// Ok(r) => r,
// Err(_) => serde_json::Value::Null,
// };
// // response
// let n_state = state.clone();
// let response =
// handle_recipe_websocket_message(req_struct, n_state, &mut socket).await?;
// let response_next = match serde_json::to_string(&response) {
// Ok(p) => p,
// Err(_) => "{'result': 'error'}".to_string(),
// };
// let result = socket.send(Message::Text(response_next.into())).await;
// if let Err(error) = result {
// println!("Error sending: {error}");
// send_close_message(socket, 1011, &format!("Error occured: {error}")).await;
// break;
// }
// }
// Message::Binary(bytes) => {
// println!("recv bytes len: {}", bytes.len());
// let result = socket
// .send(Message::Text(
// format!("recv bytes len: {}", bytes.len()).into(),
// ))
// .await;
// if let Err(error) = result {
// println!("Error sending: {error}");
// send_close_message(socket, 1011, &format!("Error occured: {error}")).await;
// break;
// }
// }
// _ => {}
// }
// } else {
// let error = msg.err().unwrap();
// println!("Error while receiving message: {error:?}");
// send_close_message(socket, 1011, &format!("Error occured: {error}")).await;
// break;
// }
// }
Ok(())
}
@ -257,18 +223,6 @@ async fn fetch_content_from_redis_byte(redis: redis::Client, key: &str) -> Resul
return Ok(vec![]);
}
}
// match res {
// Ok(s) => {
// if let Some(res) = s {
// Ok(res.as_bytes().to_vec())
// } else {
// Err(format!("result error from key: {key}"))
// }
// }
// Err(e) => Err(format!("redis get failed: {e}")),
// }
//
}
#[derive(Serialize, Deserialize)]
@ -309,27 +263,6 @@ fn convert_ack_command(cmd_req: &serde_json::Value) -> Option<CommandRequestPayl
}
}
// /// Result to send to client
// #[derive(Serialize, Deserialize)]
// struct RecipeOverview {
// productCode: String,
// name: Option<String>,
// description: Option<String>,
// tags: String,
// status: MenuStatus,
// }
// #[derive(Serialize, Deserialize)]
// enum MenuStatus {
// #[serde(rename = "ready")]
// Ready,
// #[serde(rename = "obsolete")]
// Obsolete,
// #[serde(rename = "drafted")]
// Drafted,
// Status(String), // additional status
// }
async fn read(
// redis: redis::Client,
mut state: Arc<AppState>,
@ -339,6 +272,7 @@ async fn read(
// cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let redis = state.redis_cli.clone();
let config = state.dev_config.clone();
let tx_to_client = tx.clone();
tokio::spawn(async move {
@ -365,10 +299,9 @@ async fn read(
// get actual version
//
let latest_key =
format!("master:{country}/version", country = recipe_param.country);
format!("{country}/version", country = recipe_param.country);
println!("latest key: {latest_key}");
let latest_version =
let mut latest_version =
match fetch_content_from_redis_byte(redis.clone(), &latest_key).await {
Ok(x) => {
// decode brotli
@ -390,6 +323,18 @@ async fn read(
}
};
if latest_version.is_empty() {
// cannot get actual version, try get from git
latest_version =
match invoke_checkout_request(config.clone(), latest_key).await {
Ok(version) => version,
Err(e) => {
println!("Error on checkout: {e}");
"".to_string()
}
};
}
let req_file = if is_req_patch(&recipe_param) {
format!(
"stx_{country}_{version}.json",
@ -405,7 +350,6 @@ async fn read(
};
let mut retry_cnt = 0;
let sid = Uuid::new_v4();
println!("init req: {req_file}");
match get_local_file(req_file) {
@ -416,67 +360,15 @@ async fn read(
// split send
let recipe: Recipe = serde_json::from_str(&file_content)?;
// concat all recipes including subs
let r01s: Vec<Recipe01> = recipe
.Recipe01
.par_iter()
.flat_map(|x| {
let mut v = Vec::new();
v.push(x.clone());
if let Some(sub) = x.clone().SubMenu {
v.extend(sub);
}
v
})
.collect();
let matset: Vec<MaterialSetting> = recipe.MaterialSetting.clone();
let ss = StreamDataStart::new(
r01s.len(),
100,
Some("local".to_string()),
);
let sid = ss.get_id();
if let Some(err) = tx.send(ss.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
for (index, chunk) in r01s.chunks(100).enumerate() {
let sda =
StreamDataChunk::new(&sid, index * 100, chunk.to_vec());
// no validate
if let Some(err) = tx.send(sda.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
let mat_exid = sid.clone();
let extp = "matset";
for (index, chunk) in matset.chunks(100).enumerate() {
let curr_ch_id = format!("{mat_exid}_{index}");
let extra_matset =
StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec());
if let Some(err) = tx.send(extra_matset.as_msg()).await.err() {
println!("ERR: send tx extra error: {err:?}");
}
}
let end_msg = StreamDataEnd::new(&sid);
if let Some(err) = tx.send(end_msg.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
throttle_send_recipe(
&recipe,
&tx,
recipe_param.country,
latest_version,
)
.await;
}
Err(_) => {
// savable sid means it could find data but not guarantee if sendable
let mut success_sid = String::new();
// concurrent fetch
for i in 1..5 {
retry_cnt = i;
@ -505,109 +397,23 @@ async fn read(
{
let recipe: Recipe =
serde_json::from_str(&sbuf)?;
let r01s: Vec<Recipe01> = recipe
.Recipe01
.par_iter()
.flat_map(|x| {
let mut v = Vec::new();
v.push(x.clone());
if let Some(sub) = x.clone().SubMenu {
v.extend(sub);
}
v
})
.collect();
let matset: Vec<MaterialSetting> =
recipe.MaterialSetting.clone();
// test stream start model
let ss = StreamDataStart::new(
r01s.len(),
100,
Some("redis".to_string()),
);
let sid = ss.get_id();
success_sid = sid.clone();
if let Some(err) =
tx.send(ss.as_msg()).await.err()
{
println!("ERR: send tx error, {err:?}");
}
// split send
for (index, chunk) in
r01s.chunks(100).enumerate()
{
let sda = StreamDataChunk::new(
&sid,
index * 100,
chunk.to_vec(),
);
// no validate
if let Some(err) =
tx.send(sda.as_msg()).await.err()
{
println!("ERR: send tx error, {err:?}");
}
}
let mat_exid = sid.clone();
let extp = "matset";
for (index, chunk) in
matset.chunks(100).enumerate()
{
let curr_ch_id =
format!("{mat_exid}_{index}");
let extra_matset = StreamDataExtra::new(
&curr_ch_id,
&extp,
chunk.to_vec(),
);
if let Some(err) = tx
.send(extra_matset.as_msg())
.await
.err()
{
println!(
"ERR: send tx extra error: {err:?}"
);
}
}
let rp_clone = recipe.clone();
tokio::task::spawn(async move {
rp_clone.export_to_json_file(Some(
format!(
"result.{country}.{version}.json",
country = recipe_param.country,
version = latest_version
),
));
});
throttle_send_recipe(
&recipe,
&tx,
recipe_param.country,
latest_version,
)
.await;
break;
}
}
Err(_) => {}
}
} else {
// retry get from git
}
}
let end_msg = StreamDataEnd::new(&success_sid);
if let Some(err) = tx.send(end_msg.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
}
}
@ -653,8 +459,11 @@ async fn write(
while let Some(res) = rx.recv().await {
// no check
// println!("sending {res:?}");
info!("sending to client ...");
let _ = sender.send(res.to_string().into()).await;
let _ = tokio::time::sleep(Duration::from_millis(100)).await;
// リミットブレく - limito breaku!! (uncomment to slow down messages)
// let _ = tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
@ -691,6 +500,94 @@ fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt: i3
}
}
async fn throttle_send_recipe(
recipe: &Recipe,
tx: &Sender<serde_json::Value>,
country: String,
version: String,
) {
let r01s: Vec<Recipe01> = recipe
.Recipe01
.par_iter()
.flat_map(|x| {
let mut v = Vec::new();
v.push(x.clone());
if let Some(sub) = x.clone().SubMenu {
v.extend(sub);
}
v
})
.collect();
let matset: Vec<MaterialSetting> = recipe.MaterialSetting.clone();
// test stream start model
let ss = StreamDataStart::new(r01s.len(), CHUNK_SIZE, Some("redis".to_string()));
let sid = ss.get_id();
info!("starting {sid}");
if let Some(err) = tx.send(ss.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
// split send
for (index, chunk) in r01s.chunks(CHUNK_SIZE).enumerate() {
let sda = StreamDataChunk::new(&sid, index * CHUNK_SIZE, chunk.to_vec());
// no validate
if let Some(err) = tx.send(sda.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
let mat_exid = sid.clone();
let extp = "matset";
for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_{index}");
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec());
if let Some(err) = tx.send(extra_matset.as_msg()).await.err() {
println!("ERR: send tx extra error: {err:?}");
}
}
let extl = "topplist";
for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_tl{index}");
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec());
if let Some(err) = tx.send(extra_topplist.as_msg()).await.err() {
println!("ERR: send tx extra2 error: {err:?}");
}
}
let extg = "toppgrp";
for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_tg{index}");
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec());
if let Some(err) = tx.send(extra_toppgrp.as_msg()).await.err() {
println!("ERR: send tx extra2 error: {err:?}");
}
}
let rp_clone = recipe.clone();
tokio::task::spawn(async move {
rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json")));
});
info!("sending {sid}");
// return sid;
let end_msg = StreamDataEnd::new(&sid);
if let Some(err) = tx.send(end_msg.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
pub struct AppState {
dev_config: DevConfig,
redis_cli: redis::Client,
@ -809,6 +706,8 @@ impl AppState {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Hello, world!");
dotenv::dotenv().ok();
env_logger::init();
// send req to repo service
let server_port = env::var("PORT").unwrap_or("36579".to_string());