Compare commits
21 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2466472235 | ||
|
|
a69ef7b927 | ||
|
|
b70a35135c | ||
|
|
c8f820e238 | ||
|
|
caa0833ea2 | ||
|
|
166b7079ca | ||
|
|
ae9d9fa66b | ||
|
|
bca1c911d3 | ||
|
|
5bb2a6c192 | ||
|
|
d19dab7561 | ||
|
|
bb3e55eecb | ||
|
|
febf91d417 | ||
|
|
6fe3357efe | ||
|
|
a2da030a99 | ||
|
|
03263815e6 | ||
|
|
3043f30012 | ||
|
|
2dd165b451 | ||
|
|
59d0dd7ab4 | ||
|
|
4e3b561f61 | ||
|
|
9f4fb6c274 | ||
|
|
90856717e4 |
15 changed files with 4055 additions and 1105 deletions
5
.dockerignore
Normal file
5
.dockerignore
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
/target
|
||||||
|
.tbcfg
|
||||||
|
*.txt
|
||||||
|
*.log
|
||||||
|
|
||||||
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -1,5 +1,5 @@
|
||||||
/target
|
/target
|
||||||
.tbcfg
|
.tbcfg*
|
||||||
*.txt
|
*.txt
|
||||||
*.log
|
*.log
|
||||||
|
|
||||||
|
|
|
||||||
2575
Cargo.lock
generated
2575
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
16
Cargo.toml
16
Cargo.toml
|
|
@ -4,19 +4,33 @@ version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
async-compression = { version = "0.4.39", features = ["tokio", "brotli"] }
|
||||||
axum = "0.8.7"
|
axum = "0.8.7"
|
||||||
|
axum-macros = "0.5.0"
|
||||||
|
axum-extra = { version = "0.12.6", features = ["multipart"] }
|
||||||
|
bb8 = "0.9.1"
|
||||||
|
bb8-redis = "0.26.0"
|
||||||
|
brotli = "8.0.2"
|
||||||
|
bytes = "1.11.1"
|
||||||
env_logger = "0.11.8"
|
env_logger = "0.11.8"
|
||||||
|
futures-util = "0.3.31"
|
||||||
git2 = { version = "0.20.3", features = ["https", "ssh"] }
|
git2 = { version = "0.20.3", features = ["https", "ssh"] }
|
||||||
|
image = "0.25.9"
|
||||||
|
json-patch = "4.1.0"
|
||||||
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
|
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
|
||||||
libtbr = { git = "https://gitlab.forthrd.io/Pakin/libtbr.git", version = "0.1.1" }
|
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
|
||||||
log = "0.4.29"
|
log = "0.4.29"
|
||||||
prost = "0.14.1"
|
prost = "0.14.1"
|
||||||
|
redis = { version = "1.0.2", features = ["tokio-comp"] }
|
||||||
reqwest = { version = "0.12.25", features = ["json"] }
|
reqwest = { version = "0.12.25", features = ["json"] }
|
||||||
serde = { version = "1.0.228", features = ["derive"] }
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
serde_json = { version = "1.0.145", features = ["preserve_order"] }
|
serde_json = { version = "1.0.145", features = ["preserve_order"] }
|
||||||
tokio = { version = "1.48.0", features = ["full"] }
|
tokio = { version = "1.48.0", features = ["full"] }
|
||||||
|
tokio-util = { version = "0.7.18", features = ["io"] }
|
||||||
tonic = { version = "0.14.2", features = ["transport"] }
|
tonic = { version = "0.14.2", features = ["transport"] }
|
||||||
tonic-prost = "0.14.2"
|
tonic-prost = "0.14.2"
|
||||||
|
uuid = { version = "1.20.0", features = ["v4"] }
|
||||||
|
tokio-postgres = "0.7.17"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-prost-build = "0.14.2"
|
tonic-prost-build = "0.14.2"
|
||||||
|
|
|
||||||
43
Dockerfile
Normal file
43
Dockerfile
Normal file
|
|
@ -0,0 +1,43 @@
|
||||||
|
FROM rustlang/rust:nightly-slim AS builder
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Install build dependencies
|
||||||
|
RUN apt update && apt install -y \
|
||||||
|
musl-tools \
|
||||||
|
musl-dev \
|
||||||
|
build-essential \
|
||||||
|
libssl-dev \
|
||||||
|
curl \
|
||||||
|
pkg-config \
|
||||||
|
ca-certificates \
|
||||||
|
protobuf-compiler \
|
||||||
|
zlib1g
|
||||||
|
|
||||||
|
# Copy dependency files first for better caching
|
||||||
|
COPY Cargo.toml Cargo.lock ./
|
||||||
|
COPY ./src ./src
|
||||||
|
COPY build.rs ./
|
||||||
|
COPY tbm-proto ./tbm-proto
|
||||||
|
|
||||||
|
# Download config
|
||||||
|
RUN curl -X GET https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/tbm-git-repo-service/releases/download/config/.tbcfg -o .tbcfg
|
||||||
|
|
||||||
|
# Build the application
|
||||||
|
ENV RUSTFLAGS="-C target-feature=+crt-static"
|
||||||
|
RUN cargo build --release
|
||||||
|
|
||||||
|
# Builder
|
||||||
|
FROM gcr.io/distroless/cc
|
||||||
|
|
||||||
|
# Copy the binary
|
||||||
|
COPY --from=builder /app/target/release/tbm-git-repo-service /
|
||||||
|
COPY --from=builder /app/.tbcfg /
|
||||||
|
|
||||||
|
# Create data directory
|
||||||
|
|
||||||
|
EXPOSE 36583
|
||||||
|
|
||||||
|
# Environment defaults
|
||||||
|
|
||||||
|
CMD ["./tbm-git-repo-service"]
|
||||||
183
README.md
Normal file
183
README.md
Normal file
|
|
@ -0,0 +1,183 @@
|
||||||
|
# tbm-git-service
|
||||||
|
|
||||||
|
Git repo as a service with operational endpoints
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Endpoints
|
||||||
|
|
||||||
|
### Checkout file
|
||||||
|
|
||||||
|
Get file content or list of file in directory from HEAD
|
||||||
|
|
||||||
|
```
|
||||||
|
GET .../checkout?path=<path_to_file>
|
||||||
|
```
|
||||||
|
|
||||||
|
Examples
|
||||||
|
|
||||||
|
Get list of files in directory
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X GET http://localhost:36593/checkout\?path\=inter
|
||||||
|
aus,common,dev,gbr,gbr_premium,hkg,ltu,mys,rou,sgp,tha,tha_premium,uae_dubai,usa,whatthecup
|
||||||
|
```
|
||||||
|
|
||||||
|
Get file
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X GET http://localhost:36593/checkout\?path\=inter/mys/xml/page_catalog_group_other.lxml
|
||||||
|
|
||||||
|
<Popup>
|
||||||
|
<Cache> "Enable" </Cache>
|
||||||
|
<Width> 1080 </Width>
|
||||||
|
<Height> 1920 </Height>
|
||||||
|
;<Background> "0xeae6e1" </Background>
|
||||||
|
<Volume> SoundVolume </Volume>
|
||||||
|
<EventOpen>
|
||||||
|
; On open
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
Error
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X GET http://localhost:36593/checkout\?path\=inter2
|
||||||
|
File not found
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Fetch
|
||||||
|
|
||||||
|
Fetching & update index of current repo
|
||||||
|
|
||||||
|
```
|
||||||
|
GET .../fetch
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected result
|
||||||
|
|
||||||
|
```
|
||||||
|
{"result": "fetch success"}
|
||||||
|
```
|
||||||
|
|
||||||
|
Error
|
||||||
|
|
||||||
|
```
|
||||||
|
{"error": "..."}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Commit
|
||||||
|
|
||||||
|
Commit changes of file i.e. editing, adding, etc.
|
||||||
|
|
||||||
|
```
|
||||||
|
POST .../commit
|
||||||
|
```
|
||||||
|
|
||||||
|
Body must be multipart
|
||||||
|
|
||||||
|
- Single file
|
||||||
|
|
||||||
|
```
|
||||||
|
path: string
|
||||||
|
file: text/binary is acceptable
|
||||||
|
signature_username: string
|
||||||
|
signature_email: stirng
|
||||||
|
message: string
|
||||||
|
```
|
||||||
|
|
||||||
|
- Multiple file
|
||||||
|
|
||||||
|
```
|
||||||
|
signature_username: string
|
||||||
|
signature_email: stirng
|
||||||
|
message: string
|
||||||
|
fileX1: text/binary is acceptable
|
||||||
|
pathX1: string
|
||||||
|
fileX2: text/binary is acceptable
|
||||||
|
pathX2: string
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected result
|
||||||
|
|
||||||
|
```
|
||||||
|
{"result": "commit hash id"}
|
||||||
|
```
|
||||||
|
|
||||||
|
Error
|
||||||
|
|
||||||
|
```
|
||||||
|
{"error": "..."}
|
||||||
|
```
|
||||||
|
|
||||||
|
Example
|
||||||
|
|
||||||
|
```curl
|
||||||
|
curl --request POST \
|
||||||
|
--url http://localhost:36583/commit \
|
||||||
|
--header 'content-type: multipart/form-data' \
|
||||||
|
--form path=mys/version.dev \
|
||||||
|
--form 'signature_username=git api' \
|
||||||
|
--form signature_email=supra.m2.dev@forth.co.th \
|
||||||
|
--form 'message=test commit' \
|
||||||
|
--form file=@./mys.version.test
|
||||||
|
```
|
||||||
|
|
||||||
|
NOTE: file & path in multiple file must have the same name after
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Push
|
||||||
|
|
||||||
|
Push local commits to remote
|
||||||
|
|
||||||
|
```
|
||||||
|
GET .../push
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected result
|
||||||
|
|
||||||
|
```
|
||||||
|
{"result": "push completed"}
|
||||||
|
```
|
||||||
|
|
||||||
|
Error
|
||||||
|
|
||||||
|
```
|
||||||
|
{"error": "..."}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Pull
|
||||||
|
|
||||||
|
Pull commits from remote. This operation always does git reset hard first before pull for reason of no conflicts.
|
||||||
|
|
||||||
|
```
|
||||||
|
GET .../pull
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected result
|
||||||
|
|
||||||
|
```
|
||||||
|
{"result": "pull completed"}
|
||||||
|
```
|
||||||
|
|
||||||
|
Error
|
||||||
|
|
||||||
|
```
|
||||||
|
{"error": "..."}
|
||||||
|
```
|
||||||
|
|
||||||
|
NOTE: Commit did lost but could be checkout later
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Recovery
|
||||||
|
|
||||||
|
WIP...
|
||||||
1373
src/app.rs
1373
src/app.rs
File diff suppressed because it is too large
Load diff
90
src/git.rs
90
src/git.rs
|
|
@ -1,16 +1,21 @@
|
||||||
use std::{cell::RefCell, collections::HashMap, io::{self, Write}, path::{Path, PathBuf}};
|
use std::{
|
||||||
|
cell::RefCell,
|
||||||
|
collections::HashMap,
|
||||||
|
io::{self, Write},
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
|
||||||
use git2::{build::RepoBuilder, Cred, FetchOptions, Progress, RemoteCallbacks};
|
use git2::{Cred, FetchOptions, Progress, RemoteCallbacks, build::RepoBuilder};
|
||||||
use log::{info};
|
use log::info;
|
||||||
|
|
||||||
use crate::gcm;
|
use crate::gcm;
|
||||||
|
|
||||||
struct GitState {
|
struct GitState {
|
||||||
progress: Option<Progress<'static>>,
|
progress: Option<Progress<'static>>,
|
||||||
total: usize,
|
total: usize,
|
||||||
current: usize,
|
current: usize,
|
||||||
path: Option<PathBuf>,
|
path: Option<PathBuf>,
|
||||||
newline: bool
|
newline: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print(state: &mut GitState) {
|
fn print(state: &mut GitState) {
|
||||||
|
|
@ -29,10 +34,11 @@ fn print(state: &mut GitState) {
|
||||||
state.newline = true;
|
state.newline = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Resolving deltas {}/{}",
|
info!(
|
||||||
stats.indexed_deltas(),
|
"Resolving deltas {}/{}",
|
||||||
stats.total_deltas());
|
stats.indexed_deltas(),
|
||||||
|
stats.total_deltas()
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"net {:3}% ({:4} kb, {:5}/{:5}) / idx {:3}% ({:5}/{:5}) \
|
"net {:3}% ({:4} kb, {:5}/{:5}) / idx {:3}% ({:5}/{:5}) \
|
||||||
|
|
@ -58,40 +64,38 @@ fn print(state: &mut GitState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_git_repo(config: HashMap<String, String>) -> gcm::StandardResult {
|
pub fn setup_git_repo(config: HashMap<String, String>) -> gcm::StandardResult {
|
||||||
let state = RefCell::new(GitState {
|
let state = RefCell::new(GitState {
|
||||||
progress: None,
|
progress: None,
|
||||||
total: 0,
|
total: 0,
|
||||||
current: 0,
|
current: 0,
|
||||||
path: None,
|
path: None,
|
||||||
newline: true
|
newline: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut cb = RemoteCallbacks::new();
|
let mut cb = RemoteCallbacks::new();
|
||||||
cb.transfer_progress(|stats| {
|
cb.transfer_progress(|stats| {
|
||||||
let mut state = state.borrow_mut();
|
let mut state = state.borrow_mut();
|
||||||
state.progress = Some(stats.to_owned());
|
state.progress = Some(stats.to_owned());
|
||||||
print(&mut state);
|
print(&mut state);
|
||||||
true
|
true
|
||||||
});
|
});
|
||||||
|
|
||||||
cb.credentials(|_,_,_| {
|
cb.credentials(|_, _, _| {
|
||||||
Cred::userpass_plaintext(
|
Cred::userpass_plaintext(
|
||||||
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
|
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
|
||||||
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string())
|
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string()),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut fo = FetchOptions::new();
|
let mut fo = FetchOptions::new();
|
||||||
fo.remote_callbacks(cb);
|
fo.remote_callbacks(cb);
|
||||||
fo.depth(1);
|
|
||||||
|
|
||||||
RepoBuilder::new()
|
let _ = RepoBuilder::new().bare(false).fetch_options(fo).clone(
|
||||||
.bare(true)
|
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
|
||||||
.fetch_options(fo)
|
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into(),
|
||||||
.clone(
|
|
||||||
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
|
|
||||||
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into()
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(())
|
println!("clone completed !");
|
||||||
}
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
||||||
73
src/main.rs
73
src/main.rs
|
|
@ -1,56 +1,67 @@
|
||||||
use std::{fs::File};
|
use std::fs::File;
|
||||||
|
|
||||||
use env_logger::Builder;
|
use env_logger::Builder;
|
||||||
use libtbr::recipe_functions::common;
|
use libtbr::recipe_functions::common;
|
||||||
use log::info;
|
use log::info;
|
||||||
|
|
||||||
use crate::{git::setup_git_repo, reg::{heartbeat_loop, registry::registry_client::RegistryClient}};
|
use crate::{
|
||||||
|
git::setup_git_repo,
|
||||||
|
// reg::{heartbeat_loop, registry::registry_client::RegistryClient},
|
||||||
|
};
|
||||||
|
|
||||||
mod app;
|
mod app;
|
||||||
mod gcm;
|
mod gcm;
|
||||||
mod git;
|
mod git;
|
||||||
mod reg;
|
mod tx;
|
||||||
|
// mod reg;
|
||||||
|
|
||||||
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
|
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
|
||||||
let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
|
// NOTE: disable logging file, use send to log service instead
|
||||||
|
// let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
|
||||||
|
|
||||||
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
|
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
|
||||||
.target(env_logger::Target::Pipe(Box::new(logfile))).init();
|
// .target(env_logger::Target::Pipe(Box::new(logfile)))
|
||||||
|
.init();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn setup_registry(config: gcm::Configure) -> gcm::StandardResult {
|
|
||||||
let cfg_clone = config.clone();
|
|
||||||
let register_server_address = cfg_clone.get("REGISTRY_SERVER").expect("not found registry server");
|
|
||||||
let name = cfg_clone.get("SERVICE_NAME").expect("missing service name");
|
|
||||||
let url = cfg_clone.get("SERVICE_URL").expect("missing service url");
|
|
||||||
|
|
||||||
let mut client = RegistryClient::connect(register_server_address.clone()).await?;
|
|
||||||
reg::register_service(&mut client, name, url).await;
|
|
||||||
|
|
||||||
tokio::spawn(heartbeat_loop(name.to_string(), url.to_string()));
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> gcm::StandardResult {
|
async fn main() -> gcm::StandardResult {
|
||||||
let config: gcm::Configure = common::get_config();
|
let config: gcm::Configure = common::get_config();
|
||||||
setup_log(config.clone())?;
|
setup_log(config.clone())?;
|
||||||
|
|
||||||
match std::fs::read_dir(config.get("GIT_REPO_LOCAL_DEST").expect("not exist")) {
|
match std::fs::read_dir(config.get("GIT_REPO_LOCAL_DEST").expect("not exist")) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("GIT_REPO `{dest}` already setup", dest = config.get("GIT_REPO_LOCAL_DEST").unwrap())
|
info!(
|
||||||
},
|
"GIT_REPO `{dest}` existed, checking if has git",
|
||||||
Err(_) => {
|
dest = config.get("GIT_REPO_LOCAL_DEST").unwrap()
|
||||||
setup_git_repo(config.clone())?;
|
);
|
||||||
}
|
|
||||||
|
match std::fs::read_dir(format!(
|
||||||
|
"{}/.git",
|
||||||
|
config.get("GIT_REPO_LOCAL_DEST").expect("not exist")
|
||||||
|
)) {
|
||||||
|
Ok(_) => {
|
||||||
|
info!("GIT REPO already set up!")
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
setup_git_repo(config.clone())?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
setup_git_repo(config.clone())?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
setup_registry(config.clone()).await?;
|
|
||||||
|
info!(
|
||||||
|
"APP VERSION: {}",
|
||||||
|
config
|
||||||
|
.get("CONFIG_VERSION")
|
||||||
|
.expect("config version not defined")
|
||||||
|
);
|
||||||
|
info!("RUNNING: {:?}", config.get("PUBLIC_PORT"));
|
||||||
app::run(config.clone()).await?;
|
app::run(config.clone()).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use log::{error, info};
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tonic::transport::Channel;
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
|
|
@ -17,7 +18,11 @@ pub async fn register_service(client: &mut RegistryClient<Channel>, name: &str,
|
||||||
token: std::env::var("REGISTRY_TOKEN").unwrap_or_default(),
|
token: std::env::var("REGISTRY_TOKEN").unwrap_or_default(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let _ = client.register(req).await;
|
let resp = client.register(req).await;
|
||||||
|
match resp {
|
||||||
|
Ok(r) => info!("{:?}", r),
|
||||||
|
Err(e) => error!("error register: {:?}", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn heartbeat_loop(name: String, url: String) {
|
pub async fn heartbeat_loop(name: String, url: String) {
|
||||||
|
|
|
||||||
323
src/tx/handler.rs
Normal file
323
src/tx/handler.rs
Normal file
|
|
@ -0,0 +1,323 @@
|
||||||
|
use axum::{
|
||||||
|
Json, Router,
|
||||||
|
extract::{Path, Query, State},
|
||||||
|
response::IntoResponse,
|
||||||
|
routing::{get, post},
|
||||||
|
};
|
||||||
|
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
|
||||||
|
use reqwest::StatusCode;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_json::{Value, json};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::tx::{
|
||||||
|
helpers::*,
|
||||||
|
patcher::apply_ops,
|
||||||
|
types::{PatchOp, Reservation, ReserveReq, ReserveRes, TxCommitReq, TxCommitRes},
|
||||||
|
};
|
||||||
|
|
||||||
|
async fn reserve_tx(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(doc_id): Path<String>,
|
||||||
|
Json(req): Json<ReserveReq>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let rsv_id = Uuid::new_v4().to_string();
|
||||||
|
let ttl_secs: i64 = 30; // ttl reserve 30s
|
||||||
|
let expires_at = now_unix() + ttl_secs;
|
||||||
|
|
||||||
|
let mut rcli = state.redis_cli.clone();
|
||||||
|
|
||||||
|
// check version
|
||||||
|
let ver_key = doc_ver_key(&doc_id);
|
||||||
|
let base_version: u64 = match rcli.get(ver_key) {
|
||||||
|
Ok(res) => {
|
||||||
|
if let Some(r) = res {
|
||||||
|
r.parse().ok().unwrap()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let rsv = Reservation {
|
||||||
|
reservation_id: rsv_id.clone(),
|
||||||
|
doc_id: doc_id.clone(),
|
||||||
|
author: req.author,
|
||||||
|
base_version,
|
||||||
|
expires_at,
|
||||||
|
};
|
||||||
|
|
||||||
|
let key = rsv_key(&doc_id, &rsv_id);
|
||||||
|
let payload = serde_json::to_string(&rsv).unwrap();
|
||||||
|
|
||||||
|
let _: () = rcli.set_ex(key, payload, ttl_secs as u64).unwrap();
|
||||||
|
|
||||||
|
(
|
||||||
|
StatusCode::OK,
|
||||||
|
Json(ReserveRes {
|
||||||
|
reservation_id: rsv_id,
|
||||||
|
base_version,
|
||||||
|
expires_at,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn commit_tx(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(doc_id): Path<String>,
|
||||||
|
Json(req): Json<TxCommitReq>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
if req.tx_id.is_empty() || req.reservation_id.is_empty() {
|
||||||
|
return (
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"missing tx_id or reservation_id ".into_response(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut con = match state.redis_cli.get_multiplexed_async_connection().await {
|
||||||
|
Ok(conn) => conn,
|
||||||
|
Err(e) => {
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
format!("{}", e.to_string()).into_response(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// lock commit
|
||||||
|
let lock_key = commit_lock_key(&doc_id);
|
||||||
|
if let Err(_) = wait_lock(&mut con, &lock_key, 1200, 20).await {
|
||||||
|
return (StatusCode::CONFLICT, "doc busy, retry".into_response());
|
||||||
|
}
|
||||||
|
|
||||||
|
let txid_k = txid_key(&doc_id, &req.tx_id);
|
||||||
|
if let Ok(Some(v)) = con.get(&txid_k).await {
|
||||||
|
if let Ok(committed_version) = v.parse::<u64>() {
|
||||||
|
let res = TxCommitRes {
|
||||||
|
doc_id,
|
||||||
|
tx_id: req.tx_id,
|
||||||
|
committed_version,
|
||||||
|
};
|
||||||
|
return (StatusCode::OK, Json(res).into_response());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let rsv_k = rsv_key(&doc_id, &req.reservation_id);
|
||||||
|
let rsv_raw: Option<String> = con.get(&rsv_k).await.unwrap_or(None);
|
||||||
|
let rsv_raw = match rsv_raw {
|
||||||
|
Some(x) => x,
|
||||||
|
None => {
|
||||||
|
return (
|
||||||
|
StatusCode::CONFLICT,
|
||||||
|
"reservation missing/expired".into_response(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let rsv: Reservation = match serde_json::from_str(&rsv_raw) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(_) => return (StatusCode::CONFLICT, "bad reservation".into_response()),
|
||||||
|
};
|
||||||
|
|
||||||
|
if rsv.doc_id != doc_id || rsv.author != req.author || rsv.base_version != req.base_version {
|
||||||
|
return (StatusCode::CONFLICT, "reservation mismatch".into_response());
|
||||||
|
}
|
||||||
|
|
||||||
|
if now_unix() > rsv.expires_at {
|
||||||
|
return (StatusCode::CONFLICT, "reservation expired".into_response());
|
||||||
|
}
|
||||||
|
|
||||||
|
let curr_ver: u64 = con
|
||||||
|
.get(doc_ver_key(&doc_id))
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
if req.base_version != curr_ver {
|
||||||
|
let body = json!({
|
||||||
|
"error": "version_conflict",
|
||||||
|
"server_version": curr_ver,
|
||||||
|
});
|
||||||
|
return (StatusCode::CONFLICT, Json(body).into_response());
|
||||||
|
}
|
||||||
|
|
||||||
|
let next_ver = curr_ver + 1;
|
||||||
|
|
||||||
|
let stream_key = doc_tx_stream_key(&doc_id);
|
||||||
|
|
||||||
|
let tx_event = json!({
|
||||||
|
"doc_id": doc_id,
|
||||||
|
"tx_id": req.tx_id,
|
||||||
|
"version": next_ver,
|
||||||
|
"base_version": req.base_version,
|
||||||
|
"author": req.author,
|
||||||
|
"ts": now_unix(),
|
||||||
|
"ops": req.ops
|
||||||
|
});
|
||||||
|
|
||||||
|
let tx_event_str = tx_event.to_string();
|
||||||
|
let _: String = redis::cmd("XADD")
|
||||||
|
.arg(&stream_key)
|
||||||
|
.arg("*")
|
||||||
|
.arg("data")
|
||||||
|
.arg(&tx_event_str)
|
||||||
|
.query_async(&mut con)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let _: () = con
|
||||||
|
.set(doc_ver_key(&doc_id), next_ver.to_string())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let _: () = con.set(&txid_k, next_ver.to_string()).await.unwrap();
|
||||||
|
let _: usize = con.del(&rsv_k).await.unwrap();
|
||||||
|
|
||||||
|
let snapshot_every: u64 = 50;
|
||||||
|
if next_ver % snapshot_every == 0 {
|
||||||
|
if let Ok(state) = build_state_at(&mut con, &doc_id, next_ver).await {
|
||||||
|
let _: () = con
|
||||||
|
.set(doc_snap_key(&doc_id), state.to_string())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let _: () = con
|
||||||
|
.set(doc_snapver_key(&doc_id), next_ver.to_string())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(
|
||||||
|
StatusCode::OK,
|
||||||
|
Json(TxCommitRes {
|
||||||
|
doc_id,
|
||||||
|
tx_id: req.tx_id,
|
||||||
|
committed_version: next_ver,
|
||||||
|
})
|
||||||
|
.into_response(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn build_state_at(
|
||||||
|
con: &mut MultiplexedConnection,
|
||||||
|
doc_id: &str,
|
||||||
|
target_version: u64,
|
||||||
|
) -> Result<Value, String> {
|
||||||
|
let snap_ver: u64 = con
|
||||||
|
.get(doc_snap_key(&doc_id))
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let snap_json: Value = if snap_ver > 0 {
|
||||||
|
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
|
||||||
|
raw.and_then(|s| serde_json::from_str(&s).ok())
|
||||||
|
.unwrap_or_else(|| json!({}))
|
||||||
|
} else {
|
||||||
|
json!({})
|
||||||
|
};
|
||||||
|
|
||||||
|
let stream_key = doc_tx_stream_key(doc_id);
|
||||||
|
|
||||||
|
let start_id = "0-0";
|
||||||
|
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
|
||||||
|
.arg(&stream_key)
|
||||||
|
.arg(start_id)
|
||||||
|
.arg("+")
|
||||||
|
.query_async(con)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
let mut ops_list: Vec<PatchOp> = Vec::new();
|
||||||
|
for (_id, fields) in entries {
|
||||||
|
let mut data_opts = None;
|
||||||
|
for (k, v) in fields {
|
||||||
|
if k == "data" {
|
||||||
|
data_opts = Some(v);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let data = match data_opts {
|
||||||
|
Some(x) => x,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
|
||||||
|
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
|
||||||
|
if ver == 0 || ver > target_version {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ver <= snap_ver {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let ops: Vec<PatchOp> =
|
||||||
|
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
|
||||||
|
ops_list.extend(ops);
|
||||||
|
}
|
||||||
|
|
||||||
|
apply_ops(snap_json, &ops_list)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct StateQuery {
|
||||||
|
at_version: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_state(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Path(doc_id): Path<String>,
|
||||||
|
Query(q): Query<StateQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let rcli = state.redis_cli.clone();
|
||||||
|
let mut con = match rcli.get_multiplexed_async_connection().await {
|
||||||
|
Ok(sc) => sc,
|
||||||
|
Err(_) => {
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
"cannot get connection".into_response(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let cur_ver: u64 = con
|
||||||
|
.get(doc_ver_key(&doc_id))
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let target = q.at_version.unwrap_or(cur_ver).min(cur_ver);
|
||||||
|
|
||||||
|
match build_state_at(&mut con, &doc_id, target).await {
|
||||||
|
Ok(state) => (
|
||||||
|
StatusCode::OK,
|
||||||
|
Json(json!({
|
||||||
|
"doc_id": doc_id,
|
||||||
|
"version": target,
|
||||||
|
"state": state
|
||||||
|
}))
|
||||||
|
.into_response(),
|
||||||
|
),
|
||||||
|
Err(e) => (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(json!({
|
||||||
|
"error": e
|
||||||
|
}))
|
||||||
|
.into_response(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pub async fn create_tx_patcher_route() -> Router<AppState> {
|
||||||
|
// Router::new()
|
||||||
|
// .route("/{id}/tx-reserve", post(reserve_tx))
|
||||||
|
// .route("/{id}/tx-commit", post(commit_tx))
|
||||||
|
// .route("/{id}/state", get(get_state))
|
||||||
|
// }
|
||||||
296
src/tx/helpers.rs
Normal file
296
src/tx/helpers.rs
Normal file
|
|
@ -0,0 +1,296 @@
|
||||||
|
use std::{
|
||||||
|
clone,
|
||||||
|
collections::HashMap,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||||
|
};
|
||||||
|
|
||||||
|
use git2::Repository;
|
||||||
|
use json_patch::{Patch, diff};
|
||||||
|
use log::info;
|
||||||
|
use redis::{RedisResult, aio::MultiplexedConnection};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use tokio::{sync::mpsc::Sender, time::sleep};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::app::CacheJob;
|
||||||
|
|
||||||
|
pub fn now_unix() -> i64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs() as i64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn doc_ver_key(doc: &str) -> String {
|
||||||
|
format!("doc:{doc}:ver")
|
||||||
|
}
|
||||||
|
pub fn doc_tx_stream_key(doc: &str) -> String {
|
||||||
|
format!("doc:{doc}:tx")
|
||||||
|
}
|
||||||
|
pub fn doc_snap_key(doc: &str) -> String {
|
||||||
|
format!("doc:{doc}:snap")
|
||||||
|
}
|
||||||
|
pub fn doc_snapver_key(doc: &str) -> String {
|
||||||
|
format!("doc:{doc}:snapver")
|
||||||
|
}
|
||||||
|
pub fn rsv_key(doc: &str, rsv: &str) -> String {
|
||||||
|
format!("doc:{doc}:rsv:{rsv}")
|
||||||
|
}
|
||||||
|
pub fn txid_key(doc: &str, tx_id: &str) -> String {
|
||||||
|
format!("doc:{doc}:txid:{tx_id}")
|
||||||
|
}
|
||||||
|
pub fn commit_lock_key(doc: &str) -> String {
|
||||||
|
format!("doc:{doc}:lock")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn acquire_lock(
|
||||||
|
con: &mut MultiplexedConnection,
|
||||||
|
key: &str,
|
||||||
|
ttl_ms: u64,
|
||||||
|
) -> RedisResult<bool> {
|
||||||
|
let token = Uuid::new_v4().to_string();
|
||||||
|
let ok: Option<String> = redis::cmd("SET")
|
||||||
|
.arg(key)
|
||||||
|
.arg(token)
|
||||||
|
.arg("NX")
|
||||||
|
.arg("PX")
|
||||||
|
.arg(ttl_ms)
|
||||||
|
.query_async(con)
|
||||||
|
.await?;
|
||||||
|
Ok(ok.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn wait_lock(
|
||||||
|
con: &mut MultiplexedConnection,
|
||||||
|
key: &str,
|
||||||
|
ttl_ms: u64,
|
||||||
|
attempts: u32,
|
||||||
|
) -> RedisResult<()> {
|
||||||
|
for _ in 0..attempts {
|
||||||
|
if acquire_lock(con, key, ttl_ms).await? {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(30)).await;
|
||||||
|
}
|
||||||
|
Err(redis::RedisError::from((redis::ErrorKind::Io, "lock busy")))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn build_tx_from_diff(base: serde_json::Value, alt: serde_json::Value) -> Patch {
|
||||||
|
diff(&base, &alt)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_file_from_repo(
|
||||||
|
repo: Arc<Mutex<Repository>>,
|
||||||
|
path: &str,
|
||||||
|
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
if let Ok(repo_m) = repo.try_lock() {
|
||||||
|
let obj = repo_m.revparse_single(path)?;
|
||||||
|
if let Some(blob) = obj.as_blob() {
|
||||||
|
let content = unsafe { str::from_utf8_unchecked(blob.content()) };
|
||||||
|
return Ok(content.to_string());
|
||||||
|
} else if let Some(tree) = obj.as_tree() {
|
||||||
|
let dir_list = tree
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.name().unwrap_or("").to_string())
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
|
return Ok(dir_list.join(","));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err("FileNotFound".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn setup_prebuild_tx_cache(
|
||||||
|
tx: Sender<CacheJob>,
|
||||||
|
git_repo: &str,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
// import tx files
|
||||||
|
let repo = match Repository::open(git_repo) {
|
||||||
|
Ok(repo) => repo,
|
||||||
|
Err(_) => return Err("cannot open repo".into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let repo_m = Arc::new(Mutex::new(repo));
|
||||||
|
|
||||||
|
let root_path = String::from("master:");
|
||||||
|
let all_folders = match get_file_from_repo(repo_m.clone(), root_path.as_str()) {
|
||||||
|
Ok(af) => af,
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let folders = all_folders
|
||||||
|
.split(",")
|
||||||
|
.map(|x| x.to_string())
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
|
for folder in folders {
|
||||||
|
let current_path = format!("master:{folder}");
|
||||||
|
|
||||||
|
println!("trying {folder}...");
|
||||||
|
|
||||||
|
// test get version
|
||||||
|
//
|
||||||
|
// NOTE: version may miss if there is file with `versionXXXX`
|
||||||
|
//
|
||||||
|
//
|
||||||
|
if let Ok(current_files) = get_file_from_repo(repo_m.clone(), current_path.as_str())
|
||||||
|
&& current_files.contains(",version")
|
||||||
|
{
|
||||||
|
let expected_version_path = format!("{current_path}/version");
|
||||||
|
println!("\tmay have version --> {expected_version_path}, \ncurrent: {current_files}");
|
||||||
|
|
||||||
|
// get version content
|
||||||
|
|
||||||
|
let version_str =
|
||||||
|
if let Ok(v) = get_file_from_repo(repo_m.clone(), expected_version_path.as_str()) {
|
||||||
|
v
|
||||||
|
} else {
|
||||||
|
"".to_string()
|
||||||
|
};
|
||||||
|
// let version = version_str.parse::<u64>().unwrap_or(0);
|
||||||
|
if version_str.is_empty() {
|
||||||
|
// unexpected
|
||||||
|
return Err("cannot get version".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
if tx
|
||||||
|
.send(CacheJob {
|
||||||
|
rel_path: expected_version_path.clone(),
|
||||||
|
file_data: Some(serde_json::to_vec(&version_str)?),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
info!("cache ok {current_path}/version");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("try get {folder}:{version_str}");
|
||||||
|
|
||||||
|
let files = current_files
|
||||||
|
.split(",")
|
||||||
|
.map(|x| x.to_string())
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
|
||||||
|
let base_file: String = match files.iter().find(|x| {
|
||||||
|
x.ends_with(".json") && x.starts_with(&format!("coffeethai02_{version_str}"))
|
||||||
|
}) {
|
||||||
|
Some(f) => f.to_string(),
|
||||||
|
None => "".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("{folder}: {base_file}");
|
||||||
|
|
||||||
|
if base_file.is_empty() {
|
||||||
|
println!("base file empty skip");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let base_recipe_str = if let Ok(r) =
|
||||||
|
get_file_from_repo(repo_m.clone(), &format!("{current_path}/{base_file}"))
|
||||||
|
{
|
||||||
|
r.to_string()
|
||||||
|
} else {
|
||||||
|
"".to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
if base_recipe_str.is_empty() {
|
||||||
|
println!("empty recipe skip!");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let base_recipe: serde_json::Value = match serde_json::from_str(&base_recipe_str) {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut current_recipes = HashMap::new();
|
||||||
|
|
||||||
|
current_recipes.insert("base".to_string(), base_recipe);
|
||||||
|
|
||||||
|
for cfile in files {
|
||||||
|
if cfile.ne(&base_file)
|
||||||
|
&& cfile.ends_with(".json")
|
||||||
|
&& cfile.starts_with("coffeethai02")
|
||||||
|
{
|
||||||
|
//
|
||||||
|
// fetch file content
|
||||||
|
// read into value
|
||||||
|
// do diff
|
||||||
|
|
||||||
|
let current_fetch_path = format!("{current_path}/{cfile}");
|
||||||
|
let current_file_str =
|
||||||
|
if let Ok(cr) = get_file_from_repo(repo_m.clone(), ¤t_fetch_path) {
|
||||||
|
cr.to_string()
|
||||||
|
} else {
|
||||||
|
"".to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
if !current_file_str.is_empty() {
|
||||||
|
// into value
|
||||||
|
let alt_recipe: serde_json::Value =
|
||||||
|
match serde_json::from_str(¤t_file_str) {
|
||||||
|
Ok(crr) => crr,
|
||||||
|
Err(e) => {
|
||||||
|
println!("get error while alt {current_fetch_path}: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
current_recipes.insert(cfile, alt_recipe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// process diffing
|
||||||
|
//
|
||||||
|
|
||||||
|
let base_recipe_ref = current_recipes.get("base").unwrap();
|
||||||
|
|
||||||
|
for (key, value) in current_recipes.iter() {
|
||||||
|
if key != "base" {
|
||||||
|
let patch_diff = diff(&base_recipe_ref, value);
|
||||||
|
let tx_name = format!(
|
||||||
|
"stx_{}",
|
||||||
|
key.replace("coffeethai02_", "").replace(".json", "")
|
||||||
|
);
|
||||||
|
|
||||||
|
let base_path_out = format!("./recipe_tx/{folder}");
|
||||||
|
if !std::path::Path::new(&base_path_out).exists() {
|
||||||
|
std::fs::create_dir_all(&base_path_out)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let file_out = format!("{base_path_out}/{tx_name}.json");
|
||||||
|
|
||||||
|
std::fs::write(file_out, serde_json::to_string_pretty(&patch_diff)?)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// let _ = tx
|
||||||
|
// .send(serde_json::json!({
|
||||||
|
// "topic": "generate_stx",
|
||||||
|
// "target": folder,
|
||||||
|
// "base_version": version_str,
|
||||||
|
// "full_base_filename": base_file,
|
||||||
|
// "value": base_recipe_clone
|
||||||
|
// }))
|
||||||
|
// .await;
|
||||||
|
|
||||||
|
let json_bytes = serde_json::to_vec(base_recipe_ref)?;
|
||||||
|
|
||||||
|
if tx
|
||||||
|
.send(CacheJob {
|
||||||
|
rel_path: format!("{current_path}/{base_file}"),
|
||||||
|
file_data: Some(json_bytes),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
info!("cache ok {current_path}/{base_file}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
4
src/tx/mod.rs
Normal file
4
src/tx/mod.rs
Normal file
|
|
@ -0,0 +1,4 @@
|
||||||
|
// pub mod handler;
|
||||||
|
pub mod helpers;
|
||||||
|
pub mod patcher;
|
||||||
|
pub mod types;
|
||||||
119
src/tx/patcher.rs
Normal file
119
src/tx/patcher.rs
Normal file
|
|
@ -0,0 +1,119 @@
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::tx::types::PatchOp;
|
||||||
|
|
||||||
|
pub fn json_pointer_parent<'a>(
|
||||||
|
root: &'a mut Value,
|
||||||
|
path: &str,
|
||||||
|
) -> Result<(&'a mut Value, String), String> {
|
||||||
|
if !path.starts_with('/') && path != "" {
|
||||||
|
return Err("path must be JSON pointer starting with '/'".into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let part: Vec<String> = path
|
||||||
|
.split('/')
|
||||||
|
.skip(1)
|
||||||
|
.map(|x| x.replace("~1", "/").replace("~0", "~"))
|
||||||
|
.collect();
|
||||||
|
if path.is_empty() {
|
||||||
|
return Ok((root, "".into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let last = part.last().unwrap().clone();
|
||||||
|
|
||||||
|
let mut cur = root;
|
||||||
|
|
||||||
|
for key in &part[..part.len() - 1] {
|
||||||
|
match cur {
|
||||||
|
Value::Object(map) => {
|
||||||
|
cur = map
|
||||||
|
.get_mut(key)
|
||||||
|
.ok_or_else(|| format!("missing object key {key}"))?;
|
||||||
|
}
|
||||||
|
Value::Array(arr) => {
|
||||||
|
let idx: usize = key.parse().map_err(|_| format!("bad array idx {key}"))?;
|
||||||
|
cur = arr
|
||||||
|
.get_mut(idx)
|
||||||
|
.ok_or_else(|| format!("array index out of bounds {idx}"))?;
|
||||||
|
}
|
||||||
|
_ => return Err("cannot traverse non-container".into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((cur, last))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn apply_ops(mut state: Value, ops: &[PatchOp]) -> Result<Value, String> {
|
||||||
|
for op in ops {
|
||||||
|
match op {
|
||||||
|
PatchOp::Replace { path, value } => {
|
||||||
|
if path == "" || path == "/" {
|
||||||
|
state = value.clone();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let (parent, last) = json_pointer_parent(&mut state, path)?;
|
||||||
|
match parent {
|
||||||
|
Value::Object(map) => {
|
||||||
|
if !map.contains_key(&last) {
|
||||||
|
return Err(format!("replace target missing key {last}"));
|
||||||
|
}
|
||||||
|
map.insert(last, value.clone());
|
||||||
|
}
|
||||||
|
Value::Array(arr) => {
|
||||||
|
let idx: usize = last
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| format!("bad array index {last}"))?;
|
||||||
|
if idx >= arr.len() {
|
||||||
|
return Err(format!("replace index out of bounds {idx}"));
|
||||||
|
}
|
||||||
|
arr[idx] = value.clone();
|
||||||
|
}
|
||||||
|
_ => return Err("replace parent not container".into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PatchOp::Add { path, value } => {
|
||||||
|
let (parent, last) = json_pointer_parent(&mut state, path)?;
|
||||||
|
match parent {
|
||||||
|
Value::Object(map) => {
|
||||||
|
map.insert(last, value.clone());
|
||||||
|
}
|
||||||
|
Value::Array(arr) => {
|
||||||
|
if last == "-" {
|
||||||
|
arr.push(value.clone());
|
||||||
|
} else {
|
||||||
|
let idx: usize = last
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| format!("bad array index {last}"))?;
|
||||||
|
if idx > arr.len() {
|
||||||
|
return Err(format!("add index out of bounds {idx}"));
|
||||||
|
}
|
||||||
|
arr.insert(idx, value.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => return Err("add parent not container".into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PatchOp::Remove { path } => {
|
||||||
|
let (parent, last) = json_pointer_parent(&mut state, path)?;
|
||||||
|
match parent {
|
||||||
|
Value::Object(map) => {
|
||||||
|
map.remove(&last)
|
||||||
|
.ok_or_else(|| format!("remove missing key {last}"))?;
|
||||||
|
}
|
||||||
|
Value::Array(arr) => {
|
||||||
|
let idx: usize = last
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| format!("bad array index {last}"))?;
|
||||||
|
if idx > arr.len() {
|
||||||
|
return Err(format!("remove index out of bounds {idx}"));
|
||||||
|
}
|
||||||
|
arr.remove(idx);
|
||||||
|
}
|
||||||
|
_ => return Err("remove parent not container".into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(state)
|
||||||
|
}
|
||||||
51
src/tx/types.rs
Normal file
51
src/tx/types.rs
Normal file
|
|
@ -0,0 +1,51 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ReserveReq {
|
||||||
|
pub author: String,
|
||||||
|
hint_paths: Option<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ReserveRes {
|
||||||
|
pub reservation_id: String,
|
||||||
|
pub base_version: u64,
|
||||||
|
pub expires_at: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct TxCommitReq {
|
||||||
|
pub reservation_id: String,
|
||||||
|
pub tx_id: String,
|
||||||
|
pub base_version: u64,
|
||||||
|
pub author: String,
|
||||||
|
pub ops: Vec<PatchOp>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct TxCommitRes {
|
||||||
|
pub doc_id: String,
|
||||||
|
pub tx_id: String,
|
||||||
|
pub committed_version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
#[serde(tag = "op")]
|
||||||
|
pub enum PatchOp {
|
||||||
|
#[serde(rename = "replace")]
|
||||||
|
Replace { path: String, value: Value },
|
||||||
|
#[serde(rename = "add")]
|
||||||
|
Add { path: String, value: Value },
|
||||||
|
#[serde(rename = "remove")]
|
||||||
|
Remove { path: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct Reservation {
|
||||||
|
pub reservation_id: String,
|
||||||
|
pub doc_id: String,
|
||||||
|
pub author: String,
|
||||||
|
pub base_version: u64,
|
||||||
|
pub expires_at: i64,
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue