From a2da030a9958de50066eb2da190e4cb87fa22e1e Mon Sep 17 00:00:00 2001 From: Pakin Date: Tue, 17 Feb 2026 14:23:31 +0700 Subject: [PATCH] add tx, cold boot --- Cargo.lock | 174 +++++++++++++++- Cargo.toml | 10 +- src/app.rs | 522 ++++++++++++++++++++++++++++++++++++++++------ src/main.rs | 30 +-- src/tx/handler.rs | 323 ++++++++++++++++++++++++++++ src/tx/helpers.rs | 294 ++++++++++++++++++++++++++ src/tx/mod.rs | 4 + src/tx/patcher.rs | 119 +++++++++++ src/tx/types.rs | 51 +++++ 9 files changed, 1433 insertions(+), 94 deletions(-) create mode 100644 src/tx/handler.rs create mode 100644 src/tx/helpers.rs create mode 100644 src/tx/mod.rs create mode 100644 src/tx/patcher.rs create mode 100644 src/tx/types.rs diff --git a/Cargo.lock b/Cargo.lock index b68a3f5..90a1483 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -108,6 +123,18 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03918c3dbd7701a85c6b9887732e2921175f26c350b4563841d0958c21d57e6d" +[[package]] +name = "async-compression" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68650b7df54f0293fd061972a0fb05aaf4fc0879d3b3d21a638a182c5c543b9f" +dependencies = [ + "compression-codecs", + "compression-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -200,6 +227,28 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bb8" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "457d7ed3f888dfd2c7af56d4975cade43c622f74bdcddfed6d4352f57acc6310" +dependencies = [ + "futures-util", + "parking_lot", + "portable-atomic", + "tokio", +] + +[[package]] +name = "bb8-redis" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1063effc7f6cf848bcbcc6e31b5962be75215835587d3109607c643d616f66" +dependencies = [ + "bb8", + "redis", +] + [[package]] name = "bitflags" version = "2.10.0" @@ -215,6 +264,27 @@ dependencies = [ "generic-array", ] +[[package]] +name = "brotli" +version = "8.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.19.1" @@ -223,9 +293,9 @@ checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "bzip2" @@ -290,9 +360,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] +[[package]] +name = "compression-codecs" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00828ba6fd27b45a448e57dbfe84f1029d4c9f26b368157e9a448a5f49a2ec2a" +dependencies = [ + "brotli", + "compression-core", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "constant_time_eq" version = "0.3.1" @@ -576,6 +666,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -595,6 +696,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-macro", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", @@ -1063,6 +1166,28 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-patch" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "jsonptr" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "libbz2-rs-sys" version = "0.2.2" @@ -1629,13 +1754,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfe20977fe93830c0e9817a16fbf1ed1cfd8d4bba366087a1841d2c6033c251" dependencies = [ "arcstr", + "bytes", + "cfg-if", "combine", + "futures-util", "itoa", "num-bigint", "percent-encoding", + "pin-project-lite", "ryu", "sha1_smol", "socket2", + "tokio", + "tokio-util", "url", "xxhash-rust", ] @@ -2064,10 +2195,16 @@ dependencies = [ name = "tbm-git-repo-service" version = "0.1.0" dependencies = [ + "async-compression", "axum", "axum-macros", + "bb8", + "bb8-redis", + "bytes", "env_logger", + "futures-util", "git2", + "json-patch", "libgit2-sys", "libtbr", "log", @@ -2077,9 +2214,11 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "tonic", "tonic-prost", "tonic-prost-build", + "uuid", ] [[package]] @@ -2095,6 +2234,26 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.45" @@ -2399,6 +2558,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" +dependencies = [ + "getrandom 0.3.4", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index 24a0aa3..1b72069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,21 +4,29 @@ version = "0.1.0" edition = "2024" [dependencies] +async-compression = { version = "0.4.39", features = ["tokio", "brotli"] } axum = "0.8.7" axum-macros = "0.5.0" +bb8 = "0.9.1" +bb8-redis = "0.26.0" +bytes = "1.11.1" env_logger = "0.11.8" +futures-util = "0.3.31" git2 = { version = "0.20.3", features = ["https", "ssh"] } +json-patch = "4.1.0" libgit2-sys = { version = "0.18.3", features = ["ssh"] } libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" } log = "0.4.29" prost = "0.14.1" -redis = "1.0.2" +redis = { version = "1.0.2", features = ["tokio-comp"] } reqwest = { version = "0.12.25", features = ["json"] } serde = { version = "1.0.228", features = ["derive"] } serde_json = { version = "1.0.145", features = ["preserve_order"] } tokio = { version = "1.48.0", features = ["full"] } +tokio-util = { version = "0.7.18", features = ["io"] } tonic = { version = "0.14.2", features = ["transport"] } tonic-prost = "0.14.2" +uuid = { version = "1.20.0", features = ["v4"] } [build-dependencies] tonic-prost-build = "0.14.2" diff --git a/src/app.rs b/src/app.rs index 5a6dcc6..108d9a2 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,5 +1,6 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, io::Cursor, sync::Arc, time::Duration}; +use async_compression::tokio::bufread::BrotliEncoder; use axum::{ Json, Router, extract::{Query, State}, @@ -7,24 +8,53 @@ use axum::{ routing::{get, post}, }; use axum_macros::debug_handler; +use bb8::Pool; +use bb8_redis::RedisConnectionManager; use git2::{Cred, FetchOptions, PushOptions, RemoteCallbacks, Repository}; use log::{error, info, warn}; -use redis::TypedCommands; +use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection}; use serde::Deserialize; use serde_json::{Value, json}; -use tokio::sync::Mutex; +use tokio::{ + io::{AsyncReadExt, BufReader}, + sync::{ + Mutex, + mpsc::{Receiver, Sender, channel}, + }, + task::JoinSet, +}; -use crate::{gcm, reg}; +use uuid::Uuid; + +use crate::{ + gcm, + tx::{ + self, + helpers::{doc_snap_key, doc_tx_stream_key, doc_ver_key, now_unix}, + patcher::apply_ops, + types::PatchOp, + }, +}; + +type RedisPool = Pool; + +#[derive(Debug, Clone)] +pub struct CacheJob { + pub rel_path: String, + pub file_data: Option, +} #[derive(Clone)] pub struct AppState { // cached_country_names: Vec<&'static str>, configures: gcm::Configure, repo: Arc>, - redis: Arc>, + redis: RedisPool, // save already fetched path, further calls should fetch from redis instead. fetched: Vec, + // queue + enqueue_tx: Sender, } impl AppState { @@ -41,7 +71,7 @@ impl AppState { } } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct CheckoutParams { path: String, } @@ -52,21 +82,24 @@ async fn checkout_handler( ) -> impl IntoResponse { let mut response: HashMap = HashMap::new(); + // println!( + // "req: {param:?}, has_cached: {}, saved: {:?}", + // state + // .fetched + // .contains(¶m.path.clone().as_str().to_string()), + // state.fetched.clone() + // ); + // quick response from redis - if let Ok(mut rcli) = state.redis.try_lock() + if let Ok(mut conn) = state.redis.get().await && state .fetched .contains(¶m.path.clone().as_str().to_string()) { - if let Ok(res) = rcli.get(param.path.clone().as_str().to_string()) { - match res { - Some(res) => { - response.insert("result".to_string(), json!(res)); - return (axum::http::StatusCode::OK, Json(json!(response))); - } - None => { - // continue - } + if let Ok(res) = conn.get(param.path.clone().as_str().to_string()).await { + if let Some(res) = res { + response.insert("result".to_string(), json!(res)); + return (axum::http::StatusCode::OK, Json(json!(response))); } } } @@ -109,7 +142,14 @@ async fn checkout_handler( } }; - let fpath = format!("master:{}", legit_path); + let fpath = format!( + "{}:{}", + state + .get_config("GIT_REPO_BRANCH_NAME") + .map(|x| x.to_string()) + .unwrap_or("master".to_string()), + legit_path + ); let obj = match repo.revparse_single(&fpath) { Ok(obj) => obj, Err(e) => { @@ -127,10 +167,12 @@ async fn checkout_handler( let content = unsafe { str::from_utf8_unchecked(blob.content()) }; response.insert("result".to_string(), json!(content.to_string())); - if let Ok(mut rcli) = state.redis.try_lock() { - let _ = rcli.set(legit_path.to_string(), content.to_string()); - state.fetched.push(legit_path.to_string()); - } + return (axum::http::StatusCode::OK, Json(json!(content))); + + // if let Ok(mut conn) = state.redis.get().await { + // let _ = conn.set(legit_path.to_string(), content.to_string()).await; + // state.fetched.push(legit_path.to_string()); + // } } else if let Some(tree) = obj.as_tree() { let dir_list = tree .iter() @@ -138,10 +180,12 @@ async fn checkout_handler( .collect::>(); response.insert("result".to_string(), json!(dir_list)); - if let Ok(mut rcli) = state.redis.try_lock() { - let _ = rcli.set(legit_path.to_string(), dir_list.join(",")); - state.fetched.push(legit_path.to_string()); - } + return (axum::http::StatusCode::OK, Json(json!(dir_list))); + + // if let Ok(mut conn) = state.redis.get().await { + // let _ = conn.set(legit_path.to_string(), dir_list.join(",")).await; + // state.fetched.push(legit_path.to_string()); + // } } else { let error_log = "not obj nor tree"; error!("{error_log}"); @@ -149,7 +193,9 @@ async fn checkout_handler( return (axum::http::StatusCode::BAD_REQUEST, Json(json!(response))); } - (axum::http::StatusCode::OK, Json(json!(response))) + // println!("checkout response: {response:?}"); + + // (axum::http::StatusCode::OK, Json(json!(response))) } async fn fetch_handler(State(state): State) -> impl IntoResponse { @@ -223,9 +269,9 @@ async fn fetch_handler(State(state): State) -> impl IntoResponse { ); } - tokio::spawn(async move { - let _ = broadcast_need_update_new_data_after_fetch(state.clone()).await; - }); + // tokio::spawn(async move { + // let _ = broadcast_need_update_new_data_after_fetch(state.clone()).await; + // }); ( axum::http::StatusCode::OK, @@ -234,21 +280,21 @@ async fn fetch_handler(State(state): State) -> impl IntoResponse { } // refetch data after request `/fetch` -async fn broadcast_need_update_new_data_after_fetch( - state: AppState, -) -> Result<(), Box> { - // clear out saved state, user must request again - // state.fetched.clear(); // ----> background fetch into redis - // - // - let state_cl = state.clone(); - let mut rcli = state_cl.redis.try_lock()?; - for s in state.fetched { - let _ = rcli.publish("recipe_repo_news", s); - } +// async fn broadcast_need_update_new_data_after_fetch( +// state: AppState, +// ) -> Result<(), Box> { +// // clear out saved state, user must request again +// // state.fetched.clear(); // ----> background fetch into redis +// // +// // +// let state_cl = state.clone(); +// let mut rcli = state_cl.redis.try_lock()?; +// for s in state.fetched { +// let _ = rcli.publish("recipe_repo_news", s); +// } - Ok(()) -} +// Ok(()) +// } // { path: "/path/to/file", signature: { // username: "", email: "" @@ -297,11 +343,16 @@ async fn commit_handler( } let commit_oid = match commit_file_content( - state.repo, + state.clone().repo, &payload.path, &content.as_bytes(), payload.signature, &payload.message.unwrap_or("update: from api".to_string()), + state + .clone() + .get_config("GIT_REPO_BRANCH_NAME") + .map(|x| x.to_string()) + .unwrap_or("master".to_string()), ) .await { @@ -317,7 +368,7 @@ async fn commit_handler( // save history let redis_pre_lock = state.redis.clone(); { - if let Ok(mut rl) = redis_pre_lock.try_lock() { + if let Ok(mut rl) = redis_pre_lock.get().await { let _ = rl.rpush(format!("{}.history", payload.path), payload.patch_key); } } @@ -328,13 +379,12 @@ async fn commit_handler( ) } -async fn fetch_content_from_redis( - redis: Arc>, - key: &str, -) -> Result { - match redis.lock().await.get(key) { - Ok(s) => { - if let Some(res) = s { +async fn fetch_content_from_redis(redis: RedisPool, key: &str) -> Result { + match redis.get().await { + Ok(mut s) => { + if let Ok(res) = s.get(key).await + && let Some(res) = res + { Ok(res) } else { Err(format!("result error from key: {key}")) @@ -395,11 +445,7 @@ fn diff_apply(target: &mut Value, patch: &Value) -> Result<(), String> { } } -async fn apply_patch_to_file( - redis: Arc>, - path: &str, - patch_changes: &mut String, -) -> String { +async fn apply_patch_to_file(redis: RedisPool, path: &str, patch_changes: &mut String) -> String { use libtbr::*; // expect the path to already has in redis let full_file = match fetch_content_from_redis(redis, path).await { @@ -446,6 +492,7 @@ async fn commit_file_content( content: &[u8], author: Signature, message: &str, + branch: String, ) -> Result> { let repo_clone = repo.clone(); let blob_oid = repo_clone.lock().await.blob(content)?; @@ -494,7 +541,8 @@ async fn commit_file_content( let parents: Vec<&git2::Commit> = parent.iter().collect(); let oid = rlock.commit( - Some("refs/heads/master"), + //"refs/heads/master" + Some(&format!("refs/heads/{branch}")), &sig, &sig, message, @@ -520,7 +568,10 @@ async fn push_handler(State(state): State) -> impl IntoResponse { } }; - let branch = "master"; + let branch = &config + .get("GIT_REPO_BRANCH_NAME") + .map(|x| x.to_string()) + .unwrap_or("master".to_string()); if let Err(e) = push(config, repo, remote_name, branch) { return ( @@ -561,27 +612,242 @@ fn push( Err("cannot lock repo".into()) } +/// Return git state & server state +async fn health_handler(State(_): State) -> impl IntoResponse { + ( + axum::http::StatusCode::OK, + Json(json!({ + "status": "ok", + "git": "", + "server": "" + })), + ) +} + +async fn handle_system_message(redis: Arc>, mut msg: serde_json::Value) { + let topic = if let Some(t) = msg["topic"].as_str() { + t + } else { + "" + }; + + match topic { + "generate_stx" => { + // expect target, base_version, value + let target = msg["target"].as_str().unwrap_or("").to_string(); + let base_version = msg["base_version"].as_str().unwrap_or("").to_string(); + let full_base_filename = msg["full_base_filename"].as_str().unwrap_or("").to_string(); + let value = msg["value"].take(); + + let rclone = redis.clone(); + let mut rcli = rclone.lock().await; + + let doc_id = format!("{target}_{base_version}"); + let doc_version_key = doc_ver_key(&doc_id); + let doc_snap = doc_snap_key(&doc_id); + + let _ = rcli.set( + format!("{target}/{full_base_filename}"), + serde_json::to_string(&value).unwrap_or("".to_string()), + ); + + let _ = rcli.set(format!("{target}/version"), base_version); + + info!("publish {full_base_filename} as main file"); + + // create commit transaction base + + let mut con = match rcli.get_multiplexed_async_connection().await { + Ok(conn) => conn, + Err(e) => { + return; + } + }; + + let stream_key = doc_tx_stream_key(&doc_id); + let tx_event = json!({ + "doc_id": doc_id, + "tx_id": Uuid::new_v4().to_string(), + "version": 1, + "base_version": 0, + "author": "from stx", + "ts": now_unix(), + "ops": vec![ + PatchOp::Add { path: "/".to_string(), value: value.clone() } + ] + }); + + let tx_str = tx_event.to_string(); + + let _: String = redis::cmd("XADD") + .arg(&stream_key) + .arg("*") + .arg("data") + .arg(&tx_str) + .query_async(&mut con) + .await + .unwrap(); + let _: () = con.set(doc_ver_key(&doc_id), 1.to_string()).await.unwrap(); + + // if let Ok(state) = build_state_at(&mut con, &doc_id, 1).await { + // let _: () = con + // .set(doc_snap_key(&doc_id), state.to_string()) + // .await + // .unwrap(); + // let _: () = con + // .set(doc_snapver_key(&doc_id), 1.to_string()) + // .await + // .unwrap(); + // } + } + _ => {} + } +} + +async fn build_state_at( + con: &mut MultiplexedConnection, + doc_id: &str, + target_version: u64, +) -> Result { + let snap_ver: u64 = con + .get(doc_snap_key(&doc_id)) + .await + .ok() + .flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + let snap_json: Value = if snap_ver > 0 { + let raw: Option = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None); + raw.and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_else(|| json!({})) + } else { + json!({}) + }; + + let stream_key = doc_tx_stream_key(doc_id); + + let start_id = "0-0"; + let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE") + .arg(&stream_key) + .arg(start_id) + .arg("+") + .query_async(con) + .await + .map_err(|e| e.to_string())?; + + let mut ops_list: Vec = Vec::new(); + for (_id, fields) in entries { + let mut data_opts = None; + for (k, v) in fields { + if k == "data" { + data_opts = Some(v); + break; + } + } + let data = match data_opts { + Some(x) => x, + None => continue, + }; + + let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?; + let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0); + if ver == 0 || ver > target_version { + continue; + } + + if ver <= snap_ver { + continue; + } + + let ops: Vec = + serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?; + ops_list.extend(ops); + } + + apply_ops(snap_json, &ops_list) +} + pub async fn run(config: gcm::Configure) -> gcm::StandardResult { + let queue_capacity = config + .get("BOOT_QUEUE_CAP") + .and_then(|x| x.parse().ok()) + .unwrap_or(512); + + let worker_count: usize = config + .get("CACHE_WORKERS") + .and_then(|x| x.parse().ok()) + .unwrap_or(4); + + // REDIS POOL + // + // + // + // + let manager = RedisConnectionManager::new(format!( + "redis://{}:{}", + config.get("REDIS_URI").unwrap_or(&"".to_string()), + config.get("REDIS_PORT").unwrap_or(&"".to_string()) + )) + .expect("failed to create redis conn manager"); + let redis_pool = Pool::builder() + .max_size(16) + .build(manager) + .await + .expect("failed to build redis pool"); + + let (enqueue_tx, enqueue_rx) = channel::(queue_capacity); + let state = AppState { // cached_country_names: common::valid_country_name(), configures: config.clone(), repo: Arc::new(Mutex::new(Repository::open_bare( config.get("GIT_REPO_LOCAL_DEST").unwrap_or(&"".to_string()), )?)), - redis: Arc::new(Mutex::new(redis::Client::open(format!( - "redis://{}:{}", - config.get("REDIS_URI").unwrap_or(&"".to_string()), - config.get("REDIS_PORT").unwrap_or(&"".to_string()) - ))?)), + redis: redis_pool.clone(), fetched: Vec::new(), + enqueue_tx: enqueue_tx.clone(), }; + let sys_redis_cli = state.redis.clone(); + + // tokio::spawn(async move { + // while let Some(msg) = sys_rx.recv().await { + // handle_system_message(sys_redis_cli.clone(), msg).await; + // } + // }) + // .await?; + + // SPAWN WORKER + let worker_handle = spawn_cache_workers(redis_pool.clone(), enqueue_rx, worker_count); + + // COLD BOOT PRELOAD + { + let enqueue_tx = enqueue_tx.clone(); + let cfg = config.clone(); + + let enable_cold_start = if let Some(flag) = config.clone().get("COLD_START_ENABLE") { + flag.contains("1") + } else { + false + }; + + if enable_cold_start { + tokio::spawn(async move { + if let Err(e) = cold_boot_enqueue_json(cfg, enqueue_tx).await { + error!("cold boot enqueue failed {e}"); + } + }); + } + } + let app = Router::new() .route("/checkout", get(checkout_handler)) .route("/fetch", get(fetch_handler)) .route("/commit", post(commit_handler)) .route("/push", get(push_handler)) - .route("/healthz", get(reg::health)) + .route("/health", get(health_handler)) + // .route("/healthz", get(reg::health)) .with_state(state); let listener = tokio::net::TcpListener::bind(format!( @@ -595,5 +861,131 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult { axum::serve(listener, app).await?; + drop(worker_handle); + + Ok(()) +} + +// Cold Boot +// +// +async fn cold_boot_enqueue_json( + config: gcm::Configure, + enqueue_tx: Sender, +) -> Result<(), Box> { + info!("cold boot scanning"); + + let root_repo = config + .get("GIT_REPO_LOCAL_DEST") + .expect("git repo local not define"); + + tx::helpers::setup_prebuild_tx_cache(enqueue_tx, root_repo).await?; + + Ok(()) +} + +// Worker pool +// spawn_cache_workers(config.clone(), redis_pool.clone(), enqueue_rx, worker_count); +fn spawn_cache_workers( + redis_pool: RedisPool, + mut rx: Receiver, + worker_count: usize, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + info!("starting {worker_count} cache workers"); + + let mut joinset = JoinSet::new(); + + for worker_id in 0..worker_count { + // let redis_pool = redis_pool.clone(); + // let repo_root = config + // .clone() + // .get("GIT_REPO_LOCAL_DEST") + // .expect("unable to get git repo local dest"); + + joinset.spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + // pull job + let _ = worker_id; + break; + } + }); + } + + // dispatcher + + let max_inflight = worker_count; + + while let Some(job) = rx.recv().await { + while joinset.len() >= max_inflight { + if let Some(res) = joinset.join_next().await { + if let Err(e) = res { + warn!("job task join error: {e}"); + } + } + } + + let redis_pool = redis_pool.clone(); + // let repo_root = config + // .clone() + // .get("GIT_REPO_LOCAL_DEST") + // .expect("unable to get git repo local dest"); + + joinset.spawn(async move { + if let Err(e) = cache_one_file_streaming(redis_pool, job.clone()).await { + warn!("cache job failed: {e:?} {job:?}"); + } + }); + } + + while let Some(res) = joinset.join_next().await { + if let Err(e) = res { + warn!("job task join error: {e:?}"); + } + } + + info!("cache worker stopped (queue closed)"); + }) +} + +async fn cache_one_file_streaming(redis_pool: RedisPool, job: CacheJob) -> gcm::StandardResult { + if job.file_data.is_none() { + return Ok(()); + } + + let final_key = job.rel_path.clone(); + let tmp_key = format!("tmp:{}:{}", final_key, Uuid::new_v4()); + + let file_string = job.file_data.unwrap().to_string(); + let cursor = Cursor::new(file_string.into_bytes()); + let reader = BufReader::new(cursor); + + // brotli + let mut encoder = BrotliEncoder::new(reader); + + let mut conn = redis_pool.get().await?; + let _ = conn.del(&tmp_key).await?; + + let mut buf = vec![0u8; 64 * 1024]; + let mut compressed_total = 0u64; + + loop { + let n = encoder.read(&mut buf).await?; + if n == 0 { + break; + } + + compressed_total += n as u64; + + // append chunk to redis + let _: usize = conn.append(&tmp_key, &buf[..n]).await?; + } + + let _ = conn.del(&final_key).await?; + let _ = conn.rename(tmp_key, &final_key).await?; + + info!("cached {} ({} bytes)", job.rel_path, compressed_total); + Ok(()) } diff --git a/src/main.rs b/src/main.rs index a201bfd..2f77339 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{env, fs::File}; +use std::fs::File; use env_logger::Builder; use libtbr::recipe_functions::common; @@ -6,13 +6,14 @@ use log::info; use crate::{ git::setup_git_repo, - reg::{heartbeat_loop, registry::registry_client::RegistryClient}, + // reg::{heartbeat_loop, registry::registry_client::RegistryClient}, }; mod app; mod gcm; mod git; -mod reg; +mod tx; +// mod reg; fn setup_log(config: gcm::Configure) -> gcm::StandardResult { let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?; @@ -24,27 +25,6 @@ fn setup_log(config: gcm::Configure) -> gcm::StandardResult { Ok(()) } -async fn setup_registry(config: gcm::Configure) -> gcm::StandardResult { - let cfg_clone = config.clone(); - let register_server_address = cfg_clone - .get("REGISTRY_SERVER") - .expect("not found registry server"); - let name = cfg_clone.get("SERVICE_NAME").expect("missing service name"); - let url = cfg_clone.get("SERVICE_URL").expect("missing service url"); - - let token = cfg_clone.get("REGISTRY_TOKEN").expect("token not provided"); - unsafe { - env::set_var("REGISTRY_TOKEN", token); - } - - let mut client = RegistryClient::connect(register_server_address.clone()).await?; - reg::register_service(&mut client, name, url).await; - - tokio::spawn(heartbeat_loop(name.to_string(), url.to_string())); - - Ok(()) -} - #[tokio::main] async fn main() -> gcm::StandardResult { let config: gcm::Configure = common::get_config(); @@ -68,8 +48,6 @@ async fn main() -> gcm::StandardResult { .get("CONFIG_VERSION") .expect("config version not defined") ); - - setup_registry(config.clone()).await?; app::run(config.clone()).await?; Ok(()) diff --git a/src/tx/handler.rs b/src/tx/handler.rs new file mode 100644 index 0000000..f6c5968 --- /dev/null +++ b/src/tx/handler.rs @@ -0,0 +1,323 @@ +use axum::{ + Json, Router, + extract::{Path, Query, State}, + response::IntoResponse, + routing::{get, post}, +}; +use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection}; +use reqwest::StatusCode; +use serde::Deserialize; +use serde_json::{Value, json}; +use uuid::Uuid; + +use crate::tx::{ + helpers::*, + patcher::apply_ops, + types::{PatchOp, Reservation, ReserveReq, ReserveRes, TxCommitReq, TxCommitRes}, +}; + +async fn reserve_tx( + State(state): State, + Path(doc_id): Path, + Json(req): Json, +) -> impl IntoResponse { + let rsv_id = Uuid::new_v4().to_string(); + let ttl_secs: i64 = 30; // ttl reserve 30s + let expires_at = now_unix() + ttl_secs; + + let mut rcli = state.redis_cli.clone(); + + // check version + let ver_key = doc_ver_key(&doc_id); + let base_version: u64 = match rcli.get(ver_key) { + Ok(res) => { + if let Some(r) = res { + r.parse().ok().unwrap() + } else { + 0 + } + } + Err(_) => 0, + }; + + let rsv = Reservation { + reservation_id: rsv_id.clone(), + doc_id: doc_id.clone(), + author: req.author, + base_version, + expires_at, + }; + + let key = rsv_key(&doc_id, &rsv_id); + let payload = serde_json::to_string(&rsv).unwrap(); + + let _: () = rcli.set_ex(key, payload, ttl_secs as u64).unwrap(); + + ( + StatusCode::OK, + Json(ReserveRes { + reservation_id: rsv_id, + base_version, + expires_at, + }), + ) +} + +async fn commit_tx( + State(state): State, + Path(doc_id): Path, + Json(req): Json, +) -> impl IntoResponse { + if req.tx_id.is_empty() || req.reservation_id.is_empty() { + return ( + StatusCode::BAD_REQUEST, + "missing tx_id or reservation_id ".into_response(), + ); + } + + let mut con = match state.redis_cli.get_multiplexed_async_connection().await { + Ok(conn) => conn, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("{}", e.to_string()).into_response(), + ); + } + }; + + // lock commit + let lock_key = commit_lock_key(&doc_id); + if let Err(_) = wait_lock(&mut con, &lock_key, 1200, 20).await { + return (StatusCode::CONFLICT, "doc busy, retry".into_response()); + } + + let txid_k = txid_key(&doc_id, &req.tx_id); + if let Ok(Some(v)) = con.get(&txid_k).await { + if let Ok(committed_version) = v.parse::() { + let res = TxCommitRes { + doc_id, + tx_id: req.tx_id, + committed_version, + }; + return (StatusCode::OK, Json(res).into_response()); + } + } + + let rsv_k = rsv_key(&doc_id, &req.reservation_id); + let rsv_raw: Option = con.get(&rsv_k).await.unwrap_or(None); + let rsv_raw = match rsv_raw { + Some(x) => x, + None => { + return ( + StatusCode::CONFLICT, + "reservation missing/expired".into_response(), + ); + } + }; + + let rsv: Reservation = match serde_json::from_str(&rsv_raw) { + Ok(x) => x, + Err(_) => return (StatusCode::CONFLICT, "bad reservation".into_response()), + }; + + if rsv.doc_id != doc_id || rsv.author != req.author || rsv.base_version != req.base_version { + return (StatusCode::CONFLICT, "reservation mismatch".into_response()); + } + + if now_unix() > rsv.expires_at { + return (StatusCode::CONFLICT, "reservation expired".into_response()); + } + + let curr_ver: u64 = con + .get(doc_ver_key(&doc_id)) + .await + .ok() + .flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + if req.base_version != curr_ver { + let body = json!({ + "error": "version_conflict", + "server_version": curr_ver, + }); + return (StatusCode::CONFLICT, Json(body).into_response()); + } + + let next_ver = curr_ver + 1; + + let stream_key = doc_tx_stream_key(&doc_id); + + let tx_event = json!({ + "doc_id": doc_id, + "tx_id": req.tx_id, + "version": next_ver, + "base_version": req.base_version, + "author": req.author, + "ts": now_unix(), + "ops": req.ops + }); + + let tx_event_str = tx_event.to_string(); + let _: String = redis::cmd("XADD") + .arg(&stream_key) + .arg("*") + .arg("data") + .arg(&tx_event_str) + .query_async(&mut con) + .await + .unwrap(); + + let _: () = con + .set(doc_ver_key(&doc_id), next_ver.to_string()) + .await + .unwrap(); + let _: () = con.set(&txid_k, next_ver.to_string()).await.unwrap(); + let _: usize = con.del(&rsv_k).await.unwrap(); + + let snapshot_every: u64 = 50; + if next_ver % snapshot_every == 0 { + if let Ok(state) = build_state_at(&mut con, &doc_id, next_ver).await { + let _: () = con + .set(doc_snap_key(&doc_id), state.to_string()) + .await + .unwrap(); + let _: () = con + .set(doc_snapver_key(&doc_id), next_ver.to_string()) + .await + .unwrap(); + } + } + + ( + StatusCode::OK, + Json(TxCommitRes { + doc_id, + tx_id: req.tx_id, + committed_version: next_ver, + }) + .into_response(), + ) +} + +async fn build_state_at( + con: &mut MultiplexedConnection, + doc_id: &str, + target_version: u64, +) -> Result { + let snap_ver: u64 = con + .get(doc_snap_key(&doc_id)) + .await + .ok() + .flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + let snap_json: Value = if snap_ver > 0 { + let raw: Option = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None); + raw.and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_else(|| json!({})) + } else { + json!({}) + }; + + let stream_key = doc_tx_stream_key(doc_id); + + let start_id = "0-0"; + let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE") + .arg(&stream_key) + .arg(start_id) + .arg("+") + .query_async(con) + .await + .map_err(|e| e.to_string())?; + + let mut ops_list: Vec = Vec::new(); + for (_id, fields) in entries { + let mut data_opts = None; + for (k, v) in fields { + if k == "data" { + data_opts = Some(v); + break; + } + } + let data = match data_opts { + Some(x) => x, + None => continue, + }; + + let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?; + let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0); + if ver == 0 || ver > target_version { + continue; + } + + if ver <= snap_ver { + continue; + } + + let ops: Vec = + serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?; + ops_list.extend(ops); + } + + apply_ops(snap_json, &ops_list) +} + +#[derive(Deserialize)] +struct StateQuery { + at_version: Option, +} + +async fn get_state( + State(state): State, + Path(doc_id): Path, + Query(q): Query, +) -> impl IntoResponse { + let rcli = state.redis_cli.clone(); + let mut con = match rcli.get_multiplexed_async_connection().await { + Ok(sc) => sc, + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + "cannot get connection".into_response(), + ); + } + }; + + let cur_ver: u64 = con + .get(doc_ver_key(&doc_id)) + .await + .ok() + .flatten() + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + + let target = q.at_version.unwrap_or(cur_ver).min(cur_ver); + + match build_state_at(&mut con, &doc_id, target).await { + Ok(state) => ( + StatusCode::OK, + Json(json!({ + "doc_id": doc_id, + "version": target, + "state": state + })) + .into_response(), + ), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": e + })) + .into_response(), + ), + } +} + +// pub async fn create_tx_patcher_route() -> Router { +// Router::new() +// .route("/{id}/tx-reserve", post(reserve_tx)) +// .route("/{id}/tx-commit", post(commit_tx)) +// .route("/{id}/state", get(get_state)) +// } diff --git a/src/tx/helpers.rs b/src/tx/helpers.rs new file mode 100644 index 0000000..07b5e5b --- /dev/null +++ b/src/tx/helpers.rs @@ -0,0 +1,294 @@ +use std::{ + clone, + collections::HashMap, + sync::{Arc, Mutex}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use git2::Repository; +use json_patch::{Patch, diff}; +use log::info; +use redis::{RedisResult, aio::MultiplexedConnection}; +use serde::Deserialize; +use tokio::{sync::mpsc::Sender, time::sleep}; +use uuid::Uuid; + +use crate::app::CacheJob; + +pub fn now_unix() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64 +} + +pub fn doc_ver_key(doc: &str) -> String { + format!("doc:{doc}:ver") +} +pub fn doc_tx_stream_key(doc: &str) -> String { + format!("doc:{doc}:tx") +} +pub fn doc_snap_key(doc: &str) -> String { + format!("doc:{doc}:snap") +} +pub fn doc_snapver_key(doc: &str) -> String { + format!("doc:{doc}:snapver") +} +pub fn rsv_key(doc: &str, rsv: &str) -> String { + format!("doc:{doc}:rsv:{rsv}") +} +pub fn txid_key(doc: &str, tx_id: &str) -> String { + format!("doc:{doc}:txid:{tx_id}") +} +pub fn commit_lock_key(doc: &str) -> String { + format!("doc:{doc}:lock") +} + +pub async fn acquire_lock( + con: &mut MultiplexedConnection, + key: &str, + ttl_ms: u64, +) -> RedisResult { + let token = Uuid::new_v4().to_string(); + let ok: Option = redis::cmd("SET") + .arg(key) + .arg(token) + .arg("NX") + .arg("PX") + .arg(ttl_ms) + .query_async(con) + .await?; + Ok(ok.is_some()) +} + +pub async fn wait_lock( + con: &mut MultiplexedConnection, + key: &str, + ttl_ms: u64, + attempts: u32, +) -> RedisResult<()> { + for _ in 0..attempts { + if acquire_lock(con, key, ttl_ms).await? { + return Ok(()); + } + + sleep(Duration::from_millis(30)).await; + } + Err(redis::RedisError::from((redis::ErrorKind::Io, "lock busy"))) +} + +pub fn build_tx_from_diff(base: serde_json::Value, alt: serde_json::Value) -> Patch { + diff(&base, &alt) +} + +fn get_file_from_repo( + repo: Arc>, + path: &str, +) -> Result> { + if let Ok(repo_m) = repo.try_lock() { + let obj = repo_m.revparse_single(path)?; + if let Some(blob) = obj.as_blob() { + let content = unsafe { str::from_utf8_unchecked(blob.content()) }; + return Ok(content.to_string()); + } else if let Some(tree) = obj.as_tree() { + let dir_list = tree + .iter() + .map(|x| x.name().unwrap_or("").to_string()) + .collect::>(); + + return Ok(dir_list.join(",")); + } + } + Err("FileNotFound".into()) +} + +pub async fn setup_prebuild_tx_cache( + tx: Sender, + git_repo: &str, +) -> Result<(), Box> { + // import tx files + let repo = match Repository::open_bare(git_repo) { + Ok(repo) => repo, + Err(_) => return Err("cannot open repo".into()), + }; + + let repo_m = Arc::new(Mutex::new(repo)); + + let root_path = String::from("master:"); + let all_folders = match get_file_from_repo(repo_m.clone(), root_path.as_str()) { + Ok(af) => af, + Err(e) => return Err(e), + }; + + let folders = all_folders + .split(",") + .map(|x| x.to_string()) + .collect::>(); + + for folder in folders { + let current_path = format!("master:{folder}"); + + println!("trying {folder}..."); + + // test get version + // + // NOTE: version may miss if there is file with `versionXXXX` + // + // + if let Ok(current_files) = get_file_from_repo(repo_m.clone(), current_path.as_str()) + && current_files.contains(",version") + { + let expected_version_path = format!("{current_path}/version"); + println!("\tmay have version --> {expected_version_path}, \ncurrent: {current_files}"); + + // get version content + + let version_str = + if let Ok(v) = get_file_from_repo(repo_m.clone(), expected_version_path.as_str()) { + v + } else { + "".to_string() + }; + // let version = version_str.parse::().unwrap_or(0); + if version_str.is_empty() { + // unexpected + return Err("cannot get version".into()); + } + + if tx + .send(CacheJob { + rel_path: expected_version_path.clone(), + file_data: Some(serde_json::json!(version_str)), + }) + .await + .is_ok() + { + info!("cache ok {current_path}/version"); + } + + println!("try get {folder}:{version_str}"); + + let files = current_files + .split(",") + .map(|x| x.to_string()) + .collect::>(); + + let base_file: String = match files.iter().find(|x| { + x.ends_with(".json") && x.starts_with(&format!("coffeethai02_{version_str}")) + }) { + Some(f) => f.to_string(), + None => "".to_string(), + }; + + println!("{folder}: {base_file}"); + + if base_file.is_empty() { + println!("base file empty skip"); + continue; + } + + let base_recipe_str = if let Ok(r) = + get_file_from_repo(repo_m.clone(), &format!("{current_path}/{base_file}")) + { + r.to_string() + } else { + "".to_string() + }; + + if base_recipe_str.is_empty() { + println!("empty recipe skip!"); + continue; + } + + let base_recipe: serde_json::Value = match serde_json::from_str(&base_recipe_str) { + Ok(r) => r, + Err(e) => return Err(e.into()), + }; + + let mut current_recipes = HashMap::new(); + + current_recipes.insert("base".to_string(), base_recipe); + + for cfile in files { + if cfile.ne(&base_file) + && cfile.ends_with(".json") + && cfile.starts_with("coffeethai02") + { + // + // fetch file content + // read into value + // do diff + + let current_fetch_path = format!("{current_path}/{cfile}"); + let current_file_str = + if let Ok(cr) = get_file_from_repo(repo_m.clone(), ¤t_fetch_path) { + cr.to_string() + } else { + "".to_string() + }; + + if !current_file_str.is_empty() { + // into value + let alt_recipe: serde_json::Value = + match serde_json::from_str(¤t_file_str) { + Ok(crr) => crr, + Err(e) => { + println!("get error while alt {current_fetch_path}: {e}"); + continue; + } + }; + + current_recipes.insert(cfile, alt_recipe); + } + } + } + + // process diffing + // + + let base_recipe_clone = current_recipes.get("base").unwrap().clone(); + + for (key, value) in current_recipes.iter() { + if key != "base" { + let patch_diff = diff(&base_recipe_clone, value); + let tx_name = format!( + "stx_{}", + key.replace("coffeethai02_", "").replace(".json", "") + ); + + let base_path_out = format!("./recipe_tx/{folder}"); + if !std::path::Path::new(&base_path_out).exists() { + std::fs::create_dir_all(&base_path_out)?; + } + + let file_out = format!("{base_path_out}/{tx_name}.json"); + + std::fs::write(file_out, serde_json::to_string_pretty(&patch_diff)?)?; + } + } + + // let _ = tx + // .send(serde_json::json!({ + // "topic": "generate_stx", + // "target": folder, + // "base_version": version_str, + // "full_base_filename": base_file, + // "value": base_recipe_clone + // })) + // .await; + + if tx + .send(CacheJob { + rel_path: format!("{current_path}/{base_file}"), + file_data: Some(base_recipe_clone), + }) + .await + .is_ok() + { + info!("cache ok {current_path}/{base_file}"); + } + } + } + + Ok(()) +} diff --git a/src/tx/mod.rs b/src/tx/mod.rs new file mode 100644 index 0000000..b4579dd --- /dev/null +++ b/src/tx/mod.rs @@ -0,0 +1,4 @@ +// pub mod handler; +pub mod helpers; +pub mod patcher; +pub mod types; diff --git a/src/tx/patcher.rs b/src/tx/patcher.rs new file mode 100644 index 0000000..d96121b --- /dev/null +++ b/src/tx/patcher.rs @@ -0,0 +1,119 @@ +use serde_json::Value; + +use crate::tx::types::PatchOp; + +pub fn json_pointer_parent<'a>( + root: &'a mut Value, + path: &str, +) -> Result<(&'a mut Value, String), String> { + if !path.starts_with('/') && path != "" { + return Err("path must be JSON pointer starting with '/'".into()); + } + + let part: Vec = path + .split('/') + .skip(1) + .map(|x| x.replace("~1", "/").replace("~0", "~")) + .collect(); + if path.is_empty() { + return Ok((root, "".into())); + } + + let last = part.last().unwrap().clone(); + + let mut cur = root; + + for key in &part[..part.len() - 1] { + match cur { + Value::Object(map) => { + cur = map + .get_mut(key) + .ok_or_else(|| format!("missing object key {key}"))?; + } + Value::Array(arr) => { + let idx: usize = key.parse().map_err(|_| format!("bad array idx {key}"))?; + cur = arr + .get_mut(idx) + .ok_or_else(|| format!("array index out of bounds {idx}"))?; + } + _ => return Err("cannot traverse non-container".into()), + } + } + + Ok((cur, last)) +} + +pub fn apply_ops(mut state: Value, ops: &[PatchOp]) -> Result { + for op in ops { + match op { + PatchOp::Replace { path, value } => { + if path == "" || path == "/" { + state = value.clone(); + continue; + } + let (parent, last) = json_pointer_parent(&mut state, path)?; + match parent { + Value::Object(map) => { + if !map.contains_key(&last) { + return Err(format!("replace target missing key {last}")); + } + map.insert(last, value.clone()); + } + Value::Array(arr) => { + let idx: usize = last + .parse() + .map_err(|_| format!("bad array index {last}"))?; + if idx >= arr.len() { + return Err(format!("replace index out of bounds {idx}")); + } + arr[idx] = value.clone(); + } + _ => return Err("replace parent not container".into()), + } + } + PatchOp::Add { path, value } => { + let (parent, last) = json_pointer_parent(&mut state, path)?; + match parent { + Value::Object(map) => { + map.insert(last, value.clone()); + } + Value::Array(arr) => { + if last == "-" { + arr.push(value.clone()); + } else { + let idx: usize = last + .parse() + .map_err(|_| format!("bad array index {last}"))?; + if idx > arr.len() { + return Err(format!("add index out of bounds {idx}")); + } + arr.insert(idx, value.clone()); + } + } + _ => return Err("add parent not container".into()), + } + } + PatchOp::Remove { path } => { + let (parent, last) = json_pointer_parent(&mut state, path)?; + match parent { + Value::Object(map) => { + map.remove(&last) + .ok_or_else(|| format!("remove missing key {last}"))?; + } + Value::Array(arr) => { + let idx: usize = last + .parse() + .map_err(|_| format!("bad array index {last}"))?; + if idx > arr.len() { + return Err(format!("remove index out of bounds {idx}")); + } + arr.remove(idx); + } + _ => return Err("remove parent not container".into()), + } + } + } + } + + Ok(state) +} diff --git a/src/tx/types.rs b/src/tx/types.rs new file mode 100644 index 0000000..661ad90 --- /dev/null +++ b/src/tx/types.rs @@ -0,0 +1,51 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReserveReq { + pub author: String, + hint_paths: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReserveRes { + pub reservation_id: String, + pub base_version: u64, + pub expires_at: i64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct TxCommitReq { + pub reservation_id: String, + pub tx_id: String, + pub base_version: u64, + pub author: String, + pub ops: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TxCommitRes { + pub doc_id: String, + pub tx_id: String, + pub committed_version: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "op")] +pub enum PatchOp { + #[serde(rename = "replace")] + Replace { path: String, value: Value }, + #[serde(rename = "add")] + Add { path: String, value: Value }, + #[serde(rename = "remove")] + Remove { path: String }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Reservation { + pub reservation_id: String, + pub doc_id: String, + pub author: String, + pub base_version: u64, + pub expires_at: i64, +}