disable cold boot caching & optimizing

- caused by too much ram usage
- optimize memory usage on checkout code

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-02-25 15:11:29 +07:00
parent 6fe3357efe
commit febf91d417
4 changed files with 203 additions and 466 deletions

View file

@ -9,20 +9,21 @@ use axum::{
routing::{get, post},
};
use axum_macros::debug_handler;
use bb8::Pool;
use bb8::{Pool, PooledConnection};
use bb8_redis::RedisConnectionManager;
use git2::{Cred, FetchOptions, PushOptions, RemoteCallbacks, Repository};
use brotli::CompressorWriter;
use git2::{Cred, FetchOptions, Object, PushOptions, RemoteCallbacks, Repository};
use image::load_from_memory;
use log::{error, info, warn};
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
use redis::{AsyncTypedCommands, Connection, TypedCommands, aio::MultiplexedConnection};
use reqwest::header;
use serde::Deserialize;
use serde_json::{Value, json};
use tokio::{
io::{AsyncReadExt, BufReader},
sync::{
Mutex,
Mutex, MutexGuard,
mpsc::{Receiver, Sender, channel},
},
task::JoinSet,
@ -45,7 +46,7 @@ type RedisPool = Pool<RedisConnectionManager>;
#[derive(Debug, Clone)]
pub struct CacheJob {
pub rel_path: String,
pub file_data: Option<Value>,
pub file_data: Option<Vec<u8>>,
}
#[derive(Clone)]
@ -82,16 +83,34 @@ fn is_supported_image_file_type(name: String) -> bool {
SUPPORTED_IMAGES.contains(&ext)
}
fn detect_content_type(path: &str) -> String {
let ext = path.split(".").last().unwrap_or("");
if ext.eq(".json") {
"application/json".to_string()
} else if SUPPORTED_IMAGES.contains(&ext) {
format!("image/{ext}").to_string()
} else {
"application/octet-stream".to_string()
}
}
#[derive(Deserialize, Debug)]
struct CheckoutParams {
path: String,
}
async fn checkout_handler(
State(mut state): State<AppState>,
State(state): State<AppState>,
Query(param): Query<CheckoutParams>,
) -> impl IntoResponse {
let mut response: HashMap<String, Value> = HashMap::new();
let fpath = format!(
"{}:{}",
state
.get_config("GIT_REPO_BRANCH_NAME")
.map(|x| x.to_string())
.unwrap_or("master".to_string()),
param.path
);
// println!(
// "req: {param:?}, has_cached: {}, saved: {:?}",
@ -102,140 +121,147 @@ async fn checkout_handler(
// );
// quick response from redis
if let Ok(mut conn) = state.redis.get().await
&& state
.fetched
.contains(&param.path.clone().as_str().to_string())
{
if let Ok(res) = conn.get(param.path.clone().as_str().to_string()).await {
if let Some(res) = res {
response.insert("result".to_string(), json!(res));
return (axum::http::StatusCode::OK, Json(json!(response))).into_response();
}
}
}
if let Ok(conn) = state.redis.dedicated_connection().await {
println!("conneciton ok");
if let Ok(raw) = fetch_content_from_redis_byte(conn, &fpath).await
&& !raw.is_empty()
{
// detect content type
let content_type = detect_content_type(&fpath);
let repo_path = state.get_config("GIT_REPO_LOCAL_DEST");
if repo_path.is_none() {
response.insert(
"error".to_string(),
json!("config repo dest not found".to_string()),
);
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(json!(response)),
)
.into_response();
}
let legit_path = param.path.as_str();
// match param.path.as_str() {
// legit_path if param.path.contains("/") || state.check_country_existed(param.path.as_str()) || param.path.is_empty() => {
// }
// _ => {
// let error_log = "requested path is unexpected";
// error!("{error_log}");
// response.insert("error".to_string(), json!(error_log));
// }
// }
let rpath = repo_path.unwrap().clone();
let repo = match Repository::open_bare(rpath) {
Ok(repo) => repo,
Err(_) => {
let error_log = "unable to open repo as bare";
error!("{error_log}");
response.insert("error".to_string(), json!(error_log));
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(json!(response)),
)
.into_response();
}
};
let fpath = format!(
"{}:{}",
state
.get_config("GIT_REPO_BRANCH_NAME")
.map(|x| x.to_string())
.unwrap_or("master".to_string()),
legit_path
);
let obj = match repo.revparse_single(&fpath) {
Ok(obj) => obj,
Err(e) => {
let error_log = format!("unexpected revparse single: {err}", err = e.message());
error!("{error_log}");
response.insert("error".to_string(), json!(error_log.clone()));
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(json!(response)),
)
.into_response();
}
};
if let Some(blob) = obj.as_blob() {
// image
if is_supported_image_file_type(fpath.clone()) {
let fc = fpath.clone();
let ext = fc.split(".").last().unwrap_or("");
let content: &[u8] = blob.content();
let img = match load_from_memory(content) {
Ok(img) => img,
Err(err) => {
//.map_err(|err| format!("Failed tp load image from memory: {err}"))
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(json!(format!("Failed tp load image from memory: {err}"))),
)
.into_response();
}
};
let mut image_buffer = Vec::new();
img.write_to(
&mut Cursor::new(&mut image_buffer),
image::ImageFormat::from_extension(ext).unwrap_or(image::ImageFormat::Png),
)
.unwrap();
println!("content-len: {}", raw.len());
return Response::builder()
.header(header::CONTENT_TYPE, format!("image/{ext}"))
.body(Body::from(image_buffer))
.header(header::CONTENT_TYPE, content_type)
.body(Body::from(raw))
.unwrap();
}
let content = unsafe { str::from_utf8_unchecked(blob.content()) };
return (axum::http::StatusCode::OK, Json(json!(content))).into_response();
// if let Ok(mut conn) = state.redis.get().await {
// let _ = conn.set(legit_path.to_string(), content.to_string()).await;
// state.fetched.push(legit_path.to_string());
// }
} else if let Some(tree) = obj.as_tree() {
let dir_list = tree
.iter()
.map(|x| x.name().unwrap_or("").to_string())
.collect::<Vec<String>>();
response.insert("result".to_string(), json!(dir_list));
return (axum::http::StatusCode::OK, Json(json!(dir_list))).into_response();
// if let Ok(mut conn) = state.redis.get().await {
// let _ = conn.set(legit_path.to_string(), dir_list.join(",")).await;
// state.fetched.push(legit_path.to_string());
// }
} else {
let error_log = "not obj nor tree";
error!("{error_log}");
response.insert("error".to_string(), json!(error_log));
return (axum::http::StatusCode::BAD_REQUEST, Json(json!(response))).into_response();
}
println!("failed to get from redis, continue ...");
let repo = state.repo.clone();
// let obj = match repo.revparse_single(&fpath) {
// Ok(obj) => obj,
// Err(e) => {
// let error_log = format!("unexpected revparse single: {err}", err = e.message());
// error!("{error_log}");
// response.insert("error".to_string(), json!(error_log.clone()));
// return (
// axum::http::StatusCode::INTERNAL_SERVER_ERROR,
// Json(json!(response)),
// )
// .into_response();
// }
// };
//
// let obj_bytes = match get_git_object(repo, &fpath).await {
// Ok(ob) => ob,
// Err(e) => {
// return (
// axum::http::StatusCode::INTERNAL_SERVER_ERROR,
// Json(json!(format!("{e}"))),
// )
// .into_response();
// }
// };
let fpath_clone = fpath.clone();
let result = tokio::task::spawn_blocking(move || {
let repo = repo.try_lock()?;
get_git_object(repo, &fpath_clone)
})
.await;
match result {
Ok(Ok(o)) => {
let state_clone = state.clone();
let key_clone = fpath.clone();
let data_clone = o.clone();
tokio::spawn(async move {
if let Ok(mut conn) = state_clone.redis.dedicated_connection().await {
let _: Result<(), _> = redis::cmd("SET")
.arg(&key_clone)
.arg(&data_clone)
.arg("EX")
.arg(3600)
.query_async(&mut conn)
.await;
}
});
let content_type = detect_content_type(&fpath);
Response::builder()
.header(header::CONTENT_TYPE, content_type)
.body(Body::from(o))
.unwrap()
}
_ => (axum::http::StatusCode::NOT_FOUND, "File not found").into_response(),
}
// if let Some(blob) = obj.as_blob() {
// // image
// if is_supported_image_file_type(fpath.clone()) {
// let fc = fpath.clone();
// let ext = fc.split(".").last().unwrap_or("");
// let content: &[u8] = blob.content();
// let img = match load_from_memory(content) {
// Ok(img) => img,
// Err(err) => {
// //.map_err(|err| format!("Failed tp load image from memory: {err}"))
// return (
// axum::http::StatusCode::INTERNAL_SERVER_ERROR,
// Json(json!(format!("Failed tp load image from memory: {err}"))),
// )
// .into_response();
// }
// };
// let mut image_buffer = Vec::new();
// img.write_to(
// &mut Cursor::new(&mut image_buffer),
// image::ImageFormat::from_extension(ext).unwrap_or(image::ImageFormat::Png),
// )
// .unwrap();
// return Response::builder()
// .header(header::CONTENT_TYPE, format!("image/{ext}"))
// .body(Body::from(image_buffer))
// .unwrap();
// }
// let content = unsafe { str::from_utf8_unchecked(blob.content()) };
// return (axum::http::StatusCode::OK, Json(json!(content))).into_response();
// // if let Ok(mut conn) = state.redis.get().await {
// // let _ = conn.set(legit_path.to_string(), content.to_string()).await;
// // state.fetched.push(legit_path.to_string());
// // }
// } else if let Some(tree) = obj.as_tree() {
// let dir_list = tree
// .iter()
// .map(|x| x.name().unwrap_or("").to_string())
// .collect::<Vec<String>>();
// response.insert("result".to_string(), json!(dir_list));
// return (axum::http::StatusCode::OK, Json(json!(dir_list))).into_response();
// // if let Ok(mut conn) = state.redis.get().await {
// // let _ = conn.set(legit_path.to_string(), dir_list.join(",")).await;
// // state.fetched.push(legit_path.to_string());
// // }
// } else {
// let error_log = "not obj nor tree";
// error!("{error_log}");
// response.insert("error".to_string(), json!(error_log));
// return (axum::http::StatusCode::BAD_REQUEST, Json(json!(response))).into_response();
// }
// println!("checkout response: {response:?}");
// (axum::http::StatusCode::OK, Json(json!(response)))
@ -244,20 +270,7 @@ async fn checkout_handler(
async fn fetch_handler(State(state): State<AppState>) -> impl IntoResponse {
let mut response: HashMap<String, Value> = HashMap::new();
if let Some(repo_path) = state.clone().get_config("GIT_REPO_LOCAL_DEST") {
let rpath = repo_path.clone();
let repo = match Repository::open_bare(rpath) {
Ok(repo) => repo,
Err(_) => {
let error_log = "unable to open bare repo";
error!("{error_log}");
response.insert("error".to_string(), json!(error_log));
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(json!(response)),
);
}
};
let repo = state.repo.lock().await;
let mut remote = match repo.find_remote("origin") {
Ok(remote) => remote,
@ -437,6 +450,38 @@ async fn fetch_content_from_redis(redis: RedisPool, key: &str) -> Result<String,
}
}
async fn fetch_content_from_redis_byte(
mut conn: MultiplexedConnection,
key: &str,
) -> Result<Vec<u8>, String> {
let res = redis::cmd("GET").arg(key).query_async(&mut conn).await;
match res {
Ok(res) => Ok(res),
Err(e) => {
return Err(format!("{e}"));
}
}
}
fn get_git_object(
repo: MutexGuard<'_, Repository>,
path: &str,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
let obj = repo.revparse_single(&path)?;
if let Some(blob) = obj.as_blob() {
Ok(blob.content().to_vec())
} else if let Some(tree) = obj.as_tree() {
let dir_list = tree
.iter()
.map(|x| x.name().unwrap_or("").to_string())
.collect::<Vec<String>>();
Ok(dir_list.join(",").as_bytes().to_vec())
} else {
Err("Unknown or not supported".into())
}
}
fn read_patch_from_str(patch_changes: &str) -> Option<Value> {
//
@ -667,161 +712,7 @@ async fn health_handler(State(_): State<AppState>) -> impl IntoResponse {
)
}
async fn handle_system_message(redis: Arc<Mutex<redis::Client>>, mut msg: serde_json::Value) {
let topic = if let Some(t) = msg["topic"].as_str() {
t
} else {
""
};
match topic {
"generate_stx" => {
// expect target, base_version, value
let target = msg["target"].as_str().unwrap_or("").to_string();
let base_version = msg["base_version"].as_str().unwrap_or("").to_string();
let full_base_filename = msg["full_base_filename"].as_str().unwrap_or("").to_string();
let value = msg["value"].take();
let rclone = redis.clone();
let mut rcli = rclone.lock().await;
let doc_id = format!("{target}_{base_version}");
let doc_version_key = doc_ver_key(&doc_id);
let doc_snap = doc_snap_key(&doc_id);
let _ = rcli.set(
format!("{target}/{full_base_filename}"),
serde_json::to_string(&value).unwrap_or("".to_string()),
);
let _ = rcli.set(format!("{target}/version"), base_version);
info!("publish {full_base_filename} as main file");
// create commit transaction base
let mut con = match rcli.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
return;
}
};
let stream_key = doc_tx_stream_key(&doc_id);
let tx_event = json!({
"doc_id": doc_id,
"tx_id": Uuid::new_v4().to_string(),
"version": 1,
"base_version": 0,
"author": "from stx",
"ts": now_unix(),
"ops": vec![
PatchOp::Add { path: "/".to_string(), value: value.clone() }
]
});
let tx_str = tx_event.to_string();
let _: String = redis::cmd("XADD")
.arg(&stream_key)
.arg("*")
.arg("data")
.arg(&tx_str)
.query_async(&mut con)
.await
.unwrap();
let _: () = con.set(doc_ver_key(&doc_id), 1.to_string()).await.unwrap();
// if let Ok(state) = build_state_at(&mut con, &doc_id, 1).await {
// let _: () = con
// .set(doc_snap_key(&doc_id), state.to_string())
// .await
// .unwrap();
// let _: () = con
// .set(doc_snapver_key(&doc_id), 1.to_string())
// .await
// .unwrap();
// }
}
_ => {}
}
}
async fn build_state_at(
con: &mut MultiplexedConnection,
doc_id: &str,
target_version: u64,
) -> Result<Value, String> {
let snap_ver: u64 = con
.get(doc_snap_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let snap_json: Value = if snap_ver > 0 {
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
raw.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_else(|| json!({}))
} else {
json!({})
};
let stream_key = doc_tx_stream_key(doc_id);
let start_id = "0-0";
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
.arg(&stream_key)
.arg(start_id)
.arg("+")
.query_async(con)
.await
.map_err(|e| e.to_string())?;
let mut ops_list: Vec<PatchOp> = Vec::new();
for (_id, fields) in entries {
let mut data_opts = None;
for (k, v) in fields {
if k == "data" {
data_opts = Some(v);
break;
}
}
let data = match data_opts {
Some(x) => x,
None => continue,
};
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
if ver == 0 || ver > target_version {
continue;
}
if ver <= snap_ver {
continue;
}
let ops: Vec<PatchOp> =
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
ops_list.extend(ops);
}
apply_ops(snap_json, &ops_list)
}
pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
let queue_capacity = config
.get("BOOT_QUEUE_CAP")
.and_then(|x| x.parse().ok())
.unwrap_or(512);
let worker_count: usize = config
.get("CACHE_WORKERS")
.and_then(|x| x.parse().ok())
.unwrap_or(4);
// REDIS POOL
//
//
@ -834,12 +725,12 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
))
.expect("failed to create redis conn manager");
let redis_pool = Pool::builder()
.max_size(16)
.max_size(8)
.build(manager)
.await
.expect("failed to build redis pool");
let (enqueue_tx, enqueue_rx) = channel::<CacheJob>(queue_capacity);
let (enqueue_tx, enqueue_rx) = channel::<CacheJob>(8);
let state = AppState {
// cached_country_names: common::valid_country_name(),
@ -852,38 +743,6 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
enqueue_tx: enqueue_tx.clone(),
};
let sys_redis_cli = state.redis.clone();
// tokio::spawn(async move {
// while let Some(msg) = sys_rx.recv().await {
// handle_system_message(sys_redis_cli.clone(), msg).await;
// }
// })
// .await?;
// SPAWN WORKER
let worker_handle = spawn_cache_workers(redis_pool.clone(), enqueue_rx, worker_count);
// COLD BOOT PRELOAD
{
let enqueue_tx = enqueue_tx.clone();
let cfg = config.clone();
let enable_cold_start = if let Some(flag) = config.clone().get("COLD_START_ENABLE") {
flag.contains("1")
} else {
false
};
if enable_cold_start {
tokio::spawn(async move {
if let Err(e) = cold_boot_enqueue_json(cfg, enqueue_tx).await {
error!("cold boot enqueue failed {e}");
}
});
}
}
let app = Router::new()
.route("/checkout", get(checkout_handler))
.route("/fetch", get(fetch_handler))
@ -904,131 +763,5 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
axum::serve(listener, app).await?;
drop(worker_handle);
Ok(())
}
// Cold Boot
//
//
async fn cold_boot_enqueue_json(
config: gcm::Configure,
enqueue_tx: Sender<CacheJob>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("cold boot scanning");
let root_repo = config
.get("GIT_REPO_LOCAL_DEST")
.expect("git repo local not define");
tx::helpers::setup_prebuild_tx_cache(enqueue_tx, root_repo).await?;
Ok(())
}
// Worker pool
// spawn_cache_workers(config.clone(), redis_pool.clone(), enqueue_rx, worker_count);
fn spawn_cache_workers(
redis_pool: RedisPool,
mut rx: Receiver<CacheJob>,
worker_count: usize,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!("starting {worker_count} cache workers");
let mut joinset = JoinSet::new();
for worker_id in 0..worker_count {
// let redis_pool = redis_pool.clone();
// let repo_root = config
// .clone()
// .get("GIT_REPO_LOCAL_DEST")
// .expect("unable to get git repo local dest");
joinset.spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
// pull job
let _ = worker_id;
break;
}
});
}
// dispatcher
let max_inflight = worker_count;
while let Some(job) = rx.recv().await {
while joinset.len() >= max_inflight {
if let Some(res) = joinset.join_next().await {
if let Err(e) = res {
warn!("job task join error: {e}");
}
}
}
let redis_pool = redis_pool.clone();
// let repo_root = config
// .clone()
// .get("GIT_REPO_LOCAL_DEST")
// .expect("unable to get git repo local dest");
joinset.spawn(async move {
if let Err(e) = cache_one_file_streaming(redis_pool, job.clone()).await {
warn!("cache job failed: {e:?} {job:?}");
}
});
}
while let Some(res) = joinset.join_next().await {
if let Err(e) = res {
warn!("job task join error: {e:?}");
}
}
info!("cache worker stopped (queue closed)");
})
}
async fn cache_one_file_streaming(redis_pool: RedisPool, job: CacheJob) -> gcm::StandardResult {
if job.file_data.is_none() {
return Ok(());
}
let final_key = job.rel_path.clone();
let tmp_key = format!("tmp:{}:{}", final_key, Uuid::new_v4());
let file_string = job.file_data.unwrap().to_string();
let cursor = Cursor::new(file_string.into_bytes());
let reader = BufReader::new(cursor);
// brotli
let mut encoder = BrotliEncoder::new(reader);
let mut conn = redis_pool.get().await?;
let _ = conn.del(&tmp_key).await?;
let mut buf = vec![0u8; 64 * 1024];
let mut compressed_total = 0u64;
loop {
let n = encoder.read(&mut buf).await?;
if n == 0 {
break;
}
compressed_total += n as u64;
// append chunk to redis
let _: usize = conn.append(&tmp_key, &buf[..n]).await?;
}
let _ = conn.del(&final_key).await?;
let _ = conn.rename(tmp_key, &final_key).await?;
info!("cached {} ({} bytes)", job.rel_path, compressed_total);
Ok(())
}

View file

@ -158,7 +158,7 @@ pub async fn setup_prebuild_tx_cache(
if tx
.send(CacheJob {
rel_path: expected_version_path.clone(),
file_data: Some(serde_json::json!(version_str)),
file_data: Some(serde_json::to_vec(&version_str)?),
})
.await
.is_ok()
@ -246,11 +246,11 @@ pub async fn setup_prebuild_tx_cache(
// process diffing
//
let base_recipe_clone = current_recipes.get("base").unwrap().clone();
let base_recipe_ref = current_recipes.get("base").unwrap();
for (key, value) in current_recipes.iter() {
if key != "base" {
let patch_diff = diff(&base_recipe_clone, value);
let patch_diff = diff(&base_recipe_ref, value);
let tx_name = format!(
"stx_{}",
key.replace("coffeethai02_", "").replace(".json", "")
@ -277,10 +277,12 @@ pub async fn setup_prebuild_tx_cache(
// }))
// .await;
let json_bytes = serde_json::to_vec(base_recipe_ref)?;
if tx
.send(CacheJob {
rel_path: format!("{current_path}/{base_file}"),
file_data: Some(base_recipe_clone),
file_data: Some(json_bytes),
})
.await
.is_ok()