Compare commits

...

21 commits

Author SHA1 Message Date
Pakin
2466472235 feat: add replay feature for recovery
- fix bug commit not update index

Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-19 12:10:31 +07:00
Pakin
a69ef7b927 disable cache from redis
- reason from value not updated

Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-18 09:56:23 +07:00
Pakin
b70a35135c update usage of commit
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-15 14:38:27 +07:00
Pakin
c8f820e238 add usages
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-15 14:27:18 +07:00
Pakin
caa0833ea2 fix: fail git clone by case empty folder
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-08 15:43:13 +07:00
Pakin
166b7079ca feat: add multi files support for commit
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-06 10:00:06 +07:00
Pakin
ae9d9fa66b feat: commit, push, pull
- test ok but must call pull first before do commit or push

Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-05-05 16:55:09 +07:00
Pakin
bca1c911d3 add commit logging 2026-04-28 11:43:03 +07:00
Pakin
5bb2a6c192 change: change get from json cache file to multipart
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-04-27 15:47:11 +07:00
Pakin
d19dab7561 feat: pull handler
- test simple pull ok

Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-04-10 10:11:53 +07:00
Pakin
bb3e55eecb feat: enable full clone
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-03-24 10:21:02 +07:00
Pakin
febf91d417 disable cold boot caching & optimizing
- caused by too much ram usage
- optimize memory usage on checkout code

Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-02-25 15:11:29 +07:00
Pakin
6fe3357efe feat: add support for image file in checkout
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-02-17 15:17:04 +07:00
Pakin
a2da030a99 add tx, cold boot 2026-02-17 14:23:31 +07:00
Pakin
03263815e6 feat: fetch from cache if existed
Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-02-02 14:12:16 +07:00
Pakin
3043f30012 feat: add commit, push handler
- add new feature commit changes and push to remote

Signed-off-by: Pakin <pakin.t@forth.co.th>
2026-01-22 17:20:03 +07:00
Pakin
2dd165b451 fix: cannot get file at root 2026-01-09 11:36:50 +07:00
Pakin
59d0dd7ab4 feature: enable get root files
- allow get files/dirs at root repo
2026-01-09 11:25:33 +07:00
Pakin
4e3b561f61 fix: missing token 2026-01-09 09:46:51 +07:00
Pakin
9f4fb6c274 update dockerfile 2026-01-08 14:40:58 +07:00
Pakin
90856717e4 update: repo & deps 2026-01-08 14:15:27 +07:00
15 changed files with 4055 additions and 1105 deletions

5
.dockerignore Normal file
View file

@ -0,0 +1,5 @@
/target
.tbcfg
*.txt
*.log

2
.gitignore vendored
View file

@ -1,5 +1,5 @@
/target
.tbcfg
.tbcfg*
*.txt
*.log

