Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-02-27 14:09:21 +07:00
commit 551f4ec3ab
14 changed files with 5227 additions and 0 deletions

4
.dockerignore Normal file
View file

@ -0,0 +1,4 @@
*.json
*.txt
.env
target

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
*.json
.env
target
*.txt

3220
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

24
Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
[package]
name = "server-mark2-dev"
version = "0.1.0"
edition = "2024"
[dependencies]
async-compression = { version = "0.4.39", features = ["tokio", "brotli"] }
axum = { version = "0.8.8", features = ["ws"] }
axum-streams = { version = "0.24.0", features = ["json"] }
celes = "2.6.0"
chrono = "0.4.43"
crossbeam-queue = "0.3.12"
dotenv = "0.15.0"
futures = "0.3.32"
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
rayon = "1.11.0"
redis = { version = "1.0.2", features = ["tokio-comp"] }
reqwest = "0.13.1"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
tokio = { version = "1.49.0", features = ["full"] }
tokio-cron-scheduler = "0.15.1"
tokio-stream = "0.1.18"
uuid = { version = "1.20.0", features = ["v4"] }

29
Dockerfile Normal file
View file

@ -0,0 +1,29 @@
FROM rust:latest AS builder
RUN apt-get update && apt-get install -y \
pkg-config \
libssl-dev
WORKDIR /app
ENV CARGO_BUILD_JOBS=1
COPY Cargo.toml Cargo.lock ./
RUN mkdir src && \
echo "fn main(){}" > src/main.rs && \
cargo build --release -j 1&& \
rm -rf src
COPY src ./src
RUN cargo build --release -j 1
FROM gcr.io/distroless/cc-debian12
COPY --from=builder /app/target/release/server-mark2-dev /usr/local/bin/
EXPOSE 36579
CMD [ "server-mark2-dev" ]

345
src/cold_start.rs Normal file
View file

