add tx, cold boot

This commit is contained in:
Pakin 2026-02-17 14:23:31 +07:00
parent 03263815e6
commit a2da030a99
9 changed files with 1433 additions and 94 deletions

174
Cargo.lock generated
View file

@ -28,6 +28,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"
[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
@ -108,6 +123,18 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03918c3dbd7701a85c6b9887732e2921175f26c350b4563841d0958c21d57e6d"
[[package]]
name = "async-compression"
version = "0.4.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68650b7df54f0293fd061972a0fb05aaf4fc0879d3b3d21a638a182c5c543b9f"
dependencies = [
"compression-codecs",
"compression-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-trait"
version = "0.1.89"
@ -200,6 +227,28 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bb8"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "457d7ed3f888dfd2c7af56d4975cade43c622f74bdcddfed6d4352f57acc6310"
dependencies = [
"futures-util",
"parking_lot",
"portable-atomic",
"tokio",
]
[[package]]
name = "bb8-redis"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1063effc7f6cf848bcbcc6e31b5962be75215835587d3109607c643d616f66"
dependencies = [
"bb8",
"redis",
]
[[package]]
name = "bitflags"
version = "2.10.0"
@ -215,6 +264,27 @@ dependencies = [
"generic-array",
]
[[package]]
name = "brotli"
version = "8.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bd8b9603c7aa97359dbd97ecf258968c95f3adddd6db2f7e7a5bef101c84560"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]
[[package]]
name = "brotli-decompressor"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]
[[package]]
name = "bumpalo"
version = "3.19.1"
@ -223,9 +293,9 @@ checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
[[package]]
name = "bytes"
version = "1.11.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "bzip2"
@ -290,9 +360,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "compression-codecs"
version = "0.4.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00828ba6fd27b45a448e57dbfe84f1029d4c9f26b368157e9a448a5f49a2ec2a"
dependencies = [
"brotli",
"compression-core",
]
[[package]]
name = "compression-core"
version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d"
[[package]]
name = "constant_time_eq"
version = "0.3.1"
@ -576,6 +666,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
@ -595,6 +696,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-core",
"futures-macro",
"futures-sink",
"futures-task",
"pin-project-lite",
"pin-utils",
@ -1063,6 +1166,28 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json-patch"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f300e415e2134745ef75f04562dd0145405c2f7fd92065db029ac4b16b57fe90"
dependencies = [
"jsonptr",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "jsonptr"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "libbz2-rs-sys"
version = "0.2.2"
@ -1629,13 +1754,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfe20977fe93830c0e9817a16fbf1ed1cfd8d4bba366087a1841d2c6033c251"
dependencies = [
"arcstr",
"bytes",
"cfg-if",
"combine",
"futures-util",
"itoa",
"num-bigint",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"socket2",
"tokio",
"tokio-util",
"url",
"xxhash-rust",
]
@ -2064,10 +2195,16 @@ dependencies = [
name = "tbm-git-repo-service"
version = "0.1.0"
dependencies = [
"async-compression",
"axum",
"axum-macros",
"bb8",
"bb8-redis",
"bytes",
"env_logger",
"futures-util",
"git2",
"json-patch",
"libgit2-sys",
"libtbr",
"log",
@ -2077,9 +2214,11 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-util",
"tonic",
"tonic-prost",
"tonic-prost-build",
"uuid",
]
[[package]]
@ -2095,6 +2234,26 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.3.45"
@ -2399,6 +2558,17 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f"
dependencies = [
"getrandom 0.3.4",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "vcpkg"
version = "0.2.15"

View file

@ -4,21 +4,29 @@ version = "0.1.0"
edition = "2024"
[dependencies]
async-compression = { version = "0.4.39", features = ["tokio", "brotli"] }
axum = "0.8.7"
axum-macros = "0.5.0"
bb8 = "0.9.1"
bb8-redis = "0.26.0"
bytes = "1.11.1"
env_logger = "0.11.8"
futures-util = "0.3.31"
git2 = { version = "0.20.3", features = ["https", "ssh"] }
json-patch = "4.1.0"
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
log = "0.4.29"
prost = "0.14.1"
redis = "1.0.2"
redis = { version = "1.0.2", features = ["tokio-comp"] }
reqwest = { version = "0.12.25", features = ["json"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["preserve_order"] }
tokio = { version = "1.48.0", features = ["full"] }
tokio-util = { version = "0.7.18", features = ["io"] }
tonic = { version = "0.14.2", features = ["transport"] }
tonic-prost = "0.14.2"
uuid = { version = "1.20.0", features = ["v4"] }
[build-dependencies]
tonic-prost-build = "0.14.2"

View file

@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, io::Cursor, sync::Arc, time::Duration};
use async_compression::tokio::bufread::BrotliEncoder;
use axum::{
Json, Router,
extract::{Query, State},
@ -7,24 +8,53 @@ use axum::{
routing::{get, post},
};
use axum_macros::debug_handler;
use bb8::Pool;
use bb8_redis::RedisConnectionManager;
use git2::{Cred, FetchOptions, PushOptions, RemoteCallbacks, Repository};
use log::{error, info, warn};
use redis::TypedCommands;
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
use serde::Deserialize;
use serde_json::{Value, json};
use tokio::sync::Mutex;
use tokio::{
io::{AsyncReadExt, BufReader},
sync::{
Mutex,
mpsc::{Receiver, Sender, channel},
},
task::JoinSet,
};
use crate::{gcm, reg};
use uuid::Uuid;
use crate::{
gcm,
tx::{
self,
helpers::{doc_snap_key, doc_tx_stream_key, doc_ver_key, now_unix},
patcher::apply_ops,
types::PatchOp,
},
};
type RedisPool = Pool<RedisConnectionManager>;
#[derive(Debug, Clone)]
pub struct CacheJob {
pub rel_path: String,
pub file_data: Option<Value>,
}
#[derive(Clone)]
pub struct AppState {
// cached_country_names: Vec<&'static str>,
configures: gcm::Configure,
repo: Arc<Mutex<Repository>>,
redis: Arc<Mutex<redis::Client>>,
redis: RedisPool,
// save already fetched path, further calls should fetch from redis instead.
fetched: Vec<String>,
// queue
enqueue_tx: Sender<CacheJob>,
}
impl AppState {
@ -41,7 +71,7 @@ impl AppState {
}
}
#[derive(Deserialize)]
#[derive(Deserialize, Debug)]
struct CheckoutParams {
path: String,
}
@ -52,21 +82,24 @@ async fn checkout_handler(
) -> impl IntoResponse {
let mut response: HashMap<String, Value> = HashMap::new();
// println!(
// "req: {param:?}, has_cached: {}, saved: {:?}",
// state
// .fetched
// .contains(&param.path.clone().as_str().to_string()),
// state.fetched.clone()
// );
// quick response from redis
if let Ok(mut rcli) = state.redis.try_lock()
if let Ok(mut conn) = state.redis.get().await
&& state
.fetched
.contains(&param.path.clone().as_str().to_string())
{
if let Ok(res) = rcli.get(param.path.clone().as_str().to_string()) {
match res {
Some(res) => {
response.insert("result".to_string(), json!(res));
return (axum::http::StatusCode::OK, Json(json!(response)));
}
None => {
// continue
}
if let Ok(res) = conn.get(param.path.clone().as_str().to_string()).await {
if let Some(res) = res {
response.insert("result".to_string(), json!(res));
return (axum::http::StatusCode::OK, Json(json!(response)));
}
}
}
@ -109,7 +142,14 @@ async fn checkout_handler(
}
};
let fpath = format!("master:{}", legit_path);
let fpath = format!(
"{}:{}",
state
.get_config("GIT_REPO_BRANCH_NAME")
.map(|x| x.to_string())
.unwrap_or("master".to_string()),
legit_path
);
let obj = match repo.revparse_single(&fpath) {
Ok(obj) => obj,
Err(e) => {
@ -127,10 +167,12 @@ async fn checkout_handler(
let content = unsafe { str::from_utf8_unchecked(blob.content()) };
response.insert("result".to_string(), json!(content.to_string()));
if let Ok(mut rcli) = state.redis.try_lock() {
let _ = rcli.set(legit_path.to_string(), content.to_string());
state.fetched.push(legit_path.to_string());
}
return (axum::http::StatusCode::OK, Json(json!(content)));
// if let Ok(mut conn) = state.redis.get().await {
// let _ = conn.set(legit_path.to_string(), content.to_string()).await;
// state.fetched.push(legit_path.to_string());
// }
} else if let Some(tree) = obj.as_tree() {
let dir_list = tree
.iter()
@ -138,10 +180,12 @@ async fn checkout_handler(
.collect::<Vec<String>>();
response.insert("result".to_string(), json!(dir_list));
if let Ok(mut rcli) = state.redis.try_lock() {
let _ = rcli.set(legit_path.to_string(), dir_list.join(","));
state.fetched.push(legit_path.to_string());
}
return (axum::http::StatusCode::OK, Json(json!(dir_list)));
// if let Ok(mut conn) = state.redis.get().await {
// let _ = conn.set(legit_path.to_string(), dir_list.join(",")).await;
// state.fetched.push(legit_path.to_string());
// }
} else {
let error_log = "not obj nor tree";
error!("{error_log}");
@ -149,7 +193,9 @@ async fn checkout_handler(
return (axum::http::StatusCode::BAD_REQUEST, Json(json!(response)));
}
(axum::http::StatusCode::OK, Json(json!(response)))
// println!("checkout response: {response:?}");
// (axum::http::StatusCode::OK, Json(json!(response)))
}
async fn fetch_handler(State(state): State<AppState>) -> impl IntoResponse {
@ -223,9 +269,9 @@ async fn fetch_handler(State(state): State<AppState>) -> impl IntoResponse {
);
}
tokio::spawn(async move {
let _ = broadcast_need_update_new_data_after_fetch(state.clone()).await;
});
// tokio::spawn(async move {
// let _ = broadcast_need_update_new_data_after_fetch(state.clone()).await;
// });
(
axum::http::StatusCode::OK,
@ -234,21 +280,21 @@ async fn fetch_handler(State(state): State<AppState>) -> impl IntoResponse {
}
// refetch data after request `/fetch`
async fn broadcast_need_update_new_data_after_fetch(
state: AppState,
) -> Result<(), Box<dyn std::error::Error>> {
// clear out saved state, user must request again
// state.fetched.clear(); // ----> background fetch into redis
//
//
let state_cl = state.clone();
let mut rcli = state_cl.redis.try_lock()?;
for s in state.fetched {
let _ = rcli.publish("recipe_repo_news", s);
}
// async fn broadcast_need_update_new_data_after_fetch(
// state: AppState,
// ) -> Result<(), Box<dyn std::error::Error>> {
// // clear out saved state, user must request again
// // state.fetched.clear(); // ----> background fetch into redis
// //
// //
// let state_cl = state.clone();
// let mut rcli = state_cl.redis.try_lock()?;
// for s in state.fetched {
// let _ = rcli.publish("recipe_repo_news", s);
// }
Ok(())
}
// Ok(())
// }
// { path: "/path/to/file", signature: {
// username: "", email: ""
@ -297,11 +343,16 @@ async fn commit_handler(
}
let commit_oid = match commit_file_content(
state.repo,
state.clone().repo,
&payload.path,
&content.as_bytes(),
payload.signature,
&payload.message.unwrap_or("update: from api".to_string()),
state
.clone()
.get_config("GIT_REPO_BRANCH_NAME")
.map(|x| x.to_string())
.unwrap_or("master".to_string()),
)
.await
{
@ -317,7 +368,7 @@ async fn commit_handler(
// save history
let redis_pre_lock = state.redis.clone();
{
if let Ok(mut rl) = redis_pre_lock.try_lock() {
if let Ok(mut rl) = redis_pre_lock.get().await {
let _ = rl.rpush(format!("{}.history", payload.path), payload.patch_key);
}
}
@ -328,13 +379,12 @@ async fn commit_handler(
)
}
async fn fetch_content_from_redis(
redis: Arc<Mutex<redis::Client>>,
key: &str,
) -> Result<String, String> {
match redis.lock().await.get(key) {
Ok(s) => {
if let Some(res) = s {
async fn fetch_content_from_redis(redis: RedisPool, key: &str) -> Result<String, String> {
match redis.get().await {
Ok(mut s) => {
if let Ok(res) = s.get(key).await
&& let Some(res) = res
{
Ok(res)
} else {
Err(format!("result error from key: {key}"))
@ -395,11 +445,7 @@ fn diff_apply(target: &mut Value, patch: &Value) -> Result<(), String> {
}
}
async fn apply_patch_to_file(
redis: Arc<Mutex<redis::Client>>,
path: &str,
patch_changes: &mut String,
) -> String {
async fn apply_patch_to_file(redis: RedisPool, path: &str, patch_changes: &mut String) -> String {
use libtbr::*;
// expect the path to already has in redis
let full_file = match fetch_content_from_redis(redis, path).await {
@ -446,6 +492,7 @@ async fn commit_file_content(
content: &[u8],
author: Signature,
message: &str,
branch: String,
) -> Result<git2::Oid, Box<dyn std::error::Error>> {
let repo_clone = repo.clone();
let blob_oid = repo_clone.lock().await.blob(content)?;
@ -494,7 +541,8 @@ async fn commit_file_content(
let parents: Vec<&git2::Commit> = parent.iter().collect();
let oid = rlock.commit(
Some("refs/heads/master"),
//"refs/heads/master"
Some(&format!("refs/heads/{branch}")),
&sig,
&sig,
message,
@ -520,7 +568,10 @@ async fn push_handler(State(state): State<AppState>) -> impl IntoResponse {
}
};
let branch = "master";
let branch = &config
.get("GIT_REPO_BRANCH_NAME")
.map(|x| x.to_string())
.unwrap_or("master".to_string());
if let Err(e) = push(config, repo, remote_name, branch) {
return (
@ -561,27 +612,242 @@ fn push(
Err("cannot lock repo".into())
}
/// Return git state & server state
async fn health_handler(State(_): State<AppState>) -> impl IntoResponse {
(
axum::http::StatusCode::OK,
Json(json!({
"status": "ok",
"git": "",
"server": ""
})),
)
}
async fn handle_system_message(redis: Arc<Mutex<redis::Client>>, mut msg: serde_json::Value) {
let topic = if let Some(t) = msg["topic"].as_str() {
t
} else {
""
};
match topic {
"generate_stx" => {
// expect target, base_version, value
let target = msg["target"].as_str().unwrap_or("").to_string();
let base_version = msg["base_version"].as_str().unwrap_or("").to_string();
let full_base_filename = msg["full_base_filename"].as_str().unwrap_or("").to_string();
let value = msg["value"].take();
let rclone = redis.clone();
let mut rcli = rclone.lock().await;
let doc_id = format!("{target}_{base_version}");
let doc_version_key = doc_ver_key(&doc_id);
let doc_snap = doc_snap_key(&doc_id);
let _ = rcli.set(
format!("{target}/{full_base_filename}"),
serde_json::to_string(&value).unwrap_or("".to_string()),
);
let _ = rcli.set(format!("{target}/version"), base_version);
info!("publish {full_base_filename} as main file");
// create commit transaction base
let mut con = match rcli.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
return;
}
};
let stream_key = doc_tx_stream_key(&doc_id);
let tx_event = json!({
"doc_id": doc_id,
"tx_id": Uuid::new_v4().to_string(),
"version": 1,
"base_version": 0,
"author": "from stx",
"ts": now_unix(),
"ops": vec![
PatchOp::Add { path: "/".to_string(), value: value.clone() }
]
});
let tx_str = tx_event.to_string();
let _: String = redis::cmd("XADD")
.arg(&stream_key)
.arg("*")
.arg("data")
.arg(&tx_str)
.query_async(&mut con)
.await
.unwrap();
let _: () = con.set(doc_ver_key(&doc_id), 1.to_string()).await.unwrap();
// if let Ok(state) = build_state_at(&mut con, &doc_id, 1).await {
// let _: () = con
// .set(doc_snap_key(&doc_id), state.to_string())
// .await
// .unwrap();
// let _: () = con
// .set(doc_snapver_key(&doc_id), 1.to_string())
// .await
// .unwrap();
// }
}
_ => {}
}
}
async fn build_state_at(
con: &mut MultiplexedConnection,
doc_id: &str,
target_version: u64,
) -> Result<Value, String> {
let snap_ver: u64 = con
.get(doc_snap_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let snap_json: Value = if snap_ver > 0 {
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
raw.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_else(|| json!({}))
} else {
json!({})
};
let stream_key = doc_tx_stream_key(doc_id);
let start_id = "0-0";
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
.arg(&stream_key)
.arg(start_id)
.arg("+")
.query_async(con)
.await
.map_err(|e| e.to_string())?;
let mut ops_list: Vec<PatchOp> = Vec::new();
for (_id, fields) in entries {
let mut data_opts = None;
for (k, v) in fields {
if k == "data" {
data_opts = Some(v);
break;
}
}
let data = match data_opts {
Some(x) => x,
None => continue,
};
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
if ver == 0 || ver > target_version {
continue;
}
if ver <= snap_ver {
continue;
}
let ops: Vec<PatchOp> =
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
ops_list.extend(ops);
}
apply_ops(snap_json, &ops_list)
}
pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
let queue_capacity = config
.get("BOOT_QUEUE_CAP")
.and_then(|x| x.parse().ok())
.unwrap_or(512);
let worker_count: usize = config
.get("CACHE_WORKERS")
.and_then(|x| x.parse().ok())
.unwrap_or(4);
// REDIS POOL
//
//
//
//
let manager = RedisConnectionManager::new(format!(
"redis://{}:{}",
config.get("REDIS_URI").unwrap_or(&"".to_string()),
config.get("REDIS_PORT").unwrap_or(&"".to_string())
))
.expect("failed to create redis conn manager");
let redis_pool = Pool::builder()
.max_size(16)
.build(manager)
.await
.expect("failed to build redis pool");
let (enqueue_tx, enqueue_rx) = channel::<CacheJob>(queue_capacity);
let state = AppState {
// cached_country_names: common::valid_country_name(),
configures: config.clone(),
repo: Arc::new(Mutex::new(Repository::open_bare(
config.get("GIT_REPO_LOCAL_DEST").unwrap_or(&"".to_string()),
)?)),
redis: Arc::new(Mutex::new(redis::Client::open(format!(
"redis://{}:{}",
config.get("REDIS_URI").unwrap_or(&"".to_string()),
config.get("REDIS_PORT").unwrap_or(&"".to_string())
))?)),
redis: redis_pool.clone(),
fetched: Vec::new(),
enqueue_tx: enqueue_tx.clone(),
};
let sys_redis_cli = state.redis.clone();
// tokio::spawn(async move {
// while let Some(msg) = sys_rx.recv().await {
// handle_system_message(sys_redis_cli.clone(), msg).await;
// }
// })
// .await?;
// SPAWN WORKER
let worker_handle = spawn_cache_workers(redis_pool.clone(), enqueue_rx, worker_count);
// COLD BOOT PRELOAD
{
let enqueue_tx = enqueue_tx.clone();
let cfg = config.clone();
let enable_cold_start = if let Some(flag) = config.clone().get("COLD_START_ENABLE") {
flag.contains("1")
} else {
false
};
if enable_cold_start {
tokio::spawn(async move {
if let Err(e) = cold_boot_enqueue_json(cfg, enqueue_tx).await {
error!("cold boot enqueue failed {e}");
}
});
}
}
let app = Router::new()
.route("/checkout", get(checkout_handler))
.route("/fetch", get(fetch_handler))
.route("/commit", post(commit_handler))
.route("/push", get(push_handler))
.route("/healthz", get(reg::health))
.route("/health", get(health_handler))
// .route("/healthz", get(reg::health))
.with_state(state);
let listener = tokio::net::TcpListener::bind(format!(
@ -595,5 +861,131 @@ pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
axum::serve(listener, app).await?;
drop(worker_handle);
Ok(())
}
// Cold Boot
//
//
async fn cold_boot_enqueue_json(
config: gcm::Configure,
enqueue_tx: Sender<CacheJob>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!("cold boot scanning");
let root_repo = config
.get("GIT_REPO_LOCAL_DEST")
.expect("git repo local not define");
tx::helpers::setup_prebuild_tx_cache(enqueue_tx, root_repo).await?;
Ok(())
}
// Worker pool
// spawn_cache_workers(config.clone(), redis_pool.clone(), enqueue_rx, worker_count);
fn spawn_cache_workers(
redis_pool: RedisPool,
mut rx: Receiver<CacheJob>,
worker_count: usize,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
info!("starting {worker_count} cache workers");
let mut joinset = JoinSet::new();
for worker_id in 0..worker_count {
// let redis_pool = redis_pool.clone();
// let repo_root = config
// .clone()
// .get("GIT_REPO_LOCAL_DEST")
// .expect("unable to get git repo local dest");
joinset.spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
// pull job
let _ = worker_id;
break;
}
});
}
// dispatcher
let max_inflight = worker_count;
while let Some(job) = rx.recv().await {
while joinset.len() >= max_inflight {
if let Some(res) = joinset.join_next().await {
if let Err(e) = res {
warn!("job task join error: {e}");
}
}
}
let redis_pool = redis_pool.clone();
// let repo_root = config
// .clone()
// .get("GIT_REPO_LOCAL_DEST")
// .expect("unable to get git repo local dest");
joinset.spawn(async move {
if let Err(e) = cache_one_file_streaming(redis_pool, job.clone()).await {
warn!("cache job failed: {e:?} {job:?}");
}
});
}
while let Some(res) = joinset.join_next().await {
if let Err(e) = res {
warn!("job task join error: {e:?}");
}
}
info!("cache worker stopped (queue closed)");
})
}
async fn cache_one_file_streaming(redis_pool: RedisPool, job: CacheJob) -> gcm::StandardResult {
if job.file_data.is_none() {
return Ok(());
}
let final_key = job.rel_path.clone();
let tmp_key = format!("tmp:{}:{}", final_key, Uuid::new_v4());
let file_string = job.file_data.unwrap().to_string();
let cursor = Cursor::new(file_string.into_bytes());
let reader = BufReader::new(cursor);
// brotli
let mut encoder = BrotliEncoder::new(reader);
let mut conn = redis_pool.get().await?;
let _ = conn.del(&tmp_key).await?;
let mut buf = vec![0u8; 64 * 1024];
let mut compressed_total = 0u64;
loop {
let n = encoder.read(&mut buf).await?;
if n == 0 {
break;
}
compressed_total += n as u64;
// append chunk to redis
let _: usize = conn.append(&tmp_key, &buf[..n]).await?;
}
let _ = conn.del(&final_key).await?;
let _ = conn.rename(tmp_key, &final_key).await?;
info!("cached {} ({} bytes)", job.rel_path, compressed_total);
Ok(())
}

View file

@ -1,4 +1,4 @@
use std::{env, fs::File};
use std::fs::File;
use env_logger::Builder;
use libtbr::recipe_functions::common;
@ -6,13 +6,14 @@ use log::info;
use crate::{
git::setup_git_repo,
reg::{heartbeat_loop, registry::registry_client::RegistryClient},
// reg::{heartbeat_loop, registry::registry_client::RegistryClient},
};
mod app;
mod gcm;
mod git;
mod reg;
mod tx;
// mod reg;
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
@ -24,27 +25,6 @@ fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
Ok(())
}
async fn setup_registry(config: gcm::Configure) -> gcm::StandardResult {
let cfg_clone = config.clone();
let register_server_address = cfg_clone
.get("REGISTRY_SERVER")
.expect("not found registry server");
let name = cfg_clone.get("SERVICE_NAME").expect("missing service name");
let url = cfg_clone.get("SERVICE_URL").expect("missing service url");
let token = cfg_clone.get("REGISTRY_TOKEN").expect("token not provided");
unsafe {
env::set_var("REGISTRY_TOKEN", token);
}
let mut client = RegistryClient::connect(register_server_address.clone()).await?;
reg::register_service(&mut client, name, url).await;
tokio::spawn(heartbeat_loop(name.to_string(), url.to_string()));
Ok(())
}
#[tokio::main]
async fn main() -> gcm::StandardResult {
let config: gcm::Configure = common::get_config();
@ -68,8 +48,6 @@ async fn main() -> gcm::StandardResult {
.get("CONFIG_VERSION")
.expect("config version not defined")
);
setup_registry(config.clone()).await?;
app::run(config.clone()).await?;
Ok(())

323
src/tx/handler.rs Normal file
View file

@ -0,0 +1,323 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
response::IntoResponse,
routing::{get, post},
};
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::{Value, json};
use uuid::Uuid;
use crate::tx::{
helpers::*,
patcher::apply_ops,
types::{PatchOp, Reservation, ReserveReq, ReserveRes, TxCommitReq, TxCommitRes},
};
async fn reserve_tx(
State(state): State<AppState>,
Path(doc_id): Path<String>,
Json(req): Json<ReserveReq>,
) -> impl IntoResponse {
let rsv_id = Uuid::new_v4().to_string();
let ttl_secs: i64 = 30; // ttl reserve 30s
let expires_at = now_unix() + ttl_secs;
let mut rcli = state.redis_cli.clone();
// check version
let ver_key = doc_ver_key(&doc_id);
let base_version: u64 = match rcli.get(ver_key) {
Ok(res) => {
if let Some(r) = res {
r.parse().ok().unwrap()
} else {
0
}
}
Err(_) => 0,
};
let rsv = Reservation {
reservation_id: rsv_id.clone(),
doc_id: doc_id.clone(),
author: req.author,
base_version,
expires_at,
};
let key = rsv_key(&doc_id, &rsv_id);
let payload = serde_json::to_string(&rsv).unwrap();
let _: () = rcli.set_ex(key, payload, ttl_secs as u64).unwrap();
(
StatusCode::OK,
Json(ReserveRes {
reservation_id: rsv_id,
base_version,
expires_at,
}),
)
}
async fn commit_tx(
State(state): State<AppState>,
Path(doc_id): Path<String>,
Json(req): Json<TxCommitReq>,
) -> impl IntoResponse {
if req.tx_id.is_empty() || req.reservation_id.is_empty() {
return (
StatusCode::BAD_REQUEST,
"missing tx_id or reservation_id ".into_response(),
);
}
let mut con = match state.redis_cli.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("{}", e.to_string()).into_response(),
);
}
};
// lock commit
let lock_key = commit_lock_key(&doc_id);
if let Err(_) = wait_lock(&mut con, &lock_key, 1200, 20).await {
return (StatusCode::CONFLICT, "doc busy, retry".into_response());
}
let txid_k = txid_key(&doc_id, &req.tx_id);
if let Ok(Some(v)) = con.get(&txid_k).await {
if let Ok(committed_version) = v.parse::<u64>() {
let res = TxCommitRes {
doc_id,
tx_id: req.tx_id,
committed_version,
};
return (StatusCode::OK, Json(res).into_response());
}
}
let rsv_k = rsv_key(&doc_id, &req.reservation_id);
let rsv_raw: Option<String> = con.get(&rsv_k).await.unwrap_or(None);
let rsv_raw = match rsv_raw {
Some(x) => x,
None => {
return (
StatusCode::CONFLICT,
"reservation missing/expired".into_response(),
);
}
};
let rsv: Reservation = match serde_json::from_str(&rsv_raw) {
Ok(x) => x,
Err(_) => return (StatusCode::CONFLICT, "bad reservation".into_response()),
};
if rsv.doc_id != doc_id || rsv.author != req.author || rsv.base_version != req.base_version {
return (StatusCode::CONFLICT, "reservation mismatch".into_response());
}
if now_unix() > rsv.expires_at {
return (StatusCode::CONFLICT, "reservation expired".into_response());
}
let curr_ver: u64 = con
.get(doc_ver_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if req.base_version != curr_ver {
let body = json!({
"error": "version_conflict",
"server_version": curr_ver,
});
return (StatusCode::CONFLICT, Json(body).into_response());
}
let next_ver = curr_ver + 1;
let stream_key = doc_tx_stream_key(&doc_id);
let tx_event = json!({
"doc_id": doc_id,
"tx_id": req.tx_id,
"version": next_ver,
"base_version": req.base_version,
"author": req.author,
"ts": now_unix(),
"ops": req.ops
});
let tx_event_str = tx_event.to_string();
let _: String = redis::cmd("XADD")
.arg(&stream_key)
.arg("*")
.arg("data")
.arg(&tx_event_str)
.query_async(&mut con)
.await
.unwrap();
let _: () = con
.set(doc_ver_key(&doc_id), next_ver.to_string())
.await
.unwrap();
let _: () = con.set(&txid_k, next_ver.to_string()).await.unwrap();
let _: usize = con.del(&rsv_k).await.unwrap();
let snapshot_every: u64 = 50;
if next_ver % snapshot_every == 0 {
if let Ok(state) = build_state_at(&mut con, &doc_id, next_ver).await {
let _: () = con
.set(doc_snap_key(&doc_id), state.to_string())
.await
.unwrap();
let _: () = con
.set(doc_snapver_key(&doc_id), next_ver.to_string())
.await
.unwrap();
}
}
(
StatusCode::OK,
Json(TxCommitRes {
doc_id,
tx_id: req.tx_id,
committed_version: next_ver,
})
.into_response(),
)
}
async fn build_state_at(
con: &mut MultiplexedConnection,
doc_id: &str,
target_version: u64,
) -> Result<Value, String> {
let snap_ver: u64 = con
.get(doc_snap_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let snap_json: Value = if snap_ver > 0 {
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
raw.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_else(|| json!({}))
} else {
json!({})
};
let stream_key = doc_tx_stream_key(doc_id);
let start_id = "0-0";
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
.arg(&stream_key)
.arg(start_id)
.arg("+")
.query_async(con)
.await
.map_err(|e| e.to_string())?;
let mut ops_list: Vec<PatchOp> = Vec::new();
for (_id, fields) in entries {
let mut data_opts = None;
for (k, v) in fields {
if k == "data" {
data_opts = Some(v);
break;
}
}
let data = match data_opts {
Some(x) => x,
None => continue,
};
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
if ver == 0 || ver > target_version {
continue;
}
if ver <= snap_ver {
continue;
}
let ops: Vec<PatchOp> =
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
ops_list.extend(ops);
}
apply_ops(snap_json, &ops_list)
}
#[derive(Deserialize)]
struct StateQuery {
at_version: Option<u64>,
}
async fn get_state(
State(state): State<AppState>,
Path(doc_id): Path<String>,
Query(q): Query<StateQuery>,
) -> impl IntoResponse {
let rcli = state.redis_cli.clone();
let mut con = match rcli.get_multiplexed_async_connection().await {
Ok(sc) => sc,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
"cannot get connection".into_response(),
);
}
};
let cur_ver: u64 = con
.get(doc_ver_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let target = q.at_version.unwrap_or(cur_ver).min(cur_ver);
match build_state_at(&mut con, &doc_id, target).await {
Ok(state) => (
StatusCode::OK,
Json(json!({
"doc_id": doc_id,
"version": target,
"state": state
}))
.into_response(),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({
"error": e
}))
.into_response(),
),
}
}
// pub async fn create_tx_patcher_route() -> Router<AppState> {
// Router::new()
// .route("/{id}/tx-reserve", post(reserve_tx))
// .route("/{id}/tx-commit", post(commit_tx))
// .route("/{id}/state", get(get_state))
// }

294
src/tx/helpers.rs Normal file
View file

@ -0,0 +1,294 @@
use std::{
clone,
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use git2::Repository;
use json_patch::{Patch, diff};
use log::info;
use redis::{RedisResult, aio::MultiplexedConnection};
use serde::Deserialize;
use tokio::{sync::mpsc::Sender, time::sleep};
use uuid::Uuid;
use crate::app::CacheJob;
pub fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
pub fn doc_ver_key(doc: &str) -> String {
format!("doc:{doc}:ver")
}
pub fn doc_tx_stream_key(doc: &str) -> String {
format!("doc:{doc}:tx")
}
pub fn doc_snap_key(doc: &str) -> String {
format!("doc:{doc}:snap")
}
pub fn doc_snapver_key(doc: &str) -> String {
format!("doc:{doc}:snapver")
}
pub fn rsv_key(doc: &str, rsv: &str) -> String {
format!("doc:{doc}:rsv:{rsv}")
}
pub fn txid_key(doc: &str, tx_id: &str) -> String {
format!("doc:{doc}:txid:{tx_id}")
}
pub fn commit_lock_key(doc: &str) -> String {
format!("doc:{doc}:lock")
}
pub async fn acquire_lock(
con: &mut MultiplexedConnection,
key: &str,
ttl_ms: u64,
) -> RedisResult<bool> {
let token = Uuid::new_v4().to_string();
let ok: Option<String> = redis::cmd("SET")
.arg(key)
.arg(token)
.arg("NX")
.arg("PX")
.arg(ttl_ms)
.query_async(con)
.await?;
Ok(ok.is_some())
}
pub async fn wait_lock(
con: &mut MultiplexedConnection,
key: &str,
ttl_ms: u64,
attempts: u32,
) -> RedisResult<()> {
for _ in 0..attempts {
if acquire_lock(con, key, ttl_ms).await? {
return Ok(());
}
sleep(Duration::from_millis(30)).await;
}
Err(redis::RedisError::from((redis::ErrorKind::Io, "lock busy")))
}
pub fn build_tx_from_diff(base: serde_json::Value, alt: serde_json::Value) -> Patch {
diff(&base, &alt)
}
fn get_file_from_repo(
repo: Arc<Mutex<Repository>>,
path: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
if let Ok(repo_m) = repo.try_lock() {
let obj = repo_m.revparse_single(path)?;
if let Some(blob) = obj.as_blob() {
let content = unsafe { str::from_utf8_unchecked(blob.content()) };
return Ok(content.to_string());
} else if let Some(tree) = obj.as_tree() {
let dir_list = tree
.iter()
.map(|x| x.name().unwrap_or("").to_string())
.collect::<Vec<String>>();
return Ok(dir_list.join(","));
}
}
Err("FileNotFound".into())
}
pub async fn setup_prebuild_tx_cache(
tx: Sender<CacheJob>,
git_repo: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// import tx files
let repo = match Repository::open_bare(git_repo) {
Ok(repo) => repo,
Err(_) => return Err("cannot open repo".into()),
};
let repo_m = Arc::new(Mutex::new(repo));
let root_path = String::from("master:");
let all_folders = match get_file_from_repo(repo_m.clone(), root_path.as_str()) {
Ok(af) => af,
Err(e) => return Err(e),
};
let folders = all_folders
.split(",")
.map(|x| x.to_string())
.collect::<Vec<String>>();
for folder in folders {
let current_path = format!("master:{folder}");
println!("trying {folder}...");
// test get version
//
// NOTE: version may miss if there is file with `versionXXXX`
//
//
if let Ok(current_files) = get_file_from_repo(repo_m.clone(), current_path.as_str())
&& current_files.contains(",version")
{
let expected_version_path = format!("{current_path}/version");
println!("\tmay have version --> {expected_version_path}, \ncurrent: {current_files}");
// get version content
let version_str =
if let Ok(v) = get_file_from_repo(repo_m.clone(), expected_version_path.as_str()) {
v
} else {
"".to_string()
};
// let version = version_str.parse::<u64>().unwrap_or(0);
if version_str.is_empty() {
// unexpected
return Err("cannot get version".into());
}
if tx
.send(CacheJob {
rel_path: expected_version_path.clone(),
file_data: Some(serde_json::json!(version_str)),
})
.await
.is_ok()
{
info!("cache ok {current_path}/version");
}
println!("try get {folder}:{version_str}");
let files = current_files
.split(",")
.map(|x| x.to_string())
.collect::<Vec<String>>();
let base_file: String = match files.iter().find(|x| {
x.ends_with(".json") && x.starts_with(&format!("coffeethai02_{version_str}"))
}) {
Some(f) => f.to_string(),
None => "".to_string(),
};
println!("{folder}: {base_file}");
if base_file.is_empty() {
println!("base file empty skip");
continue;
}
let base_recipe_str = if let Ok(r) =
get_file_from_repo(repo_m.clone(), &format!("{current_path}/{base_file}"))
{
r.to_string()
} else {
"".to_string()
};
if base_recipe_str.is_empty() {
println!("empty recipe skip!");
continue;
}
let base_recipe: serde_json::Value = match serde_json::from_str(&base_recipe_str) {
Ok(r) => r,
Err(e) => return Err(e.into()),
};
let mut current_recipes = HashMap::new();
current_recipes.insert("base".to_string(), base_recipe);
for cfile in files {
if cfile.ne(&base_file)
&& cfile.ends_with(".json")
&& cfile.starts_with("coffeethai02")
{
//
// fetch file content
// read into value
// do diff
let current_fetch_path = format!("{current_path}/{cfile}");
let current_file_str =
if let Ok(cr) = get_file_from_repo(repo_m.clone(), &current_fetch_path) {
cr.to_string()
} else {
"".to_string()
};
if !current_file_str.is_empty() {
// into value
let alt_recipe: serde_json::Value =
match serde_json::from_str(&current_file_str) {
Ok(crr) => crr,
Err(e) => {
println!("get error while alt {current_fetch_path}: {e}");
continue;
}
};
current_recipes.insert(cfile, alt_recipe);
}
}
}
// process diffing
//
let base_recipe_clone = current_recipes.get("base").unwrap().clone();
for (key, value) in current_recipes.iter() {
if key != "base" {
let patch_diff = diff(&base_recipe_clone, value);
let tx_name = format!(
"stx_{}",
key.replace("coffeethai02_", "").replace(".json", "")
);
let base_path_out = format!("./recipe_tx/{folder}");
if !std::path::Path::new(&base_path_out).exists() {
std::fs::create_dir_all(&base_path_out)?;
}
let file_out = format!("{base_path_out}/{tx_name}.json");
std::fs::write(file_out, serde_json::to_string_pretty(&patch_diff)?)?;
}
}
// let _ = tx
// .send(serde_json::json!({
// "topic": "generate_stx",
// "target": folder,
// "base_version": version_str,
// "full_base_filename": base_file,
// "value": base_recipe_clone
// }))
// .await;
if tx
.send(CacheJob {
rel_path: format!("{current_path}/{base_file}"),
file_data: Some(base_recipe_clone),
})
.await
.is_ok()
{
info!("cache ok {current_path}/{base_file}");
}
}
}
Ok(())
}

4
src/tx/mod.rs Normal file
View file

@ -0,0 +1,4 @@
// pub mod handler;
pub mod helpers;
pub mod patcher;
pub mod types;

119
src/tx/patcher.rs Normal file
View file

@ -0,0 +1,119 @@
use serde_json::Value;
use crate::tx::types::PatchOp;
pub fn json_pointer_parent<'a>(
root: &'a mut Value,
path: &str,
) -> Result<(&'a mut Value, String), String> {
if !path.starts_with('/') && path != "" {
return Err("path must be JSON pointer starting with '/'".into());
}
let part: Vec<String> = path
.split('/')
.skip(1)
.map(|x| x.replace("~1", "/").replace("~0", "~"))
.collect();
if path.is_empty() {
return Ok((root, "".into()));
}
let last = part.last().unwrap().clone();
let mut cur = root;
for key in &part[..part.len() - 1] {
match cur {
Value::Object(map) => {
cur = map
.get_mut(key)
.ok_or_else(|| format!("missing object key {key}"))?;
}
Value::Array(arr) => {
let idx: usize = key.parse().map_err(|_| format!("bad array idx {key}"))?;
cur = arr
.get_mut(idx)
.ok_or_else(|| format!("array index out of bounds {idx}"))?;
}
_ => return Err("cannot traverse non-container".into()),
}
}
Ok((cur, last))
}
pub fn apply_ops(mut state: Value, ops: &[PatchOp]) -> Result<Value, String> {
for op in ops {
match op {
PatchOp::Replace { path, value } => {
if path == "" || path == "/" {
state = value.clone();
continue;
}
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
if !map.contains_key(&last) {
return Err(format!("replace target missing key {last}"));
}
map.insert(last, value.clone());
}
Value::Array(arr) => {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx >= arr.len() {
return Err(format!("replace index out of bounds {idx}"));
}
arr[idx] = value.clone();
}
_ => return Err("replace parent not container".into()),
}
}
PatchOp::Add { path, value } => {
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
map.insert(last, value.clone());
}
Value::Array(arr) => {
if last == "-" {
arr.push(value.clone());
} else {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx > arr.len() {
return Err(format!("add index out of bounds {idx}"));
}
arr.insert(idx, value.clone());
}
}
_ => return Err("add parent not container".into()),
}
}
PatchOp::Remove { path } => {
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
map.remove(&last)
.ok_or_else(|| format!("remove missing key {last}"))?;
}
Value::Array(arr) => {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx > arr.len() {
return Err(format!("remove index out of bounds {idx}"));
}
arr.remove(idx);
}
_ => return Err("remove parent not container".into()),
}
}
}
}
Ok(state)
}

51
src/tx/types.rs Normal file
View file

@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Serialize, Deserialize)]
pub struct ReserveReq {
pub author: String,
hint_paths: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ReserveRes {
pub reservation_id: String,
pub base_version: u64,
pub expires_at: i64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TxCommitReq {
pub reservation_id: String,
pub tx_id: String,
pub base_version: u64,
pub author: String,
pub ops: Vec<PatchOp>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TxCommitRes {
pub doc_id: String,
pub tx_id: String,
pub committed_version: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "op")]
pub enum PatchOp {
#[serde(rename = "replace")]
Replace { path: String, value: Value },
#[serde(rename = "add")]
Add { path: String, value: Value },
#[serde(rename = "remove")]
Remove { path: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Reservation {
pub reservation_id: String,
pub doc_id: String,
pub author: String,
pub base_version: u64,
pub expires_at: i64,
}