2575
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,19 +4,33 @@ version = "0.1.0"
edition = "2024"
[dependencies]
async-compression = { version = "0.4.39", features = ["tokio", "brotli"] }
axum = "0.8.7"
axum-macros = "0.5.0"
axum-extra = { version = "0.12.6", features = ["multipart"] }
bb8 = "0.9.1"
bb8-redis = "0.26.0"
brotli = "8.0.2"
bytes = "1.11.1"
env_logger = "0.11.8"
futures-util = "0.3.31"
git2 = { version = "0.20.3", features = ["https", "ssh"] }
image = "0.25.9"
json-patch = "4.1.0"
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
libtbr = { git = "https://gitlab.forthrd.io/Pakin/libtbr.git", version = "0.1.1" }
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
log = "0.4.29"
prost = "0.14.1"
redis = { version = "1.0.2", features = ["tokio-comp"] }
reqwest = { version = "0.12.25", features = ["json"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = { version = "1.0.145", features = ["preserve_order"] }
tokio = { version = "1.48.0", features = ["full"] }
tokio-util = { version = "0.7.18", features = ["io"] }
tonic = { version = "0.14.2", features = ["transport"] }
tonic-prost = "0.14.2"
uuid = { version = "1.20.0", features = ["v4"] }
tokio-postgres = "0.7.17"
[build-dependencies]
tonic-prost-build = "0.14.2"

43
Dockerfile Normal file
View file

@ -0,0 +1,43 @@
FROM rustlang/rust:nightly-slim AS builder
WORKDIR /app
# Install build dependencies
RUN apt update && apt install -y \
musl-tools \
musl-dev \
build-essential \
libssl-dev \
curl \
pkg-config \
ca-certificates \
protobuf-compiler \
zlib1g
# Copy dependency files first for better caching
COPY Cargo.toml Cargo.lock ./
COPY ./src ./src
COPY build.rs ./
COPY tbm-proto ./tbm-proto
# Download config
RUN curl -X GET https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/tbm-git-repo-service/releases/download/config/.tbcfg -o .tbcfg
# Build the application
ENV RUSTFLAGS="-C target-feature=+crt-static"
RUN cargo build --release
# Builder
FROM gcr.io/distroless/cc
# Copy the binary
COPY --from=builder /app/target/release/tbm-git-repo-service /
COPY --from=builder /app/.tbcfg /
# Create data directory
EXPOSE 36583
# Environment defaults
CMD ["./tbm-git-repo-service"]

183
README.md Normal file
View file

@ -0,0 +1,183 @@
# tbm-git-service
Git repo as a service with operational endpoints
---
## Endpoints
### Checkout file
Get file content or list of file in directory from HEAD
```
GET .../checkout?path=<path_to_file>
```
Examples
Get list of files in directory
```bash
curl -X GET http://localhost:36593/checkout\?path\=inter
aus,common,dev,gbr,gbr_premium,hkg,ltu,mys,rou,sgp,tha,tha_premium,uae_dubai,usa,whatthecup
```
Get file
```bash
curl -X GET http://localhost:36593/checkout\?path\=inter/mys/xml/page_catalog_group_other.lxml
<Popup>
<Cache> "Enable" </Cache>
<Width> 1080 </Width>
<Height> 1920 </Height>
;<Background> "0xeae6e1" </Background>
<Volume> SoundVolume </Volume>
<EventOpen>
; On open
```
Error
```bash
curl -X GET http://localhost:36593/checkout\?path\=inter2
File not found
```
---
### Fetch
Fetching & update index of current repo
```
GET .../fetch
```
Expected result
```
{"result": "fetch success"}
```
Error
```
{"error": "..."}
```
---
### Commit
Commit changes of file i.e. editing, adding, etc.
```
POST .../commit
```
Body must be multipart
- Single file
```
path: string
file: text/binary is acceptable
signature_username: string
signature_email: stirng
message: string
```
- Multiple file
```
signature_username: string
signature_email: stirng
message: string
fileX1: text/binary is acceptable
pathX1: string
fileX2: text/binary is acceptable
pathX2: string
...
```
Expected result
```
{"result": "commit hash id"}
```
Error
```
{"error": "..."}
```
Example
```curl
curl --request POST \
--url http://localhost:36583/commit \
--header 'content-type: multipart/form-data' \
--form path=mys/version.dev \
--form 'signature_username=git api' \
--form signature_email=supra.m2.dev@forth.co.th \
--form 'message=test commit' \
--form file=@./mys.version.test
```
NOTE: file & path in multiple file must have the same name after
---
### Push
Push local commits to remote
```
GET .../push
```
Expected result
```
{"result": "push completed"}
```
Error
```
{"error": "..."}
```
---
### Pull
Pull commits from remote. This operation always does git reset hard first before pull for reason of no conflicts.
```
GET .../pull
```
Expected result
```
{"result": "pull completed"}
```
Error
```
{"error": "..."}
```
NOTE: Commit did lost but could be checkout later
---
### Recovery
WIP...

1373
src/app.rs

File diff suppressed because it is too large Load diff

View file

@ -1,16 +1,21 @@
use std::{cell::RefCell, collections::HashMap, io::{self, Write}, path::{Path, PathBuf}};
use std::{
cell::RefCell,
collections::HashMap,
io::{self, Write},
path::{Path, PathBuf},
};
use git2::{build::RepoBuilder, Cred, FetchOptions, Progress, RemoteCallbacks};
use log::{info};
use git2::{Cred, FetchOptions, Progress, RemoteCallbacks, build::RepoBuilder};
use log::info;
use crate::gcm;
struct GitState {
progress: Option<Progress<'static>>,
total: usize,
current: usize,
path: Option<PathBuf>,
newline: bool
progress: Option<Progress<'static>>,
total: usize,
current: usize,
path: Option<PathBuf>,
newline: bool,
}
fn print(state: &mut GitState) {
@ -29,10 +34,11 @@ fn print(state: &mut GitState) {
state.newline = true;
}
info!("Resolving deltas {}/{}",
stats.indexed_deltas(),
stats.total_deltas());
info!(
"Resolving deltas {}/{}",
stats.indexed_deltas(),
stats.total_deltas()
);
} else {
info!(
"net {:3}% ({:4} kb, {:5}/{:5}) / idx {:3}% ({:5}/{:5}) \
@ -58,40 +64,38 @@ fn print(state: &mut GitState) {
}
pub fn setup_git_repo(config: HashMap<String, String>) -> gcm::StandardResult {
let state = RefCell::new(GitState {
progress: None,
total: 0,
current: 0,
path: None,
newline: true
});
let state = RefCell::new(GitState {
progress: None,
total: 0,
current: 0,
path: None,
newline: true,
});
let mut cb = RemoteCallbacks::new();
cb.transfer_progress(|stats| {
let mut state = state.borrow_mut();
state.progress = Some(stats.to_owned());
print(&mut state);
true
});
let mut cb = RemoteCallbacks::new();
cb.transfer_progress(|stats| {
let mut state = state.borrow_mut();
state.progress = Some(stats.to_owned());
print(&mut state);
true
});
cb.credentials(|_,_,_| {
Cred::userpass_plaintext(
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string())
)
});
cb.credentials(|_, _, _| {
Cred::userpass_plaintext(
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string()),
)
});
let mut fo = FetchOptions::new();
fo.remote_callbacks(cb);
fo.depth(1);
let mut fo = FetchOptions::new();
fo.remote_callbacks(cb);
RepoBuilder::new()
.bare(true)
.fetch_options(fo)
.clone(
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into()
let _ = RepoBuilder::new().bare(false).fetch_options(fo).clone(
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into(),
)?;
Ok(())
}
println!("clone completed !");
Ok(())
}

View file

@ -1,56 +1,67 @@
use std::{fs::File};
use std::fs::File;
use env_logger::Builder;
use libtbr::recipe_functions::common;
use log::info;
use crate::{git::setup_git_repo, reg::{heartbeat_loop, registry::registry_client::RegistryClient}};
use crate::{
git::setup_git_repo,
// reg::{heartbeat_loop, registry::registry_client::RegistryClient},
};
mod app;
mod gcm;
mod git;
mod reg;
mod tx;
// mod reg;
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
// NOTE: disable logging file, use send to log service instead
// let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
.target(env_logger::Target::Pipe(Box::new(logfile))).init();
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
// .target(env_logger::Target::Pipe(Box::new(logfile)))
.init();
Ok(())
Ok(())
}
async fn setup_registry(config: gcm::Configure) -> gcm::StandardResult {
let cfg_clone = config.clone();
let register_server_address = cfg_clone.get("REGISTRY_SERVER").expect("not found registry server");
let name = cfg_clone.get("SERVICE_NAME").expect("missing service name");
let url = cfg_clone.get("SERVICE_URL").expect("missing service url");
let mut client = RegistryClient::connect(register_server_address.clone()).await?;
reg::register_service(&mut client, name, url).await;
tokio::spawn(heartbeat_loop(name.to_string(), url.to_string()));
Ok(())
}
#[tokio::main]
async fn main() -> gcm::StandardResult {
let config: gcm::Configure = common::get_config();
setup_log(config.clone())?;
match std::fs::read_dir(config.get("GIT_REPO_LOCAL_DEST").expect("not exist")) {
Ok(_) => {
info!("GIT_REPO `{dest}` already setup", dest = config.get("GIT_REPO_LOCAL_DEST").unwrap())
},
Err(_) => {
setup_git_repo(config.clone())?;
}
Ok(_) => {
info!(
"GIT_REPO `{dest}` existed, checking if has git",
dest = config.get("GIT_REPO_LOCAL_DEST").unwrap()
);
match std::fs::read_dir(format!(
"{}/.git",
config.get("GIT_REPO_LOCAL_DEST").expect("not exist")
)) {
Ok(_) => {
info!("GIT REPO already set up!")
}
Err(_) => {
setup_git_repo(config.clone())?;
}
}
}
Err(_) => {
setup_git_repo(config.clone())?;
}
}
setup_registry(config.clone()).await?;
info!(
"APP VERSION: {}",
config
.get("CONFIG_VERSION")
.expect("config version not defined")
);
info!("RUNNING: {:?}", config.get("PUBLIC_PORT"));
app::run(config.clone()).await?;
Ok(())

View file

@ -1,5 +1,6 @@
use std::time::Duration;
use log::{error, info};
use tokio::time::sleep;
use tonic::transport::Channel;
@ -17,7 +18,11 @@ pub async fn register_service(client: &mut RegistryClient<Channel>, name: &str,
token: std::env::var("REGISTRY_TOKEN").unwrap_or_default(),
});
let _ = client.register(req).await;
let resp = client.register(req).await;
match resp {
Ok(r) => info!("{:?}", r),
Err(e) => error!("error register: {:?}", e)
}
}
pub async fn heartbeat_loop(name: String, url: String) {

323
src/tx/handler.rs Normal file
View file

@ -0,0 +1,323 @@
use axum::{
Json, Router,
extract::{Path, Query, State},
response::IntoResponse,
routing::{get, post},
};
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::{Value, json};
use uuid::Uuid;
use crate::tx::{
helpers::*,
patcher::apply_ops,
types::{PatchOp, Reservation, ReserveReq, ReserveRes, TxCommitReq, TxCommitRes},
};
async fn reserve_tx(
State(state): State<AppState>,
Path(doc_id): Path<String>,
Json(req): Json<ReserveReq>,
) -> impl IntoResponse {
let rsv_id = Uuid::new_v4().to_string();
let ttl_secs: i64 = 30; // ttl reserve 30s
let expires_at = now_unix() + ttl_secs;
let mut rcli = state.redis_cli.clone();
// check version
let ver_key = doc_ver_key(&doc_id);
let base_version: u64 = match rcli.get(ver_key) {
Ok(res) => {
if let Some(r) = res {
r.parse().ok().unwrap()
} else {
0
}
}
Err(_) => 0,
};
let rsv = Reservation {
reservation_id: rsv_id.clone(),
doc_id: doc_id.clone(),
author: req.author,
base_version,
expires_at,
};
let key = rsv_key(&doc_id, &rsv_id);
let payload = serde_json::to_string(&rsv).unwrap();
let _: () = rcli.set_ex(key, payload, ttl_secs as u64).unwrap();
(
StatusCode::OK,
Json(ReserveRes {
reservation_id: rsv_id,
base_version,
expires_at,
}),
)
}
async fn commit_tx(
State(state): State<AppState>,
Path(doc_id): Path<String>,
Json(req): Json<TxCommitReq>,
) -> impl IntoResponse {
if req.tx_id.is_empty() || req.reservation_id.is_empty() {
return (
StatusCode::BAD_REQUEST,
"missing tx_id or reservation_id ".into_response(),
);
}
let mut con = match state.redis_cli.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("{}", e.to_string()).into_response(),
);
}
};
// lock commit
let lock_key = commit_lock_key(&doc_id);
if let Err(_) = wait_lock(&mut con, &lock_key, 1200, 20).await {
return (StatusCode::CONFLICT, "doc busy, retry".into_response());
}
let txid_k = txid_key(&doc_id, &req.tx_id);
if let Ok(Some(v)) = con.get(&txid_k).await {
if let Ok(committed_version) = v.parse::<u64>() {
let res = TxCommitRes {
doc_id,
tx_id: req.tx_id,
committed_version,
};
return (StatusCode::OK, Json(res).into_response());
}
}
let rsv_k = rsv_key(&doc_id, &req.reservation_id);
let rsv_raw: Option<String> = con.get(&rsv_k).await.unwrap_or(None);
let rsv_raw = match rsv_raw {
Some(x) => x,
None => {
return (
StatusCode::CONFLICT,
"reservation missing/expired".into_response(),
);
}
};
let rsv: Reservation = match serde_json::from_str(&rsv_raw) {
Ok(x) => x,
Err(_) => return (StatusCode::CONFLICT, "bad reservation".into_response()),
};
if rsv.doc_id != doc_id || rsv.author != req.author || rsv.base_version != req.base_version {
return (StatusCode::CONFLICT, "reservation mismatch".into_response());
}
if now_unix() > rsv.expires_at {
return (StatusCode::CONFLICT, "reservation expired".into_response());
}
let curr_ver: u64 = con
.get(doc_ver_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if req.base_version != curr_ver {
let body = json!({
"error": "version_conflict",
"server_version": curr_ver,
});
return (StatusCode::CONFLICT, Json(body).into_response());
}
let next_ver = curr_ver + 1;
let stream_key = doc_tx_stream_key(&doc_id);
let tx_event = json!({
"doc_id": doc_id,
"tx_id": req.tx_id,
"version": next_ver,
"base_version": req.base_version,
"author": req.author,
"ts": now_unix(),
"ops": req.ops
});
let tx_event_str = tx_event.to_string();
let _: String = redis::cmd("XADD")
.arg(&stream_key)
.arg("*")
.arg("data")
.arg(&tx_event_str)
.query_async(&mut con)
.await
.unwrap();
let _: () = con
.set(doc_ver_key(&doc_id), next_ver.to_string())
.await
.unwrap();
let _: () = con.set(&txid_k, next_ver.to_string()).await.unwrap();
let _: usize = con.del(&rsv_k).await.unwrap();
let snapshot_every: u64 = 50;
if next_ver % snapshot_every == 0 {
if let Ok(state) = build_state_at(&mut con, &doc_id, next_ver).await {
let _: () = con
.set(doc_snap_key(&doc_id), state.to_string())
.await
.unwrap();
let _: () = con
.set(doc_snapver_key(&doc_id), next_ver.to_string())
.await
.unwrap();
}
}
(
StatusCode::OK,
Json(TxCommitRes {
doc_id,
tx_id: req.tx_id,
committed_version: next_ver,
})
.into_response(),
)
}
async fn build_state_at(
con: &mut MultiplexedConnection,
doc_id: &str,
target_version: u64,
) -> Result<Value, String> {
let snap_ver: u64 = con
.get(doc_snap_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let snap_json: Value = if snap_ver > 0 {
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
raw.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_else(|| json!({}))
} else {
json!({})
};
let stream_key = doc_tx_stream_key(doc_id);
let start_id = "0-0";
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
.arg(&stream_key)
.arg(start_id)
.arg("+")
.query_async(con)
.await
.map_err(|e| e.to_string())?;
let mut ops_list: Vec<PatchOp> = Vec::new();
for (_id, fields) in entries {
let mut data_opts = None;
for (k, v) in fields {
if k == "data" {
data_opts = Some(v);
break;
}
}
let data = match data_opts {
Some(x) => x,
None => continue,
};
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
if ver == 0 || ver > target_version {
continue;
}
if ver <= snap_ver {
continue;
}
let ops: Vec<PatchOp> =
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
ops_list.extend(ops);
}
apply_ops(snap_json, &ops_list)
}
#[derive(Deserialize)]
struct StateQuery {
at_version: Option<u64>,
}
async fn get_state(
State(state): State<AppState>,
Path(doc_id): Path<String>,
Query(q): Query<StateQuery>,
) -> impl IntoResponse {
let rcli = state.redis_cli.clone();
let mut con = match rcli.get_multiplexed_async_connection().await {
Ok(sc) => sc,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
"cannot get connection".into_response(),
);
}
};
let cur_ver: u64 = con
.get(doc_ver_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let target = q.at_version.unwrap_or(cur_ver).min(cur_ver);
match build_state_at(&mut con, &doc_id, target).await {
Ok(state) => (
StatusCode::OK,
Json(json!({
"doc_id": doc_id,
"version": target,
"state": state
}))
.into_response(),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({
"error": e
}))
.into_response(),
),
}
}
// pub async fn create_tx_patcher_route() -> Router<AppState> {
// Router::new()
// .route("/{id}/tx-reserve", post(reserve_tx))
// .route("/{id}/tx-commit", post(commit_tx))
// .route("/{id}/state", get(get_state))
// }