@ -0,0 +1,345 @@
use std::{collections::HashMap, fs::File};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use celes::Country;
use redis::TypedCommands;
use crate::AppState;
#[allow(non_snake_case)]
#[derive(Serialize, Deserialize, Clone)]
pub struct CountryInfo {
image: String,
Brand: String,
Country: String,
VendingClass: String,
Machinecompatible: String,
MateriallistProfile: Vec<CountryInfoProfileDetail>,
#[serde(flatten)]
extra: HashMap<String, serde_json::Value>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct CountryInfoProfileDetail {
json: String,
img: String,
desc: String,
#[serde(flatten)]
extra: HashMap<String, serde_json::Value>,
}
impl CountryInfo {
pub fn new(country_code: String, brand: Option<String>) -> CountryInfo {
let country = match Country::from_alpha3(&country_code.clone()) {
Ok(c) => c,
Err(_) => {
if country_code.eq("dubai") {
Country::the_united_arab_emirates()
} else {
Country::thailand()
}
}
};
CountryInfo {
image: format!("taobin_project/logo/{country_code}_plate.png"),
Brand: brand.unwrap_or("".to_string()),
Country: country.long_name.to_string(),
VendingClass: String::from("coffeethai02"),
Machinecompatible: String::from("GEN2 and GEN32"),
MateriallistProfile: vec![CountryInfoProfileDetail {
json: String::from("vending_setting_and_profile_v1.json"),
img: String::from("vending_setting_and_profile_v1.png"),
desc: String::new(),
extra: HashMap::default(),
}],
extra: HashMap::default(),
}
}
}
async fn get_root_files(state: AppState) -> Result<Value, Box<dyn std::error::Error>> {
let api_header = state.get_cfg().get_api_header();
let mut ret_result = serde_json::Value::Null;
let client = reqwest::Client::new();
let res = client
.get("http://localhost:36584/checkout?path=")
.header(api_header.0, api_header.1)
.send()
.await;
match res {
Ok(res) => {
if let Some(ct) = res.headers().get("content-type")
&& ct.eq("application/json")
{
let raw = res.text().await;
if let Ok(raw) = raw {
let result: serde_json::Value =
serde_json::from_str(&raw).unwrap_or(serde_json::Value::Null);
let mut redis_client = state.clone().redis_cli;
let _ = redis_client.set("root_repo", result.to_string());
ret_result = result.clone();
println!("setup next");
tokio::spawn(async move {
let s1 = setup_after_get_root(state.clone(), result.clone())
.await
.ok();
println!("checkpoint 1: {}", s1.is_some());
if let Some((country_with_version, country_mapping)) = s1 {
println!("entries: {}", country_with_version.len());
let _ = get_all_file_path_of_legit_country(
state.clone(),
country_with_version,
country_mapping,
)
.await;
}
});
}
}
}
Err(e) => {
println!("Error on root fetch: {e}");
}
}
Ok(ret_result)
}
async fn setup_after_get_root(
state: AppState,
roots: serde_json::Value,
) -> Result<(Vec<String>, HashMap<String, String>), Box<dyn std::error::Error>> {
let mut legit_country_with_version = Vec::new();
let mut country_version_mapping = HashMap::new();
if let Some(map) = roots.as_object()
&& let Some(res) = map.get("result")
{
let fds: Vec<String> = res
.as_array()
.unwrap_or(&Vec::new())
.iter()
.map(|x| x.as_str().unwrap_or("").to_string())
.collect();
println!("pre_loop: {fds:?}");
// TODO: build in pattern `<country_name>/version`
// if get response ok, save
// NOTE: filter country
let api_header = state.get_cfg().get_api_header();
for fd in fds {
println!("checking {fd}");
// try GET
let client = reqwest::Client::new();
let res = client
.get(format!("http://localhost:36584/checkout?path={fd}/version"))
.header(api_header.clone().0, api_header.clone().1)
.send()
.await;
if let Ok(r) = res
&& let Some(ct) = r.headers().get("content-type")
&& r.status().eq(&reqwest::StatusCode::OK)
&& ct.eq("application/json")
&& let Ok(txt) = r.text().await
{
println!("{fd}.version = {txt}");
//
let vres: HashMap<String, String> = serde_json::from_str(&txt).unwrap();
let vv = vres
.get("result")
.map(|x| x.to_string())
.unwrap_or("".to_string());
// get version of country
let mut rcli = state.clone().redis_cli;
let _ = rcli.set(format!("{fd}.version"), vv.clone());
// generate all file paths
legit_country_with_version.push(fd.clone());
country_version_mapping.insert(fd.clone(), vv.clone());
}
}
}
Ok((legit_country_with_version, country_version_mapping))
}
async fn get_all_file_path_of_legit_country(
state: AppState,
legit_countries: Vec<String>,
country_mapping: HashMap<String, String>,
) -> Result<(), Box<dyn std::error::Error>> {
let api_header = state.get_cfg().get_api_header();
// save all entries of each country
for country in legit_countries {
let client = reqwest::Client::new();
let res = client
.get(format!("http://localhost:36584/checkout?path={country}"))
.header(api_header.clone().0, api_header.clone().1)
.send()
.await;
if let Ok(r) = res
&& let Some(ct) = r.headers().get("content-type")
&& r.status().eq(&reqwest::StatusCode::OK)
&& ct.eq("application/json")
&& let Ok(txt) = r.text().await
{
// get version of country & persist save
let mut rcli = state.clone().redis_cli;
let _ = rcli.set(country.clone(), txt.clone());
// generate all file paths
println!("{country} ready!");
let files: HashMap<String, Vec<String>> =
serde_json::from_str(&txt.clone()).unwrap_or(HashMap::new());
// stream content
let _ = rcli.publish(
"recipe_files_by_country",
json!({country.clone() : files}).to_string(),
);
if let Some(fl) = files.get("result") {
let has_info = fl.contains(&".info.json".to_string());
println!("{country} has info: {has_info}");
// read version
let current_latest_version = country_mapping
.get(&country)
.map(|x| x.to_string())
.unwrap_or("unknown".to_string());
let latest_version_file: Vec<String> = fl
.iter()
.filter(|x| x.contains(&current_latest_version))
.map(|x| x.to_string())
.collect();
if !has_info {
// generate info for country
let _ = generate_country_info_default(state.clone(), country.clone()).await;
} else {
let _ = fetch_country_info(state.clone(), country.clone()).await;
}
// do fetch latest version into redis
if let Some(single) = latest_version_file.first() {
let res_c = client
.get(format!(
"http://localhost:36584/checkout?path={}/{single}",
country.clone()
))
.header(api_header.clone().0, api_header.clone().1)
.send()
.await;
if let Ok(latest_raw) = res_c
&& let Ok(latest_raw_txt) = latest_raw.text().await
{
println!("cached {single}");
let _ = rcli.set(
format!("{}/{}", country.clone(), single.clone()),
latest_raw_txt,
);
}
}
}
}
}
Ok(())
}
async fn generate_country_info_default(
state: AppState,
cc: String,
) -> Result<(), Box<dyn std::error::Error>> {
let country_info = match cc.as_str() {
"sgp" | "dubai" => CountryInfo::new(cc.clone(), Some("WhatTheCup".to_string())),
"gbr" | "aus" | "hkg" | "rou" | "lva" | "est" | "etu" => {
CountryInfo::new(cc.clone(), Some("Flying Turtle".to_string()))
}
_ => CountryInfo::new(cc.clone(), Some("Taobin".to_string())),
};
// save country info
let mut rcli = state.clone().redis_cli;
let _ = rcli.set(format!("{cc}.info"), serde_json::to_string(&country_info)?);
// save local
let json = serde_json::to_string(&country_info)?;
let json2: serde_json::Value = serde_json::from_str(&json)?;
let writer = File::create(format!(".info.{cc}.json")).unwrap();
let _ = serde_json::to_writer_pretty(writer, &json2);
Ok(())
}
async fn fetch_country_info(state: AppState, cc: String) -> Result<(), Box<dyn std::error::Error>> {
let api_header = state.get_cfg().get_api_header();
let client = reqwest::Client::new();
let res = client
.get(
state
.get_cfg()
.get_file_from_recipe_repo(format!("{cc}/.info.json")),
)
.header(api_header.clone().0, api_header.clone().1)
.send()
.await;
if let Ok(r) = res
&& let Some(ct) = r.headers().get("content-type")
&& r.status().eq(&reqwest::StatusCode::OK)
&& ct.eq("application/json")
&& let Ok(txt) = r.text().await
{
let mut rcli = state.clone().redis_cli;
let info: CountryInfo =
serde_json::from_str(&txt.clone()).unwrap_or(CountryInfo::new(cc.clone(), None));
let _ = rcli.set(format!("{cc}.info"), serde_json::to_string(&info)?);
let json = serde_json::to_string(&info)?;
let json2: serde_json::Value = serde_json::from_str(&json)?;
let writer = File::create(format!(".info.{cc}.json")).unwrap();
let _ = serde_json::to_writer_pretty(writer, &json2);
}
Ok(())
}
pub async fn cold_start_process(state: AppState) -> Result<(), Box<dyn std::error::Error>> {
let ostate = state.clone();
println!("starting cold process");
let _ = tokio::spawn(async move {
match get_root_files(ostate).await {
Ok(res) => {
println!("cold start ok, {}", res);
}
Err(e) => {
println!("cold start error: {e}");
}
}
})
.await;
Ok(())
}

852
src/main.rs Normal file
View file

@ -0,0 +1,852 @@
use std::{
collections::VecDeque, env, fs::File, io::Read, path::PathBuf, sync::Arc, time::Duration,
};
use async_compression::tokio::bufread::BrotliDecoder;
use axum::{
Json, Router,
extract::{
State, WebSocketUpgrade,
ws::{CloseFrame, Message, WebSocket},
},
response::IntoResponse,
routing::{get, post},
};
use futures::{
SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use redis::{TypedCommands, cmd};
use serde::{Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, BufReader},
sync::{
Mutex,
mpsc::{self, Receiver, Sender},
},
};
use uuid::Uuid;
use crate::{
stream::model::{
IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart,
},
tx::handler::create_tx_patcher_route,
};
// mod cold_start;
mod stream;
mod tx;
// features
// - get result from recipe_repo
// - store in redis
// - cron job fetch update
#[derive(Clone)]
pub struct DevConfig {
api_key: String,
api_domain: String,
api_recipe_service: String,
api_redis_url: String,
}
impl DevConfig {
pub fn new(
key: String,
domain: String,
rp_service: String,
api_redis_url: String,
) -> DevConfig {
DevConfig {
api_key: key,
api_domain: domain,
api_recipe_service: rp_service,
api_redis_url,
}
}
pub fn get_recipe_url(&self) -> String {
format!("{}{}", self.api_domain, self.api_recipe_service)
}
pub fn get_file_from_recipe_repo(&self, path: String) -> String {
format!("{}/checkout?path={}", self.get_recipe_url(), path)
}
pub fn get_api_header(&self) -> (String, String) {
("X-API-Key".to_string(), self.api_key.clone())
}
}
// async fn test_send(dev_cfg: DevConfig) -> Result<(), Box<dyn std::error::Error>> {
// let api_header = dev_cfg.get_api_header();
// let client = reqwest::Client::new();
// let res = client.get(dev_cfg.get_file_from_recipe_repo(String::new())).header(api_header.0, api_header.1).send().await?;
// println!("headers: {:?}", res.headers());
// if let Some(ct) = res.headers().get("content-type") && ct.eq("application/json"){
// let raw = res.text().await?;
// let result: serde_json::Value = serde_json::from_str(&raw)?;
// println!("raw response: {result:?}");
// //redis-cli -h 100.120.136.127 -p 6379
// let mut redis_client = redis::Client::open("redis://100.120.136.127:6379")?;
// let _ = redis_client.set("root_repo", result.to_string());
// }
// Ok(())
// }
pub async fn create_recipe_repo_router() -> Router<Arc<AppState>> {
Router::new()
// .route("/", get(get_root_files))
.route("/ws", get(websocket_handler))
// .route("/edit", post())
// .route("/{country}/", method_router)
}
#[derive(Debug, Serialize, Deserialize)]
struct SysMessage {
#[serde(rename = "type")]
stype: String,
payload: serde_json::Value,
}
async fn post_from_other_system(
State(mut state): State<Arc<AppState>>,
Json(msg): Json<SysMessage>,
) -> impl IntoResponse {
let sys_payload = match serde_json::to_value(&msg) {
Ok(s) => s,
Err(_) => {
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"cannot create payload",
)
.into_response();
}
};
match state.system_tx.send(sys_payload) {
Ok(_) => (axum::http::StatusCode::OK, "").into_response(),
Err(_) => (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "send fail").into_response(),
}
}
async fn websocket_handler(
State(mut state): State<Arc<AppState>>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
let state_clone = Arc::clone(&state);
ws.on_failed_upgrade(|error| println!("Error upgrading websocket: {}", error))
.on_upgrade(async |s| handle_socket(s, state_clone).await.unwrap_or(()))
}
async fn handle_socket(
mut socket: WebSocket,
mut state: Arc<AppState>,
) -> Result<(), Box<dyn std::error::Error>> {
let (mut sender, mut receiver) = socket.split();
// internal channel
let (tx, mut rx) = mpsc::channel::<serde_json::Value>(2);
let user_sys_rx = state.system_tx.subscribe();
tokio::spawn(write(sender, rx));
tokio::spawn(read(state, receiver, tx.clone(), user_sys_rx));
// NOTE:
// while let Some(msg) = socket.recv().await {
// if let Ok(msg) = msg {
// match msg {
// Message::Text(utf8_bytes) => {
// println!("text recv: {utf8_bytes}");
// let req = utf8_bytes.clone().as_str().to_string();
// let req_struct: serde_json::Value = match serde_json::from_str(&req) {
// Ok(r) => r,
// Err(_) => serde_json::Value::Null,
// };
// // response
// let n_state = state.clone();
// let response =
// handle_recipe_websocket_message(req_struct, n_state, &mut socket).await?;
// let response_next = match serde_json::to_string(&response) {
// Ok(p) => p,
// Err(_) => "{'result': 'error'}".to_string(),
// };
// let result = socket.send(Message::Text(response_next.into())).await;
// if let Err(error) = result {
// println!("Error sending: {error}");
// send_close_message(socket, 1011, &format!("Error occured: {error}")).await;
// break;
// }
// }
// Message::Binary(bytes) => {
// println!("recv bytes len: {}", bytes.len());
// let result = socket
// .send(Message::Text(
// format!("recv bytes len: {}", bytes.len()).into(),
// ))
// .await;
// if let Err(error) = result {
// println!("Error sending: {error}");
// send_close_message(socket, 1011, &format!("Error occured: {error}")).await;
// break;
// }
// }
// _ => {}
// }
// } else {
// let error = msg.err().unwrap();
// println!("Error while receiving message: {error:?}");
// send_close_message(socket, 1011, &format!("Error occured: {error}")).await;
// break;
// }
// }
Ok(())
}
async fn send_close_message(mut socket: WebSocket, code: u16, reason: &str) {
_ = socket
.send(Message::Close(Some(CloseFrame {
code,
reason: reason.into(),
})))
.await;
}
async fn fetch_content_from_redis(redis: redis::Client, key: &str) -> Result<String, String> {
let mut rcli = redis.clone();
match rcli.get(key) {
Ok(s) => {
if let Some(res) = s {
Ok(res)
} else {
Err(format!("result error from key: {key}"))
}
}
Err(e) => Err(format!("redis get failed: {e}")),
}
}
async fn fetch_content_from_redis_byte(redis: redis::Client, key: &str) -> Result<Vec<u8>, String> {
let mut conn = match redis.get_connection() {
Ok(cnn) => cnn,
Err(e) => {
println!("get connection fail, {e}");
return Ok(vec![]);
}
};
let res = cmd("GET").arg(key).query::<Vec<u8>>(&mut conn);
match res {
Ok(res) => Ok(res),
Err(e) => {
println!("get fail, {e}");
return Ok(vec![]);
}
}
// match res {
// Ok(s) => {
// if let Some(res) = s {
// Ok(res.as_bytes().to_vec())
// } else {
// Err(format!("result error from key: {key}"))
// }
// }
// Err(e) => Err(format!("redis get failed: {e}")),
// }
//
}
#[derive(Serialize, Deserialize)]
struct WebsocketMessageRequest {
#[serde(rename = "type")]
type_w: String,
payload: Option<serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct RecipeRequestPayload {
/// For validate request is acceptable
auth: String,
/// Only grep partial of file, will be sent with json patch
partial: bool,
/// Country of recipe
country: String,
/// version of recipe
version: i64,
/// Extended infos, required parameters or unimplemented fields in the current struct. Expected pattern `<key1>=<val1>,<key2>=<val2>,...`
parameters: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct CommandRequestPayload {
/// User info expect at least id, token, name
user_info: serde_json::Value,
/// Target service
srv_name: String,
/// Values
values: serde_json::Value,
}
fn convert_ack_command(cmd_req: &serde_json::Value) -> Option<CommandRequestPayload> {
match serde_json::from_value(cmd_req.clone()) {
Ok(req) => Some(req),
Err(_) => None,
}
}
// /// Result to send to client
// #[derive(Serialize, Deserialize)]
// struct RecipeOverview {
// productCode: String,
// name: Option<String>,
// description: Option<String>,
// tags: String,
// status: MenuStatus,
// }
// #[derive(Serialize, Deserialize)]
// enum MenuStatus {
// #[serde(rename = "ready")]
// Ready,
// #[serde(rename = "obsolete")]
// Obsolete,
// #[serde(rename = "drafted")]
// Drafted,
// Status(String), // additional status
// }
async fn read(
// redis: redis::Client,
mut state: Arc<AppState>,
mut receiver: SplitStream<WebSocket>,
tx: Sender<serde_json::Value>,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
// cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let redis = state.redis_cli.clone();
let tx_to_client = tx.clone();
tokio::spawn(async move {
// Send back to client from services
while let Ok(s_msg) = system_rx.recv().await {
if convert_ack_command(&s_msg).is_none()
&& let Some(err) = tx_to_client.send(s_msg).await.err()
{
println!("[SYS] failed to send back to client: {err}");
}
}
});
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(t) => {
let req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?;
match req.type_w.as_str() {
"recipe" if req.payload.is_some() => {
// guard expect value
let p = req.payload.unwrap();
let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?;
// get actual version
//
let latest_key =
format!("master:{country}/version", country = recipe_param.country);
println!("latest key: {latest_key}");
let latest_version =
match fetch_content_from_redis_byte(redis.clone(), &latest_key).await {
Ok(x) => {
// decode brotli
let mut sbuf = String::new();
let mut decoder = BrotliDecoder::new(x.as_slice());
match decoder.read_to_string(&mut sbuf).await {
Ok(_) => sbuf.replace('"', ""),
Err(e) => {
println!("decode fail: {e}");
"".to_string()
}
}
}
Err(e) => {
println!("get latest fail: {e}");
"".to_string()
}
};
let req_file = if is_req_patch(&recipe_param) {
format!(
"stx_{country}_{version}.json",
country = recipe_param.country,
version = latest_version
)
} else {
format!(
"result.{country}.{version}.json",
country = recipe_param.country,
version = latest_version
)
};
let mut retry_cnt = 0;
let sid = Uuid::new_v4();
println!("init req: {req_file}");
match get_local_file(req_file) {
Ok(mut f) => {
let mut file_content = String::new();
f.read_to_string(&mut file_content)?;
// split send
let recipe: Recipe = serde_json::from_str(&file_content)?;
// concat all recipes including subs
let r01s: Vec<Recipe01> = recipe
.Recipe01
.par_iter()
.flat_map(|x| {
let mut v = Vec::new();
v.push(x.clone());
if let Some(sub) = x.clone().SubMenu {
v.extend(sub);
}
v
})
.collect();
let matset: Vec<MaterialSetting> = recipe.MaterialSetting.clone();
let ss = StreamDataStart::new(
r01s.len(),
100,
Some("local".to_string()),
);
let sid = ss.get_id();
if let Some(err) = tx.send(ss.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
for (index, chunk) in r01s.chunks(100).enumerate() {
let sda =
StreamDataChunk::new(&sid, index * 100, chunk.to_vec());
// no validate
if let Some(err) = tx.send(sda.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
let mat_exid = sid.clone();
let extp = "matset";
for (index, chunk) in matset.chunks(100).enumerate() {
let curr_ch_id = format!("{mat_exid}_{index}");
let extra_matset =
StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec());
if let Some(err) = tx.send(extra_matset.as_msg()).await.err() {
println!("ERR: send tx extra error: {err:?}");
}
}
let end_msg = StreamDataEnd::new(&sid);
if let Some(err) = tx.send(end_msg.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
Err(_) => {
// savable sid means it could find data but not guarantee if sendable
let mut success_sid = String::new();
// concurrent fetch
for i in 1..5 {
retry_cnt = i;
// retry #1: get from redis
let r1_key = get_key_cache(
recipe_param.country.clone(),
latest_version.to_string(),
is_req_patch(&recipe_param),
retry_cnt,
);
println!("curr key: {r1_key}");
if retry_cnt < 3 {
match fetch_content_from_redis_byte(redis.clone(), &r1_key)
.await
{
Ok(res) => {
let buf = BufReader::new(res.as_slice());
let mut sbuf = String::new();
let mut decoder = BrotliDecoder::new(buf);
if let Ok(_) =
decoder.read_to_string(&mut sbuf).await
{
let recipe: Recipe =
serde_json::from_str(&sbuf)?;
let r01s: Vec<Recipe01> = recipe
.Recipe01
.par_iter()
.flat_map(|x| {
let mut v = Vec::new();
v.push(x.clone());
if let Some(sub) = x.clone().SubMenu {
v.extend(sub);
}
v
})
.collect();
let matset: Vec<MaterialSetting> =
recipe.MaterialSetting.clone();
// test stream start model
let ss = StreamDataStart::new(
r01s.len(),
100,
Some("redis".to_string()),
);
let sid = ss.get_id();
success_sid = sid.clone();
if let Some(err) =
tx.send(ss.as_msg()).await.err()
{
println!("ERR: send tx error, {err:?}");
}
// split send
for (index, chunk) in
r01s.chunks(100).enumerate()
{
let sda = StreamDataChunk::new(
&sid,
index * 100,
chunk.to_vec(),
);
// no validate
if let Some(err) =
tx.send(sda.as_msg()).await.err()
{
println!("ERR: send tx error, {err:?}");
}
}
let mat_exid = sid.clone();
let extp = "matset";
for (index, chunk) in
matset.chunks(100).enumerate()
{
let curr_ch_id =
format!("{mat_exid}_{index}");
let extra_matset = StreamDataExtra::new(
&curr_ch_id,
&extp,
chunk.to_vec(),
);
if let Some(err) = tx
.send(extra_matset.as_msg())
.await
.err()
{
println!(
"ERR: send tx extra error: {err:?}"
);
}
}
let rp_clone = recipe.clone();
tokio::task::spawn(async move {
rp_clone.export_to_json_file(Some(
format!(
"result.{country}.{version}.json",
country = recipe_param.country,
version = latest_version
),
));
});
break;
}
}
Err(_) => {}
}
}
}
let end_msg = StreamDataEnd::new(&success_sid);
if let Some(err) = tx.send(end_msg.as_msg()).await.err() {
println!("ERR: send tx error, {err:?}");
}
}
}
}
"command" if req.payload.is_some() => {
// do command send to other services
// // guard expect value
let p = req.payload.unwrap();
// TODO
// - Queue requests
// - Send if service available
if let Some(_) = state.system_tx.send(p).err() {
let _ = tx
.send(serde_json::json!({
"type": "notify",
"payload": {
"from": "system_tx",
"level": "error",
"msg": "send request fail",
"to": ""
}
}))
.await;
}
}
_ => {
// not implemented
}
}
}
_ => {
// unhanled, ignore
}
}
}
Ok(())
}
async fn write(
mut sender: SplitSink<WebSocket, Message>,
mut rx: Receiver<serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
while let Some(res) = rx.recv().await {
// no check
// println!("sending {res:?}");
let _ = sender.send(res.to_string().into()).await;
let _ = tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
fn is_req_patch(param: &RecipeRequestPayload) -> bool {
param.version != -1 && param.partial
}
fn get_local_file(filename: String) -> Result<File, std::io::Error> {
File::open(PathBuf::from(filename))
}
fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt: i32) -> String {
if is_patch {
format!("stx_{country}_{version}.json")
} else {
match retry_cnt {
1 => {
format!("master:{country}/coffeethai02_{version}_{country}.json")
}
2 => {
format!("master:{country}/coffeethai02_{version}.json")
}
3 => {
// do checkout
format!("{country}/coffeethai02_{country}_{version}.json")
}
4 => {
// do checkout
format!("{country}/coffeethai02_{version}.json")
}
_ => "".to_string(),
}
}
}
pub struct AppState {
dev_config: DevConfig,
redis_cli: redis::Client,
system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
}
impl AppState {
pub fn get_cfg(&self) -> DevConfig {
self.dev_config.clone()
}
pub async fn new(
dev_config: DevConfig,
redis_cli: redis::Client,
system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
) -> Arc<AppState> {
let redis_cli_clone = redis_cli.clone();
let tx_new = system_tx.clone();
let result = Arc::new(AppState {
dev_config,
redis_cli,
system_tx,
});
tokio::spawn(async move {
let mut lredis = redis_cli_clone.clone();
let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> =
crossbeam_queue::ArrayQueue::new(1);
let mut pending_command: VecDeque<CommandRequestPayload> = VecDeque::new();
let mut check_available_path = String::new();
loop {
if let Ok(rmsg) = system_rx.recv().await {
// add queue process
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
Ok(cmd) => cmd,
Err(_) => return, // reject
};
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
if pending_command.len() < 10 {
pending_command.push_back(fail_payload)
} else {
let user_name = fail_payload.user_info.get("name").unwrap_or_default();
let _ = tx_new.send(serde_json::json!({
"type": "notify",
"payload": {
"from": "system_tx",
"msg": "request queue full, try again later",
"level": "ERR",
"to": user_name,
}
}));
}
} else {
// set check to latest push to queue ok
check_available_path = format!("{}/status", command_req.srv_name);
}
}
// send process
if let Ok(Some(status)) = lredis.get(&check_available_path) {
match status.as_str() {
"ok" | "OK" | "Ok" => {
//
if current_queue.is_full()
&& let Some(cmd) = current_queue.pop()
{
// get one
let channel = format!("{}/job", cmd.srv_name);
let _ = lredis.publish(
channel,
serde_json::to_string(&cmd).unwrap_or("{}".to_string()),
);
// queue next
if let Some(next_cmd) = pending_command.pop_front() {
check_available_path = format!("{}/status", next_cmd.srv_name);
// ignore result
let _ = current_queue.push(next_cmd);
} else {
check_available_path = String::new();
}
} else if current_queue.is_empty() {
check_available_path = String::new();
}
}
_ => {}
}
} else if current_queue.is_empty()
&& let Some(next_cmd) = pending_command.pop_front()
{
// case empty queue, fetch next
check_available_path = format!("{}/status", next_cmd.srv_name);
// ignore result
let _ = current_queue.push(next_cmd);
}
// else if current_queue.is_full() && pending_command.len() >= 10 {
// // suspect too many request, redis may be disconnected then cancel all
// println!("too many requests to wait");
// let _ = current_queue.pop();
// pending_command.clear();
// let _ = tx_new.send(value);
// }
}
});
result
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Hello, world!");
dotenv::dotenv().ok();
// send req to repo service
let server_port = env::var("PORT").unwrap_or("36579".to_string());
let api_key = env::var("DEV_API_KEY").expect("no api key");
let api_domain = env::var("DEV_API_DOMAIN").expect("no domain");
let api_recipe_service = env::var("DEV_API_RECIPE_SERVICE").expect("no service");
let api_redis = env::var("DEV_API_REDIS").unwrap_or("0.0.0.0".to_string());
let api_redis_port = env::var("DEV_API_REDIS_PORT").unwrap_or("6379".to_string());
let dev_cfg = DevConfig::new(
api_key,
api_domain,
api_recipe_service,
format!("redis://{api_redis}:{api_redis_port}"),
);
// test_send(dev_cfg).await?;
//
let redis_cli = redis::Client::open(dev_cfg.api_redis_url.clone())?;
let (sys_tx, mut sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
let app_state = AppState::new(dev_cfg, redis_cli, sys_tx, sys_rx).await;
let rp_router = create_recipe_repo_router().await;
let doc_router = create_tx_patcher_route().await;
let app = Router::new()
// .route("/sessionLogin", post(session_login))
.route("/syscb", post(post_from_other_system))
.nest("/recipe", rp_router)
.nest("/docs", doc_router)
.with_state(app_state);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}")).await?;
axum::serve(listener, app).await?;
Ok(())
}

1
src/stream/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod model;

179
src/stream/model.rs Normal file
View file

@ -0,0 +1,179 @@
use rayon::iter::Either;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub trait IntoStreamMessage {
const MSG_NAME: &str;
fn build(&self) -> serde_json::Value;
fn get_id(&self) -> String;
}
/// Pre-flight metadata for streaming
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamDataStart {
/// Uuid v4, client must mapping later values with this stream id
pub stream_id: String,
/// Total amount items known
pub total_size: usize,
///
pub chunk_size: usize,
/// referer for checking steps in debugging
#[serde(rename = "ref")]
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_ref: Option<String>,
}
impl IntoStreamMessage for StreamDataStart {
const MSG_NAME: &str = "stream_data_start";
fn build(&self) -> serde_json::Value {
serde_json::json!({
"type": StreamDataStart::MSG_NAME,
"payload": self.clone()
})
}
fn get_id(&self) -> String {
self.stream_id.clone()
}
}
impl StreamDataStart {
pub fn new(total_size: usize, chunk_size: usize, stream_ref: Option<String>) -> Self {
Self {
stream_id: Uuid::new_v4().to_string(),
total_size,
chunk_size,
stream_ref,
}
}
pub fn as_msg(&self) -> serde_json::Value {
self.build()
}
}
/// Data splited into chunks not exceeding expected size
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamDataChunk<T> {
/// Uuid v4, client must mapping later values with this stream id
pub stream_id: String,
/// Actual index of first item in this chunk from full data
pub start_idx: usize,
/// Chunked data which splited into N items per chunk
pub data: Vec<T>,
}
impl<T> IntoStreamMessage for StreamDataChunk<T>
where
T: Serialize,
{
const MSG_NAME: &str = "stream_data_chunk";
fn build(&self) -> serde_json::Value {
serde_json::json!({
"type": StreamDataChunk::<T>::MSG_NAME,
"payload": self
})
}
fn get_id(&self) -> String {
self.stream_id.clone()
}
}
impl<T> StreamDataChunk<T>
where
T: Serialize,
{
pub fn new(sid: &str, start_idx: usize, data: Vec<T>) -> Self {
Self {
stream_id: sid.to_string(),
start_idx,
data,
}
}
pub fn as_msg(&self) -> serde_json::Value {
self.build()
}
}
/// Close message for signaling end of streaming
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamDataEnd {
/// Uuid v4, client must mapping later values with this stream id
pub stream_id: String,
}
impl IntoStreamMessage for StreamDataEnd {
const MSG_NAME: &str = "stream_data_end";
fn build(&self) -> serde_json::Value {
serde_json::json!({
"type": StreamDataEnd::MSG_NAME,
"payload": self.clone()
})
}
fn get_id(&self) -> String {
self.stream_id.clone()
}
}
impl StreamDataEnd {
pub fn new(sid: &str) -> Self {
Self {
stream_id: sid.to_string(),
}
}
pub fn as_msg(&self) -> serde_json::Value {
self.build()
}
}
/// Extra data send along with previous sent chunks,
/// must send before end msg.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StreamDataExtra<T> {
/// Uuid v4, client must mapping later values with this stream id
pub exid: String,
pub extp: String,
pub payload: Vec<T>,
}
impl<T> IntoStreamMessage for StreamDataExtra<T>
where
T: Serialize,
{
const MSG_NAME: &str = "stream_data_extra";
fn build(&self) -> serde_json::Value {
serde_json::json!({
"type": StreamDataExtra::<T>::MSG_NAME,
"payload": self
})
}
fn get_id(&self) -> String {
self.exid.clone()
}
}
impl<T> StreamDataExtra<T>
where
T: Serialize + Clone,
{
pub fn new(exid: &str, extp: &str, data: Vec<T>) -> Self {
Self {
exid: exid.to_string(),
extp: extp.to_string(),
payload: data.to_vec(),
}
}
pub fn as_msg(&self) -> serde_json::Value {
self.build()
}
}

328
src/tx/handler.rs Normal file
View file

@ -0,0 +1,328 @@
use std::sync::Arc;
use axum::{
Json, Router,
extract::{Path, Query, State},
response::IntoResponse,
routing::{get, post},
};
use redis::{AsyncTypedCommands, TypedCommands, aio::MultiplexedConnection};
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::{Value, json};
use uuid::Uuid;
use crate::{
AppState,
tx::{
helpers::*,
patcher::apply_ops,
types::{PatchOp, Reservation, ReserveReq, ReserveRes, TxCommitReq, TxCommitRes},
},
};
async fn reserve_tx(
State(state): State<Arc<AppState>>,
Path(doc_id): Path<String>,
Json(req): Json<ReserveReq>,
) -> impl IntoResponse {
let rsv_id = Uuid::new_v4().to_string();
let ttl_secs: i64 = 30; // ttl reserve 30s
let expires_at = now_unix() + ttl_secs;
let mut rcli = state.redis_cli.clone();
// check version
let ver_key = doc_ver_key(&doc_id);
let base_version: u64 = match rcli.get(ver_key) {
Ok(res) => {
if let Some(r) = res {
r.parse().ok().unwrap()
} else {
0
}
}
Err(_) => 0,
};
let rsv = Reservation {
reservation_id: rsv_id.clone(),
doc_id: doc_id.clone(),
author: req.author,
base_version,
expires_at,
};
let key = rsv_key(&doc_id, &rsv_id);
let payload = serde_json::to_string(&rsv).unwrap();
let _: () = rcli.set_ex(key, payload, ttl_secs as u64).unwrap();
(
StatusCode::OK,
Json(ReserveRes {
reservation_id: rsv_id,
base_version,
expires_at,
}),
)
}
async fn commit_tx(
State(state): State<Arc<AppState>>,
Path(doc_id): Path<String>,
Json(req): Json<TxCommitReq>,
) -> impl IntoResponse {
if req.tx_id.is_empty() || req.reservation_id.is_empty() {
return (
StatusCode::BAD_REQUEST,
"missing tx_id or reservation_id ".into_response(),
);
}
let mut con = match state.redis_cli.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("{}", e.to_string()).into_response(),
);
}
};
// lock commit
let lock_key = commit_lock_key(&doc_id);
if let Err(_) = wait_lock(&mut con, &lock_key, 1200, 20).await {
return (StatusCode::CONFLICT, "doc busy, retry".into_response());
}
let txid_k = txid_key(&doc_id, &req.tx_id);
if let Ok(Some(v)) = con.get(&txid_k).await {
if let Ok(committed_version) = v.parse::<u64>() {
let res = TxCommitRes {
doc_id,
tx_id: req.tx_id,
committed_version,
};
return (StatusCode::OK, Json(res).into_response());
}
}
let rsv_k = rsv_key(&doc_id, &req.reservation_id);
let rsv_raw: Option<String> = con.get(&rsv_k).await.unwrap_or(None);
let rsv_raw = match rsv_raw {
Some(x) => x,
None => {
return (
StatusCode::CONFLICT,
"reservation missing/expired".into_response(),
);
}
};
let rsv: Reservation = match serde_json::from_str(&rsv_raw) {
Ok(x) => x,
Err(_) => return (StatusCode::CONFLICT, "bad reservation".into_response()),
};
if rsv.doc_id != doc_id || rsv.author != req.author || rsv.base_version != req.base_version {
return (StatusCode::CONFLICT, "reservation mismatch".into_response());
}
if now_unix() > rsv.expires_at {
return (StatusCode::CONFLICT, "reservation expired".into_response());
}
let curr_ver: u64 = con
.get(doc_ver_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if req.base_version != curr_ver {
let body = json!({
"error": "version_conflict",
"server_version": curr_ver,
});
return (StatusCode::CONFLICT, Json(body).into_response());
}
let next_ver = curr_ver + 1;
let stream_key = doc_tx_stream_key(&doc_id);
let tx_event = json!({
"doc_id": doc_id,
"tx_id": req.tx_id,
"version": next_ver,
"base_version": req.base_version,
"author": req.author,
"ts": now_unix(),
"ops": req.ops
});
let tx_event_str = tx_event.to_string();
let _: String = redis::cmd("XADD")
.arg(&stream_key)
.arg("*")
.arg("data")
.arg(&tx_event_str)
.query_async(&mut con)
.await
.unwrap();
let _: () = con
.set(doc_ver_key(&doc_id), next_ver.to_string())
.await
.unwrap();
let _: () = con.set(&txid_k, next_ver.to_string()).await.unwrap();
let _: usize = con.del(&rsv_k).await.unwrap();
let snapshot_every: u64 = 50;
if next_ver % snapshot_every == 0 {
if let Ok(state) = build_state_at(&mut con, &doc_id, next_ver).await {
let _: () = con
.set(doc_snap_key(&doc_id), state.to_string())
.await
.unwrap();
let _: () = con
.set(doc_snapver_key(&doc_id), next_ver.to_string())
.await
.unwrap();
}
}
(
StatusCode::OK,
Json(TxCommitRes {
doc_id,
tx_id: req.tx_id,
committed_version: next_ver,
})
.into_response(),
)
}
async fn build_state_at(
con: &mut MultiplexedConnection,
doc_id: &str,
target_version: u64,
) -> Result<Value, String> {
let snap_ver: u64 = con
.get(doc_snap_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let snap_json: Value = if snap_ver > 0 {
let raw: Option<String> = con.get(doc_snap_key(&doc_id)).await.unwrap_or(None);
raw.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_else(|| json!({}))
} else {
json!({})
};
let stream_key = doc_tx_stream_key(doc_id);
let start_id = "0-0";
let entries: Vec<(String, Vec<(String, String)>)> = redis::cmd("XRANGE")
.arg(&stream_key)
.arg(start_id)
.arg("+")
.query_async(con)
.await
.map_err(|e| e.to_string())?;
let mut ops_list: Vec<PatchOp> = Vec::new();
for (_id, fields) in entries {
let mut data_opts = None;
for (k, v) in fields {
if k == "data" {
data_opts = Some(v);
break;
}
}
let data = match data_opts {
Some(x) => x,
None => continue,
};
let ev: Value = serde_json::from_str(&data).map_err(|e| e.to_string())?;
let ver = ev.get("version").and_then(|v| v.as_u64()).unwrap_or(0);
if ver == 0 || ver > target_version {
continue;
}
if ver <= snap_ver {
continue;
}
let ops: Vec<PatchOp> =
serde_json::from_value(ev["ops"].clone()).map_err(|e| e.to_string())?;
ops_list.extend(ops);
}
apply_ops(snap_json, &ops_list)
}
#[derive(Deserialize)]
struct StateQuery {
at_version: Option<u64>,
}
async fn get_state(
State(state): State<Arc<AppState>>,
Path(doc_id): Path<String>,
Query(q): Query<StateQuery>,
) -> impl IntoResponse {
let rcli = state.redis_cli.clone();
let mut con = match rcli.get_multiplexed_async_connection().await {
Ok(sc) => sc,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
"cannot get connection".into_response(),
);
}
};
let cur_ver: u64 = con
.get(doc_ver_key(&doc_id))
.await
.ok()
.flatten()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let target = q.at_version.unwrap_or(cur_ver).min(cur_ver);
match build_state_at(&mut con, &doc_id, target).await {
Ok(state) => (
StatusCode::OK,
Json(json!({
"doc_id": doc_id,
"version": target,
"state": state
}))
.into_response(),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({
"error": e
}))
.into_response(),
),
}
}
pub async fn create_tx_patcher_route() -> Router<Arc<AppState>> {
Router::new()
.route("/{id}/tx-reserve", post(reserve_tx))
.route("/{id}/tx-commit", post(commit_tx))
.route("/{id}/state", get(get_state))
}

