Compare commits
No commits in common. "master" and "config" have entirely different histories.
15 changed files with 1097 additions and 4047 deletions
|
|
@ -1,5 +0,0 @@
|
||||||
/target
|
|
||||||
.tbcfg
|
|
||||||
*.txt
|
|
||||||
*.log
|
|
||||||
|
|
||||||
2
.gitignore
vendored
2
.gitignore
vendored
|
|
@ -1,5 +1,5 @@
|
||||||
/target
|
/target
|
||||||
.tbcfg*
|
.tbcfg
|
||||||
*.txt
|
*.txt
|
||||||
*.log
|
*.log
|
||||||
|
|
||||||
|
|
|
||||||
2567
Cargo.lock
generated
2567
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
16
Cargo.toml
16
Cargo.toml
|
|
@ -4,33 +4,19 @@ version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-compression = { version = "0.4.39", features = ["tokio", "brotli"] }
|
|
||||||
axum = "0.8.7"
|
axum = "0.8.7"
|
||||||
axum-macros = "0.5.0"
|
|
||||||
axum-extra = { version = "0.12.6", features = ["multipart"] }
|
|
||||||
bb8 = "0.9.1"
|
|
||||||
bb8-redis = "0.26.0"
|
|
||||||
brotli = "8.0.2"
|
|
||||||
bytes = "1.11.1"
|
|
||||||
env_logger = "0.11.8"
|
env_logger = "0.11.8"
|
||||||
futures-util = "0.3.31"
|
|
||||||
git2 = { version = "0.20.3", features = ["https", "ssh"] }
|
git2 = { version = "0.20.3", features = ["https", "ssh"] }
|
||||||
image = "0.25.9"
|
|
||||||
json-patch = "4.1.0"
|
|
||||||
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
|
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
|
||||||
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
|
libtbr = { git = "https://gitlab.forthrd.io/Pakin/libtbr.git", version = "0.1.1" }
|
||||||
log = "0.4.29"
|
log = "0.4.29"
|
||||||
prost = "0.14.1"
|
prost = "0.14.1"
|
||||||
redis = { version = "1.0.2", features = ["tokio-comp"] }
|
|
||||||
reqwest = { version = "0.12.25", features = ["json"] }
|
reqwest = { version = "0.12.25", features = ["json"] }
|
||||||
serde = { version = "1.0.228", features = ["derive"] }
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
serde_json = { version = "1.0.145", features = ["preserve_order"] }
|
serde_json = { version = "1.0.145", features = ["preserve_order"] }
|
||||||
tokio = { version = "1.48.0", features = ["full"] }
|
tokio = { version = "1.48.0", features = ["full"] }
|
||||||
tokio-util = { version = "0.7.18", features = ["io"] }
|
|
||||||
tonic = { version = "0.14.2", features = ["transport"] }
|
tonic = { version = "0.14.2", features = ["transport"] }
|
||||||
tonic-prost = "0.14.2"
|
tonic-prost = "0.14.2"
|
||||||
uuid = { version = "1.20.0", features = ["v4"] }
|
|
||||||
tokio-postgres = "0.7.17"
|
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-prost-build = "0.14.2"
|
tonic-prost-build = "0.14.2"
|
||||||
|
|
|
||||||
43
Dockerfile
43
Dockerfile
|
|
@ -1,43 +0,0 @@
|
||||||
FROM rustlang/rust:nightly-slim AS builder
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Install build dependencies
|
|
||||||
RUN apt update && apt install -y \
|
|
||||||
musl-tools \
|
|
||||||
musl-dev \
|
|
||||||
build-essential \
|
|
||||||
libssl-dev \
|
|
||||||
curl \
|
|
||||||
pkg-config \
|
|
||||||
ca-certificates \
|
|
||||||
protobuf-compiler \
|
|
||||||
zlib1g
|
|
||||||
|
|
||||||
# Copy dependency files first for better caching
|
|
||||||
COPY Cargo.toml Cargo.lock ./
|
|
||||||
COPY ./src ./src
|
|
||||||
COPY build.rs ./
|
|
||||||
COPY tbm-proto ./tbm-proto
|
|
||||||
|
|
||||||
# Download config
|
|
||||||
RUN curl -X GET https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/tbm-git-repo-service/releases/download/config/.tbcfg -o .tbcfg
|
|
||||||
|
|
||||||
# Build the application
|
|
||||||
ENV RUSTFLAGS="-C target-feature=+crt-static"
|
|
||||||
RUN cargo build --release
|
|
||||||
|
|
||||||
# Builder
|
|
||||||
FROM gcr.io/distroless/cc
|
|
||||||
|
|
||||||
# Copy the binary
|
|
||||||
COPY --from=builder /app/target/release/tbm-git-repo-service /
|
|
||||||
COPY --from=builder /app/.tbcfg /
|
|
||||||
|
|
||||||
# Create data directory
|
|
||||||
|
|
||||||
EXPOSE 36583
|
|
||||||
|
|
||||||
# Environment defaults
|
|
||||||
|
|
||||||
CMD ["./tbm-git-repo-service"]
|
|
||||||
183
README.md
183
README.md
|
|
@ -1,183 +0,0 @@
|
||||||
# tbm-git-service
|
|
||||||
|
|
||||||
Git repo as a service with operational endpoints
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Endpoints
|
|
||||||
|
|
||||||
### Checkout file
|
|
||||||
|
|
||||||
Get file content or list of file in directory from HEAD
|
|
||||||
|
|
||||||
```
|
|
||||||
GET .../checkout?path=<path_to_file>
|
|
||||||
```
|
|
||||||
|
|
||||||
Examples
|
|
||||||
|
|
||||||
Get list of files in directory
|
|
||||||
|
|
||||||
```bash
|
|
||||||
curl -X GET http://localhost:36593/checkout\?path\=inter
|
|
||||||
aus,common,dev,gbr,gbr_premium,hkg,ltu,mys,rou,sgp,tha,tha_premium,uae_dubai,usa,whatthecup
|
|
||||||
```
|
|
||||||
|
|
||||||
Get file
|
|
||||||
|
|
||||||
```bash
|
|
||||||
curl -X GET http://localhost:36593/checkout\?path\=inter/mys/xml/page_catalog_group_other.lxml
|
|
||||||
|
|
||||||
<Popup>
|
|
||||||
<Cache> "Enable" </Cache>
|
|
||||||
<Width> 1080 </Width>
|
|
||||||
<Height> 1920 </Height>
|
|
||||||
;<Background> "0xeae6e1" </Background>
|
|
||||||
<Volume> SoundVolume </Volume>
|
|
||||||
<EventOpen>
|
|
||||||
; On open
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
Error
|
|
||||||
|
|
||||||
```bash
|
|
||||||
curl -X GET http://localhost:36593/checkout\?path\=inter2
|
|
||||||
File not found
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Fetch
|
|
||||||
|
|
||||||
Fetching & update index of current repo
|
|
||||||
|
|
||||||
```
|
|
||||||
GET .../fetch
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected result
|
|
||||||
|
|
||||||
```
|
|
||||||
{"result": "fetch success"}
|
|
||||||
```
|
|
||||||
|
|
||||||
Error
|
|
||||||
|
|
||||||
```
|
|
||||||
{"error": "..."}
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Commit
|
|
||||||
|
|
||||||
Commit changes of file i.e. editing, adding, etc.
|
|
||||||
|
|
||||||
```
|
|
||||||
POST .../commit
|
|
||||||
```
|
|
||||||
|
|
||||||
Body must be multipart
|
|
||||||
|
|
||||||
- Single file
|
|
||||||
|
|
||||||
```
|
|
||||||
path: string
|
|
||||||
file: text/binary is acceptable
|
|
||||||
signature_username: string
|
|
||||||
signature_email: stirng
|
|
||||||
message: string
|
|
||||||
```
|
|
||||||
|
|
||||||
- Multiple file
|
|
||||||
|
|
||||||
```
|
|
||||||
signature_username: string
|
|
||||||
signature_email: stirng
|
|
||||||
message: string
|
|
||||||
fileX1: text/binary is acceptable
|
|
||||||
pathX1: string
|
|
||||||
fileX2: text/binary is acceptable
|
|
||||||
pathX2: string
|
|
||||||
...
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected result
|
|
||||||
|
|
||||||
```
|
|
||||||
{"result": "commit hash id"}
|
|
||||||
```
|
|
||||||
|
|
||||||
Error
|
|
||||||
|
|
||||||
```
|
|
||||||
{"error": "..."}
|
|
||||||
```
|
|
||||||
|
|
||||||
Example
|
|
||||||
|
|
||||||
```curl
|
|
||||||
curl --request POST \
|
|
||||||
--url http://localhost:36583/commit \
|
|
||||||
--header 'content-type: multipart/form-data' \
|
|
||||||
--form path=mys/version.dev \
|
|
||||||
--form 'signature_username=git api' \
|
|
||||||
--form signature_email=supra.m2.dev@forth.co.th \
|
|
||||||
--form 'message=test commit' \
|
|
||||||
--form file=@./mys.version.test
|
|
||||||
```
|
|
||||||
|
|
||||||
NOTE: file & path in multiple file must have the same name after
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Push
|
|
||||||
|
|
||||||
Push local commits to remote
|
|
||||||
|
|
||||||
```
|
|
||||||
GET .../push
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected result
|
|
||||||
|
|
||||||
```
|
|
||||||
{"result": "push completed"}
|
|
||||||
```
|
|
||||||
|
|
||||||
Error
|
|
||||||
|
|
||||||
```
|
|
||||||
{"error": "..."}
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Pull
|
|
||||||
|
|
||||||
Pull commits from remote. This operation always does git reset hard first before pull for reason of no conflicts.
|
|
||||||
|
|
||||||
```
|
|
||||||
GET .../pull
|
|
||||||
```
|
|
||||||
|
|
||||||
Expected result
|
|
||||||
|
|
||||||
```
|
|
||||||
{"result": "pull completed"}
|
|
||||||
```
|
|
||||||
|
|
||||||
Error
|
|
||||||
|
|
||||||
```
|
|
||||||
{"error": "..."}
|
|
||||||
```
|
|
||||||
|
|
||||||
NOTE: Commit did lost but could be checkout later
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Recovery
|
|
||||||
|
|
||||||
WIP...
|
|
||||||
1365
src/app.rs
1365
src/app.rs
File diff suppressed because it is too large
Load diff
90
src/git.rs
90
src/git.rs
|
|
@ -1,21 +1,16 @@
|
||||||
use std::{
|
use std::{cell::RefCell, collections::HashMap, io::{self, Write}, path::{Path, PathBuf}};
|
||||||
cell::RefCell,
|
|
||||||
collections::HashMap,
|
|
||||||
io::{self, Write},
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
};
|
|
||||||
|
|
||||||
use git2::{Cred, FetchOptions, Progress, RemoteCallbacks, build::RepoBuilder};
|
use git2::{build::RepoBuilder, Cred, FetchOptions, Progress, RemoteCallbacks};
|
||||||
use log::info;
|
use log::{info};
|
||||||
|
|
||||||
use crate::gcm;
|
use crate::gcm;
|
||||||
|
|
||||||
struct GitState {
|
struct GitState {
|
||||||
progress: Option<Progress<'static>>,
|
progress: Option<Progress<'static>>,
|
||||||
total: usize,
|
total: usize,
|
||||||
current: usize,
|
current: usize,
|
||||||
path: Option<PathBuf>,
|
path: Option<PathBuf>,
|
||||||
newline: bool,
|
newline: bool
|
||||||
}
|
}
|
||||||
|
|
||||||
fn print(state: &mut GitState) {
|
fn print(state: &mut GitState) {
|
||||||
|
|
@ -34,11 +29,10 @@ fn print(state: &mut GitState) {
|
||||||
state.newline = true;
|
state.newline = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!("Resolving deltas {}/{}",
|
||||||
"Resolving deltas {}/{}",
|
stats.indexed_deltas(),
|
||||||
stats.indexed_deltas(),
|
stats.total_deltas());
|
||||||
stats.total_deltas()
|
|
||||||
);
|
|
||||||
} else {
|
} else {
|
||||||
info!(
|
info!(
|
||||||
"net {:3}% ({:4} kb, {:5}/{:5}) / idx {:3}% ({:5}/{:5}) \
|
"net {:3}% ({:4} kb, {:5}/{:5}) / idx {:3}% ({:5}/{:5}) \
|
||||||
|
|
@ -64,38 +58,40 @@ fn print(state: &mut GitState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn setup_git_repo(config: HashMap<String, String>) -> gcm::StandardResult {
|
pub fn setup_git_repo(config: HashMap<String, String>) -> gcm::StandardResult {
|
||||||
let state = RefCell::new(GitState {
|
let state = RefCell::new(GitState {
|
||||||
progress: None,
|
progress: None,
|
||||||
total: 0,
|
total: 0,
|
||||||
current: 0,
|
current: 0,
|
||||||
path: None,
|
path: None,
|
||||||
newline: true,
|
newline: true
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut cb = RemoteCallbacks::new();
|
let mut cb = RemoteCallbacks::new();
|
||||||
cb.transfer_progress(|stats| {
|
cb.transfer_progress(|stats| {
|
||||||
let mut state = state.borrow_mut();
|
let mut state = state.borrow_mut();
|
||||||
state.progress = Some(stats.to_owned());
|
state.progress = Some(stats.to_owned());
|
||||||
print(&mut state);
|
print(&mut state);
|
||||||
true
|
true
|
||||||
});
|
});
|
||||||
|
|
||||||
cb.credentials(|_, _, _| {
|
cb.credentials(|_,_,_| {
|
||||||
Cred::userpass_plaintext(
|
Cred::userpass_plaintext(
|
||||||
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
|
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
|
||||||
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string()),
|
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string())
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut fo = FetchOptions::new();
|
let mut fo = FetchOptions::new();
|
||||||
fo.remote_callbacks(cb);
|
fo.remote_callbacks(cb);
|
||||||
|
fo.depth(1);
|
||||||
|
|
||||||
let _ = RepoBuilder::new().bare(false).fetch_options(fo).clone(
|
RepoBuilder::new()
|
||||||
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
|
.bare(true)
|
||||||
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into(),
|
.fetch_options(fo)
|
||||||
|
.clone(
|
||||||
|
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
|
||||||
|
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into()
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
println!("clone completed !");
|
Ok(())
|
||||||
|
}
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
73
src/main.rs
73
src/main.rs
|
|
@ -1,67 +1,56 @@
|
||||||
use std::fs::File;
|
use std::{fs::File};
|
||||||
|
|
||||||
use env_logger::Builder;
|
use env_logger::Builder;
|
||||||
use libtbr::recipe_functions::common;
|
use libtbr::recipe_functions::common;
|
||||||
use log::info;
|
use log::info;
|
||||||
|
|
||||||
use crate::{
|
use crate::{git::setup_git_repo, reg::{heartbeat_loop, registry::registry_client::RegistryClient}};
|
||||||
git::setup_git_repo,
|
|
||||||
// reg::{heartbeat_loop, registry::registry_client::RegistryClient},
|
|
||||||
};
|
|
||||||
|
|
||||||
mod app;
|
mod app;
|
||||||
mod gcm;
|
mod gcm;
|
||||||
mod git;
|
mod git;
|
||||||
mod tx;
|
mod reg;
|
||||||
// mod reg;
|
|
||||||
|
|
||||||
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
|
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
|
||||||
// NOTE: disable logging file, use send to log service instead
|
let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
|
||||||
// let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
|
|
||||||
|
|
||||||
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
|
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
|
||||||
// .target(env_logger::Target::Pipe(Box::new(logfile)))
|
.target(env_logger::Target::Pipe(Box::new(logfile))).init();
|
||||||
.init();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn setup_registry(config: gcm::Configure) -> gcm::StandardResult {
|
||||||
|
let cfg_clone = config.clone();
|
||||||
|
let register_server_address = cfg_clone.get("REGISTRY_SERVER").expect("not found registry server");
|
||||||
|
let name = cfg_clone.get("SERVICE_NAME").expect("missing service name");
|
||||||
|
let url = cfg_clone.get("SERVICE_URL").expect("missing service url");
|
||||||
|
|
||||||
|
let mut client = RegistryClient::connect(register_server_address.clone()).await?;
|
||||||
|
reg::register_service(&mut client, name, url).await;
|
||||||
|
|
||||||
|
tokio::spawn(heartbeat_loop(name.to_string(), url.to_string()));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> gcm::StandardResult {
|
async fn main() -> gcm::StandardResult {
|
||||||
let config: gcm::Configure = common::get_config();
|
let config: gcm::Configure = common::get_config();
|
||||||
setup_log(config.clone())?;
|
setup_log(config.clone())?;
|
||||||
|
|
||||||
match std::fs::read_dir(config.get("GIT_REPO_LOCAL_DEST").expect("not exist")) {
|
match std::fs::read_dir(config.get("GIT_REPO_LOCAL_DEST").expect("not exist")) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!(
|
info!("GIT_REPO `{dest}` already setup", dest = config.get("GIT_REPO_LOCAL_DEST").unwrap())
|
||||||
"GIT_REPO `{dest}` existed, checking if has git",
|
},
|
||||||
dest = config.get("GIT_REPO_LOCAL_DEST").unwrap()
|
Err(_) => {
|
||||||
);
|
setup_git_repo(config.clone())?;
|
||||||
|
}
|
||||||
match std::fs::read_dir(format!(
|
|
||||||
"{}/.git",
|
|
||||||
config.get("GIT_REPO_LOCAL_DEST").expect("not exist")
|
|
||||||
)) {
|
|
||||||
Ok(_) => {
|
|
||||||
info!("GIT REPO already set up!")
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
setup_git_repo(config.clone())?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
setup_git_repo(config.clone())?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
setup_registry(config.clone()).await?;
|
||||||
info!(
|
|
||||||
"APP VERSION: {}",
|
|
||||||
config
|
|
||||||
.get("CONFIG_VERSION")
|
|
||||||
.expect("config version not defined")
|
|
||||||
);
|
|
||||||
info!("RUNNING: {:?}", config.get("PUBLIC_PORT"));
|
|
||||||
app::run(config.clone()).await?;
|
app::run(config.clone()).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,5 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use log::{error, info};
|
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use tonic::transport::Channel;
|
use tonic::transport::Channel;
|
||||||
|
|
||||||
|
|
@ -18,11 +17,7 @@ pub async fn register_service(client: &mut RegistryClient<Channel>, name: &str,
|
||||||
token: std::env::var("REGISTRY_TOKEN").unwrap_or_default(),
|
token: std::env::var("REGISTRY_TOKEN").unwrap_or_default(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let resp = client.register(req).await;
|
let _ = client.register(req).await;
|
||||||
match resp {
|
|
||||||
Ok(r) => info!("{:?}", r),
|
|
||||||
Err(e) => error!("error register: {:?}", e)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn heartbeat_loop(name: String, url: String) {
|
pub async fn heartbeat_loop(name: String, url: String) {
|
||||||
|
|
|
||||||
|
|
@ -1,323 +0,0 @@
|
||||||
use axum::{
|
|
||||||
Json, Router,
|
|
||||||
extract::{Path, Query, State},
|
|
||||||
response::IntoResponse,
|
|
||||||
routing::{get, post},
|
|
||||||
};
|
|
||||||
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
|
|
||||||
use reqwest::StatusCode;
|
|
||||||
use serde::Deserialize;
|
|
||||||
use serde_json::{Value, json};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::tx::{
|
|
||||||
helpers::*,
|
|
||||||
patcher::apply_ops,
|
|
||||||
types::{PatchOp, Reservation, ReserveReq, ReserveRes, TxCommitReq, TxCommitRes},
|
|
||||||
};
|
|
||||||
|
|
||||||
async fn reserve_tx(
|
|
||||||
State(state): State<AppState>,
|
|
||||||
Path(doc_id): Path<String>,
|
|
||||||
Json(req): Json<ReserveReq>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
let rsv_id = Uuid::new_v4().to_string();
|
|
||||||
let ttl_secs: i64 = 30; // ttl reserve 30s
|
|
||||||
let expires_at = now_unix() + ttl_secs;
|
|
||||||
|
|
||||||
let mut rcli = state.redis_cli.clone();
|
|
||||||
|
|
||||||
// check version
|
|
||||||
let ver_key = doc_ver_key(&doc_id);
|
|
||||||
let base_version: u64 = match rcli.get(ver_key) {
|
|
||||||
Ok(res) => {
|
|
||||||
if let Some(r) = res {
|
|
||||||
r.parse().ok().unwrap()
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let rsv = Reservation {
|
|
||||||
reservation_id: rsv_id.clone(),
|
|
||||||
doc_id: doc_id.clone(),
|
|
||||||
author: req.author,
|
|
||||||
base_version,
|
|
||||||
expires_at,
|
|
||||||
};
|
|
||||||
|
|
||||||
let key = rsv_key(&doc_id, &rsv_id);
|
|
||||||
let payload = serde_json::to_string(&rsv).unwrap();
|
|
||||||
|
|
||||||
let _: () = rcli.set_ex(key, payload, ttl_secs as u64).unwrap();
|
|
||||||
|
|
||||||
(
|
|
||||||
StatusCode::OK,
|
|
||||||
Json(ReserveRes {
|
|
||||||
reservation_id: rsv_id,
|
|
||||||
base_version,
|
|
||||||
expires_at,
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn commit_tx(
|
|
||||||
State(state): State<AppState>,
|
|
||||||
Path(doc_id): Path<String>,
|
|
||||||
Json(req): Json<TxCommitReq>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
if req.tx_id.is_empty() || req.reservation_id.is_empty() {
|
|
||||||
return (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
"missing tx_id or reservation_id ".into_response(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut con = match state.redis_cli.get_multiplexed_async_connection().await {
|
|
||||||
Ok(conn) => conn,
|
|
||||||
Err(e) => {
|
|
||||||
return (
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
format!("{}", e.to_string()).into_response(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// lock commit
|
|
||||||
let lock_key = commit_lock_key(&doc_id);
|
|
||||||
if let Err(_) = wait_lock(&mut con, &lock_key, 1200, 20).await {
|
|
||||||
return (StatusCode::CONFLICT, "doc busy, retry".into_response());
|
|
||||||
}
|
|
||||||
|
|
||||||
let txid_k = txid_key(&doc_id, &req.tx_id);
|
|
||||||
if let Ok(Some(v)) = con.get(&txid_k).await {
|
|
||||||
if let Ok(committed_version) = v.parse::<u64>() {
|
|
||||||
let res = TxCommitRes {
|
|
||||||
doc_id,
|
|
||||||
tx_id: req.tx_id,
|
|
||||||
committed_version,
|
|
||||||
};
|
|
||||||
return (StatusCode::OK, Json(res).into_response());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let rsv_k = rsv_key(&doc_id, &req.reservation_id);
|
|
||||||
let rsv_raw: Option<String> = con.get(&rsv_k).await.unwrap_or(None);
|
|
||||||
let rsv_raw = match rsv_raw {
|
|
||||||
Some(x) => x,
|
|
||||||
None => {
|
|
||||||
return (
|
|
||||||
StatusCode::CONFLICT,
|
|
||||||
"reservation missing/expired".into_response(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let rsv: Reservation = match serde_json::from_str(&rsv_raw) {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(_) => return (StatusCode::CONFLICT, "bad reservation".into_response()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if rsv.doc_id != doc_id || rsv.author != req.author || rsv.base_version != req.base_version {
|
|
||||||
return (StatusCode::CONFLICT, "reservation mismatch".into_response());
|
|
||||||
}
|
|
||||||
|
|
||||||
if now_unix() > rsv.expires_at {
|
|
||||||
return (StatusCode::CONFLICT, "reservation expired".into_response());
|
|
||||||
}
|
|
||||||
|
|
||||||
let curr_ver: u64 = con
|
|
||||||
.get(doc_ver_key(&doc_id))
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
if req.base_version != curr_ver {
|
|
||||||
let body = json!({
|
|
||||||
"error": "version_conflict",
|
|
||||||
"server_version": curr_ver,
|
|
||||||
});
|
|
||||||
return (StatusCode::CONFLICT, Json(body).into_response());
|
|
||||||
}
|
|
||||||
|
|
||||||
let next_ver = curr_ver + 1;
|
|
||||||
|
|
||||||
let stream_key = doc_tx_stream_key(&doc_id);
|
|
||||||
|
|
||||||
let tx_event = json!({
|
|
||||||
"doc_id": doc_id,
|
|
||||||
"tx_id": req.tx_id,
|
|
||||||
"version": next_ver,
|
|
||||||
"base_version": req.base_version,
|
|
||||||
"author": req.author,
|
|
||||||
"ts": now_unix(),
|
|
||||||
"ops": req.ops
|
|
||||||
});
|
|
||||||
|
|
||||||
let tx_event_str = tx_event.to_string();
|
|
||||||
let _: String = redis::cmd("XADD")
|
|
||||||
.arg(&stream_key)
|
|
||||||
.arg("*")
|
|
||||||
.arg("data")
|
|
||||||
.arg(&tx_event_str)
|
|
||||||
.query_async(&mut con)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let _: () = con
|
|
||||||
.set(doc_ver_key(&doc_id), next_ver.to_string())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let _: () = con.set(&txid_k, next_ver.to_string()).await.unwrap();
|
|
||||||
let _: usize = con.del(&rsv_k).await.unwrap();
|
|
||||||
|
|
||||||
let snapshot_every: u64 = 50;
|
|
||||||
if next_ver % snapshot_every == 0 {
|
|
||||||
if let Ok(state) = build_state_at(&mut con, &doc_id, next_ver).await {
|
|
||||||
let _: () = con
|
|
||||||
.set(doc_snap_key(&doc_id), state.to_string())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let _: () = con
|
|
||||||
.set(doc_snapver_key(&doc_id), next_ver.to_string())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(
|
|
||||||
StatusCode::OK,
|
|
||||||
Json(TxCommitRes {
|
|
||||||
doc_id,
|
|
||||||
tx_id: req.tx_id,
|
|
||||||
committed_version: next_ver,
|
|
||||||
})
|
|
||||||
.into_response(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn build_state_at(
|
|
||||||
con: &mut MultiplexedConnection,
|
|
||||||
doc_id: &str,
|
|
||||||
target_version: u64,
|
|
||||||
) -> Result<Value, String> {
|
|
||||||
let snap_ver: u64 = con
|
|
||||||
.get(doc_snap_key(&doc_id))
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
let snap_json: Value = if snap_ver > 0 {
|
|
||||||
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
|
|
||||||
raw.and_then(|s| serde_json::from_str(&s).ok())
|
|
||||||
.unwrap_or_else(|| json!({}))
|
|
||||||
} else {
|
|
||||||
json!({})
|
|
||||||
};
|
|
||||||
|
|
||||||
let stream_key = doc_tx_stream_key(doc_id);
|
|
||||||
|
|
||||||
let start_id = "0-0";
|
|
||||||
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
|
|
||||||
.arg(&stream_key)
|
|
||||||
.arg(start_id)
|
|
||||||
.arg("+")
|
|
||||||
.query_async(con)
|
|
||||||
.await
|
|
||||||
.map_err(|e| e.to_string())?;
|
|
||||||
|
|
||||||
let mut ops_list: Vec<PatchOp> = Vec::new();
|
|
||||||
for (_id, fields) in entries {
|
|
||||||
let mut data_opts = None;
|
|
||||||
for (k, v) in fields {
|
|
||||||
if k == "data" {
|
|
||||||
data_opts = Some(v);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let data = match data_opts {
|
|
||||||
Some(x) => x,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
|
|
||||||
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
|
|
||||||
if ver == 0 || ver > target_version {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ver <= snap_ver {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let ops: Vec<PatchOp> =
|
|
||||||
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
|
|
||||||
ops_list.extend(ops);
|
|
||||||
}
|
|
||||||
|
|
||||||
apply_ops(snap_json, &ops_list)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct StateQuery {
|
|
||||||
at_version: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_state(
|
|
||||||
State(state): State<AppState>,
|
|
||||||
Path(doc_id): Path<String>,
|
|
||||||
Query(q): Query<StateQuery>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
let rcli = state.redis_cli.clone();
|
|
||||||
let mut con = match rcli.get_multiplexed_async_connection().await {
|
|
||||||
Ok(sc) => sc,
|
|
||||||
Err(_) => {
|
|
||||||
return (
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
"cannot get connection".into_response(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let cur_ver: u64 = con
|
|
||||||
.get(doc_ver_key(&doc_id))
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
let target = q.at_version.unwrap_or(cur_ver).min(cur_ver);
|
|
||||||
|
|
||||||
match build_state_at(&mut con, &doc_id, target).await {
|
|
||||||
Ok(state) => (
|
|
||||||
StatusCode::OK,
|
|
||||||
Json(json!({
|
|
||||||
"doc_id": doc_id,
|
|
||||||
"version": target,
|
|
||||||
"state": state
|
|
||||||
}))
|
|
||||||
.into_response(),
|
|
||||||
),
|
|
||||||
Err(e) => (
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
Json(json!({
|
|
||||||
"error": e
|
|
||||||
}))
|
|
||||||
.into_response(),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pub async fn create_tx_patcher_route() -> Router<AppState> {
|
|
||||||
// Router::new()
|
|
||||||
// .route("/{id}/tx-reserve", post(reserve_tx))
|
|
||||||
// .route("/{id}/tx-commit", post(commit_tx))
|
|
||||||
// .route("/{id}/state", get(get_state))
|
|
||||||
// }
|
|
||||||
|
|
@ -1,296 +0,0 @@
|
||||||
use std::{
|
|
||||||
clone,
|
|
||||||
collections::HashMap,
|
|
||||||
sync::{Arc, Mutex},
|
|
||||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
|
||||||
};
|
|
||||||
|
|
||||||
use git2::Repository;
|
|
||||||
use json_patch::{Patch, diff};
|
|
||||||
use log::info;
|
|
||||||
use redis::{RedisResult, aio::MultiplexedConnection};
|
|
||||||
use serde::Deserialize;
|
|
||||||
use tokio::{sync::mpsc::Sender, time::sleep};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::app::CacheJob;
|
|
||||||
|
|
||||||
pub fn now_unix() -> i64 {
|
|
||||||
SystemTime::now()
|
|
||||||
.duration_since(UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_secs() as i64
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn doc_ver_key(doc: &str) -> String {
|
|
||||||
format!("doc:{doc}:ver")
|
|
||||||
}
|
|
||||||
pub fn doc_tx_stream_key(doc: &str) -> String {
|
|
||||||
format!("doc:{doc}:tx")
|
|
||||||
}
|
|
||||||
pub fn doc_snap_key(doc: &str) -> String {
|
|
||||||
format!("doc:{doc}:snap")
|
|
||||||
}
|
|
||||||
pub fn doc_snapver_key(doc: &str) -> String {
|
|
||||||
format!("doc:{doc}:snapver")
|
|
||||||
}
|
|
||||||
pub fn rsv_key(doc: &str, rsv: &str) -> String {
|
|
||||||
format!("doc:{doc}:rsv:{rsv}")
|
|
||||||
}
|
|
||||||
pub fn txid_key(doc: &str, tx_id: &str) -> String {
|
|
||||||
format!("doc:{doc}:txid:{tx_id}")
|
|
||||||
}
|
|
||||||
pub fn commit_lock_key(doc: &str) -> String {
|
|
||||||
format!("doc:{doc}:lock")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn acquire_lock(
|
|
||||||
con: &mut MultiplexedConnection,
|
|
||||||
key: &str,
|
|
||||||
ttl_ms: u64,
|
|
||||||
) -> RedisResult<bool> {
|
|
||||||
let token = Uuid::new_v4().to_string();
|
|
||||||
let ok: Option<String> = redis::cmd("SET")
|
|
||||||
.arg(key)
|
|
||||||
.arg(token)
|
|
||||||
.arg("NX")
|
|
||||||
.arg("PX")
|
|
||||||
.arg(ttl_ms)
|
|
||||||
.query_async(con)
|
|
||||||
.await?;
|
|
||||||
Ok(ok.is_some())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn wait_lock(
|
|
||||||
con: &mut MultiplexedConnection,
|
|
||||||
key: &str,
|
|
||||||
ttl_ms: u64,
|
|
||||||
attempts: u32,
|
|
||||||
) -> RedisResult<()> {
|
|
||||||
for _ in 0..attempts {
|
|
||||||
if acquire_lock(con, key, ttl_ms).await? {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(Duration::from_millis(30)).await;
|
|
||||||
}
|
|
||||||
Err(redis::RedisError::from((redis::ErrorKind::Io, "lock busy")))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_tx_from_diff(base: serde_json::Value, alt: serde_json::Value) -> Patch {
|
|
||||||
diff(&base, &alt)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_file_from_repo(
|
|
||||||
repo: Arc<Mutex<Repository>>,
|
|
||||||
path: &str,
|
|
||||||
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
if let Ok(repo_m) = repo.try_lock() {
|
|
||||||
let obj = repo_m.revparse_single(path)?;
|
|
||||||
if let Some(blob) = obj.as_blob() {
|
|
||||||
let content = unsafe { str::from_utf8_unchecked(blob.content()) };
|
|
||||||
return Ok(content.to_string());
|
|
||||||
} else if let Some(tree) = obj.as_tree() {
|
|
||||||
let dir_list = tree
|
|
||||||
.iter()
|
|
||||||
.map(|x| x.name().unwrap_or("").to_string())
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
|
|
||||||
return Ok(dir_list.join(","));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err("FileNotFound".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn setup_prebuild_tx_cache(
|
|
||||||
tx: Sender<CacheJob>,
|
|
||||||
git_repo: &str,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
// import tx files
|
|
||||||
let repo = match Repository::open(git_repo) {
|
|
||||||
Ok(repo) => repo,
|
|
||||||
Err(_) => return Err("cannot open repo".into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let repo_m = Arc::new(Mutex::new(repo));
|
|
||||||
|
|
||||||
let root_path = String::from("master:");
|
|
||||||
let all_folders = match get_file_from_repo(repo_m.clone(), root_path.as_str()) {
|
|
||||||
Ok(af) => af,
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
};
|
|
||||||
|
|
||||||
let folders = all_folders
|
|
||||||
.split(",")
|
|
||||||
.map(|x| x.to_string())
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
|
|
||||||
for folder in folders {
|
|
||||||
let current_path = format!("master:{folder}");
|
|
||||||
|
|
||||||
println!("trying {folder}...");
|
|
||||||
|
|
||||||
// test get version
|
|
||||||
//
|
|
||||||
// NOTE: version may miss if there is file with `versionXXXX`
|
|
||||||
//
|
|
||||||
//
|
|
||||||
if let Ok(current_files) = get_file_from_repo(repo_m.clone(), current_path.as_str())
|
|
||||||
&& current_files.contains(",version")
|
|
||||||
{
|
|
||||||
let expected_version_path = format!("{current_path}/version");
|
|
||||||
println!("\tmay have version --> {expected_version_path}, \ncurrent: {current_files}");
|
|
||||||
|
|
||||||
// get version content
|
|
||||||
|
|
||||||
let version_str =
|
|
||||||
if let Ok(v) = get_file_from_repo(repo_m.clone(), expected_version_path.as_str()) {
|
|
||||||
v
|
|
||||||
} else {
|
|
||||||
"".to_string()
|
|
||||||
};
|
|
||||||
// let version = version_str.parse::<u64>().unwrap_or(0);
|
|
||||||
if version_str.is_empty() {
|
|
||||||
// unexpected
|
|
||||||
return Err("cannot get version".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
if tx
|
|
||||||
.send(CacheJob {
|
|
||||||
rel_path: expected_version_path.clone(),
|
|
||||||
file_data: Some(serde_json::to_vec(&version_str)?),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
info!("cache ok {current_path}/version");
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("try get {folder}:{version_str}");
|
|
||||||
|
|
||||||
let files = current_files
|
|
||||||
.split(",")
|
|
||||||
.map(|x| x.to_string())
|
|
||||||
.collect::<Vec<String>>();
|
|
||||||
|
|
||||||
let base_file: String = match files.iter().find(|x| {
|
|
||||||
x.ends_with(".json") && x.starts_with(&format!("coffeethai02_{version_str}"))
|
|
||||||
}) {
|
|
||||||
Some(f) => f.to_string(),
|
|
||||||
None => "".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("{folder}: {base_file}");
|
|
||||||
|
|
||||||
if base_file.is_empty() {
|
|
||||||
println!("base file empty skip");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let base_recipe_str = if let Ok(r) =
|
|
||||||
get_file_from_repo(repo_m.clone(), &format!("{current_path}/{base_file}"))
|
|
||||||
{
|
|
||||||
r.to_string()
|
|
||||||
} else {
|
|
||||||
"".to_string()
|
|
||||||
};
|
|
||||||
|
|
||||||
if base_recipe_str.is_empty() {
|
|
||||||
println!("empty recipe skip!");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let base_recipe: serde_json::Value = match serde_json::from_str(&base_recipe_str) {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut current_recipes = HashMap::new();
|
|
||||||
|
|
||||||
current_recipes.insert("base".to_string(), base_recipe);
|
|
||||||
|
|
||||||
for cfile in files {
|
|
||||||
if cfile.ne(&base_file)
|
|
||||||
&& cfile.ends_with(".json")
|
|
||||||
&& cfile.starts_with("coffeethai02")
|
|
||||||
{
|
|
||||||
//
|
|
||||||
// fetch file content
|
|
||||||
// read into value
|
|
||||||
// do diff
|
|
||||||
|
|
||||||
let current_fetch_path = format!("{current_path}/{cfile}");
|
|
||||||
let current_file_str =
|
|
||||||
if let Ok(cr) = get_file_from_repo(repo_m.clone(), ¤t_fetch_path) {
|
|
||||||
cr.to_string()
|
|
||||||
} else {
|
|
||||||
"".to_string()
|
|
||||||
};
|
|
||||||
|
|
||||||
if !current_file_str.is_empty() {
|
|
||||||
// into value
|
|
||||||
let alt_recipe: serde_json::Value =
|
|
||||||
match serde_json::from_str(¤t_file_str) {
|
|
||||||
Ok(crr) => crr,
|
|
||||||
Err(e) => {
|
|
||||||
println!("get error while alt {current_fetch_path}: {e}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
current_recipes.insert(cfile, alt_recipe);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// process diffing
|
|
||||||
//
|
|
||||||
|
|
||||||
let base_recipe_ref = current_recipes.get("base").unwrap();
|
|
||||||
|
|
||||||
for (key, value) in current_recipes.iter() {
|
|
||||||
if key != "base" {
|
|
||||||
let patch_diff = diff(&base_recipe_ref, value);
|
|
||||||
let tx_name = format!(
|
|
||||||
"stx_{}",
|
|
||||||
key.replace("coffeethai02_", "").replace(".json", "")
|
|
||||||
);
|
|
||||||
|
|
||||||
let base_path_out = format!("./recipe_tx/{folder}");
|
|
||||||
if !std::path::Path::new(&base_path_out).exists() {
|
|
||||||
std::fs::create_dir_all(&base_path_out)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let file_out = format!("{base_path_out}/{tx_name}.json");
|
|
||||||
|
|
||||||
std::fs::write(file_out, serde_json::to_string_pretty(&patch_diff)?)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// let _ = tx
|
|
||||||
// .send(serde_json::json!({
|
|
||||||
// "topic": "generate_stx",
|
|
||||||
// "target": folder,
|
|
||||||
// "base_version": version_str,
|
|
||||||
// "full_base_filename": base_file,
|
|
||||||
// "value": base_recipe_clone
|
|
||||||
// }))
|
|
||||||
// .await;
|
|
||||||
|
|
||||||
let json_bytes = serde_json::to_vec(base_recipe_ref)?;
|
|
||||||
|
|
||||||
if tx
|
|
||||||
.send(CacheJob {
|
|
||||||
rel_path: format!("{current_path}/{base_file}"),
|
|
||||||
file_data: Some(json_bytes),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
info!("cache ok {current_path}/{base_file}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
@ -1,4 +0,0 @@
|
||||||
// pub mod handler;
|
|
||||||
pub mod helpers;
|
|
||||||
pub mod patcher;
|
|
||||||
pub mod types;
|
|
||||||
|
|
@ -1,119 +0,0 @@
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use crate::tx::types::PatchOp;
|
|
||||||
|
|
||||||
pub fn json_pointer_parent<'a>(
|
|
||||||
root: &'a mut Value,
|
|
||||||
path: &str,
|
|
||||||
) -> Result<(&'a mut Value, String), String> {
|
|
||||||
if !path.starts_with('/') && path != "" {
|
|
||||||
return Err("path must be JSON pointer starting with '/'".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let part: Vec<String> = path
|
|
||||||
.split('/')
|
|
||||||
.skip(1)
|
|
||||||
.map(|x| x.replace("~1", "/").replace("~0", "~"))
|
|
||||||
.collect();
|
|
||||||
if path.is_empty() {
|
|
||||||
return Ok((root, "".into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let last = part.last().unwrap().clone();
|
|
||||||
|
|
||||||
let mut cur = root;
|
|
||||||
|
|
||||||
for key in &part[..part.len() - 1] {
|
|
||||||
match cur {
|
|
||||||
Value::Object(map) => {
|
|
||||||
cur = map
|
|
||||||
.get_mut(key)
|
|
||||||
.ok_or_else(|| format!("missing object key {key}"))?;
|
|
||||||
}
|
|
||||||
Value::Array(arr) => {
|
|
||||||
let idx: usize = key.parse().map_err(|_| format!("bad array idx {key}"))?;
|
|
||||||
cur = arr
|
|
||||||
.get_mut(idx)
|
|
||||||
.ok_or_else(|| format!("array index out of bounds {idx}"))?;
|
|
||||||
}
|
|
||||||
_ => return Err("cannot traverse non-container".into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((cur, last))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn apply_ops(mut state: Value, ops: &[PatchOp]) -> Result<Value, String> {
|
|
||||||
for op in ops {
|
|
||||||
match op {
|
|
||||||
PatchOp::Replace { path, value } => {
|
|
||||||
if path == "" || path == "/" {
|
|
||||||
state = value.clone();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let (parent, last) = json_pointer_parent(&mut state, path)?;
|
|
||||||
match parent {
|
|
||||||
Value::Object(map) => {
|
|
||||||
if !map.contains_key(&last) {
|
|
||||||
return Err(format!("replace target missing key {last}"));
|
|
||||||
}
|
|
||||||
map.insert(last, value.clone());
|
|
||||||
}
|
|
||||||
Value::Array(arr) => {
|
|
||||||
let idx: usize = last
|
|
||||||
.parse()
|
|
||||||
.map_err(|_| format!("bad array index {last}"))?;
|
|
||||||
if idx >= arr.len() {
|
|
||||||
return Err(format!("replace index out of bounds {idx}"));
|
|
||||||
}
|
|
||||||
arr[idx] = value.clone();
|
|
||||||
}
|
|
||||||
_ => return Err("replace parent not container".into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PatchOp::Add { path, value } => {
|
|
||||||
let (parent, last) = json_pointer_parent(&mut state, path)?;
|
|
||||||
match parent {
|
|
||||||
Value::Object(map) => {
|
|
||||||
map.insert(last, value.clone());
|
|
||||||
}
|
|
||||||
Value::Array(arr) => {
|
|
||||||
if last == "-" {
|
|
||||||
arr.push(value.clone());
|
|
||||||
} else {
|
|
||||||
let idx: usize = last
|
|
||||||
.parse()
|
|
||||||
.map_err(|_| format!("bad array index {last}"))?;
|
|
||||||
if idx > arr.len() {
|
|
||||||
return Err(format!("add index out of bounds {idx}"));
|
|
||||||
}
|
|
||||||
arr.insert(idx, value.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => return Err("add parent not container".into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
PatchOp::Remove { path } => {
|
|
||||||
let (parent, last) = json_pointer_parent(&mut state, path)?;
|
|
||||||
match parent {
|
|
||||||
Value::Object(map) => {
|
|
||||||
map.remove(&last)
|
|
||||||
.ok_or_else(|| format!("remove missing key {last}"))?;
|
|
||||||
}
|
|
||||||
Value::Array(arr) => {
|
|
||||||
let idx: usize = last
|
|
||||||
.parse()
|
|
||||||
.map_err(|_| format!("bad array index {last}"))?;
|
|
||||||
if idx > arr.len() {
|
|
||||||
return Err(format!("remove index out of bounds {idx}"));
|
|
||||||
}
|
|
||||||
arr.remove(idx);
|
|
||||||
}
|
|
||||||
_ => return Err("remove parent not container".into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(state)
|
|
||||||
}
|
|
||||||
|
|
@ -1,51 +0,0 @@
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ReserveReq {
|
|
||||||
pub author: String,
|
|
||||||
hint_paths: Option<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ReserveRes {
|
|
||||||
pub reservation_id: String,
|
|
||||||
pub base_version: u64,
|
|
||||||
pub expires_at: i64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
||||||
pub struct TxCommitReq {
|
|
||||||
pub reservation_id: String,
|
|
||||||
pub tx_id: String,
|
|
||||||
pub base_version: u64,
|
|
||||||
pub author: String,
|
|
||||||
pub ops: Vec<PatchOp>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct TxCommitRes {
|
|
||||||
pub doc_id: String,
|
|
||||||
pub tx_id: String,
|
|
||||||
pub committed_version: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
|
||||||
#[serde(tag = "op")]
|
|
||||||
pub enum PatchOp {
|
|
||||||
#[serde(rename = "replace")]
|
|
||||||
Replace { path: String, value: Value },
|
|
||||||
#[serde(rename = "add")]
|
|
||||||
Add { path: String, value: Value },
|
|
||||||
#[serde(rename = "remove")]
|
|
||||||
Remove { path: String },
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
pub struct Reservation {
|
|
||||||
pub reservation_id: String,
|
|
||||||
pub doc_id: String,
|
|
||||||
pub author: String,
|
|
||||||
pub base_version: u64,
|
|
||||||
pub expires_at: i64,
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue