feat: 0.0.1-dev

- expose shared config get/update endpoint
- interceptor for reporting changes from user
- task[recipe]: optimize recipe send flow, add material action (create/update, modify[not test])
- add secured session, in addition to auth message, this is required to use for newer client (expect ^0.0.2 for client)
- disable plugin mode
- optimize ram/cpu usages (reduce from 300MB to ~80MB)

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-06-16 10:47:00 +07:00
parent aa008ccd53
commit 317fcd4203
22 changed files with 3443 additions and 423 deletions

1075
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -8,25 +8,37 @@ async-compression = { version = "0.4.39", features = ["tokio", "brotli"] }
axum = { version = "0.8.8", features = ["ws"] } axum = { version = "0.8.8", features = ["ws"] }
axum-streams = { version = "0.24.0", features = ["json"] } axum-streams = { version = "0.24.0", features = ["json"] }
celes = "2.6.0" celes = "2.6.0"
chrono = "0.4.43" chrono = { version = "0.4.43", features = ["serde"] }
crossbeam-queue = "0.3.12" crossbeam-queue = "0.3.12"
dotenv = "0.15.0" dotenv = "0.15.0"
env_logger = "0.11.9" env_logger = "0.11.9"
futures = "0.3.32" futures = "0.3.32"
futures-util = { version = "0.3.32", optional = true }
libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" } libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.git", version = "0.1.1" }
log = "0.4.29" log = "0.4.29"
openssl = { version = "0.10.80", features = ["vendored"] } openssl = { version = "0.10.80", features = ["vendored"] }
rayon = "1.11.0" rayon = "1.11.0"
redis = { version = "1.0.2", features = ["tls-rustls-webpki-roots", "tokio-comp", "tokio-rustls-comp"] } redis = { version = "1.0.2", features = ["tls-rustls-webpki-roots", "tokio-comp", "tokio-rustls-comp"] }
reqwest = { version = "0.13.1", features = ["multipart", "rustls"] } reqwest = { version = "0.13.1", features = ["multipart", "rustls", "json"] }
tracing = "0.1.41"
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149" serde_json = "1.0.149"
sqlx = { version = "0.8.6", features = ["runtime-tokio", "tls-rustls", "sqlite"] } sqlx = { version = "0.8.6", features = ["runtime-tokio", "tls-rustls", "sqlite"] }
tokio = { version = "1.49.0", features = ["full"] } tokio = { version = "1.49.0", features = ["full"] }
tokio-cron-scheduler = "0.15.1" tokio-cron-scheduler = "0.15.1"
tokio-stream = "0.1.18" tokio-stream = "0.1.18"
tokio-tungstenite = { version = "0.29.0", features = ["connect"], optional = true }
tokio-util = "0.7.18" tokio-util = "0.7.18"
uuid = { version = "1.20.0", features = ["v4"] } uuid = { version = "1.20.0", features = ["v4"] }
wasmtime = { version = "44.0.1", features = ["async"] } wasmtime = { version = "44.0.1", features = ["async"] }
wasmtime-wasi = "44.0.1" wasmtime-wasi = "44.0.1"
wasmtime-wasi-http = "44.0.1" wasmtime-wasi-http = "44.0.1"
tokio-postgres = "0.7.17"
pprof = { version = "0.15.0", features = ["flamegraph", "prost-codec"] }
prost = "0.14.4"
jsonwebtoken = { version = "10.4.0", features = ["rsa", "rust_crypto"] }
aes-gcm = "0.10.3"
base64 = "0.22.1"
p256 = { version = "0.13.2", features = ["ecdh"] }
rand_core = { version = "=0.6.4", features = ["getrandom"] }
rand = "0.10.1"

View file

@ -17,6 +17,7 @@ FROM chef AS builder
# Capture Docker's target platform variables # Capture Docker's target platform variables
ARG TARGETPLATFORM ARG TARGETPLATFORM
ARG TARGETARCH ARG TARGETARCH
ARG TARGETVARIANT
# Install host tools needed for compilation (including cmake and clang for aws-lc-sys) # Install host tools needed for compilation (including cmake and clang for aws-lc-sys)
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y \
@ -43,6 +44,9 @@ RUN apt-get update && \
echo "CXX=/usr/bin/x86_64-linux-gnu-g++" >> /env_config; \ echo "CXX=/usr/bin/x86_64-linux-gnu-g++" >> /env_config; \
echo "CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=/usr/bin/x86_64-linux-gnu-gcc" >> /env_config; \ echo "CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_LINKER=/usr/bin/x86_64-linux-gnu-gcc" >> /env_config; \
echo "OPENSSL_DIR=/usr" >> /env_config; \ echo "OPENSSL_DIR=/usr" >> /env_config; \
if [ "$TARGETVARIANT" = "v3" ]; then \
echo "export RUSTFLAGS='-C target-cpu=x86-64-v3'" >> /env_config; \
fi \
elif [ "$TARGETARCH" = "arm64" ]; then \ elif [ "$TARGETARCH" = "arm64" ]; then \
apt-get install -y gcc-aarch64-linux-gnu g++-aarch64-linux-gnu libssl-dev; \ apt-get install -y gcc-aarch64-linux-gnu g++-aarch64-linux-gnu libssl-dev; \
echo "TARGET_TRIPLE=aarch64-unknown-linux-gnu" >> /env_config; \ echo "TARGET_TRIPLE=aarch64-unknown-linux-gnu" >> /env_config; \
@ -87,7 +91,17 @@ RUN . /env_config && \
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
# Stage 3: Minimal Runtime # Stage 3: Minimal Runtime
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
FROM debian:bookworm-slim AS runtime # We dynamically anchor the platform to a base architecture to stop Docker
# from panicking over micro-variants like v3 during runtime container setup.
FROM --platform=$TARGETPLATFORM debian:bookworm-slim AS runtime-base
# Trick Buildx: force the runner to resolve back to basic broad platforms
# so it can execute native container processes like apt-get without errors.
FROM --platform=linux/amd64 debian:bookworm-slim AS runtime-amd64
FROM --platform=linux/arm64 debian:bookworm-slim AS runtime-arm64
# Select the clean runtime environment matching your target layout
FROM runtime-$TARGETARCH AS final-runtime
WORKDIR /app WORKDIR /app
# Install runtime dependencies if needed (like ca-certificates or openssl) # Install runtime dependencies if needed (like ca-certificates or openssl)
@ -98,6 +112,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
COPY --from=builder /server-mark2-dev /usr/local/bin/server-mark2-dev COPY --from=builder /server-mark2-dev /usr/local/bin/server-mark2-dev
COPY --from=builder /app/.env /usr/local/bin/.env COPY --from=builder /app/.env /usr/local/bin/.env
COPY --from=builder /app/.env /app/.env
COPY --from=builder /app/sheet-api.json /usr/local/bin/sheet-api.json COPY --from=builder /app/sheet-api.json /usr/local/bin/sheet-api.json
COPY --from=builder /app/sheet-api.json /app/sheet-api.json
COPY --from=builder /app/shared-configures.json /usr/local/bin/shared-configures.json
COPY --from=builder /app/shared-configures.json /app/shared-configures.json
COPY --from=builder /app/plugins /usr/local/bin/plugins COPY --from=builder /app/plugins /usr/local/bin/plugins
CMD ["server-mark2-dev"] CMD ["server-mark2-dev"]

1
build.sh Executable file
View file

@ -0,0 +1 @@
docker build --pull --platform linux/amd64,linux/arm64 -t pakin-inspiron-15-3530.tail110d9.ts.net/pakin/server-m2 .

1
push.sh Executable file
View file

@ -0,0 +1 @@
docker push pakin-inspiron-15-3530.tail110d9.ts.net/pakin/server-m2:latest

View file