67
src/tx/helpers.rs Normal file
View file

@ -0,0 +1,67 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use redis::{RedisResult, aio::MultiplexedConnection};
use tokio::time::sleep;
use uuid::Uuid;
pub fn now_unix() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
pub fn doc_ver_key(doc: &str) -> String {
format!("doc:{doc}:ver")
}
pub fn doc_tx_stream_key(doc: &str) -> String {
format!("doc:{doc}:tx")
}
pub fn doc_snap_key(doc: &str) -> String {
format!("doc:{doc}:snap")
}
pub fn doc_snapver_key(doc: &str) -> String {
format!("doc:{doc}:snapver")
}
pub fn rsv_key(doc: &str, rsv: &str) -> String {
format!("doc:{doc}:rsv:{rsv}")
}
pub fn txid_key(doc: &str, tx_id: &str) -> String {
format!("doc:{doc}:txid:{tx_id}")
}
pub fn commit_lock_key(doc: &str) -> String {
format!("doc:{doc}:lock")
}
pub async fn acquire_lock(
con: &mut MultiplexedConnection,
key: &str,
ttl_ms: u64,
) -> RedisResult<bool> {
let token = Uuid::new_v4().to_string();
let ok: Option<String> = redis::cmd("SET")
.arg(key)
.arg(token)
.arg("NX")
.arg("PX")
.arg(ttl_ms)
.query_async(con)
.await?;
Ok(ok.is_some())
}
pub async fn wait_lock(
con: &mut MultiplexedConnection,
key: &str,
ttl_ms: u64,
attempts: u32,
) -> RedisResult<()> {
for _ in 0..attempts {
if acquire_lock(con, key, ttl_ms).await? {
return Ok(());
}
sleep(Duration::from_millis(30)).await;
}
Err(redis::RedisError::from((redis::ErrorKind::Io, "lock busy")))
}