296
src/tx/helpers.rs Normal file
View file

@ -0,0 +1,296 @@
use std::{
clone,
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use git2::Repository;
use json_patch::{Patch, diff};
use log::info;
use redis::{RedisResult, aio::MultiplexedConnection};
use serde::Deserialize;
use tokio::{sync::mpsc::Sender, time::sleep};
use uuid::Uuid;
use crate::app::CacheJob;
pub fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
pub fn doc_ver_key(doc: &str) -> String {
format!("doc:{doc}:ver")
}
pub fn doc_tx_stream_key(doc: &str) -> String {
format!("doc:{doc}:tx")
}
pub fn doc_snap_key(doc: &str) -> String {
format!("doc:{doc}:snap")
}
pub fn doc_snapver_key(doc: &str) -> String {
format!("doc:{doc}:snapver")
}
pub fn rsv_key(doc: &str, rsv: &str) -> String {
format!("doc:{doc}:rsv:{rsv}")
}
pub fn txid_key(doc: &str, tx_id: &str) -> String {
format!("doc:{doc}:txid:{tx_id}")
}
pub fn commit_lock_key(doc: &str) -> String {
format!("doc:{doc}:lock")
}
pub async fn acquire_lock(
con: &mut MultiplexedConnection,
key: &str,
ttl_ms: u64,
) -> RedisResult<bool> {
let token = Uuid::new_v4().to_string();
let ok: Option<String> = redis::cmd("SET")
.arg(key)
.arg(token)
.arg("NX")
.arg("PX")
.arg(ttl_ms)
.query_async(con)
.await?;
Ok(ok.is_some())
}
pub async fn wait_lock(
con: &mut MultiplexedConnection,
key: &str,
ttl_ms: u64,
attempts: u32,
) -> RedisResult<()> {
for _ in 0..attempts {
if acquire_lock(con, key, ttl_ms).await? {
return Ok(());
}
sleep(Duration::from_millis(30)).await;
}
Err(redis::RedisError::from((redis::ErrorKind::Io, "lock busy")))
}
pub fn build_tx_from_diff(base: serde_json::Value, alt: serde_json::Value) -> Patch {
diff(&base, &alt)
}
fn get_file_from_repo(
repo: Arc<Mutex<Repository>>,
path: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
if let Ok(repo_m) = repo.try_lock() {
let obj = repo_m.revparse_single(path)?;
if let Some(blob) = obj.as_blob() {
let content = unsafe { str::from_utf8_unchecked(blob.content()) };
return Ok(content.to_string());
} else if let Some(tree) = obj.as_tree() {
let dir_list = tree
.iter()
.map(|x| x.name().unwrap_or("").to_string())
.collect::<Vec<String>>();
return Ok(dir_list.join(","));
}
}
Err("FileNotFound".into())
}
pub async fn setup_prebuild_tx_cache(
tx: Sender<CacheJob>,
git_repo: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// import tx files
let repo = match Repository::open(git_repo) {
Ok(repo) => repo,
Err(_) => return Err("cannot open repo".into()),
};
let repo_m = Arc::new(Mutex::new(repo));
let root_path = String::from("master:");
let all_folders = match get_file_from_repo(repo_m.clone(), root_path.as_str()) {
Ok(af) => af,
Err(e) => return Err(e),
};
let folders = all_folders
.split(",")
.map(|x| x.to_string())
.collect::<Vec<String>>();
for folder in folders {
let current_path = format!("master:{folder}");
println!("trying {folder}...");
// test get version
//
// NOTE: version may miss if there is file with `versionXXXX`
//
//
if let Ok(current_files) = get_file_from_repo(repo_m.clone(), current_path.as_str())
&& current_files.contains(",version")
{
let expected_version_path = format!("{current_path}/version");
println!("\tmay have version --> {expected_version_path}, \ncurrent: {current_files}");
// get version content
let version_str =
if let Ok(v) = get_file_from_repo(repo_m.clone(), expected_version_path.as_str()) {
v
} else {
"".to_string()
};
// let version = version_str.parse::<u64>().unwrap_or(0);
if version_str.is_empty() {
// unexpected
return Err("cannot get version".into());
}
if tx
.send(CacheJob {
rel_path: expected_version_path.clone(),
file_data: Some(serde_json::to_vec(&version_str)?),
})
.await
.is_ok()
{
info!("cache ok {current_path}/version");
}
println!("try get {folder}:{version_str}");
let files = current_files
.split(",")
.map(|x| x.to_string())
.collect::<Vec<String>>();
let base_file: String = match files.iter().find(|x| {
x.ends_with(".json") && x.starts_with(&format!("coffeethai02_{version_str}"))
}) {
Some(f) => f.to_string(),
None => "".to_string(),
};
println!("{folder}: {base_file}");
if base_file.is_empty() {
println!("base file empty skip");
continue;
}
let base_recipe_str = if let Ok(r) =
get_file_from_repo(repo_m.clone(), &format!("{current_path}/{base_file}"))
{
r.to_string()
} else {
"".to_string()
};
if base_recipe_str.is_empty() {
println!("empty recipe skip!");
continue;
}
let base_recipe: serde_json::Value = match serde_json::from_str(&base_recipe_str) {
Ok(r) => r,
Err(e) => return Err(e.into()),
};
let mut current_recipes = HashMap::new();
current_recipes.insert("base".to_string(), base_recipe);
for cfile in files {
if cfile.ne(&base_file)
&& cfile.ends_with(".json")
&& cfile.starts_with("coffeethai02")
{
//
// fetch file content
// read into value
// do diff
let current_fetch_path = format!("{current_path}/{cfile}");
let current_file_str =
if let Ok(cr) = get_file_from_repo(repo_m.clone(), &current_fetch_path) {
cr.to_string()
} else {
"".to_string()
};
if !current_file_str.is_empty() {
// into value
let alt_recipe: serde_json::Value =
match serde_json::from_str(&current_file_str) {
Ok(crr) => crr,
Err(e) => {
println!("get error while alt {current_fetch_path}: {e}");
continue;
}
};
current_recipes.insert(cfile, alt_recipe);
}
}
}
// process diffing
//
let base_recipe_ref = current_recipes.get("base").unwrap();
for (key, value) in current_recipes.iter() {
if key != "base" {
let patch_diff = diff(&base_recipe_ref, value);
let tx_name = format!(
"stx_{}",
key.replace("coffeethai02_", "").replace(".json", "")
);
let base_path_out = format!("./recipe_tx/{folder}");
if !std::path::Path::new(&base_path_out).exists() {
std::fs::create_dir_all(&base_path_out)?;
}
let file_out = format!("{base_path_out}/{tx_name}.json");
std::fs::write(file_out, serde_json::to_string_pretty(&patch_diff)?)?;
}
}
// let _ = tx
// .send(serde_json::json!({
// "topic": "generate_stx",
// "target": folder,
// "base_version": version_str,
// "full_base_filename": base_file,
// "value": base_recipe_clone
// }))
// .await;
let json_bytes = serde_json::to_vec(base_recipe_ref)?;
if tx
.send(CacheJob {
rel_path: format!("{current_path}/{base_file}"),
file_data: Some(json_bytes),
})
.await
.is_ok()
{
info!("cache ok {current_path}/{base_file}");
}
}
}
Ok(())
}

4
src/tx/mod.rs Normal file
View file

@ -0,0 +1,4 @@
// pub mod handler;
pub mod helpers;
pub mod patcher;
pub mod types;

119
src/tx/patcher.rs Normal file
View file

@ -0,0 +1,119 @@
use serde_json::Value;
use crate::tx::types::PatchOp;
pub fn json_pointer_parent<'a>(
root: &'a mut Value,
path: &str,
) -> Result<(&'a mut Value, String), String> {
if !path.starts_with('/') && path != "" {
return Err("path must be JSON pointer starting with '/'".into());
}
let part: Vec<String> = path
.split('/')
.skip(1)
.map(|x| x.replace("~1", "/").replace("~0", "~"))
.collect();
if path.is_empty() {
return Ok((root, "".into()));
}
let last = part.last().unwrap().clone();
let mut cur = root;
for key in &part[..part.len() - 1] {
match cur {
Value::Object(map) => {
cur = map
.get_mut(key)
.ok_or_else(|| format!("missing object key {key}"))?;
}
Value::Array(arr) => {
let idx: usize = key.parse().map_err(|_| format!("bad array idx {key}"))?;
cur = arr
.get_mut(idx)
.ok_or_else(|| format!("array index out of bounds {idx}"))?;
}
_ => return Err("cannot traverse non-container".into()),
}
}
Ok((cur, last))
}
pub fn apply_ops(mut state: Value, ops: &[PatchOp]) -> Result<Value, String> {
for op in ops {
match op {
PatchOp::Replace { path, value } => {
if path == "" || path == "/" {
state = value.clone();
continue;
}
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
if !map.contains_key(&last) {
return Err(format!("replace target missing key {last}"));
}
map.insert(last, value.clone());
}
Value::Array(arr) => {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx >= arr.len() {
return Err(format!("replace index out of bounds {idx}"));
}
arr[idx] = value.clone();
}
_ => return Err("replace parent not container".into()),
}
}
PatchOp::Add { path, value } => {
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
map.insert(last, value.clone());
}
Value::Array(arr) => {
if last == "-" {
arr.push(value.clone());
} else {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx > arr.len() {
return Err(format!("add index out of bounds {idx}"));
}
arr.insert(idx, value.clone());
}
}
_ => return Err("add parent not container".into()),
}
}
PatchOp::Remove { path } => {
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
map.remove(&last)
.ok_or_else(|| format!("remove missing key {last}"))?;
}
Value::Array(arr) => {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx > arr.len() {
return Err(format!("remove index out of bounds {idx}"));
}
arr.remove(idx);
}
_ => return Err("remove parent not container".into()),
}
}
}
}
Ok(state)
}

51
src/tx/types.rs Normal file
View file

@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Serialize, Deserialize)]
pub struct ReserveReq {
pub author: String,
hint_paths: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ReserveRes {
pub reservation_id: String,
pub base_version: u64,
pub expires_at: i64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TxCommitReq {
pub reservation_id: String,
pub tx_id: String,
pub base_version: u64,
pub author: String,
pub ops: Vec<PatchOp>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TxCommitRes {
pub doc_id: String,
pub tx_id: String,
pub committed_version: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "op")]
pub enum PatchOp {
#[serde(rename = "replace")]
Replace { path: String, value: Value },
#[serde(rename = "add")]
Add { path: String, value: Value },
#[serde(rename = "remove")]
Remove { path: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Reservation {
pub reservation_id: String,
pub doc_id: String,
pub author: String,
pub base_version: u64,
pub expires_at: i64,
}