use crate::app::*; use crate::stream::model::{ IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, }; use crate::websocket::{core::*, helper::*, model::*}; use std::{fs::File, io::Read, path::PathBuf, sync::Arc}; use async_compression::tokio::bufread::BrotliDecoder; use axum::extract::ws::{Message, WebSocket}; use futures::{ SinkExt, StreamExt, stream::{SplitSink, SplitStream}, }; use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01}; use log::{error, info, warn}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use redis::{self, TypedCommands}; use tokio::{ io::{AsyncReadExt, BufReader}, sync::{ Mutex, mpsc::{Receiver, Sender}, }, time::Instant, }; pub fn is_req_patch(param: &RecipeRequestPayload) -> bool { param.version != -1 && param.partial.is_some() && param.partial.unwrap() } pub fn get_local_file(filename: String) -> Result { File::open(PathBuf::from(filename)) } pub fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt: i32) -> String { if is_patch { format!("stx_{country}_{version}.json") } else { match retry_cnt { 1 => { format!("master:{country}/coffeethai02_{version}_{country}.json") } 2 => { format!("master:{country}/coffeethai02_{version}.json") } 3 => { // do checkout format!("{country}/coffeethai02_{version}_{country}.json") } 4 => { // do checkout format!("{country}/coffeethai02_{version}.json") } 5 => { // checkout case premium format!("{country}/coffeethai02_1{version}.json") } _ => "".to_string(), } } } pub async fn throttle_send_recipe( recipe: &Recipe, tx: &Sender, country: String, version: String, uid: Arc>, ) { let r01s: Vec = recipe .Recipe01 .par_iter() .flat_map(|x| { let mut v = Vec::new(); v.push(x.clone()); if let Some(sub) = x.clone().SubMenu { v.extend(sub); } v }) .collect(); let matset: Vec = recipe.MaterialSetting.clone(); // test stream start model let ss = StreamDataStart::new( r01s.len(), CHUNK_SIZE, Some(uid.try_lock().unwrap().to_string()), ); let sid = ss.get_id(); info!("starting {sid}"); if let Some(err) = tx.send(TxControlMessage::Payload(ss.as_msg())).await.err() { println!("ERR: send tx error, {err:?}"); } // split send let uidd = uid.try_lock().unwrap().to_string(); for (index, chunk) in r01s.chunks(CHUNK_SIZE).enumerate() { let sda = StreamDataChunk::new(&sid, index * CHUNK_SIZE, chunk.to_vec(), uidd.to_string()); // no validate if let Some(err) = tx.send(TxControlMessage::Payload(sda.as_msg())).await.err() { println!("ERR: send tx error, {err:?}"); } } let mat_exid = sid.clone(); let extp = "matset"; for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_{index}"); let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_matset.as_msg())) .await .err() { println!("ERR: send tx extra error: {err:?}"); } } let extl = "topplist"; for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_tl{index}"); let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_topplist.as_msg())) .await .err() { println!("ERR: send tx extra2 error: {err:?}"); } } let extg = "toppgrp"; for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_tg{index}"); let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_toppgrp.as_msg())) .await .err() { println!("ERR: send tx extra2 error: {err:?}"); } } // NOTE: disable from case concurrent write may causes corrupted file // let rp_clone = recipe.clone(); // tokio::task::spawn(async move { // rp_clone.export_to_json_file(Some(format!("result.{country}.{version}.json"))); // }); info!("sending {sid}"); // return sid; let end_msg = StreamDataEnd::new(&sid); if let Some(err) = tx .send(TxControlMessage::Payload(end_msg.as_msg())) .await .err() { println!("ERR: send tx error, {err:?}"); } } // TODO: split cases into sub function pub async fn handle_recipe_request( config: DevConfig, redis: redis::Client, tx: Sender, req: WebsocketMessageRequest, uid_clone: Arc>, ) -> WebsocketMessageResult { // guard expect value let p = req.payload.unwrap(); let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?; // get actual version // let latest_key = format!("{country}/version", country = recipe_param.country); let mut latest_version = match fetch_content_from_redis_byte(redis.clone(), &latest_key).await { Ok(x) => { // decode brotli let mut sbuf = String::new(); let mut decoder = BrotliDecoder::new(x.as_slice()); match decoder.read_to_string(&mut sbuf).await { Ok(_) => sbuf.replace('"', ""), Err(e) => { println!("decode fail: {e}"); "".to_string() } } } Err(e) => { println!("get latest fail: {e}"); "".to_string() } }; if latest_version.is_empty() { // cannot get actual version, try get from git latest_version = match invoke_checkout_request(config.clone(), latest_key).await { Ok(version) => version, Err(e) => { println!("Error on checkout: {e}"); "".to_string() } }; } let req_file = if is_req_patch(&recipe_param) { format!( "stx_{country}_{version}.json", country = recipe_param.country, version = latest_version ) } else { format!( "result.{country}.{version}.json", country = recipe_param.country, version = latest_version ) }; let mut retry_cnt = 0; println!("init req: {req_file}"); match get_local_file(req_file) { Ok(mut f) => { println!("get local file ok"); let mut file_content = String::new(); f.read_to_string(&mut file_content)?; if !file_content.is_empty() { info!("local file -> buffer OK"); } // split send let recipe: Recipe = match serde_json::from_str(&file_content) { Ok(c) => c, Err(e) => { error!("error deserialize struct fail, file may be corrupted: {e:?}"); if !file_content.ends_with("}") { error!("File corrupted, invalid json format"); } let _ = tx.send(TxControlMessage::Payload(serde_json::json!({ "type": "notify", "payload": { "from": "system_tx", "level": "error", "msg": format!("Some requested file on cache is corrupt, {} version {}", recipe_param.country, latest_version), "to": "" } }))).await; return Err(e.into()); } }; throttle_send_recipe( &recipe, &tx, recipe_param.country, latest_version, uid_clone.clone(), ) .await; } Err(_) => { println!("retry by fetching git"); let lvc = latest_version.clone(); // concurrent fetch for i in 1..6 { let latest_version_c = lvc.clone(); retry_cnt = i; // retry #1: get from redis let r1_key = get_key_cache( recipe_param.clone().country, latest_version_c.clone(), is_req_patch(&recipe_param), retry_cnt, ); println!("curr key: {r1_key}"); if retry_cnt < 3 { match fetch_content_from_redis_byte(redis.clone(), &r1_key).await { Ok(res) => { let buf = BufReader::new(res.as_slice()); let mut sbuf = String::new(); let mut decoder = BrotliDecoder::new(buf); if let Ok(_) = decoder.read_to_string(&mut sbuf).await { let recipe: Recipe = serde_json::from_str(&sbuf)?; throttle_send_recipe( &recipe, &tx, recipe_param.country, latest_version, uid_clone.clone(), ) .await; break; } } Err(_) => {} } } else { // retry get from git let content = match invoke_checkout_request(config.clone(), r1_key).await { Ok(file_content) => file_content, Err(e) => { println!("Error on checkout: {e}"); "".to_string() } }; let recipe = serde_json::from_str::(&content); if let Ok(rp) = recipe { throttle_send_recipe( &rp, &tx, recipe_param.clone().country, latest_version_c.clone(), uid_clone.clone(), ) .await; break; } else { info!("fail to deserialize: {}", content); } } } } } Ok(()) }