4
src/tx/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod handler;
pub mod helpers;
pub mod patcher;
pub mod types;

119
src/tx/patcher.rs Normal file
View file

@ -0,0 +1,119 @@
use serde_json::Value;
use crate::tx::types::PatchOp;
pub fn json_pointer_parent<'a>(
root: &'a mut Value,
path: &str,
) -> Result<(&'a mut Value, String), String> {
if !path.starts_with('/') && path != "" {
return Err("path must be JSON pointer starting with '/'".into());
}
let part: Vec<String> = path
.split('/')
.skip(1)
.map(|x| x.replace("~1", "/").replace("~0", "~"))
.collect();
if path.is_empty() {
return Ok((root, "".into()));
}
let last = part.last().unwrap().clone();
let mut cur = root;
for key in &part[..part.len() - 1] {
match cur {
Value::Object(map) => {
cur = map
.get_mut(key)
.ok_or_else(|| format!("missing object key {key}"))?;
}
Value::Array(arr) => {
let idx: usize = key.parse().map_err(|_| format!("bad array idx {key}"))?;
cur = arr
.get_mut(idx)
.ok_or_else(|| format!("array index out of bounds {idx}"))?;
}
_ => return Err("cannot traverse non-container".into()),
}
}
Ok((cur, last))
}
pub fn apply_ops(mut state: Value, ops: &[PatchOp]) -> Result<Value, String> {
for op in ops {
match op {
PatchOp::Replace { path, value } => {
if path == "" || path == "/" {
state = value.clone();
continue;
}
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
if !map.contains_key(&last) {
return Err(format!("replace target missing key {last}"));
}
map.insert(last, value.clone());
}
Value::Array(arr) => {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx >= arr.len() {
return Err(format!("replace index out of bounds {idx}"));
}
arr[idx] = value.clone();
}
_ => return Err("replace parent not container".into()),
}
}
PatchOp::Add { path, value } => {
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
map.insert(last, value.clone());
}
Value::Array(arr) => {
if last == "-" {
arr.push(value.clone());
} else {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx > arr.len() {
return Err(format!("add index out of bounds {idx}"));
}
arr.insert(idx, value.clone());
}
}
_ => return Err("add parent not container".into()),
}
}
PatchOp::Remove { path } => {
let (parent, last) = json_pointer_parent(&mut state, path)?;
match parent {
Value::Object(map) => {
map.remove(&last)
.ok_or_else(|| format!("remove missing key {last}"))?;
}
Value::Array(arr) => {
let idx: usize = last
.parse()
.map_err(|_| format!("bad array index {last}"))?;
if idx > arr.len() {
return Err(format!("remove index out of bounds {idx}"));
}
arr.remove(idx);
}
_ => return Err("remove parent not container".into()),
}
}
}
}
Ok(state)
}

51
src/tx/types.rs Normal file
View file

@ -0,0 +1,51 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Serialize, Deserialize)]
pub struct ReserveReq {
pub author: String,
hint_paths: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ReserveRes {
pub reservation_id: String,
pub base_version: u64,
pub expires_at: i64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TxCommitReq {
pub reservation_id: String,
pub tx_id: String,
pub base_version: u64,
pub author: String,
pub ops: Vec<PatchOp>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TxCommitRes {
pub doc_id: String,
pub tx_id: String,
pub committed_version: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "op")]
pub enum PatchOp {
#[serde(rename = "replace")]
Replace { path: String, value: Value },
#[serde(rename = "add")]
Add { path: String, value: Value },
#[serde(rename = "remove")]
Remove { path: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Reservation {
pub reservation_id: String,
pub doc_id: String,
pub author: String,
pub base_version: u64,
pub expires_at: i64,
}