@ -1,11 +1,18 @@
use crate::websocket::{core::*, helper::read_sheet_config, model::*}; use crate::websocket::{
core::*,
helper::{read_shared_configures, read_sheet_config},
model::*,
session::refresh_jwk_cache,
};
use axum::body::Body;
use axum::{ use axum::{
Router, Router,
extract::DefaultBodyLimit, extract::DefaultBodyLimit,
routing::{get, post}, routing::{get, post},
serve::ListenerExt, serve::ListenerExt,
}; };
use log::{error, info}; use log::{error, info, warn};
use pprof::ProfilerGuardBuilder;
use redis::TypedCommands; use redis::TypedCommands;
use reqwest::{StatusCode, multipart}; use reqwest::{StatusCode, multipart};
use std::{ use std::{
@ -15,6 +22,7 @@ use std::{
time::Duration, time::Duration,
}; };
use tokio::sync::{Mutex, mpsc::Sender}; use tokio::sync::{Mutex, mpsc::Sender};
use tokio_postgres::NoTls;
#[derive(Clone)] #[derive(Clone)]
pub struct Hub { pub struct Hub {
@ -27,8 +35,8 @@ pub struct DevConfig {
pub api_domain: String, pub api_domain: String,
pub api_recipe_service: String, pub api_recipe_service: String,
pub api_redis_url: String, pub api_redis_url: String,
pub api_resolver: String,
pub api_sheet_endpoints: Arc<Mutex<Vec<String>>>, pub api_sheet_endpoints: Arc<Mutex<Vec<String>>>,
pub shared_configures: Arc<RwLock<serde_json::Value>>,
pub allowed_origins: Vec<String>, pub allowed_origins: Vec<String>,
} }
@ -38,16 +46,16 @@ impl DevConfig {
domain: String, domain: String,
rp_service: String, rp_service: String,
api_redis_url: String, api_redis_url: String,
api_resolver: String,
api_sheet_endpoints: Arc<Mutex<Vec<String>>>, api_sheet_endpoints: Arc<Mutex<Vec<String>>>,
shared_configures: Arc<RwLock<serde_json::Value>>,
) -> DevConfig { ) -> DevConfig {
DevConfig { DevConfig {
api_key: key, api_key: key,
api_domain: domain, api_domain: domain,
api_recipe_service: rp_service, api_recipe_service: rp_service,
api_redis_url, api_redis_url,
api_resolver,
api_sheet_endpoints, api_sheet_endpoints,
shared_configures,
allowed_origins: Vec::new(), allowed_origins: Vec::new(),
} }
} }
@ -81,10 +89,6 @@ impl DevConfig {
("X-API-Key".to_string(), self.api_key.clone()) ("X-API-Key".to_string(), self.api_key.clone())
} }
pub fn get_yuki_resolver(&self) -> String {
format!("{}/resolve", self.api_resolver)
}
pub fn check_sheet_endpoints(&self, service_endpoint: &str) -> bool { pub fn check_sheet_endpoints(&self, service_endpoint: &str) -> bool {
self.api_sheet_endpoints self.api_sheet_endpoints
.try_lock() .try_lock()
@ -95,42 +99,178 @@ impl DevConfig {
pub fn load_sheet_endpoints_runtime(&self, new_config: Vec<String>) { pub fn load_sheet_endpoints_runtime(&self, new_config: Vec<String>) {
*self.api_sheet_endpoints.try_lock().unwrap() = new_config; *self.api_sheet_endpoints.try_lock().unwrap() = new_config;
} }
pub fn get_shared_config_by_type(&self, type_name: String) -> serde_json::Value {
let c = {
let lock = self.shared_configures.read().unwrap();
lock.as_object().cloned().unwrap_or_default()
};
if c.contains_key(&type_name) {
c.get(&type_name).cloned().unwrap()
} else {
serde_json::Value::Null
}
}
pub fn load_shared_config_runtime(
&self,
update_payload: serde_json::Value,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
if let Some(kv) = &update_payload.as_object() {
let new_update_keys: Vec<Arc<&String>> = kv.keys().map(Arc::new).collect();
let mut cs = {
let mut write_perm = self.shared_configures.write().unwrap();
write_perm.as_object_mut().cloned().unwrap()
};
let mut result_update = Vec::new();
// overwrite
for new_up_key in new_update_keys.to_owned() {
// new config
if let Some(cfg_val) = update_payload
.as_object()
.unwrap()
.get(&new_up_key.to_string())
{
let old_config = cs.insert(new_up_key.to_string(), cfg_val.to_owned());
info!("[config] updating {new_up_key}: from {old_config:?} to {cfg_val:?}");
result_update.push(new_up_key.to_string());
}
}
{
let mut write_perm = self.shared_configures.write().unwrap();
*write_perm = serde_json::Value::Object(cs);
info!("[config] successfully update!");
}
Ok(result_update)
} else {
Err(format!("unexpected type format").into())
}
}
/// helper function for getting country code
pub fn get_country_config_from_short_name(&self, short_name: &str) -> Option<i64> {
let new_short_name_static = short_name.to_string();
let short_name_arc = Arc::new(&new_short_name_static);
// expect country setting
match self.get_shared_config_by_type("country".to_string()) {
serde_json::Value::Object(m) => {
let keys: Vec<Arc<&String>> = m.keys().map(Arc::new).collect();
if keys.contains(&short_name_arc)
&& let Some(ccfg) = m.get(short_name_arc.as_str())
&& let Some(prefix) = ccfg.as_object().unwrap().get("prefix")
&& let Some(prefix_i) = prefix.as_i64()
{
return Some(prefix_i);
} else {
return None;
}
}
_ => {
// not found
return None;
}
}
}
} }
pub struct AppState { pub struct AppState {
pub dev_config: DevConfig, pub dev_config: DevConfig,
pub redis_cli: redis::Client, pub redis_cli: redis::Client,
pub postgres_cli: Arc<Mutex<tokio_postgres::Client>>,
pub system_tx: tokio::sync::broadcast::Sender<serde_json::Value>, pub system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
// saved client uid:client uuid // saved client uid:client uuid
pub connectors_mapping: Arc<RwLock<Hub>>, pub connectors_mapping: Arc<RwLock<Hub>>,
pub interceptor: Arc<Option<crate::websocket::interceptor::client::InterceptorClient>>,
pub http_client: reqwest::Client,
pub debug: bool,
/// Google public keys for decode Firebase JWT
pub jwk_encoding_keys: RwLock<Vec<jsonwebtoken::DecodingKey>>,
pub firebase_project_id: String,
} }
impl AppState { impl AppState {
pub fn get_cfg(&self) -> DevConfig { // pub fn get_cfg(&self) -> DevConfig {
self.dev_config.clone() // self.dev_config.clone()
} // }
pub async fn new( pub async fn new(
dev_config: DevConfig, dev_config: DevConfig,
redis_cli: redis::Client, redis_cli: redis::Client,
postgres_cli: tokio_postgres::Client,
system_tx: tokio::sync::broadcast::Sender<serde_json::Value>, system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>, mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
) -> Arc<AppState> { ) -> Arc<AppState> {
let redis_cli_clone = redis_cli.clone(); let redis_cli_clone = redis_cli.clone();
let tx_new = system_tx.clone(); let tx_new = system_tx.clone();
let mut interceptor = crate::websocket::interceptor::create_interceptor_client(&dev_config);
if let Some(ref mut ic) = interceptor {
ic.start();
info!("Interceptor initialized and started");
} else {
info!("Interceptor disabled or not configured");
}
let interceptor = Arc::new(interceptor);
// Create shared HTTP client with connection pool limits
let http_pool_max_idle = env::var("HTTP_POOL_MAX_IDLE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(4);
let http_pool_idle_timeout = env::var("HTTP_POOL_IDLE_TIMEOUT")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(30);
let http_client = reqwest::Client::builder()
.pool_max_idle_per_host(http_pool_max_idle)
.pool_idle_timeout(Duration::from_secs(http_pool_idle_timeout))
.timeout(Duration::from_secs(60))
.build()
.expect("Failed to create HTTP client");
let debug = env::var("DEBUG")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(false);
let firebase_project_id = env::var("FIREBASE_PROJECT_ID").expect("Project id not found");
let result = Arc::new(AppState { let result = Arc::new(AppState {
dev_config: dev_config.clone(), dev_config: dev_config.clone(),
redis_cli, redis_cli,
postgres_cli: Arc::new(Mutex::new(postgres_cli)),
system_tx, system_tx,
connectors_mapping: Arc::new(RwLock::new(Hub { connectors_mapping: Arc::new(RwLock::new(Hub {
clients: HashMap::new(), clients: HashMap::new(),
})), })),
interceptor,
http_client,
debug,
firebase_project_id,
jwk_encoding_keys: RwLock::new(Vec::new()),
}); });
// backup job
let dev_config_backup = dev_config.clone();
// NOTE: removed backup process, let each app handled by themselves // NOTE: removed backup process, let each app handled by themselves
// Background task for refresh Google's keys daily
let self_clone = result.clone();
tokio::spawn(async move {
loop {
if let Err(e) = refresh_jwk_cache(Arc::clone(&self_clone)).await {
error!("Failed tp updating background JWKS keys: {e:?}");
}
tokio::time::sleep(Duration::from_secs(86400)).await;
}
});
tokio::spawn(async move { tokio::spawn(async move {
let mut lredis = redis_cli_clone.clone(); let mut lredis = redis_cli_clone.clone();
let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> = let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> =
@ -238,15 +378,173 @@ impl AppState {
} }
} }
async fn pprof_profile() -> axum::response::Response {
pprof_profile_internal(10).await
}
async fn pprof_profile_internal(seconds: u64) -> axum::response::Response {
let mut guard = ProfilerGuardBuilder::default()
.frequency(1000)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.unwrap();
info!("Starting CPU profile for {} seconds...", seconds);
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
info!("CPU profile collection complete");
let report = match guard.report().build() {
Ok(r) => {
info!("Report built successfully, samples: {}", r.data.len());
r
}
Err(e) => {
error!("Failed to build report: {:?}", e);
return axum::response::Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(format!(
"Failed to build report: {:?}",
e
)))
.unwrap();
}
};
// Use flamegraph to generate SVG (simpler and works reliably)
let mut buf = Vec::new();
if let Err(e) = report.flamegraph(&mut buf) {
error!("Failed to generate flamegraph: {:?}", e);
return axum::response::Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(format!(
"Failed to generate flamegraph: {:?}",
e
)))
.unwrap();
}
info!("flamegraph SVG size: {} bytes", buf.len());
axum::response::Response::builder()
.header(axum::http::header::CONTENT_TYPE, "image/svg+xml")
.body(axum::body::Body::from(buf))
.unwrap()
}
async fn pprof_profile_with_duration(
axum::extract::Path(seconds): axum::extract::Path<u64>,
) -> axum::response::Response {
// Clamp duration between 1 and 60 seconds
let duration = seconds.clamp(1, 60);
pprof_profile_internal(duration).await
}
async fn pprof_heap() -> impl axum::response::IntoResponse {
// Heap profiling requires jemalloc or similar allocator
(
[(axum::http::header::CONTENT_TYPE, "text/plain")],
"Heap profiling requires jemalloc allocator. Use /debug/pprof/profile for CPU profiling."
.to_string(),
)
}
async fn pprof_growth() -> impl axum::response::IntoResponse {
(
[(axum::http::header::CONTENT_TYPE, "text/plain")],
"Heap growth profiling requires jemalloc allocator. Use /debug/pprof/profile for CPU profiling.".to_string(),
)
}
async fn pprof_cmdline() -> impl axum::response::IntoResponse {
let args: Vec<String> = std::env::args().collect();
let buf = args.join("\0");
([(axum::http::header::CONTENT_TYPE, "text/plain")], buf)
}
async fn pprof_symbol() -> impl axum::response::IntoResponse {
(
[(axum::http::header::CONTENT_TYPE, "text/plain")],
"Symbol endpoint - use with pprof tool".to_string(),
)
}
async fn pprof_trace() -> axum::response::Response {
let mut guard = ProfilerGuardBuilder::default()
.frequency(1000)
.build()
.unwrap();
info!("Starting CPU trace for 5 seconds...");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("CPU trace collection complete");
let report = match guard.report().build() {
Ok(r) => {
info!("Trace report built successfully, samples: {}", r.data.len());
r
}
Err(e) => {
error!("Failed to build trace report: {:?}", e);
return axum::response::Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(format!(
"Failed to build trace report: {:?}",
e
)))
.unwrap();
}
};
// Use flamegraph to generate SVG (simpler and works reliably)
let mut buf = Vec::new();
if let Err(e) = report.flamegraph(&mut buf) {
error!("Failed to generate trace flamegraph: {:?}", e);
return axum::response::Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(format!(
"Failed to generate trace flamegraph: {:?}",
e
)))
.unwrap();
}
info!("trace flamegraph SVG size: {} bytes", buf.len());
axum::response::Response::builder()
.header(axum::http::header::CONTENT_TYPE, "image/svg+xml")
.body(axum::body::Body::from(buf))
.unwrap()
}
async fn pprof_allocs() -> impl axum::response::IntoResponse {
(
[(axum::http::header::CONTENT_TYPE, "text/plain")],
"Allocation profiling requires jemalloc allocator. Use /debug/pprof/profile for CPU profiling.".to_string(),
)
}
async fn pprof_mutex() -> impl axum::response::IntoResponse {
(
[(axum::http::header::CONTENT_TYPE, "text/plain")],
"Mutex profiling not available in this build.".to_string(),
)
}
async fn pprof_block() -> impl axum::response::IntoResponse {
(
[(axum::http::header::CONTENT_TYPE, "text/plain")],
"Block profiling not available in this build.".to_string(),
)
}
pub async fn invoke_checkout_request( pub async fn invoke_checkout_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
path: String, path: String,
) -> Result<String, Box<dyn std::error::Error>> { ) -> Result<String, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let req_path = config.get_file_from_recipe_repo(path); let req_path = config.get_file_from_recipe_repo(path);
// println!("dbg: {req_path}"); let res = http_client.get(req_path).send().await?;
let res = client.get(req_path).send().await?;
match res.text().await { match res.text().await {
Ok(raw) => Ok(raw), Ok(raw) => Ok(raw),
@ -256,17 +554,13 @@ pub async fn invoke_checkout_request(
/// Invoke git pull, may takes sometime /// Invoke git pull, may takes sometime
pub async fn invoke_pull_sync_request( pub async fn invoke_pull_sync_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
) -> Result<String, Box<dyn std::error::Error>> { ) -> Result<String, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let req_path = config.get_pull_recipe_repo(); let req_path = config.get_pull_recipe_repo();
// println!("dbg: {req_path}"); let res = http_client.get(req_path).send().await?;
let res = client.get(req_path).send().await?;
if res.status() != StatusCode::OK { if res.status() != StatusCode::OK {
// pull fail
error!( error!(
"invoke pull fail: [{}] {:?}", "invoke pull fail: [{}] {:?}",
res.status(), res.status(),
@ -283,21 +577,21 @@ pub async fn invoke_pull_sync_request(
/// Invoke sending from server to server for committing /// Invoke sending from server to server for committing
pub async fn invoke_commit_request( pub async fn invoke_commit_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
payload: CommitPayload, payload: CommitPayload,
) -> Result<(), Box<dyn std::error::Error>> { tx: Sender<TxControlMessage>,
let client = reqwest::Client::builder() uid: String,
.timeout(Duration::from_secs(60)) ) -> Result<String, Box<dyn std::error::Error>> {
.build()?;
let commit_path = config.get_post_file_to_recipe_repo(); let commit_path = config.get_post_file_to_recipe_repo();
let filename = payload.path.split("/").last().unwrap_or("temp").to_string(); let filename = payload.path.split("/").last().unwrap_or("temp").to_string();
info!("committing {}", filename); info!("committing {}", filename);
let form = multipart::Form::new() let form = multipart::Form::new()
.text("message", payload.message) .text("message", payload.message.clone())
.text("signature_username", payload.signature_username) .text("signature_username", payload.signature_username.clone())
.text("signature_email", payload.signature_email) .text("signature_email", payload.signature_email.clone())
.text("path", payload.path) .text("path", payload.path)
.part( .part(
"file", "file",
@ -306,25 +600,60 @@ pub async fn invoke_commit_request(
.mime_str("application/octet-stream") .mime_str("application/octet-stream")
.unwrap(), .unwrap(),
); );
let response = client.post(commit_path).multipart(form).send().await?; let response = http_client.post(commit_path).multipart(form).send().await?;
info!( info!("commit status: {}", response.status());
"commit status: {}, {:?}",
response.status(),
response.text().await
);
Ok(()) let status = response.status();
let body = response.text().await;
// if status == StatusCode::OK
// && let Ok(txt) = body
// && let Ok(res) = serde_json::from_str::<serde_json::Value>(&txt)
// {
// info!("commit success")
// } else {
// warn!("status: {status}, response: {body:?}");
// }
match body {
Ok(b) if status == StatusCode::OK => {
if let Ok(res) = serde_json::from_str::<serde_json::Value>(&b)
&& let Some(cid) = res.get("result")
&& let Some(cid_str) = cid.as_str()
{
//
info!("response commit id: {cid_str}");
if let Err(e) = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "save_recipe",
"payload": {
"to": uid,
"user": payload.signature_username,
"email": payload.signature_email,
"summary": payload.message
}
})))
.await
{}
} else {
error!("failed to create json from body on commit response\n{b:#?}");
}
}
other => {
error!("status not ok, {:?}", other);
}
}
Ok("empty".to_string())
} }
/// Invoke sending from server to server for committing case multiple files /// Invoke sending from server to server for committing case multiple files
pub async fn invoke_commit_multiple_files_request( pub async fn invoke_commit_multiple_files_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
payloads: Vec<CommitPayload>, payloads: Vec<CommitPayload>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(60))
.build()?;
let commit_path = config.get_post_file_to_recipe_repo(); let commit_path = config.get_post_file_to_recipe_repo();
let mut form = multipart::Form::new() let mut form = multipart::Form::new()
.text("message", payloads.first().unwrap().message.clone()) .text("message", payloads.first().unwrap().message.clone())
@ -355,23 +684,21 @@ pub async fn invoke_commit_multiple_files_request(
); );
} }
let response = client.post(commit_path).multipart(form).send().await?; let response = http_client.post(commit_path).multipart(form).send().await?;
info!("commit status: {}", response.status()); info!("commit status: {}", response.status());
Ok(()) Ok(())
} }
pub async fn invoke_push_request(config: DevConfig) -> Result<String, Box<dyn std::error::Error>> { pub async fn invoke_push_request(
let client = reqwest::Client::new(); http_client: &reqwest::Client,
config: DevConfig,
) -> Result<String, Box<dyn std::error::Error>> {
let req_path = config.get_push_recipe_repo(); let req_path = config.get_push_recipe_repo();
// println!("dbg: {req_path}"); let res = http_client.get(req_path).send().await?;
let res = client.get(req_path).send().await?;
if res.status() != StatusCode::OK { if res.status() != StatusCode::OK {
// pull fail
error!( error!(
"invoke push fail: [{}] {:?}", "invoke push fail: [{}] {:?}",
res.status(), res.status(),
@ -400,48 +727,101 @@ pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
let api_redis = env::var("DEV_API_REDIS").unwrap_or("0.0.0.0".to_string()); let api_redis = env::var("DEV_API_REDIS").unwrap_or("0.0.0.0".to_string());
let api_redis_port = env::var("DEV_API_REDIS_PORT").unwrap_or("6379".to_string()); let api_redis_port = env::var("DEV_API_REDIS_PORT").unwrap_or("6379".to_string());
let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver"); // No need for resolver
// let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver");
let allowed_origins = env::var("ALLOWED_ORIGINS").expect("allowed origin not provided"); let allowed_origins = env::var("ALLOWED_ORIGINS").expect("allowed origin not provided");
let postgres_connection_config = env::var("POSTGRES_CONN")
.expect("postgres connection not provided")
.replace('\"', "")
.replace('"', "");
// read up sheet config // read up sheet config
// //
let sheet_endpoint_config = read_sheet_config()?; let sheet_endpoint_config = read_sheet_config()?;
let shared_configures = read_shared_configures()?;
let mut dev_cfg = crate::app::DevConfig::new( let mut dev_cfg = crate::app::DevConfig::new(
api_key, api_key,
api_domain, api_domain,
api_recipe_service, api_recipe_service,
format!("redis://{api_redis}:{api_redis_port}"), format!("redis://{api_redis}:{api_redis_port}"),
api_resolver,
Arc::new(Mutex::new(sheet_endpoint_config)), Arc::new(Mutex::new(sheet_endpoint_config)),
Arc::new(RwLock::new(shared_configures)),
); );
dev_cfg = dev_cfg.with_allowed_origins(&allowed_origins).clone(); dev_cfg = dev_cfg.with_allowed_origins(&allowed_origins).clone();
// test_send(dev_cfg).await?;
//
let redis_cli = redis::Client::open(dev_cfg.api_redis_url.clone())?; let redis_cli = redis::Client::open(dev_cfg.api_redis_url.clone())?;
let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16); let (mut client, connection) =
tokio_postgres::connect(&postgres_connection_config, NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
error!("connection postgres error: {e}");
}
});
let app_state = AppState::new(dev_cfg.clone(), redis_cli, sys_tx, sys_rx).await; info!("[SETUP] create material table ...");
if let Err(error_create_material_table) = client.batch_execute(CREATE_MATERIAL_TABLE).await {
error!("[SETUP] error while creating material table: {error_create_material_table}");
}
// Reduced broadcast channel capacity from 16 to 4
let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(4);
let app_state = AppState::new(dev_cfg.clone(), redis_cli, client, sys_tx, sys_rx).await;
let rp_router = create_recipe_repo_router().await; let rp_router = create_recipe_repo_router().await;
// let doc_router = create_tx_patcher_route().await; // let doc_router = create_tx_patcher_route().await;
let app = Router::new() let mut app = Router::new()
// .route("/sessionLogin", post(session_login)) // .route("/sessionLogin", post(session_login))
.route( .route(
"/syscb", "/syscb",
post(crate::websocket::handler::post_from_other_system), post(crate::websocket::handler::post_from_other_system),
) )
.route("/users", get(crate::websocket::handler::get_online_users)) .route("/users", get(crate::websocket::handler::get_online_users))
.route(
"/interceptor/health",
get(crate::websocket::handler::interceptor_health),
)
.route("/load-config", post(crate::websocket::handler::post_config)) .route("/load-config", post(crate::websocket::handler::post_config))
.route(
"/config/{key}",
get(crate::websocket::handler::get_shared_config),
)
// for shared ref
.route(
"/config",
post(crate::websocket::handler::update_shared_config),
)
// .route("/regas", post(request_api_session_key)) // .route("/regas", post(request_api_session_key))
.nest("/recipe", rp_router) .nest("/recipe", rp_router)
// .nest("/docs", doc_router) // .nest("/docs", doc_router)
.layer(DefaultBodyLimit::max(100 * 1024 * 1024)) .layer(DefaultBodyLimit::max(100 * 1024 * 1024))
.with_state(app_state); .with_state(app_state.clone());
// Conditionally add debug profiling endpoints
if app_state.debug {
app = app
.route("/debug/pprof/profile", get(pprof_profile))
.route(
"/debug/pprof/profile/{seconds}",
get(pprof_profile_with_duration),
)
.route("/debug/pprof/heap", get(pprof_heap))
.route("/debug/pprof/growth", get(pprof_growth))
.route("/debug/pprof/cmdline", get(pprof_cmdline))
.route("/debug/pprof/symbol", get(pprof_symbol))
.route("/debug/pprof/trace", get(pprof_trace))
.route("/debug/pprof/allocs", get(pprof_allocs))
.route("/debug/pprof/mutex", get(pprof_mutex))
.route("/debug/pprof/block", get(pprof_block));
info!("Debug profiling endpoints enabled");
} else {
info!("Debug profiling endpoints disabled (set DEBUG=true to enable)");
}
// feature: no delay, full throttle // feature: no delay, full throttle
let nodelay_listener = || async { let nodelay_listener = || async {

View file

@ -1,7 +1,6 @@
// mod cold_start;
mod app; mod app;
mod stream; mod stream;
// mod tx; mod summary;
mod websocket; mod websocket;
// features // features
@ -9,16 +8,34 @@ mod websocket;
// - store in redis // - store in redis
// - cron job fetch update // - cron job fetch update
#[tokio::main] fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
env_logger::builder() env_logger::builder()
.filter_level(log::LevelFilter::Info) .filter_level(log::LevelFilter::Info)
.init(); .init();
// send req to repo service
app::initialize().await?; // Configure tokio runtime with limited worker threads from env
let worker_threads = std::env::var("TOKIO_WORKER_THREADS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(4);
Ok(()) let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.enable_all()
.build()?;
// Configure Rayon thread pool from env
let rayon_threads = std::env::var("RAYON_NUM_THREADS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(4);
rayon::ThreadPoolBuilder::new()
.num_threads(rayon_threads)
.build_global()?;
runtime.block_on(async {
app::initialize().await
})
} }

View file

@ -184,9 +184,22 @@ where
impl<T> StreamDataExtra<T> impl<T> StreamDataExtra<T>
where where
T: Serialize + Clone, T: Serialize,
{ {
pub fn new(exid: &str, extp: &str, data: Vec<T>, to: String) -> Self { pub fn new(exid: &str, extp: &str, data: Vec<T>, to: String) -> Self {
Self {
exid: exid.to_string(),
extp: extp.to_string(),
payload: data,
to,
}
}
/// Create from slice - avoids Clone bound and intermediate allocation
pub fn from_slice(exid: &str, extp: &str, data: &[T], to: String) -> Self
where
T: Clone,
{
Self { Self {
exid: exid.to_string(), exid: exid.to_string(),
extp: extp.to_string(), extp: extp.to_string(),

22
src/summary.rs Normal file
View file

@ -0,0 +1,22 @@
pub fn get_summarized_text(text: &str, payload: Option<serde_json::Value>) -> String {
let mut result = String::new();
if text.eq("notify")
&& let Some(payload) = payload.clone()
&& let Some(summary_text) = payload.get("summary")
{
result = summary_text.as_str().unwrap_or_default().to_string();
} else if text.ne("notify") {
match text {
"save_recipe"
if let Some(payload) = payload.clone()
&& let Some(summary_text) = payload.get("summary") =>
{
result = summary_text.as_str().unwrap_or_default().to_string();
}
_ => {}
}
}
result
}

View file

@ -1,4 +1,4 @@
use std::time::Duration; use std::{pin::Pin, time::Duration};
use serde::Deserialize; use serde::Deserialize;
@ -16,6 +16,77 @@ pub const LAST_CHANGE_DATE_FORMAT: &str = "%v %T";
/// CONFIG: websocket size limit /// CONFIG: websocket size limit
pub const WEBSOCKET_MAX_BYTES: usize = 2 * 1024 * 1024; pub const WEBSOCKET_MAX_BYTES: usize = 2 * 1024 * 1024;
/// CONFIG: shared configures known name for every services
pub const SHARED_CONFIGURES_FILE: &str = "shared-configures.json";
pub const CREATE_MATERIAL_TABLE: &str = "CREATE TABLE IF NOT EXISTS material_setting (
-- Primary key
id INTEGER PRIMARY KEY,
-- Basic identification
id_alternate INTEGER NOT NULL DEFAULT 0,
is_use BOOLEAN NOT NULL DEFAULT true,
material_name VARCHAR(255) NOT NULL,
material_other_name VARCHAR(255),
path_other_name VARCHAR(255),
-- Channel type (mutually exclusive in practice)
bean_channel BOOLEAN NOT NULL DEFAULT false,
syrup_channel BOOLEAN NOT NULL DEFAULT false,
powder_channel BOOLEAN NOT NULL DEFAULT false,
fresh_syrup_channel BOOLEAN NOT NULL DEFAULT false,
frozen_fruit_channel BOOLEAN NOT NULL DEFAULT false,
ice_scream_bingsu_channel BOOLEAN NOT NULL DEFAULT false,
soda_channel BOOLEAN NOT NULL DEFAULT false,
leaves_channel BOOLEAN NOT NULL DEFAULT false,
item_channel BOOLEAN NOT NULL DEFAULT false,
is_equipment BOOLEAN NOT NULL DEFAULT false,
-- Canister/Container configuration
canister_type VARCHAR(100) NOT NULL, -- 'BeanType', 'Bag In Box', 'PowderType', 'Tank', 'Machine', '1,Valve'
-- Operational parameters
alarm_id_when_offline INTEGER NOT NULL DEFAULT 0,
drain_timer INTEGER NOT NULL DEFAULT 0,
low_to_offline INTEGER NOT NULL DEFAULT 0,
material_status INTEGER NOT NULL DEFAULT 0, -- 0=normal, 2=?
schedule_drain_type INTEGER NOT NULL DEFAULT 0,
pay_retry_max_count INTEGER NOT NULL DEFAULT 0,
-- Refill units (mutually exclusive)
refill_unit_gram BOOLEAN NOT NULL DEFAULT false,
refill_unit_milliliters BOOLEAN NOT NULL DEFAULT false,
refill_unit_pcs BOOLEAN NOT NULL DEFAULT false,
-- Feed mode (for syrups)
feed_mode VARCHAR(50), -- 'mode=1', 'mode=2'
-- Material parameters (optional, for specific types)
material_parameter TEXT,
-- Unit tracking
raw_material_unit VARCHAR(255), -- 'refill=$bag,sum=#gram,rec=$gram' etc.
-- Error messages (localized, fixed 8 slots)
str_text_show_error TEXT[], -- array of 8 strings
-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes for common queries
CREATE INDEX IF NOT EXISTS idx_material_setting_channel_type ON material_setting
(bean_channel, syrup_channel, powder_channel, fresh_syrup_channel, frozen_fruit_channel, ice_scream_bingsu_channel)
WHERE is_use = true;
CREATE INDEX IF NOT EXISTS idx_material_setting_name ON material_setting (material_name) WHERE is_use = true;
CREATE INDEX IF NOT EXISTS idx_material_setting_canister_type ON material_setting (canister_type);";
pub const SHARED_CONFIG_CHANNEL_NAME: &str = "shared_config/update";
pub const GOOGLE_PUBLIC_ENDPOINT: &str =
"https://www.googleapis.com/robot/v1/metadata/x509/securetoken@system.gserviceaccount.com";
#[derive(Clone)] #[derive(Clone)]
pub enum TxControlMessage { pub enum TxControlMessage {
Payload(serde_json::Value), Payload(serde_json::Value),

View file

@ -1,27 +1,36 @@
use axum::{ use axum::{
Json, Json,
body::Bytes, body::Bytes,
extract::{Request, State, WebSocketUpgrade, ws::WebSocket}, extract::{
Path, Request, State, WebSocketUpgrade,
ws::{Message, WebSocket},
},
response::IntoResponse, response::IntoResponse,
}; };
use futures::StreamExt; use futures::{SinkExt, StreamExt};
use log::{error, info, warn}; use log::{error, info, warn};
use redis::TypedCommands; use redis::TypedCommands;
use std::{ use std::{
fs::File, fs::File,
io::BufWriter, io::BufWriter,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
time::Duration,
}; };
use tokio::{ use tokio::{
sync::{Mutex, mpsc}, sync::{Mutex, mpsc},
time::Instant, time::{Instant, timeout},
}; };
use uuid::Uuid; use uuid::Uuid;
use super::{core::*, model::*}; use super::{core::*, model::*};
use crate::{ use crate::{
app::{AppState, Hub}, app::{AppState, Hub},
websocket::helper::read_sheet_config, websocket::{
helper::read_sheet_config,
session::{
HandshakeAck, HandshakePayload, SecureSession, execute_dh_handshake, verify_token,
},
},
}; };
pub async fn post_from_other_system( pub async fn post_from_other_system(
@ -160,6 +169,61 @@ pub async fn post_config(
return (axum::http::StatusCode::OK, "load config success").into_response(); return (axum::http::StatusCode::OK, "load config success").into_response();
} }
/// Endpoint for service calling to get configures
pub async fn get_shared_config(
State(state): State<Arc<AppState>>,
Path(key): Path<String>,
) -> impl IntoResponse {
let result = state.dev_config.get_shared_config_by_type(key);
if result.is_null() {
return (
axum::http::StatusCode::NOT_FOUND,
serde_json::json!({}).to_string(),
)
.into_response();
}
// do return value of requested config
return (axum::http::StatusCode::OK, result.to_string()).into_response();
}
/// Endpoint for updating config on runtime (Only for shared configurations)
pub async fn update_shared_config(
State(state): State<Arc<AppState>>,
Json(payload): Json<serde_json::Value>,
) -> impl IntoResponse {
let new_key_updates = match state.dev_config.load_shared_config_runtime(payload) {
Ok(keys) => keys,
Err(e) => {
error!("config update fail, {e}");
return (axum::http::StatusCode::BAD_REQUEST, "unexpected request").into_response();
}
};
// Broadcast to channel
let mut rcl = state.redis_cli.clone();
match rcl.publish(
SHARED_CONFIG_CHANNEL_NAME,
format!("{new_key_updates:?}").to_string(),
) {
Ok(_) => {
info!("broadcast success");
}
Err(e) => {
error!("broadcast fail: {e}");
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"fail to broadcast",
)
.into_response();
}
}
return (axum::http::StatusCode::OK, "success").into_response();
}
#[deprecated]
pub async fn request_api_session_key( pub async fn request_api_session_key(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
Json(msg): Json<ApiSessionRequest>, Json(msg): Json<ApiSessionRequest>,
@ -193,6 +257,32 @@ pub async fn get_online_users(State(state): State<Arc<AppState>>) -> impl IntoRe
.into_response() .into_response()
} }
pub async fn interceptor_health(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let interceptor_status = match &*state.interceptor {
Some(ic) => serde_json::json!({
"enabled": true,
"endpoint": ic.config.endpoint,
"async_mode": ic.config.async_mode,
"batch_size": ic.config.batch_size,
"timeout_ms": ic.config.timeout_ms,
}),
None => serde_json::json!({
"enabled": false,
"reason": "not configured or disabled"
}),
};
(
axum::http::StatusCode::OK,
serde_json::json!({
"interceptor": interceptor_status,
"timestamp": chrono::Utc::now().to_rfc3339(),
})
.to_string(),
)
.into_response()
}
/// Main websocket handler /// Main websocket handler
pub async fn websocket_handler( pub async fn websocket_handler(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
@ -201,6 +291,7 @@ pub async fn websocket_handler(
) -> impl IntoResponse { ) -> impl IntoResponse {
let state_clone = Arc::clone(&state); let state_clone = Arc::clone(&state);
let hub_clone = Arc::clone(&state_clone.connectors_mapping); let hub_clone = Arc::clone(&state_clone.connectors_mapping);
let interceptor_clone = Arc::clone(&state_clone.interceptor);
let origin = req let origin = req
.headers() .headers()
@ -222,15 +313,20 @@ pub async fn websocket_handler(
ws.max_frame_size(WEBSOCKET_MAX_BYTES) ws.max_frame_size(WEBSOCKET_MAX_BYTES)
.max_message_size(WEBSOCKET_MAX_BYTES) .max_message_size(WEBSOCKET_MAX_BYTES)
.on_failed_upgrade(|error| println!("Error upgrading websocket: {}", error)) .on_failed_upgrade(|error| println!("Error upgrading websocket: {}", error))
.on_upgrade(async |s| handle_socket(s, state_clone, hub_clone).await.unwrap_or(())) .on_upgrade(async |s| {
handle_socket(s, state_clone, hub_clone, interceptor_clone)
.await
.unwrap_or(())
})
} }
async fn handle_socket( async fn handle_socket(
socket: WebSocket, socket: WebSocket,
state: Arc<AppState>, state: Arc<AppState>,
hub: Arc<RwLock<Hub>>, hub: Arc<RwLock<Hub>>,
interceptor: Arc<Option<crate::websocket::interceptor::client::InterceptorClient>>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let (sender, receiver) = socket.split(); let (mut sender, mut receiver) = socket.split();
// internal channel // internal channel
let (tx, rx) = mpsc::channel::<TxControlMessage>(2); let (tx, rx) = mpsc::channel::<TxControlMessage>(2);
@ -255,11 +351,6 @@ async fn handle_socket(
let temp_session = user.lock().await.to_string(); let temp_session = user.lock().await.to_string();
info!("{} connected", temp_session); info!("{} connected", temp_session);
{
let mut h = hub.write().unwrap();
h.clients.insert(temp_session.clone(), tx.clone());
}
// NOTE: disable from cause system tx could directly send to client rx // NOTE: disable from cause system tx could directly send to client rx
// without sending to system rx. // without sending to system rx.
// let user_sys_rx = state.system_tx.subscribe(); // let user_sys_rx = state.system_tx.subscribe();
@ -271,8 +362,85 @@ async fn handle_socket(
let hub_for_write = hub.clone(); let hub_for_write = hub.clone();
let hub_for_read = hub.clone(); let hub_for_read = hub.clone();
let interceptor_for_write = interceptor.clone();
let interceptor_for_read = interceptor.clone();
let sender = tokio::spawn(super::rw::write(sender, rx, user.clone(), hub_for_write)); // New 2s auth & key exchange gate
let state_clone = state.clone();
let auth_result = timeout(Duration::from_secs(2), async {
if let Some(Ok(Message::Text(text))) = receiver.next().await {
let handshake: HandshakePayload = serde_json::from_str(&text)?;
info!("handshake ok!");
// Offline JWT validation using memory cache
let uid = verify_token(&handshake.token, state_clone).await?;
info!("uid: {uid}");
// Execute Ephemeral Elliptic Curve DF Key Exchange
let (server_pub_b64, cipher) = execute_dh_handshake(&handshake.client_public_key)?;
// confirm payload
let ack_payload = serde_json::to_string(&HandshakeAck {
status: "authenticated".to_string(),
server_public_key: server_pub_b64,
})?;
// info!("ack sending ... {ack_payload}");
sender.send(Message::Text(ack_payload.into())).await?;
return Ok(SecureSession {
uid,
cipher,
key_established_at: Instant::now(),
});
}
Err(Box::<dyn std::error::Error + Send + Sync>::from(
"No initial handshake received",
))
})
.await;
// Evaluate handshake state
let session = match auth_result {
Ok(Ok(valid_session)) => Arc::new(valid_session),
_ => {
warn!("Connection dropped: Handshake timeout or authentication failed");
let _ = sender
.send(Message::Close(Some(axum::extract::ws::CloseFrame {
code: 4001,
reason: std::borrow::Cow::Borrowed("Unauthorized Handshake Failure")
.to_string()
.into(),
})))
.await;
return Ok(());
}
};
let valid_uid = session.clone().uid.to_string();
if !session.uid.is_empty() {
// already has uid
//
{
let mut ulock = user.lock().await;
*ulock = valid_uid.clone();
}
info!("update user uid");
}
{
let mut h = hub.write().unwrap();
h.clients.insert(valid_uid, tx.clone());
}
let sender = tokio::spawn(super::rw::write(
sender,
rx,
user.clone(),
hub_for_write,
interceptor_for_write,
session.clone(),
));
let reader = tokio::spawn(super::rw::read( let reader = tokio::spawn(super::rw::read(
state, state,
receiver, receiver,
@ -280,6 +448,8 @@ async fn handle_socket(
reader_last_seen, reader_last_seen,
user.clone(), user.clone(),
hub_for_read, hub_for_read,
interceptor_for_read,
session.clone(),
)); ));
// let callback_to_client = super::rw::recv_sys_msg_send_back_client(tx.clone(), user_sys_rx); // let callback_to_client = super::rw::recv_sys_msg_send_back_client(tx.clone(), user_sys_rx);

View file

@ -1,7 +1,7 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::{cmp::Ordering, collections::HashMap, fs::File, io::BufReader}; use std::{cmp::Ordering, collections::HashMap, fs::File, io::BufReader};
use crate::websocket::core::{LAST_CHANGE_DATE_FORMAT, safe_deserialize}; use crate::websocket::core::{LAST_CHANGE_DATE_FORMAT, SHARED_CONFIGURES_FILE, safe_deserialize};
use super::model::*; use super::model::*;
use axum::extract::ws::{CloseFrame, Message, WebSocket}; use axum::extract::ws::{CloseFrame, Message, WebSocket};
@ -129,6 +129,21 @@ pub fn read_sheet_config() -> Result<Vec<String>, Box<dyn std::error::Error>> {
Ok(res) Ok(res)
} }
pub fn read_shared_configures() -> Result<serde_json::Value, Box<dyn std::error::Error>> {
let expected_path = format!("./{}", SHARED_CONFIGURES_FILE);
let config_file = match File::open(expected_path) {
Ok(f) => f,
Err(_) => {
return Ok(serde_json::json!({}));
}
};
let mut buf = BufReader::new(config_file);
let val: serde_json::Value = serde_json::from_reader(&mut buf)?;
Ok(val)
}
pub fn parse_date_from_string(date: &str, fmt: Option<&str>) -> Option<NaiveDateTime> { pub fn parse_date_from_string(date: &str, fmt: Option<&str>) -> Option<NaiveDateTime> {
let fmt = match fmt { let fmt = match fmt {
Some(fm) => fm, Some(fm) => fm,

View file

@ -0,0 +1,229 @@
use crate::websocket::interceptor::config::InterceptorConfig;
use chrono::{DateTime, Utc};
use reqwest::{
Client, ClientBuilder,
header::{HeaderMap, HeaderName, HeaderValue},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterceptedEvent {
pub direction: Direction,
pub user_id: String,
pub action_type: String,
pub payload: Value,
#[serde(with = "chrono::serde::ts_seconds")]
pub timestamp: DateTime<Utc>,
pub connection_id: String,
pub summary: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Direction {
Incoming,
Outgoing,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InterceptorResponse {
pub success: bool,
pub message: Option<String>,
}
pub struct InterceptorClient {
pub config: InterceptorConfig,
http_client: Client,
sender: Option<mpsc::Sender<InterceptedEvent>>,
worker_handle: Option<JoinHandle<()>>,
}
impl InterceptorClient {
pub fn new(
config: InterceptorConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let mut headers = HeaderMap::new();
for (k, v) in &config.headers {
if let (Ok(name), Ok(value)) = (
HeaderName::from_bytes(k.as_bytes()),
HeaderValue::from_str(v),
) {
headers.insert(name, value);
}
}
let http_client = ClientBuilder::new()
.timeout(config.timeout())
.default_headers(headers)
.build()?;
Ok(Self {
config,
http_client,
sender: None,
worker_handle: None,
})
}
pub fn start(&mut self) {
if !self.config.async_mode {
return;
}
let (tx, mut rx) = mpsc::channel::<InterceptedEvent>(self.config.batch_size * 2);
let config = self.config.clone();
let http_client = self.http_client.clone();
let handle = tokio::spawn(async move {
let mut batch = Vec::new();
let mut interval = tokio::time::interval(config.batch_timeout());
loop {
tokio::select! {
Some(event) = rx.recv() => {
batch.push(event);
if batch.len() >= config.batch_size {
Self::send_batch(&http_client, &config, &mut batch).await;
}
}
_ = interval.tick() => {
if !batch.is_empty() {
Self::send_batch(&http_client, &config, &mut batch).await;
}
}
else => break,
}
}
if !batch.is_empty() {
Self::send_batch(&http_client, &config, &mut batch).await;
}
});
self.sender = Some(tx);
self.worker_handle = Some(handle);
}
async fn send_batch(
http_client: &Client,
config: &InterceptorConfig,
batch: &mut Vec<InterceptedEvent>,
) {
if batch.is_empty() {
return;
}
let payload = serde_json::json!({
"events": batch,
"count": batch.len(),
"timestamp": chrono::Utc::now().to_rfc3339(),
});
for attempt in 0..=config.retry_count {
match http_client
.post(&config.endpoint)
.json(&payload)
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
debug!("Interceptor: sent batch of {} events", batch.len());
batch.clear();
return;
} else {
warn!(
"Interceptor: failed to send batch (attempt {}/{}), status: {}",
attempt + 1,
config.retry_count + 1,
response.status()
);
}
}
Err(e) => {
warn!(
"Interceptor: error sending batch (attempt {}/{}): {}",
attempt + 1,
config.retry_count + 1,
e
);
}
}
if attempt < config.retry_count {
tokio::time::sleep(Duration::from_millis(100 * (attempt + 1) as u64)).await;
}
}
error!(
"Interceptor: failed to send batch after {} retries, dropping {} events",
config.retry_count,
batch.len()
);
batch.clear();
}
pub async fn send_async(&self, event: InterceptedEvent) {
if !self.config.async_mode {
return;
}
if let Some(sender) = &self.sender {
info!("sender found");
if let Err(e) = sender.send(event).await {
if matches!(e, mpsc::error::SendError(_)) {
warn!("Interceptor: async queue full, dropping event");
} else {
error!("Interceptor Error: {e}");
}
}
}
}
pub async fn send_sync(
&self,
event: InterceptedEvent,
) -> Result<InterceptorResponse, Box<dyn std::error::Error + Send + Sync>> {
let payload = serde_json::json!({
"events": [event],
"count": 1,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let response = self
.http_client
.post(&self.config.endpoint)
.json(&payload)
.send()
.await?;
if response.status().is_success() {
Ok(InterceptorResponse {
success: true,
message: Some("delivered".to_string()),
})
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
Err(format!("HTTP {}: {}", status, text).into())
}
}
pub fn should_intercept(&self, action_type: &str) -> bool {
self.config.should_intercept(action_type)
}
pub async fn shutdown(&mut self) {
if let Some(sender) = self.sender.take() {
drop(sender);
}
if let Some(handle) = self.worker_handle.take() {
let _ = handle.await;
}
}
}

View file

@ -0,0 +1,67 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterceptorConfig {
pub enabled: bool,
pub endpoint: String,
pub timeout_ms: u64,
pub retry_count: u32,
pub async_mode: bool,
pub batch_size: usize,
pub batch_timeout_ms: u64,
pub headers: HashMap<String, String>,
pub filter: InterceptorFilter,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterceptorFilter {
pub include_types: Vec<String>,
pub exclude_types: Vec<String>,
}
impl InterceptorConfig {
pub fn timeout(&self) -> Duration {
Duration::from_millis(self.timeout_ms)
}
pub fn batch_timeout(&self) -> Duration {
Duration::from_millis(self.batch_timeout_ms)
}
pub fn should_intercept(&self, action_type: &str) -> bool {
if !self.enabled {
return false;
}
if self.filter.exclude_types.contains(&action_type.to_string()) {
return false;
}
if self.filter.include_types.is_empty() {
return true;
}
self.filter.include_types.contains(&action_type.to_string())
}
}
impl Default for InterceptorConfig {
fn default() -> Self {
Self {
enabled: false,
endpoint: "".to_string(),
timeout_ms: 5000,
retry_count: 3,
async_mode: true,
batch_size: 10,
batch_timeout_ms: 1000,
headers: HashMap::new(),
filter: InterceptorFilter {
include_types: vec![],
exclude_types: vec!["heartbeat".to_string(), "auth".to_string()],
},
}
}
}

View file

@ -0,0 +1,30 @@
pub mod client;
pub mod config;
use self::client::InterceptorClient;
use self::config::InterceptorConfig;
use crate::app::DevConfig;
pub fn create_interceptor_client(dev_config: &DevConfig) -> Option<InterceptorClient> {
let interceptor_config: InterceptorConfig = {
let lock = dev_config.shared_configures.read().unwrap();
lock.get("interceptor")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default()
};
if !interceptor_config.enabled || interceptor_config.endpoint.is_empty() {
return None;
}
let mut resolved_headers = std::collections::HashMap::new();
for (k, v) in &interceptor_config.headers {
let resolved = v.replace("{{INTERCEPTOR_API_KEY}}", &dev_config.api_key);
resolved_headers.insert(k.clone(), resolved);
}
let mut config = interceptor_config.clone();
config.headers = resolved_headers;
InterceptorClient::new(config).ok()
}

View file

@ -1,7 +1,9 @@
pub mod core; pub mod core;
pub mod handler; pub mod handler;
pub mod helper; pub mod helper;
pub mod interceptor;
pub mod model; pub mod model;
pub mod plugins; pub mod plugins;
mod rw; mod rw;
pub mod session;
mod tasks; mod tasks;

View file

@ -39,6 +39,8 @@ pub struct WebsocketMessageRequest {
#[serde(rename = "type")] #[serde(rename = "type")]
pub type_w: String, pub type_w: String,
pub payload: Option<serde_json::Value>, pub payload: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub action_id: Option<String>,
} }
/// Recipe request payload struct /// Recipe request payload struct
@ -150,6 +152,7 @@ impl From<CommitPayload> for WebsocketMessageRequest {
"commit": value, "commit": value,
"plugin": "example-js" "plugin": "example-js"
})), })),
action_id: None,
} }
} }
} }
@ -164,3 +167,143 @@ pub struct RequestMenuListPayload {
/// box id /// box id
pub boxid: String, pub boxid: String,
} }
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum AvailableMaterialAction {
#[serde(rename = "create")]
Create,
#[serde(rename = "modify")]
Modify,
/// Compute available material id
#[serde(rename = "new_id")]
GetNewMaterialId,
/// Query material in database
#[serde(rename = "query")]
QueryParam,
/// Force to update materials in db
#[serde(rename = "update")]
Update,
}
/// For interact with materials
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RequestMaterialActionPayload {
/// User info expect at least id, token, name
pub user_info: serde_json::Value,
/// target country to get recipe, version will always use latest
pub country: String,
/// Available action, reject unexpected scope
pub action: AvailableMaterialAction,
/// Payload data required for some action
pub data: Option<serde_json::Value>,
}
#[allow(non_snake_case)]
fn BlankString() -> Option<String> {
Some("".to_string())
}
#[allow(non_snake_case)]
fn BlankBool() -> Option<bool> {
Some(false)
}
#[allow(non_snake_case)]
fn BlankOtherStrShowTextError() -> Option<Vec<String>> {
Some(vec![
"".to_string(),
"".to_string(),
"".to_string(),
"".to_string(),
"".to_string(),
"".to_string(),
"".to_string(),
"".to_string(),
])
}
/// Request material creation, this will check if material is creatable on criteria
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CreateMaterial(pub libtbr::models::recipe::MaterialSetting);
/// Request material edit, the material must existed in database. Otherwise, return fail in tx.
/// Modifying `id` field is prohibited, user must request remove first then insert new.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ModifyMaterial {
#[serde(rename = "AlarmIDWhenOffline")]
pub alarm_id_when_offline: Option<serde_json::Value>,
#[serde(rename = "BeanChannel")]
pub bean_channel: bool,
#[serde(default = "BlankString", rename = "CanisterType")]
pub canister_type: Option<String>,
#[serde(rename = "DrainTimer")]
pub drain_timer: Option<serde_json::Value>,
#[serde(default = "BlankBool", rename = "IceScreamBingsuChannel")]
pub ice_scream_bingsu_channel: Option<bool>,
#[serde(rename = "IsEquipment")]
pub is_equipment: Option<bool>,
#[serde(rename = "LeavesChannel")]
pub leaves_channel: Option<bool>,
#[serde(rename = "LowToOffline")]
pub low_to_offline: Option<serde_json::Value>,
#[serde(default = "BlankString", rename = "MaterialDescription")]
pub material_description: Option<String>,
#[serde(rename = "MaterialStatus")]
pub material_status: Option<serde_json::Value>,
#[serde(rename = "PowderChannel")]
pub powder_channel: Option<bool>,
#[serde(rename = "RefillUnitGram")]
pub refill_unit_gram: Option<bool>,
#[serde(rename = "RefillUnitMilliliters")]
pub refill_unit_milliliters: Option<bool>,
#[serde(rename = "RefillUnitPCS")]
pub refill_unit_pcs: Option<bool>,
#[serde(rename = "ScheduleDrainType")]
pub schedule_drain_type: Option<serde_json::Value>,
#[serde(rename = "SodaChannel")]
pub soda_channel: Option<bool>,
#[serde(default = "BlankOtherStrShowTextError", rename = "StrTextShowError")]
pub str_text_show_error: Option<Vec<String>>,
#[serde(rename = "SyrupChannel")]
pub syrup_channel: Option<bool>,
pub id: i32,
#[serde(rename = "idAlternate")]
pub id_alternate: Option<serde_json::Value>,
#[serde(rename = "isUse")]
pub is_use: Option<bool>,
#[serde(default = "BlankString", rename = "materialOtherName")]
pub material_other_name: Option<String>,
#[serde(default = "BlankString", rename = "materialName")]
pub material_name: Option<String>,
#[serde(default = "BlankString", rename = "pathOtherName")]
pub path_other_name: Option<String>,
pub pay_rettry_max_count: Option<serde_json::Value>,
#[serde(default = "BlankString", rename = "RawMaterialUnit")]
pub raw_material_unit: Option<String>,
#[serde(default = "BlankString", rename = "MaterialParameter")]
pub material_parameter: Option<String>,
#[serde(flatten)]
pub extra: std::collections::HashMap<String, serde_json::Value>,
}
impl CreateMaterial {
pub fn check_valid(&mut self, country_code: Option<String>) -> bool {
let mat_type = self.0.get_definition_type(country_code);
return (self.0.BeanChannel
&& matches!(mat_type, libtbr::models::recipe::MaterialType::Bean))
|| (self.0.PowderChannel
&& matches!(mat_type, libtbr::models::recipe::MaterialType::Powder))
|| (self.0.SyrupChannel
&& matches!(mat_type, libtbr::models::recipe::MaterialType::Syrup))
|| (self.0.SodaChannel
&& matches!(mat_type, libtbr::models::recipe::MaterialType::Soda))
|| (self.0.IsEquipment
&& matches!(
mat_type,
libtbr::models::recipe::MaterialType::Lid
| libtbr::models::recipe::MaterialType::Cup
| libtbr::models::recipe::MaterialType::Straw
));
}
}

View file

@ -1,8 +1,15 @@
use super::{core::*, helper::*, model::*}; use super::{core::*, helper::*, model::*};
use crate::{ use crate::{
app::*, app::*,
websocket::{plugins::call_plugin_if_existed, tasks}, summary::get_summarized_text,
websocket::{
interceptor::client::{Direction, InterceptedEvent, InterceptorClient},
plugins::call_plugin_if_existed,
session::{EncryptedFrame, SecureSession, decrypt_message, encrypt_server_message},
tasks,
},
}; };
use serde_json::Value;
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
@ -24,7 +31,6 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
time::Instant, time::Instant,
}; };
use wasmtime::{Config, Engine};
pub async fn read( pub async fn read(
// redis: redis::Client, // redis: redis::Client,
@ -34,6 +40,8 @@ pub async fn read(
last_seen: Arc<Mutex<Instant>>, // cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>, last_seen: Arc<Mutex<Instant>>, // cmd_atom: crossbeam_queue::ArrayQueue<CommandRequestPayload>,
uid: Arc<Mutex<String>>, uid: Arc<Mutex<String>>,
hub: Arc<RwLock<Hub>>, hub: Arc<RwLock<Hub>>,
interceptor: Arc<Option<InterceptorClient>>,
session: Arc<SecureSession>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let redis = state.redis_cli.clone(); let redis = state.redis_cli.clone();
let config = state.dev_config.clone(); let config = state.dev_config.clone();
@ -43,159 +51,232 @@ pub async fn read(
// Plugins // Plugins
// //
let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap(); // let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap();
while let Some(Ok(msg)) = receiver.next().await { while let Some(Ok(msg)) = receiver.next().await {
match msg { match msg {
Message::Text(t) => { Message::Text(t) => {
let mut req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?; if let Ok(frame) = serde_json::from_str::<EncryptedFrame>(&t) {
match decrypt_message(&session.cipher, &frame) {
Ok(plain_bytes) => {
let plain_text = String::from_utf8_lossy(&plain_bytes);
let req: WebsocketMessageRequest = serde_json::from_str(&plain_text)?;
req = call_plugin_if_existed(req, engine.clone()).await; // req = call_plugin_if_existed(req, engine.clone()).await;
// info!("get msg: {}", req.type_w); if let Some(ic) = interceptor.as_ref()
match req.type_w.as_str() { && ic.should_intercept(&req.type_w)
"recipe" if req.payload.is_some() => { {
if tasks::recipe::handle_recipe_request( info!("intercept message ...");
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
"recipe_versions" if req.payload.is_some() => {
if tasks::recipe::handle_recipe_versions_list_request(
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
"price" if req.payload.is_some() => {
if tasks::price::handle_price_request(
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
"command" if req.payload.is_some() => {
if tasks::command::handle_command_request(state.clone(), tx.clone(), req)
.await
.is_err()
{
continue;
}
}
"heartbeat" => {
let new_updated_time = Instant::now();
let uidd = uid.lock().await.clone();
*last_seen.lock().await = new_updated_time;
info!("{}: active", uidd.to_string()); let uidd_clone = uid.lock().await.clone();
// send back response to keep alive let event = InterceptedEvent {
// user can now know if server is active or not direction: Direction::Incoming,
let _ = tx user_id: uidd_clone.clone(),
.send_timeout( action_type: req.type_w.clone(),
TxControlMessage::Payload(serde_json::json!({ payload: req.payload.clone().unwrap_or_default(),
"type": "heartbeat", timestamp: chrono::Utc::now(),
"payload": { connection_id: uidd_clone.clone(),
"active": true, summary: None,
"refresh_time": format!("{:?}", new_updated_time), };
"to": uidd ic.send_async(event).await;
} }
})),
Duration::from_secs(3), // info!("get msg: {}", req.type_w);
) let http_client = state.http_client.clone();
.await; match req.type_w.as_str() {
} "recipe" if req.payload.is_some() => {
"sheet" if req.payload.is_some() => { if tasks::recipe::handle_recipe_request(
if tasks::sheet::handle_sheet_request( &http_client,
config.clone(), config.clone(),
redis.clone(), redis.clone(),
tx.clone(), tx.clone(),
req, req,
uid_clone.clone(), uid_clone.clone(),
) )
.await .await
.is_err() .is_err()
{ {
continue; continue;
} }
}
"log_report" if let Some(log_payload) = req.payload => {
let log_report_payload: LogReportPayload =
match safe_deserialize(&log_payload) {
Ok(lreq) => lreq,
Err(e) => {
error!("error deserialize body log request: {e:?} ---> Skip");
continue;
} }
}; "recipe_versions" if req.payload.is_some() => {
// generate timestamp if tasks::recipe::handle_recipe_versions_list_request(
// &http_client,
let now = Instant::now(); config.clone(),
} redis.clone(),
"save_recipe" if req.payload.is_some() => { tx.clone(),
if tasks::recipe::handle_recipe_save_change_request( req,
config.clone(), uid_clone.clone(),
redis.clone(), )
tx.clone(), .await
req, .is_err()
uid_clone.clone(), {
) continue;
.await }
.is_err() }
{ "price" if req.payload.is_some() => {
continue; if tasks::price::handle_price_request(
} &http_client,
} config.clone(),
"auth" if req.payload.is_some() => { redis.clone(),
tasks::auth::handle_auth_request( tx.clone(),
state.clone(), req,
tx.clone(), uid_clone.clone(),
req, )
hub.clone(), .await
uid_clone.clone(), .is_err()
) {
.await?; continue;
} }
}
"command" if req.payload.is_some() => {
if tasks::command::handle_command_request(
state.clone(),
tx.clone(),
req,
)
.await
.is_err()
{
continue;
}
}
"heartbeat" => {
let new_updated_time = Instant::now();
let uidd = uid.lock().await.clone();
*last_seen.lock().await = new_updated_time;
"list_menu" if req.payload.is_some() => { // info!("{}: active", uidd.to_string());
if tasks::recipe::handle_request_list_menu_recipe(
config.clone(), // send back response to keep alive
redis.clone(), // user can now know if server is active or not
tx.clone(), let _ = tx
req, .send_timeout(
uid_clone.clone(), TxControlMessage::Payload(serde_json::json!({
) "type": "heartbeat",
.await "payload": {
.is_err() "active": true,
{ "refresh_time": format!("{:?}", new_updated_time),
continue; "to": uidd
}
})),
Duration::from_secs(3),
)
.await;
}
"sheet" if req.payload.is_some() => {
if tasks::sheet::handle_sheet_request(
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
"log_report" if let Some(log_payload) = req.payload => {
let log_report_payload: LogReportPayload =
match safe_deserialize(&log_payload) {
Ok(lreq) => lreq,
Err(e) => {
error!(
"error deserialize body log request: {e:?} ---> Skip"
);
continue;
}
};
// generate timestamp
//
let now = Instant::now();
}
"save_recipe" if req.payload.is_some() => {
if tasks::recipe::handle_recipe_save_change_request(
&http_client,
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
"auth" if req.payload.is_some() => {
tasks::auth::handle_auth_request(
state.clone(),
tx.clone(),
req,
hub.clone(),
uid_clone.clone(),
)
.await?;
}
"list_menu" if req.payload.is_some() => {
if tasks::recipe::handle_request_list_menu_recipe(
&http_client,
config.clone(),
redis.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
"material" if req.payload.is_some() => {
if tasks::recipe::handle_request_material_action(
&http_client,
config.clone(),
redis.clone(),
state.postgres_cli.clone(),
tx.clone(),
req,
uid_clone.clone(),
)
.await
.is_err()
{
continue;
}
}
_ => {
let uidd = uid.lock().await.clone();
// not implemented
let _ = tx
.send_timeout(
TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"error": "not implemented or missing params"
}
})),
Duration::from_secs(3),
)
.await;
}
}
}
Err(_) => {
error!("Decryption failed for data sent from UID: {}", session.uid);
} }
} }
_ => { } else {
// not implemented warn!("unexpected encrypted frame: {t:?}");
}
} }
} }
Message::Ping(_) => { Message::Ping(_) => {
@ -248,6 +329,8 @@ pub async fn write(
mut rx: Receiver<TxControlMessage>, mut rx: Receiver<TxControlMessage>,
uid: Arc<Mutex<String>>, uid: Arc<Mutex<String>>,
hub: Arc<RwLock<Hub>>, hub: Arc<RwLock<Hub>>,
interceptor: Arc<Option<InterceptorClient>>,
session: Arc<SecureSession>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// only allow each stream type for 1 request // only allow each stream type for 1 request
let pending_stream_requests = Arc::new(RwLock::new(HashMap::new())); let pending_stream_requests = Arc::new(RwLock::new(HashMap::new()));
@ -279,7 +362,8 @@ pub async fn write(
&& let Some(recv_ident) = res_payload_val.get("to") && let Some(recv_ident) = res_payload_val.get("to")
&& let Some(recv_ident_str) = recv_ident.as_str() && let Some(recv_ident_str) = recv_ident.as_str()
&& (current_uid.to_string().eq(recv_ident_str) && (current_uid.to_string().eq(recv_ident_str)
|| recv_ident_str.to_string().eq("*")) || recv_ident_str.to_string().eq("*")
|| recv_ident_str.to_string().eq("devws"))
{ {
let payload_size = res.to_string().len(); let payload_size = res.to_string().len();
@ -381,7 +465,14 @@ pub async fn write(
} }
}); });
let _ = sender.send(header.to_string().into()).await?; // let _ = sender.send(header.to_string().into()).await?;
let _ = send_encrypted_message(
&mut sender,
session.clone(),
header.to_string(),
SenderMode::Send,
)
.await?;
for (idx, raw_payload) in split.iter().enumerate() { for (idx, raw_payload) in split.iter().enumerate() {
let raw_chunk_payload = serde_json::json!({ let raw_chunk_payload = serde_json::json!({
@ -393,7 +484,15 @@ pub async fn write(
"request_id": stream_chunk_id "request_id": stream_chunk_id
} }
}); });
let _ = sender.feed(raw_chunk_payload.to_string().into()).await; // let _ = sender.feed(raw_chunk_payload.to_string().into()).await;
let _ = send_encrypted_message(
&mut sender,
session.clone(),
raw_chunk_payload.to_string(),
SenderMode::Feed,
)
.await?;
} }
if let Err(e) = sender.flush().await { if let Err(e) = sender.flush().await {
@ -411,11 +510,51 @@ pub async fn write(
); );
} }
let _ = sender.send(footer.to_string().into()).await; // let _ = sender.send(footer.to_string().into()).await;
let _ = send_encrypted_message(
&mut sender,
session.clone(),
footer.to_string(),
SenderMode::Send,
)
.await?;
continue; continue;
} else { } else {
if let Err(e) = sender.send(res.to_string().into()).await { if let Some(ic) = interceptor.as_ref() {
let action_type = res_n
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if ic.should_intercept(action_type) {
let summarized_message = get_summarized_text(
action_type,
Some(Value::Object(res_payload_val.clone())),
);
let event = InterceptedEvent {
direction: Direction::Outgoing,
user_id: current_uid.to_string(),
action_type: action_type.to_string(),
payload: Value::Object(res_payload_val.clone()),
timestamp: chrono::Utc::now(),
connection_id: current_uid.to_string(),
summary: Some(summarized_message),
};
ic.send_async(event).await;
}
}
// if let Err(e) = sender.send(res.to_string().into()).await {
// error!("[write] send payload fail; len={payload_size}, reason: {e}");
// }
if let Err(e) = send_encrypted_message(
&mut sender,
session.clone(),
res.to_string(),
SenderMode::Send,
)
.await
{
error!("[write] send payload fail; len={payload_size}, reason: {e}"); error!("[write] send payload fail; len={payload_size}, reason: {e}");
} }
} }
@ -466,31 +605,30 @@ pub async fn write(
Ok(()) Ok(())
} }
pub async fn recv_sys_msg_send_back_client( enum SenderMode {
tx: Sender<TxControlMessage>, Feed,
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>, Send,
) -> JoinHandle<()> { }
let tx_to_client = tx.clone();
tokio::spawn(async move { async fn send_encrypted_message(
loop { sender: &mut SplitSink<WebSocket, Message>,
match system_rx.recv().await { session: Arc<SecureSession>,
Ok(s_msg) => { message: String,
if convert_sys_msg_command(&s_msg).is_some() mode: SenderMode,
&& let Some(err) = tx_to_client ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.send(TxControlMessage::Payload(s_msg)) match encrypt_server_message(session.cipher.clone(), &message) {
.await Ok(enc_json) => match mode {
.err() SenderMode::Feed => {
{ let _ = sender.feed(enc_json.into()).await;
error!("[SYS] failed to send back to client: {err}"); }
} SenderMode::Send => {
} let _ = sender.send(enc_json.into()).await;
Err(_) => { }
// maybe channel closed },
break; Err(e) => {
} error!("Failed to encrypt out message payload context: {e:?}");
} }
} }
info!("[sysrx-cli] ending client system rx"); Ok(())
})
} }

161
src/websocket/session.rs Normal file
View file

@ -0,0 +1,161 @@
use std::sync::Arc;
use aes_gcm::{Aes256Gcm, aead::AeadMut};
use log::info;
use p256::elliptic_curve::sec1::{FromEncodedPoint, ToEncodedPoint};
use rand::Rng;
use tokio::time::Instant;
use crate::websocket::core::GOOGLE_PUBLIC_ENDPOINT;
pub(crate) struct SecureSession {
pub uid: String,
pub cipher: Aes256Gcm,
pub key_established_at: Instant,
}
#[derive(serde::Deserialize)]
pub(crate) struct HandshakePayload {
pub token: String,
pub client_public_key: String, // BASE 64
}
#[derive(serde::Serialize)]
pub(crate) struct HandshakeAck {
pub status: String,
pub server_public_key: String,
}
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct EncryptedFrame {
pub iv: String, // Initialized vector per message
pub ciphertext: String, // Encrypted application message
}
#[derive(serde::Deserialize)]
pub(crate) struct FirebaseJwtClaims {
aud: String, // Audience (Expect Firebase project id),
sub: String, // Subject (Firebase user uid),
exp: u64, // Expiration timestamp
}
pub(crate) async fn refresh_jwk_cache(
state: Arc<crate::app::AppState>,
) -> Result<(), Box<dyn std::error::Error>> {
let response: serde_json::Value = reqwest::get(GOOGLE_PUBLIC_ENDPOINT).await?.json().await?;
let mut new_keys = Vec::new();
if let Some(obj) = response.as_object() {
for (_, cert_pem) in obj {
if let Some(pem_str) = cert_pem.as_str() {
if let Ok(key) = jsonwebtoken::DecodingKey::from_rsa_pem(pem_str.as_bytes()) {
new_keys.push(key);
}
}
}
}
{
let mut cache = state.jwk_encoding_keys.write().unwrap();
*cache = new_keys;
info!("Google Jwk Identity cache updated!");
}
Ok(())
}
pub(crate) async fn verify_token(
token: &str,
state: Arc<crate::app::AppState>,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let mut validation = jsonwebtoken::Validation::new(jsonwebtoken::Algorithm::RS256);
validation.set_audience(&[&state.firebase_project_id]);
validation.validate_exp = true;
let keys = { state.jwk_encoding_keys.read().unwrap() };
for key in keys.iter() {
if let Ok(token_data) = jsonwebtoken::decode::<FirebaseJwtClaims>(token, key, &validation) {
return Ok(token_data.claims.sub);
}
}
Err(Box::from(
"Invalid Firebase Token signature or metadata mismatch",
))
}
pub(crate) fn execute_dh_handshake(
client_pub_b64: &str,
) -> Result<(String, aes_gcm::Aes256Gcm), Box<dyn std::error::Error + Send + Sync>> {
use aes_gcm::KeyInit;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use p256::{EncodedPoint, PublicKey, ecdh::EphemeralSecret};
use rand_core::OsRng;
// Step: decode client public key
// info!("client_pub_b64: {client_pub_b64}");
let client_bytes = BASE64.decode(client_pub_b64)?;
let encoded_point = EncodedPoint::from_bytes(&client_bytes)?;
let client_public = PublicKey::from_encoded_point(&encoded_point).unwrap();
// Generate server ephemeral keypair
let server_secret = EphemeralSecret::random(&mut OsRng);
let server_public = PublicKey::from(&server_secret);
// Compute symmetric shared secret
let shared_secret = server_secret.diffie_hellman(&client_public);
let secret_bytes = shared_secret.raw_secret_bytes();
// Instantiate AES-256 GCM Core Cipher block natively using derived 32-byte hash block
let cipher = aes_gcm::Aes256Gcm::new_from_slice(&secret_bytes)
.map_err(|_| "failed allocating cipher payload context init")?;
let server_pub_bytes = server_public.to_encoded_point(false);
let server_public_b64 = BASE64.encode(server_pub_bytes.as_bytes());
Ok((server_public_b64, cipher))
}
pub(crate) fn decrypt_message(
cipher: &aes_gcm::Aes256Gcm,
frame: &EncryptedFrame,
) -> Result<Vec<u8>, Box<dyn std::error::Error + Send + Sync>> {
use aes_gcm::aead::Aead;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
let iv_bytes = BASE64.decode(&frame.iv)?;
let ciphertext_bytes = BASE64.decode(&frame.ciphertext)?;
let nonce = aes_gcm::Nonce::from_slice(&iv_bytes);
let decrypted = cipher
.decrypt(nonce, ciphertext_bytes.as_slice())
.map_err(|_| "Decryption routine validation assertion failed")?;
Ok(decrypted)
}
pub(crate) fn encrypt_server_message(
mut cipher: aes_gcm::Aes256Gcm,
plain_text: &str,
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
let mut iv_bytes = [0u8; 12];
rand::rng().fill_bytes(&mut iv_bytes);
let nonce = aes_gcm::Nonce::from_slice(&iv_bytes);
let ciphertext_bytes = cipher
.encrypt(nonce, plain_text.as_bytes())
.map_err(|_| "Encryption execution routine failed")?;
let frame = EncryptedFrame {
iv: BASE64.encode(iv_bytes),
ciphertext: BASE64.encode(ciphertext_bytes),
};
let json_output = serde_json::to_string(&frame)?;
Ok(json_output)
}

View file

@ -52,23 +52,5 @@ pub async fn handle_auth_request(
} }
} }
// TODO
// - Queue requests
// - Send if service available
// if let Some(_) = state.system_tx.send(p).err() {
// info!("failed to send command request");
// let _ = tx
// .send(TxControlMessage::Payload(serde_json::json!({
// "type": "notify",
// "payload": {
// "from": "system_tx",
// "level": "error",
// "msg": "send request fail",
// "to": ""
// }
// })))
// .await;
// }
Ok(()) Ok(())
} }

View file

@ -125,6 +125,7 @@ impl RecipePrice {
/// Get main price profile of country /// Get main price profile of country
pub async fn handle_price_request( pub async fn handle_price_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
redis: redis::Client, redis: redis::Client,
tx: Sender<TxControlMessage>, tx: Sender<TxControlMessage>,
@ -148,7 +149,7 @@ pub async fn handle_price_request(
let price_action = price_param.action; let price_action = price_param.action;
let price_content = let price_content =
match invoke_checkout_request(config.clone(), price_file_format.clone()).await { match invoke_checkout_request(http_client, config.clone(), price_file_format.clone()).await {
Ok(pc) => pc, Ok(pc) => pc,
Err(e) => return Err(format!("Cannot find price of expected country: {e:?}").into()), Err(e) => return Err(format!("Cannot find price of expected country: {e:?}").into()),
}; };
@ -314,7 +315,14 @@ pub async fn handle_price_request(
// return Err("Fail to sync repo, backing up ...".into()); // return Err("Fail to sync repo, backing up ...".into());
// } // }
let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await; let _ = invoke_commit_request(
http_client,
config.clone(),
commit_payload.clone(),
tx,
uidd.clone().to_string(),
)
.await;
// if invoke_push_request(config.clone()).await.is_err() { // if invoke_push_request(config.clone()).await.is_err() {
// let _ = commit_payload.dump_backup(); // let _ = commit_payload.dump_backup();

View file

@ -2,10 +2,8 @@ use crate::app::*;
use crate::stream::model::{ use crate::stream::model::{
IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart, IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart,
}; };
use crate::websocket::plugins::call_plugin_if_existed;
use crate::websocket::{core::*, helper::*, model::*}; use crate::websocket::{core::*, helper::*, model::*};
use std::collections::HashMap;
use std::{fs::File, io::Read, path::PathBuf, sync::Arc}; use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
use async_compression::tokio::bufread::BrotliDecoder; use async_compression::tokio::bufread::BrotliDecoder;
@ -16,9 +14,8 @@ use futures::{
stream::{SplitSink, SplitStream}, stream::{SplitSink, SplitStream},
}; };
use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01}; use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01};
use log::{error, info, warn}; use log::{debug, error, info, warn};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use redis::{self, TypedCommands}; use redis::{self, TypedCommands};
use tokio::{ use tokio::{
@ -29,7 +26,6 @@ use tokio::{
}, },
time::Instant, time::Instant,
}; };
use wasmtime::{Config, Engine};
const NO_MERGE_FLAG: i32 = 1000; const NO_MERGE_FLAG: i32 = 1000;
const MERGE_DONE_FLAG: i32 = 0; const MERGE_DONE_FLAG: i32 = 0;
@ -71,24 +67,26 @@ pub fn get_key_cache(country: String, version: String, is_patch: bool, retry_cnt
} }
async fn get_latest_recipe_from_git( async fn get_latest_recipe_from_git(
http_client: &reqwest::Client,
config: &DevConfig, config: &DevConfig,
country: &str, country: &str,
) -> Result<Recipe, Box<dyn std::error::Error>> { ) -> Result<Recipe, Box<dyn std::error::Error>> {
let latest_key = format!("{country}/version"); let latest_key = format!("{country}/version");
let latest_version = match invoke_checkout_request(config.clone(), latest_key).await { let latest_version =
Ok(version) => version, match invoke_checkout_request(http_client, config.clone(), latest_key).await {
Err(e) => { Ok(version) => version,
println!("Error on checkout: {e}"); Err(e) => {
"".to_string() println!("Error on checkout: {e}");
} "".to_string()
}; }
};
let mut recipe_result: Option<Recipe> = None; let mut recipe_result: Option<Recipe> = None;
let init_key = 3; let init_key = 3;
for i in init_key..6 { for i in init_key..6 {
let r1_key = get_key_cache(country.to_string(), latest_version.clone(), false, i); let r1_key = get_key_cache(country.to_string(), latest_version.clone(), false, i);
let content = match invoke_checkout_request(config.clone(), r1_key).await { let content = match invoke_checkout_request(http_client, config.clone(), r1_key).await {
Ok(file_content) => file_content, Ok(file_content) => file_content,
Err(e) => { Err(e) => {
println!("Error on checkout: {e}"); println!("Error on checkout: {e}");
@ -111,18 +109,20 @@ async fn get_latest_recipe_from_git(
} }
async fn get_latest_recipe_saved_machine_from_git( async fn get_latest_recipe_saved_machine_from_git(
http_client: &reqwest::Client,
config: &DevConfig, config: &DevConfig,
country: &str, country: &str,
boxid: &str, boxid: &str,
) -> Result<Recipe, Box<dyn std::error::Error>> { ) -> Result<Recipe, Box<dyn std::error::Error>> {
let latest_key = format!("{country}/coffeethai02_{country}_{boxid}_temp.json"); let latest_key = format!("{country}/coffeethai02_{country}_{boxid}_temp.json");
let content = match invoke_checkout_request(config.clone(), latest_key.clone()).await { let content =
Ok(content) => content, match invoke_checkout_request(http_client, config.clone(), latest_key.clone()).await {
Err(e) => { Ok(content) => content,
println!("Error on checkout: {e}"); Err(e) => {
"".to_string() println!("Error on checkout: {e}");
} "".to_string()
}; }
};
info!( info!(
"[get-latest] {} -> content ready: {}", "[get-latest] {} -> content ready: {}",
latest_key, latest_key,
@ -143,22 +143,25 @@ pub async fn throttle_send_recipe(
uid: Arc<Mutex<String>>, uid: Arc<Mutex<String>>,
) { ) {
info!("Starting throttle"); info!("Starting throttle");
let r01s: Vec<Recipe01> = recipe
// Use Arc<Recipe01> to avoid cloning - single allocation, shared ownership
let r01s: Vec<Arc<Recipe01>> = recipe
.Recipe01 .Recipe01
.par_iter() .iter()
.flat_map(|x| { .flat_map(|x| {
let mut v = Vec::new(); let mut v = Vec::new();
v.push(x.clone()); v.push(Arc::new(x.clone()));
if let Some(sub) = x.clone().SubMenu { if let Some(sub) = &x.SubMenu {
v.extend(sub); v.extend(sub.iter().map(|s| Arc::new(s.clone())));
} }
v v
}) })
.collect(); .collect();
let matset: Vec<MaterialSetting> = recipe.MaterialSetting.clone(); // Use reference to MaterialSetting instead of clone
let matset = &recipe.MaterialSetting;
// test stream start model // test stream start model
let ss = StreamDataStart::new( let ss = StreamDataStart::new(
@ -175,7 +178,7 @@ pub async fn throttle_send_recipe(
println!("ERR: send tx error, {err:?}"); println!("ERR: send tx error, {err:?}");
} }
// split send // split send - use Arc pointers, zero-copy
let uidd = uid.try_lock().unwrap().to_string(); let uidd = uid.try_lock().unwrap().to_string();
for (index, chunk) in r01s.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in r01s.chunks(CHUNK_SIZE).enumerate() {
@ -192,7 +195,7 @@ pub async fn throttle_send_recipe(
for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_{index}"); let curr_ch_id = format!("{mat_exid}_{index}");
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec(), uidd.clone()); let extra_matset = StreamDataExtra::from_slice(&curr_ch_id, &extp, chunk, uidd.clone());
if let Some(err) = tx if let Some(err) = tx
.send(TxControlMessage::Payload(extra_matset.as_msg())) .send(TxControlMessage::Payload(extra_matset.as_msg()))
@ -206,7 +209,7 @@ pub async fn throttle_send_recipe(
let extl = "topplist"; let extl = "topplist";
for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_tl{index}"); let curr_ch_id = format!("{mat_exid}_tl{index}");
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec(), uidd.clone()); let extra_topplist = StreamDataExtra::from_slice(&curr_ch_id, &extl, chunk, uidd.clone());
if let Some(err) = tx if let Some(err) = tx
.send(TxControlMessage::Payload(extra_topplist.as_msg())) .send(TxControlMessage::Payload(extra_topplist.as_msg()))
.await .await
@ -219,7 +222,7 @@ pub async fn throttle_send_recipe(
let extg = "toppgrp"; let extg = "toppgrp";
for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_tg{index}"); let curr_ch_id = format!("{mat_exid}_tg{index}");
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec(), uidd.clone()); let extra_toppgrp = StreamDataExtra::from_slice(&curr_ch_id, &extg, chunk, uidd.clone());
if let Some(err) = tx if let Some(err) = tx
.send(TxControlMessage::Payload(extra_toppgrp.as_msg())) .send(TxControlMessage::Payload(extra_toppgrp.as_msg()))
.await .await
@ -248,8 +251,41 @@ pub async fn throttle_send_recipe(
} }
} }
/// Helper: fetch machine recipe saved on server with retry.
/// If machine recipe is not found, do get latest instead.
async fn get_machine_recipe_with_retry<F, Fut>(
http_client: &reqwest::Client,
config: &DevConfig,
country: &str,
box_id: &str,
callback_case_not_found: Option<F>,
) -> Option<Recipe>
where
F: Fn(&reqwest::Client, &DevConfig, String) -> Fut,
Fut: Future<Output = Option<Recipe>>,
{
let mut result_pre: Option<Recipe> =
match get_latest_recipe_saved_machine_from_git(http_client, &config, &country, &box_id)
.await
{
Ok(saved) => Some(saved),
Err(_) => {
error!("[get_save] previous save not found ...");
None
}
};
if result_pre.is_none()
&& let Some(cb) = callback_case_not_found
{
result_pre = cb(http_client, config, country.to_string()).await;
}
result_pre
}
// TODO: split cases into sub function // TODO: split cases into sub function
pub async fn handle_recipe_request( pub async fn handle_recipe_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
redis: redis::Client, redis: redis::Client,
tx: Sender<TxControlMessage>, tx: Sender<TxControlMessage>,
@ -287,13 +323,14 @@ pub async fn handle_recipe_request(
if latest_version.is_empty() { if latest_version.is_empty() {
// cannot get actual version, try get from git // cannot get actual version, try get from git
latest_version = match invoke_checkout_request(config.clone(), latest_key).await { latest_version =
Ok(version) => version, match invoke_checkout_request(http_client, config.clone(), latest_key).await {
Err(e) => { Ok(version) => version,
println!("Error on checkout: {e}"); Err(e) => {
"".to_string() println!("Error on checkout: {e}");
} "".to_string()
}; }
};
} }
// detect if use different version // detect if use different version
@ -415,13 +452,14 @@ pub async fn handle_recipe_request(
} }
} else { } else {
// retry get from git // retry get from git
let content = match invoke_checkout_request(config.clone(), r1_key).await { let content =
Ok(file_content) => file_content, match invoke_checkout_request(http_client, config.clone(), r1_key).await {
Err(e) => { Ok(file_content) => file_content,
println!("Error on checkout: {e}"); Err(e) => {
"".to_string() println!("Error on checkout: {e}");
} "".to_string()
}; }
};
info!("content ready: {}", content.len()); info!("content ready: {}", content.len());
let recipe = serde_json::from_str::<Recipe>(&content); let recipe = serde_json::from_str::<Recipe>(&content);
@ -447,8 +485,9 @@ pub async fn handle_recipe_request(
} }
pub async fn handle_recipe_versions_list_request( pub async fn handle_recipe_versions_list_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
redis: redis::Client, _redis: redis::Client,
tx: Sender<TxControlMessage>, tx: Sender<TxControlMessage>,
req: WebsocketMessageRequest, req: WebsocketMessageRequest,
uid_clone: Arc<Mutex<String>>, uid_clone: Arc<Mutex<String>>,
@ -458,10 +497,11 @@ pub async fn handle_recipe_versions_list_request(
let version_list = format!("{country}", country = recipe_param.country); let version_list = format!("{country}", country = recipe_param.country);
let country_versions_str = match invoke_checkout_request(config.clone(), version_list).await { let country_versions_str =
Ok(vs) => vs, match invoke_checkout_request(http_client, config.clone(), version_list).await {
Err(e) => return Err(format!("Cannot find versions of expected country: {e:?}").into()), Ok(vs) => vs,
}; Err(e) => return Err(format!("Cannot find versions of expected country: {e:?}").into()),
};
// extract version as list // extract version as list
let files: Vec<String> = country_versions_str let files: Vec<String> = country_versions_str
@ -491,6 +531,7 @@ pub async fn handle_recipe_versions_list_request(
} }
pub async fn handle_recipe_save_change_request( pub async fn handle_recipe_save_change_request(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
redis: redis::Client, redis: redis::Client,
tx: Sender<TxControlMessage>, tx: Sender<TxControlMessage>,
@ -526,7 +567,9 @@ pub async fn handle_recipe_save_change_request(
// try get saved machine recipe // try get saved machine recipe
let mut result_pre: Option<Recipe> = let mut result_pre: Option<Recipe> =
match get_latest_recipe_saved_machine_from_git(&config, &country, &box_id).await { match get_latest_recipe_saved_machine_from_git(http_client, &config, &country, &box_id)
.await
{
Ok(saved) => Some(saved), Ok(saved) => Some(saved),
Err(_) => { Err(_) => {
error!("[save_recipe] previous save not found ..."); error!("[save_recipe] previous save not found ...");
@ -535,7 +578,7 @@ pub async fn handle_recipe_save_change_request(
}; };
if result_pre.is_none() { if result_pre.is_none() {
result_pre = match get_latest_recipe_from_git(&config, &country).await { result_pre = match get_latest_recipe_from_git(http_client, &config, &country).await {
Ok(r) => Some(r), Ok(r) => Some(r),
Err(e) => { Err(e) => {
return Err(format!("{e}").into()); return Err(format!("{e}").into());
@ -616,7 +659,10 @@ pub async fn handle_recipe_save_change_request(
message: commit_message, message: commit_message,
}; };
if let Err(commit_error) = invoke_commit_request(config, commit_payload.clone()).await { let uidd_clone = uid_clone.lock().await.to_string();
if let Err(commit_error) =
invoke_commit_request(http_client, config, commit_payload.clone(), tx, uidd_clone).await
{
error!("failed to commit: {commit_error}"); error!("failed to commit: {commit_error}");
let _ = commit_payload.dump_backup(); let _ = commit_payload.dump_backup();
return Err(format!("{commit_error}").into()); return Err(format!("{commit_error}").into());
@ -634,8 +680,9 @@ pub async fn handle_recipe_save_change_request(
} }
pub async fn handle_request_list_menu_recipe( pub async fn handle_request_list_menu_recipe(
http_client: &reqwest::Client,
config: DevConfig, config: DevConfig,
redis: redis::Client, _redis: redis::Client,
tx: Sender<TxControlMessage>, tx: Sender<TxControlMessage>,
req: WebsocketMessageRequest, req: WebsocketMessageRequest,
uid_clone: Arc<Mutex<String>>, uid_clone: Arc<Mutex<String>>,
@ -646,20 +693,23 @@ pub async fn handle_request_list_menu_recipe(
let latest_key = format!("{country}/version", country = req_menu_list.country); let latest_key = format!("{country}/version", country = req_menu_list.country);
let latest_version = match invoke_checkout_request(config.clone(), latest_key).await { let latest_version =
Ok(version) => version, match invoke_checkout_request(http_client, config.clone(), latest_key).await {
Err(e) => { Ok(version) => version,
println!("Error on checkout: {e}"); Err(e) => {
"".to_string() println!("Error on checkout: {e}");
} "".to_string()
}; }
};
let country = req_menu_list.clone().country; let country = req_menu_list.clone().country;
let box_id = req_menu_list.clone().boxid; let box_id = req_menu_list.clone().boxid;
// merge from already saved recipe // merge from already saved recipe
let result_previous_on_same_boxid: Option<Recipe> = let result_previous_on_same_boxid: Option<Recipe> =
match get_latest_recipe_saved_machine_from_git(&config, &country, &box_id).await { match get_latest_recipe_saved_machine_from_git(http_client, &config, &country, &box_id)
.await
{
Ok(saved) => Some(saved), Ok(saved) => Some(saved),
Err(e) => { Err(e) => {
error!("[list-menu-restore] previous save not found, {e}"); error!("[list-menu-restore] previous save not found, {e}");
@ -678,13 +728,14 @@ pub async fn handle_request_list_menu_recipe(
i, i,
); );
let content = match invoke_checkout_request(config.clone(), r1_key.clone()).await { let content =
Ok(file_content) => file_content, match invoke_checkout_request(http_client, config.clone(), r1_key.clone()).await {
Err(e) => { Ok(file_content) => file_content,
println!("Error on checkout: {e}"); Err(e) => {
"".to_string() println!("Error on checkout: {e}");
} "".to_string()
}; }
};
info!("[list-menu] {r1_key} -> content ready: {}", content.len()); info!("[list-menu] {r1_key} -> content ready: {}", content.len());
let recipe = serde_json::from_str::<Recipe>(&content); let recipe = serde_json::from_str::<Recipe>(&content);
@ -784,3 +835,571 @@ fn handle_case_found_existed_recipe(
Ok(result.clone()) Ok(result.clone())
} }
async fn modify_material(
client: &tokio_postgres::Client,
payload: &ModifyMaterial,
_tx: &Sender<TxControlMessage>,
_uidd: &str,
) -> Result<u64, String> {
let mut set_clauses = Vec::new();
let mut param_values: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
let mut param_index = 1;
if payload.alarm_id_when_offline.is_some() {
set_clauses.push(format!("alarm_id_when_offline = ${}", param_index));
let val = payload.alarm_id_when_offline.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if payload.bean_channel {
set_clauses.push(format!("bean_channel = ${}", param_index));
param_values.push(Box::new(payload.bean_channel));
param_index += 1;
}
if let Some(canister_type) = &payload.canister_type {
if !canister_type.is_empty() {
set_clauses.push(format!("canister_type = ${}", param_index));
param_values.push(Box::new(canister_type.clone()));
param_index += 1;
}
}
if payload.drain_timer.is_some() {
set_clauses.push(format!("drain_timer = ${}", param_index));
let val = payload.drain_timer.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.ice_scream_bingsu_channel {
set_clauses.push(format!("ice_scream_bingsu_channel = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.is_equipment {
set_clauses.push(format!("is_equipment = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.leaves_channel {
set_clauses.push(format!("leaves_channel = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if payload.low_to_offline.is_some() {
set_clauses.push(format!("low_to_offline = ${}", param_index));
let val = payload.low_to_offline.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if payload.material_status.is_some() {
set_clauses.push(format!("material_status = ${}", param_index));
let val = payload.material_status.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.powder_channel {
set_clauses.push(format!("powder_channel = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.refill_unit_gram {
set_clauses.push(format!("refill_unit_gram = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.refill_unit_milliliters {
set_clauses.push(format!("refill_unit_milliliters = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.refill_unit_pcs {
set_clauses.push(format!("refill_unit_pcs = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if payload.schedule_drain_type.is_some() {
set_clauses.push(format!("schedule_drain_type = ${}", param_index));
let val = payload.schedule_drain_type.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.soda_channel {
set_clauses.push(format!("soda_channel = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(str_text_show_error) = &payload.str_text_show_error {
set_clauses.push(format!("str_text_show_error = ${}", param_index));
param_values.push(Box::new(str_text_show_error.clone()));
param_index += 1;
}
if let Some(val) = payload.syrup_channel {
set_clauses.push(format!("syrup_channel = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if payload.id_alternate.is_some() {
set_clauses.push(format!("id_alternate = ${}", param_index));
let val = payload.id_alternate.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(val) = payload.is_use {
set_clauses.push(format!("is_use = ${}", param_index));
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(material_other_name) = &payload.material_other_name {
if !material_other_name.is_empty() {
set_clauses.push(format!("material_other_name = ${}", param_index));
param_values.push(Box::new(material_other_name.clone()));
param_index += 1;
}
}
if let Some(material_name) = &payload.material_name {
if !material_name.is_empty() {
set_clauses.push(format!("material_name = ${}", param_index));
param_values.push(Box::new(material_name.clone()));
param_index += 1;
}
}
if let Some(path_other_name) = &payload.path_other_name {
if !path_other_name.is_empty() {
set_clauses.push(format!("path_other_name = ${}", param_index));
param_values.push(Box::new(path_other_name.clone()));
param_index += 1;
}
}
if payload.pay_rettry_max_count.is_some() {
set_clauses.push(format!("pay_retry_max_count = ${}", param_index));
let val = payload.pay_rettry_max_count.as_ref().map(|v| v.as_i64().unwrap_or(0) as i32).unwrap_or(0);
param_values.push(Box::new(val));
param_index += 1;
}
if let Some(raw_material_unit) = &payload.raw_material_unit {
if !raw_material_unit.is_empty() {
set_clauses.push(format!("raw_material_unit = ${}", param_index));
param_values.push(Box::new(raw_material_unit.clone()));
param_index += 1;
}
}
if let Some(material_parameter) = &payload.material_parameter {
if !material_parameter.is_empty() {
set_clauses.push(format!("material_parameter = ${}", param_index));
param_values.push(Box::new(material_parameter.clone()));
param_index += 1;
}
}
if set_clauses.is_empty() {
return Err("At least one field besides id must be provided for modification".to_string());
}
set_clauses.push("updated_at = NOW()".to_string());
let sql = format!(
"UPDATE material_setting SET {} WHERE id = ${} RETURNING id",
set_clauses.join(", "),
param_index
);
param_values.push(Box::new(payload.id));
let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = param_values.iter().map(|b| b.as_ref() as _).collect();
match client.execute(&sql, &params).await {
Ok(rows) => Ok(rows),
Err(e) => Err(format!("Database error: {}", e)),
}
}
pub async fn handle_request_material_action(
http_client: &reqwest::Client,
config: DevConfig,
_redis: redis::Client,
postgres_cli: Arc<Mutex<tokio_postgres::Client>>,
tx: Sender<TxControlMessage>,
req: WebsocketMessageRequest,
uid_clone: Arc<Mutex<String>>,
) -> WebsocketMessageResult {
// suppose guard value passed
let p = req.payload.unwrap();
let req_material_action: RequestMaterialActionPayload = safe_deserialize(&p)?;
let (country, mut box_id) = if req_material_action.country.contains("_") {
// send with box id
let spl: Vec<String> = req_material_action
.country
.split("_")
.map(|x| x.to_string())
.collect();
(spl[0].clone(), spl[1].clone())
} else {
(req_material_action.country, "".to_string())
};
let country_prefix = config.get_country_config_from_short_name(&country);
if box_id.is_empty() {
box_id = String::from("unknown");
}
let expected_file_path = format!("{country}/coffeethai02_{country}_{box_id}_temp.json");
//
let uidd = uid_clone.lock().await.to_string();
let result_pre = get_machine_recipe_with_retry(
&http_client,
&config,
&country,
&box_id,
Some(
|client: &reqwest::Client, config: &DevConfig, cnt: String| {
// NOTE: must do clone to extend lifetime for this scope
let client_clone = client.clone();
let config_clone = config.clone();
Box::pin(async move {
get_latest_recipe_from_git(&client_clone, &config_clone, &cnt)
.await
.ok()
})
},
),
)
.await;
if result_pre.is_none() {
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"error": "fail to interact with recipe, try again later"
}
})))
.await;
return Err(format!("cannot fetch recipe").into());
} else {
match req_material_action.action {
AvailableMaterialAction::Create
if let Some(d) = req_material_action.data.to_owned()
&& let Ok(mut create_payload) = serde_json::from_value::<CreateMaterial>(d) =>
{
if let Some(cp) = country_prefix
&& create_payload.check_valid(Some(cp.to_string()))
{
let mut result = result_pre.unwrap();
let display_name = req_material_action
.user_info
.get("displayName")
.unwrap_or_default()
.as_str()
.unwrap_or(&"unknown".to_string())
.to_string();
let email = req_material_action
.user_info
.get("email")
.unwrap_or_default()
.as_str()
.unwrap_or(&"unknown".to_string())
.to_string();
let current_mat = result.list_material_settings();
let request_mat_id_string = create_payload.0.id.as_i64().unwrap().to_string();
if current_mat.contains(&request_mat_id_string) {
warn!(
"[create_material] unexpect new requested material: {request_mat_id_string} but already existed"
);
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"error": "reject by incorrect material action, material already existed, this should be modified instead."
}
})))
.await;
} else {
info!("[create_material] new requested material: {request_mat_id_string}");
result.MaterialSetting.push(create_payload.0.clone());
// save to git prepare process
let serial_recipe =
match serde_json::to_string_pretty(&serde_json::json!(result)) {
Ok(s) => s,
Err(e) => {
error!("failed to serialize recipe: {e}");
return Err(format!("{e}").into());
}
};
let commit_message = format!(
"add material {}({})",
create_payload.0.materialOtherName.unwrap_or_default(),
create_payload.0.id.as_i64().unwrap()
);
let commit_payload = CommitPayload {
file_bytes: serial_recipe.as_bytes().to_vec(),
path: expected_file_path.clone(),
signature_username: display_name,
signature_email: email,
message: commit_message,
};
if let Err(commit_error) = invoke_commit_request(
http_client,
config,
commit_payload.clone(),
tx,
uidd,
)
.await
{
error!("failed to commit: {commit_error}");
let _ = commit_payload.dump_backup();
return Err(format!("{commit_error}").into());
}
}
} else {
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"error": "reject by invalid material range or unavailable country"
}
})))
.await;
}
}
AvailableMaterialAction::Modify => {
if let Some(d) = req_material_action.data.to_owned()
&& let Ok(modify_payload) = serde_json::from_value::<ModifyMaterial>(d)
{
let client = postgres_cli.lock().await;
let tx_result = match modify_material(&client, &modify_payload, &tx, &uidd).await {
Ok(rows_updated) => {
info!("[modify_material] updated {} row(s) for id={}", rows_updated, modify_payload.id);
serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"msg": format!("Material updated successfully, {} row(s) affected", rows_updated),
"updated_rows": rows_updated,
"material_id": modify_payload.id
}
})
}
Err(e) => {
error!("[modify_material] failed for id={}: {}", modify_payload.id, e);
serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"error": e,
"material_id": modify_payload.id
}
})
}
};
let _ = tx.send(TxControlMessage::Payload(tx_result)).await;
}
}
AvailableMaterialAction::GetNewMaterialId => {
// TODO: get available material id, user must provide the type they want
}
AvailableMaterialAction::Update => {
let result = result_pre.unwrap();
let client = postgres_cli.lock().await;
if country.eq("mys") {
// ignore this, as malaysia use same id with tha
info!("[update_material] skip update, as malaysia use same id with tha");
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"msg": "Material update skipped"
}
})))
.await;
return Ok(());
}
for matset in result.MaterialSetting {
let id = matset.id.as_i64().unwrap_or(0) as i32;
let id_alternate = matset.idAlternate.as_i64().unwrap_or(0) as i32;
let is_use = matset.isUse;
let material_name = matset.materialName.clone().unwrap_or_default();
let material_other_name = matset.materialOtherName.clone();
let path_other_name = matset.pathOtherName.clone();
let bean_channel = matset.BeanChannel;
let syrup_channel = matset.SyrupChannel;
let powder_channel = matset.PowderChannel;
let ice_scream_bingsu_channel = matset.IceScreamBingsuChannel.unwrap_or(false);
let soda_channel = matset.SodaChannel;
let leaves_channel = matset.LeavesChannel;
let is_equipment = matset.IsEquipment;
let canister_type = matset.CanisterType.clone().unwrap_or_default();
let alarm_id_when_offline =
matset.AlarmIDWhenOffline.as_i64().unwrap_or(0) as i32;
let drain_timer = matset.DrainTimer.as_i64().unwrap_or(0) as i32;
let low_to_offline = matset.LowToOffline.as_i64().unwrap_or(0) as i32;
let material_status = matset.MaterialStatus.as_i64().unwrap_or(0) as i32;
let schedule_drain_type = matset.ScheduleDrainType.as_i64().unwrap_or(0) as i32;
let pay_retry_max_count =
matset.pay_rettry_max_count.as_i64().unwrap_or(0) as i32;
let refill_unit_gram = matset.RefillUnitGram;
let refill_unit_milliliters = matset.RefillUnitMilliliters;
let refill_unit_pcs = matset.RefillUnitPCS;
let material_parameter = matset.MaterialParameter.clone();
let raw_material_unit = matset.RawMaterialUnit.clone();
let str_text_show_error = matset.StrTextShowError.clone().unwrap_or_default();
// Fields not in MaterialSetting but in table schema - use defaults
let fresh_syrup_channel = false;
let frozen_fruit_channel = false;
let item_channel = false;
let feed_mode: Option<String> = None;
let upsert_sql = r#"INSERT INTO material_setting (id, id_alternate, is_use, material_name, material_other_name, path_other_name, bean_channel, syrup_channel, powder_channel, fresh_syrup_channel, frozen_fruit_channel, ice_scream_bingsu_channel, soda_channel,leaves_channel, item_channel, is_equipment, canister_type,alarm_id_when_offline, drain_timer, low_to_offline, material_status,schedule_drain_type,pay_retry_max_count,refill_unit_gram,refill_unit_milliliters, refill_unit_pcs, feed_mode, material_parameter,raw_material_unit, str_text_show_error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17,$18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30) ON CONFLICT (id) DO UPDATE SET
id_alternate = EXCLUDED.id_alternate,
is_use = EXCLUDED.is_use,
material_name = EXCLUDED.material_name,
material_other_name = EXCLUDED.material_other_name,
path_other_name = EXCLUDED.path_other_name,
bean_channel = EXCLUDED.bean_channel,
syrup_channel = EXCLUDED.syrup_channel,
powder_channel = EXCLUDED.powder_channel,
fresh_syrup_channel = EXCLUDED.fresh_syrup_channel,
frozen_fruit_channel = EXCLUDED.frozen_fruit_channel,
ice_scream_bingsu_channel = EXCLUDED.ice_scream_bingsu_channel,
soda_channel = EXCLUDED.soda_channel,
leaves_channel = EXCLUDED.leaves_channel,
item_channel = EXCLUDED.item_channel,
is_equipment = EXCLUDED.is_equipment,
canister_type = EXCLUDED.canister_type,
alarm_id_when_offline = EXCLUDED.alarm_id_when_offline,
drain_timer = EXCLUDED.drain_timer,
low_to_offline = EXCLUDED.low_to_offline,
material_status = EXCLUDED.material_status,
schedule_drain_type = EXCLUDED.schedule_drain_type,
pay_retry_max_count = EXCLUDED.pay_retry_max_count,
refill_unit_gram = EXCLUDED.refill_unit_gram,
refill_unit_milliliters = EXCLUDED.refill_unit_milliliters,
refill_unit_pcs = EXCLUDED.refill_unit_pcs,
feed_mode = EXCLUDED.feed_mode,
material_parameter = EXCLUDED.material_parameter,
raw_material_unit = EXCLUDED.raw_material_unit,
str_text_show_error = EXCLUDED.str_text_show_error,
updated_at = NOW()
"#;
if let Err(e) = client
.execute(
upsert_sql,
&[
&id,
&id_alternate,
&is_use,
&material_name,
&material_other_name,
&path_other_name,
&bean_channel,
&syrup_channel,
&powder_channel,
&fresh_syrup_channel,
&frozen_fruit_channel,
&ice_scream_bingsu_channel,
&soda_channel,
&leaves_channel,
&item_channel,
&is_equipment,
&canister_type,
&alarm_id_when_offline,
&drain_timer,
&low_to_offline,
&material_status,
&schedule_drain_type,
&pay_retry_max_count,
&refill_unit_gram,
&refill_unit_milliliters,
&refill_unit_pcs,
&feed_mode,
&material_parameter,
&raw_material_unit,
&str_text_show_error,
],
)
.await
{
error!(
"[update_material] failed to upsert material {}: {}",
material_name, e
);
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"error": format!("Failed to update material {}: {}", material_name, e)
}
})))
.await;
} else {
info!(
"[update_material] upserted material: {} (id={})",
material_name, id
);
}
}
let _ = tx
.send(TxControlMessage::Payload(serde_json::json!({
"type": "notify",
"payload": {
"to": uidd,
"msg": "Material update completed"
}
})))
.await;
}
_ => {}
}
}
Ok(())
}