diff --git a/Cargo.lock b/Cargo.lock index ec0a2fb..1a3f576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2698,6 +2698,7 @@ dependencies = [ "axum-macros", "bb8", "bb8-redis", + "brotli", "bytes", "env_logger", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 9d35c40..9ec73b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ axum = "0.8.7" axum-macros = "0.5.0" bb8 = "0.9.1" bb8-redis = "0.26.0" +brotli = "8.0.2" bytes = "1.11.1" env_logger = "0.11.8" futures-util = "0.3.31" diff --git a/src/app.rs b/src/app.rs index 1358f81..60c29d8 100644 --- a/src/app.rs +++ b/src/app.rs @@ -9,20 +9,21 @@ use axum::{ routing::{get, post}, }; use axum_macros::debug_handler; -use bb8::Pool; +use bb8::{Pool, PooledConnection}; use bb8_redis::RedisConnectionManager; -use git2::{Cred, FetchOptions, PushOptions, RemoteCallbacks, Repository}; +use brotli::CompressorWriter; +use git2::{Cred, FetchOptions, Object, PushOptions, RemoteCallbacks, Repository}; use image::load_from_memory; use log::{error, info, warn}; -use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection}; +use redis::{AsyncTypedCommands, Connection, TypedCommands, aio::MultiplexedConnection}; use reqwest::header; use serde::Deserialize; use serde_json::{Value, json}; use tokio::{ io::{AsyncReadExt, BufReader}, sync::{ - Mutex, + Mutex, MutexGuard, mpsc::{Receiver, Sender, channel}, }, task::JoinSet, @@ -45,7 +46,7 @@ type RedisPool = Pool; #[derive(Debug, Clone)] pub struct CacheJob { pub rel_path: String, - pub file_data: Option, + pub file_data: Option>, } #[derive(Clone)] @@ -82,16 +83,34 @@ fn is_supported_image_file_type(name: String) -> bool { SUPPORTED_IMAGES.contains(&ext) } +fn detect_content_type(path: &str) -> String { + let ext = path.split(".").last().unwrap_or(""); + if ext.eq(".json") { + "application/json".to_string() + } else if SUPPORTED_IMAGES.contains(&ext) { + format!("image/{ext}").to_string() + } else { + "application/octet-stream".to_string() + } +} + #[derive(Deserialize, Debug)] struct CheckoutParams { path: String, } async fn checkout_handler( - State(mut state): State, + State(state): State, Query(param): Query, ) -> impl IntoResponse { - let mut response: HashMap = HashMap::new(); + let fpath = format!( + "{}:{}", + state + .get_config("GIT_REPO_BRANCH_NAME") + .map(|x| x.to_string()) + .unwrap_or("master".to_string()), + param.path + ); // println!( // "req: {param:?}, has_cached: {}, saved: {:?}", @@ -102,140 +121,147 @@ async fn checkout_handler( // ); // quick response from redis - if let Ok(mut conn) = state.redis.get().await - && state - .fetched - .contains(¶m.path.clone().as_str().to_string()) - { - if let Ok(res) = conn.get(param.path.clone().as_str().to_string()).await { - if let Some(res) = res { - response.insert("result".to_string(), json!(res)); - return (axum::http::StatusCode::OK, Json(json!(response))).into_response(); - } - } - } + if let Ok(conn) = state.redis.dedicated_connection().await { + println!("conneciton ok"); + if let Ok(raw) = fetch_content_from_redis_byte(conn, &fpath).await + && !raw.is_empty() + { + // detect content type + let content_type = detect_content_type(&fpath); - let repo_path = state.get_config("GIT_REPO_LOCAL_DEST"); - if repo_path.is_none() { - response.insert( - "error".to_string(), - json!("config repo dest not found".to_string()), - ); - return ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(json!(response)), - ) - .into_response(); - } - - let legit_path = param.path.as_str(); - - // match param.path.as_str() { - // legit_path if param.path.contains("/") || state.check_country_existed(param.path.as_str()) || param.path.is_empty() => { - - // } - // _ => { - // let error_log = "requested path is unexpected"; - // error!("{error_log}"); - // response.insert("error".to_string(), json!(error_log)); - // } - // } - let rpath = repo_path.unwrap().clone(); - let repo = match Repository::open_bare(rpath) { - Ok(repo) => repo, - Err(_) => { - let error_log = "unable to open repo as bare"; - error!("{error_log}"); - response.insert("error".to_string(), json!(error_log)); - return ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(json!(response)), - ) - .into_response(); - } - }; - - let fpath = format!( - "{}:{}", - state - .get_config("GIT_REPO_BRANCH_NAME") - .map(|x| x.to_string()) - .unwrap_or("master".to_string()), - legit_path - ); - let obj = match repo.revparse_single(&fpath) { - Ok(obj) => obj, - Err(e) => { - let error_log = format!("unexpected revparse single: {err}", err = e.message()); - error!("{error_log}"); - response.insert("error".to_string(), json!(error_log.clone())); - return ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(json!(response)), - ) - .into_response(); - } - }; - - if let Some(blob) = obj.as_blob() { - // image - if is_supported_image_file_type(fpath.clone()) { - let fc = fpath.clone(); - let ext = fc.split(".").last().unwrap_or(""); - let content: &[u8] = blob.content(); - let img = match load_from_memory(content) { - Ok(img) => img, - Err(err) => { - //.map_err(|err| format!("Failed tp load image from memory: {err}")) - return ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(json!(format!("Failed tp load image from memory: {err}"))), - ) - .into_response(); - } - }; - - let mut image_buffer = Vec::new(); - img.write_to( - &mut Cursor::new(&mut image_buffer), - image::ImageFormat::from_extension(ext).unwrap_or(image::ImageFormat::Png), - ) - .unwrap(); + println!("content-len: {}", raw.len()); return Response::builder() - .header(header::CONTENT_TYPE, format!("image/{ext}")) - .body(Body::from(image_buffer)) + .header(header::CONTENT_TYPE, content_type) + .body(Body::from(raw)) .unwrap(); } - - let content = unsafe { str::from_utf8_unchecked(blob.content()) }; - - return (axum::http::StatusCode::OK, Json(json!(content))).into_response(); - - // if let Ok(mut conn) = state.redis.get().await { - // let _ = conn.set(legit_path.to_string(), content.to_string()).await; - // state.fetched.push(legit_path.to_string()); - // } - } else if let Some(tree) = obj.as_tree() { - let dir_list = tree - .iter() - .map(|x| x.name().unwrap_or("").to_string()) - .collect::>(); - response.insert("result".to_string(), json!(dir_list)); - - return (axum::http::StatusCode::OK, Json(json!(dir_list))).into_response(); - - // if let Ok(mut conn) = state.redis.get().await { - // let _ = conn.set(legit_path.to_string(), dir_list.join(",")).await; - // state.fetched.push(legit_path.to_string()); - // } - } else { - let error_log = "not obj nor tree"; - error!("{error_log}"); - response.insert("error".to_string(), json!(error_log)); - return (axum::http::StatusCode::BAD_REQUEST, Json(json!(response))).into_response(); } + println!("failed to get from redis, continue ..."); + + let repo = state.repo.clone(); + + // let obj = match repo.revparse_single(&fpath) { + // Ok(obj) => obj, + // Err(e) => { + // let error_log = format!("unexpected revparse single: {err}", err = e.message()); + // error!("{error_log}"); + // response.insert("error".to_string(), json!(error_log.clone())); + // return ( + // axum::http::StatusCode::INTERNAL_SERVER_ERROR, + // Json(json!(response)), + // ) + // .into_response(); + // } + // }; + // + + // let obj_bytes = match get_git_object(repo, &fpath).await { + // Ok(ob) => ob, + // Err(e) => { + // return ( + // axum::http::StatusCode::INTERNAL_SERVER_ERROR, + // Json(json!(format!("{e}"))), + // ) + // .into_response(); + // } + // }; + + let fpath_clone = fpath.clone(); + let result = tokio::task::spawn_blocking(move || { + let repo = repo.try_lock()?; + + get_git_object(repo, &fpath_clone) + }) + .await; + + match result { + Ok(Ok(o)) => { + let state_clone = state.clone(); + let key_clone = fpath.clone(); + let data_clone = o.clone(); + + tokio::spawn(async move { + if let Ok(mut conn) = state_clone.redis.dedicated_connection().await { + let _: Result<(), _> = redis::cmd("SET") + .arg(&key_clone) + .arg(&data_clone) + .arg("EX") + .arg(3600) + .query_async(&mut conn) + .await; + } + }); + + let content_type = detect_content_type(&fpath); + Response::builder() + .header(header::CONTENT_TYPE, content_type) + .body(Body::from(o)) + .unwrap() + } + _ => (axum::http::StatusCode::NOT_FOUND, "File not found").into_response(), + } + + // if let Some(blob) = obj.as_blob() { + // // image + // if is_supported_image_file_type(fpath.clone()) { + // let fc = fpath.clone(); + // let ext = fc.split(".").last().unwrap_or(""); + // let content: &[u8] = blob.content(); + // let img = match load_from_memory(content) { + // Ok(img) => img, + // Err(err) => { + // //.map_err(|err| format!("Failed tp load image from memory: {err}")) + // return ( + // axum::http::StatusCode::INTERNAL_SERVER_ERROR, + // Json(json!(format!("Failed tp load image from memory: {err}"))), + // ) + // .into_response(); + // } + // }; + + // let mut image_buffer = Vec::new(); + // img.write_to( + // &mut Cursor::new(&mut image_buffer), + // image::ImageFormat::from_extension(ext).unwrap_or(image::ImageFormat::Png), + // ) + // .unwrap(); + + // return Response::builder() + // .header(header::CONTENT_TYPE, format!("image/{ext}")) + // .body(Body::from(image_buffer)) + // .unwrap(); + // } + + // let content = unsafe { str::from_utf8_unchecked(blob.content()) }; + + // return (axum::http::StatusCode::OK, Json(json!(content))).into_response(); + + // // if let Ok(mut conn) = state.redis.get().await { + // // let _ = conn.set(legit_path.to_string(), content.to_string()).await; + // // state.fetched.push(legit_path.to_string()); + // // } + // } else if let Some(tree) = obj.as_tree() { + // let dir_list = tree + // .iter() + // .map(|x| x.name().unwrap_or("").to_string()) + // .collect::>(); + // response.insert("result".to_string(), json!(dir_list)); + + // return (axum::http::StatusCode::OK, Json(json!(dir_list))).into_response(); + + // // if let Ok(mut conn) = state.redis.get().await { + // // let _ = conn.set(legit_path.to_string(), dir_list.join(",")).await; + // // state.fetched.push(legit_path.to_string()); + // // } + // } else { + // let error_log = "not obj nor tree"; + // error!("{error_log}"); + // response.insert("error".to_string(), json!(error_log)); + // return (axum::http::StatusCode::BAD_REQUEST, Json(json!(response))).into_response(); + // } + // println!("checkout response: {response:?}"); // (axum::http::StatusCode::OK, Json(json!(response))) @@ -244,20 +270,7 @@ async fn checkout_handler( async fn fetch_handler(State(state): State) -> impl IntoResponse { let mut response: HashMap = HashMap::new(); if let Some(repo_path) = state.clone().get_config("GIT_REPO_LOCAL_DEST") { - let rpath = repo_path.clone(); - - let repo = match Repository::open_bare(rpath) { - Ok(repo) => repo, - Err(_) => { - let error_log = "unable to open bare repo"; - error!("{error_log}"); - response.insert("error".to_string(), json!(error_log)); - return ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - Json(json!(response)), - ); - } - }; + let repo = state.repo.lock().await; let mut remote = match repo.find_remote("origin") { Ok(remote) => remote, @@ -437,6 +450,38 @@ async fn fetch_content_from_redis(redis: RedisPool, key: &str) -> Result Result, String> { + let res = redis::cmd("GET").arg(key).query_async(&mut conn).await; + + match res { + Ok(res) => Ok(res), + Err(e) => { + return Err(format!("{e}")); + } + } +} + +fn get_git_object( + repo: MutexGuard<'_, Repository>, + path: &str, +) -> Result, Box> { + let obj = repo.revparse_single(&path)?; + if let Some(blob) = obj.as_blob() { + Ok(blob.content().to_vec()) + } else if let Some(tree) = obj.as_tree() { + let dir_list = tree + .iter() + .map(|x| x.name().unwrap_or("").to_string()) + .collect::>(); + Ok(dir_list.join(",").as_bytes().to_vec()) + } else { + Err("Unknown or not supported".into()) + } +} + fn read_patch_from_str(patch_changes: &str) -> Option { // @@ -667,161 +712,7 @@ async fn health_handler(State(_): State) -> impl IntoResponse { ) } -async fn handle_system_message(redis: Arc>, mut msg: serde_json::Value) { - let topic = if let Some(t) = msg["topic"].as_str() { - t - } else { - "" - }; - - match topic { - "generate_stx" => { - // expect target, base_version, value - let target = msg["target"].as_str().unwrap_or("").to_string(); - let base_version = msg["base_version"].as_str().unwrap_or("").to_string(); - let full_base_filename = msg["full_base_filename"].as_str().unwrap_or("").to_string(); - let value = msg["value"].take(); - - let rclone = redis.clone(); - let mut rcli = rclone.lock().await; - - let doc_id = format!("{target}_{base_version}"); - let doc_version_key = doc_ver_key(&doc_id); - let doc_snap = doc_snap_key(&doc_id); - - let _ = rcli.set( - format!("{target}/{full_base_filename}"), - serde_json::to_string(&value).unwrap_or("".to_string()), - ); - - let _ = rcli.set(format!("{target}/version"), base_version); - - info!("publish {full_base_filename} as main file"); - - // create commit transaction base - - let mut con = match rcli.get_multiplexed_async_connection().await { - Ok(conn) => conn, - Err(e) => { - return; - } - }; - - let stream_key = doc_tx_stream_key(&doc_id); - let tx_event = json!({ - "doc_id": doc_id, - "tx_id": Uuid::new_v4().to_string(), - "version": 1, - "base_version": 0, - "author": "from stx", - "ts": now_unix(), - "ops": vec![ - PatchOp::Add { path: "/".to_string(), value: value.clone() } - ] - }); - - let tx_str = tx_event.to_string(); - - let _: String = redis::cmd("XADD") - .arg(&stream_key) - .arg("*") - .arg("data") - .arg(&tx_str) - .query_async(&mut con) - .await - .unwrap(); - let _: () = con.set(doc_ver_key(&doc_id), 1.to_string()).await.unwrap(); - - // if let Ok(state) = build_state_at(&mut con, &doc_id, 1).await { - // let _: () = con - // .set(doc_snap_key(&doc_id), state.to_string()) - // .await - // .unwrap(); - // let _: () = con - // .set(doc_snapver_key(&doc_id), 1.to_string()) - // .await - // .unwrap(); - // } - } - _ => {} - } -} - -async fn build_state_at( - con: &mut MultiplexedConnection, - doc_id: &str, - target_version: u64, -) -> Result { - let snap_ver: u64 = con - .get(doc_snap_key(&doc_id)) - .await - .ok() - .flatten() - .and_then(|s| s.parse().ok()) - .unwrap_or(0); - - let snap_json: Value = if snap_ver > 0 { - let raw: Option = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None); - raw.and_then(|s| serde_json::from_str(&s).ok()) - .unwrap_or_else(|| json!({})) - } else { - json!({}) - }; - - let stream_key = doc_tx_stream_key(doc_id); - - let start_id = "0-0"; - let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE") - .arg(&stream_key) - .arg(start_id) - .arg("+") - .query_async(con) - .await - .map_err(|e| e.to_string())?; - - let mut ops_list: Vec = Vec::new(); - for (_id, fields) in entries { - let mut data_opts = None; - for (k, v) in fields { - if k == "data" { - data_opts = Some(v); - break; - } - } - let data = match data_opts { - Some(x) => x, - None => continue, - }; - - let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?; - let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0); - if ver == 0 || ver > target_version { - continue; - } - - if ver <= snap_ver { - continue; - } - - let ops: Vec = - serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?; - ops_list.extend(ops); - } - - apply_ops(snap_json, &ops_list) -} - pub async fn run(config: gcm::Configure) -> gcm::StandardResult { - let queue_capacity = config - .get("BOOT_QUEUE_CAP") - .and_then(|x| x.parse().ok()) - .unwrap_or(512); - - let worker_count: usize = config - .get("CACHE_WORKERS") - .and_then(|x| x.parse().ok()) - .unwrap_or(4); - // REDIS POOL // // @@ -834,12 +725,12 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult { )) .expect("failed to create redis conn manager"); let redis_pool = Pool::builder() - .max_size(16) + .max_size(8) .build(manager) .await .expect("failed to build redis pool"); - let (enqueue_tx, enqueue_rx) = channel::(queue_capacity); + let (enqueue_tx, enqueue_rx) = channel::(8); let state = AppState { // cached_country_names: common::valid_country_name(), @@ -852,38 +743,6 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult { enqueue_tx: enqueue_tx.clone(), }; - let sys_redis_cli = state.redis.clone(); - - // tokio::spawn(async move { - // while let Some(msg) = sys_rx.recv().await { - // handle_system_message(sys_redis_cli.clone(), msg).await; - // } - // }) - // .await?; - - // SPAWN WORKER - let worker_handle = spawn_cache_workers(redis_pool.clone(), enqueue_rx, worker_count); - - // COLD BOOT PRELOAD - { - let enqueue_tx = enqueue_tx.clone(); - let cfg = config.clone(); - - let enable_cold_start = if let Some(flag) = config.clone().get("COLD_START_ENABLE") { - flag.contains("1") - } else { - false - }; - - if enable_cold_start { - tokio::spawn(async move { - if let Err(e) = cold_boot_enqueue_json(cfg, enqueue_tx).await { - error!("cold boot enqueue failed {e}"); - } - }); - } - } - let app = Router::new() .route("/checkout", get(checkout_handler)) .route("/fetch", get(fetch_handler)) @@ -904,131 +763,5 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult { axum::serve(listener, app).await?; - drop(worker_handle); - - Ok(()) -} - -// Cold Boot -// -// -async fn cold_boot_enqueue_json( - config: gcm::Configure, - enqueue_tx: Sender, -) -> Result<(), Box> { - info!("cold boot scanning"); - - let root_repo = config - .get("GIT_REPO_LOCAL_DEST") - .expect("git repo local not define"); - - tx::helpers::setup_prebuild_tx_cache(enqueue_tx, root_repo).await?; - - Ok(()) -} - -// Worker pool -// spawn_cache_workers(config.clone(), redis_pool.clone(), enqueue_rx, worker_count); -fn spawn_cache_workers( - redis_pool: RedisPool, - mut rx: Receiver, - worker_count: usize, -) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - info!("starting {worker_count} cache workers"); - - let mut joinset = JoinSet::new(); - - for worker_id in 0..worker_count { - // let redis_pool = redis_pool.clone(); - // let repo_root = config - // .clone() - // .get("GIT_REPO_LOCAL_DEST") - // .expect("unable to get git repo local dest"); - - joinset.spawn(async move { - loop { - tokio::time::sleep(Duration::from_millis(10)).await; - // pull job - let _ = worker_id; - break; - } - }); - } - - // dispatcher - - let max_inflight = worker_count; - - while let Some(job) = rx.recv().await { - while joinset.len() >= max_inflight { - if let Some(res) = joinset.join_next().await { - if let Err(e) = res { - warn!("job task join error: {e}"); - } - } - } - - let redis_pool = redis_pool.clone(); - // let repo_root = config - // .clone() - // .get("GIT_REPO_LOCAL_DEST") - // .expect("unable to get git repo local dest"); - - joinset.spawn(async move { - if let Err(e) = cache_one_file_streaming(redis_pool, job.clone()).await { - warn!("cache job failed: {e:?} {job:?}"); - } - }); - } - - while let Some(res) = joinset.join_next().await { - if let Err(e) = res { - warn!("job task join error: {e:?}"); - } - } - - info!("cache worker stopped (queue closed)"); - }) -} - -async fn cache_one_file_streaming(redis_pool: RedisPool, job: CacheJob) -> gcm::StandardResult { - if job.file_data.is_none() { - return Ok(()); - } - - let final_key = job.rel_path.clone(); - let tmp_key = format!("tmp:{}:{}", final_key, Uuid::new_v4()); - - let file_string = job.file_data.unwrap().to_string(); - let cursor = Cursor::new(file_string.into_bytes()); - let reader = BufReader::new(cursor); - - // brotli - let mut encoder = BrotliEncoder::new(reader); - - let mut conn = redis_pool.get().await?; - let _ = conn.del(&tmp_key).await?; - - let mut buf = vec![0u8; 64 * 1024]; - let mut compressed_total = 0u64; - - loop { - let n = encoder.read(&mut buf).await?; - if n == 0 { - break; - } - - compressed_total += n as u64; - - // append chunk to redis - let _: usize = conn.append(&tmp_key, &buf[..n]).await?; - } - - let _ = conn.del(&final_key).await?; - let _ = conn.rename(tmp_key, &final_key).await?; - - info!("cached {} ({} bytes)", job.rel_path, compressed_total); - Ok(()) } diff --git a/src/tx/helpers.rs b/src/tx/helpers.rs index 07b5e5b..c935bc6 100644 --- a/src/tx/helpers.rs +++ b/src/tx/helpers.rs @@ -158,7 +158,7 @@ pub async fn setup_prebuild_tx_cache( if tx .send(CacheJob { rel_path: expected_version_path.clone(), - file_data: Some(serde_json::json!(version_str)), + file_data: Some(serde_json::to_vec(&version_str)?), }) .await .is_ok() @@ -246,11 +246,11 @@ pub async fn setup_prebuild_tx_cache( // process diffing // - let base_recipe_clone = current_recipes.get("base").unwrap().clone(); + let base_recipe_ref = current_recipes.get("base").unwrap(); for (key, value) in current_recipes.iter() { if key != "base" { - let patch_diff = diff(&base_recipe_clone, value); + let patch_diff = diff(&base_recipe_ref, value); let tx_name = format!( "stx_{}", key.replace("coffeethai02_", "").replace(".json", "") @@ -277,10 +277,12 @@ pub async fn setup_prebuild_tx_cache( // })) // .await; + let json_bytes = serde_json::to_vec(base_recipe_ref)?; + if tx .send(CacheJob { rel_path: format!("{current_path}/{base_file}"), - file_data: Some(base_recipe_clone), + file_data: Some(json_bytes), }) .await .is_ok()