Compare commits
No commits in common. "0f857445a430fb97ce496ca76f6db1b62193cf11" and "da956d39a7349364476bcd5bed60c9d6801880ff" have entirely different histories.
0f857445a4
...
da956d39a7
11 changed files with 15 additions and 717 deletions
18
Cargo.lock
generated
18
Cargo.lock
generated
|
|
@ -1567,16 +1567,6 @@ version = "0.3.17"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||
|
||||
[[package]]
|
||||
name = "mime_guess"
|
||||
version = "2.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
|
||||
dependencies = [
|
||||
"mime",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.9"
|
||||
|
|
@ -2084,7 +2074,6 @@ dependencies = [
|
|||
"bytes",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
|
|
@ -2095,7 +2084,6 @@ dependencies = [
|
|||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"quinn",
|
||||
|
|
@ -2839,12 +2827,6 @@ version = "1.20.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de"
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.24"
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.g
|
|||
log = "0.4.29"
|
||||
rayon = "1.11.0"
|
||||
redis = { version = "1.0.2", features = ["tokio-comp"] }
|
||||
reqwest = { version = "0.13.1", features = ["multipart"] }
|
||||
reqwest = "0.13.1"
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = "1.0.149"
|
||||
tokio = { version = "1.49.0", features = ["full"] }
|
||||
|
|
|
|||
167
src/app.rs
167
src/app.rs
|
|
@ -4,21 +4,14 @@ use axum::{
|
|||
routing::{get, post},
|
||||
serve::ListenerExt,
|
||||
};
|
||||
use log::{error, info, warn};
|
||||
use log::{error, info};
|
||||
use redis::TypedCommands;
|
||||
use reqwest::{StatusCode, multipart};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
env,
|
||||
fs::{self, File},
|
||||
io::BufReader,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
fs::read_dir,
|
||||
sync::{Mutex, mpsc::Sender},
|
||||
};
|
||||
use tokio::sync::{Mutex, mpsc::Sender};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Hub {
|
||||
|
|
@ -59,18 +52,6 @@ impl DevConfig {
|
|||
format!("{}/checkout?path={}", self.get_recipe_url(), path)
|
||||
}
|
||||
|
||||
pub fn get_post_file_to_recipe_repo(&self) -> String {
|
||||
format!("{}/commit", self.get_recipe_url())
|
||||
}
|
||||
|
||||
pub fn get_pull_recipe_repo(&self) -> String {
|
||||
format!("{}/pull", self.get_recipe_url())
|
||||
}
|
||||
|
||||
pub fn get_push_recipe_repo(&self) -> String {
|
||||
format!("{}/push", self.get_recipe_url())
|
||||
}
|
||||
|
||||
pub fn get_api_header(&self) -> (String, String) {
|
||||
("X-API-Key".to_string(), self.api_key.clone())
|
||||
}
|
||||
|
|
@ -102,7 +83,7 @@ impl AppState {
|
|||
let redis_cli_clone = redis_cli.clone();
|
||||
let tx_new = system_tx.clone();
|
||||
let result = Arc::new(AppState {
|
||||
dev_config: dev_config.clone(),
|
||||
dev_config,
|
||||
redis_cli,
|
||||
system_tx,
|
||||
connectors_mapping: Arc::new(Mutex::new(Hub {
|
||||
|
|
@ -110,72 +91,6 @@ impl AppState {
|
|||
})),
|
||||
});
|
||||
|
||||
// backup job
|
||||
let dev_config_backup = dev_config.clone();
|
||||
tokio::spawn(async move {
|
||||
let m_cfg = dev_config_backup.clone();
|
||||
|
||||
loop {
|
||||
// auto sync
|
||||
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
|
||||
warn!("pulling repo unhealthy, retry again in 5 minutes");
|
||||
continue;
|
||||
}
|
||||
|
||||
match read_dir(".").await {
|
||||
Ok(mut d) => {
|
||||
while let Ok(Some(entry)) = d.next_entry().await {
|
||||
let ent_path = entry.path();
|
||||
|
||||
if let Some(filename) = ent_path.file_name()
|
||||
&& let Some(filename_str) = filename.to_str()
|
||||
&& filename_str.starts_with("gtx")
|
||||
&& filename_str.ends_with(".json")
|
||||
{
|
||||
// read file
|
||||
//
|
||||
let f = match File::open(ent_path.clone()) {
|
||||
Ok(f) => f,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let buf = BufReader::new(f);
|
||||
|
||||
let commit_from_backup: CommitPayload =
|
||||
match serde_json::from_reader(buf) {
|
||||
Ok(cm) => cm,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
|
||||
warn!("pulling repo unhealthy, retry again in 5 minutes");
|
||||
continue;
|
||||
}
|
||||
|
||||
let _ =
|
||||
invoke_commit_request(m_cfg.clone(), commit_from_backup).await;
|
||||
|
||||
if invoke_push_request(m_cfg.clone()).await.is_ok() {
|
||||
// push success
|
||||
info!("push backup success");
|
||||
if fs::remove_file(ent_path.clone()).is_ok() {
|
||||
info!("clean backup");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
|
||||
info!("[backup] idle");
|
||||
|
||||
tokio::time::sleep(Duration::from_mins(5)).await;
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut lredis = redis_cli_clone.clone();
|
||||
let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> =
|
||||
|
|
@ -297,82 +212,6 @@ pub async fn invoke_checkout_request(
|
|||
}
|
||||
}
|
||||
|
||||
/// Invoke git pull, may takes sometime
|
||||
pub async fn invoke_pull_sync_request(
|
||||
config: DevConfig,
|
||||
) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let req_path = config.get_pull_recipe_repo();
|
||||
// println!("dbg: {req_path}");
|
||||
let res = client.get(req_path).send().await?;
|
||||
|
||||
if res.status() != StatusCode::OK {
|
||||
// pull fail
|
||||
|
||||
error!(
|
||||
"invoke pull fail: [{}] {:?}",
|
||||
res.status(),
|
||||
res.text().await
|
||||
);
|
||||
return Err("pull fail".into());
|
||||
}
|
||||
|
||||
match res.text().await {
|
||||
Ok(raw) => Ok(raw),
|
||||
Err(e) => Err(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Invoke sending from server to server for committing
|
||||
pub async fn invoke_commit_request(
|
||||
config: DevConfig,
|
||||
payload: CommitPayload,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
let commit_path = config.get_post_file_to_recipe_repo();
|
||||
let form = multipart::Form::new()
|
||||
.text("message", payload.message)
|
||||
.text("signature_username", payload.signature_username)
|
||||
.text("signature_email", payload.signature_email)
|
||||
.text("path", payload.path)
|
||||
.part(
|
||||
"file",
|
||||
multipart::Part::bytes(payload.file_bytes)
|
||||
.mime_str("application/octet-stream")
|
||||
.unwrap(),
|
||||
);
|
||||
let response = client.post(commit_path).multipart(form).send().await?;
|
||||
|
||||
info!("commit status: {}", response.status());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn invoke_push_request(config: DevConfig) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let req_path = config.get_push_recipe_repo();
|
||||
// println!("dbg: {req_path}");
|
||||
let res = client.get(req_path).send().await?;
|
||||
|
||||
if res.status() != StatusCode::OK {
|
||||
// pull fail
|
||||
|
||||
error!(
|
||||
"invoke push fail: [{}] {:?}",
|
||||
res.status(),
|
||||
res.text().await
|
||||
);
|
||||
return Err("push fail".into());
|
||||
}
|
||||
|
||||
match res.text().await {
|
||||
Ok(raw) => Ok(raw),
|
||||
Err(e) => Err(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_recipe_repo_router() -> Router<Arc<AppState>> {
|
||||
Router::new().route("/ws", get(crate::websocket::handler::websocket_handler))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,24 +20,15 @@ pub struct StreamDataStart {
|
|||
#[serde(rename = "ref")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub stream_ref: Option<String>,
|
||||
/// extra data, information
|
||||
pub metadata: String,
|
||||
}
|
||||
|
||||
impl IntoStreamMessage for StreamDataStart {
|
||||
const MSG_NAME: &str = "stream_data_start";
|
||||
|
||||
fn build(&self) -> serde_json::Value {
|
||||
let mut payload = serde_json::to_value(self).unwrap();
|
||||
|
||||
payload.as_object_mut().unwrap().insert(
|
||||
"to".to_string(),
|
||||
serde_json::json!(self.stream_ref.clone().unwrap_or_default()),
|
||||
);
|
||||
|
||||
serde_json::json!({
|
||||
"type": StreamDataStart::MSG_NAME,
|
||||
"payload": payload.clone()
|
||||
"payload": self.clone()
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -47,18 +38,12 @@ impl IntoStreamMessage for StreamDataStart {
|
|||
}
|
||||
|
||||
impl StreamDataStart {
|
||||
pub fn new(
|
||||
total_size: usize,
|
||||
chunk_size: usize,
|
||||
stream_ref: Option<String>,
|
||||
metadata: String,
|
||||
) -> Self {
|
||||
pub fn new(total_size: usize, chunk_size: usize, stream_ref: Option<String>) -> Self {
|
||||
Self {
|
||||
stream_id: Uuid::new_v4().to_string(),
|
||||
total_size,
|
||||
chunk_size,
|
||||
stream_ref,
|
||||
metadata,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -76,7 +61,6 @@ pub struct StreamDataChunk<T> {
|
|||
pub start_idx: usize,
|
||||
/// Chunked data which splited into N items per chunk
|
||||
pub data: Vec<T>,
|
||||
#[serde(rename = "to")]
|
||||
uid: String,
|
||||
}
|
||||
|
||||
|
|
@ -121,8 +105,6 @@ where
|
|||
pub struct StreamDataEnd {
|
||||
/// Uuid v4, client must mapping later values with this stream id
|
||||
pub stream_id: String,
|
||||
/// endpoint user
|
||||
pub to: String,
|
||||
}
|
||||
|
||||
impl IntoStreamMessage for StreamDataEnd {
|
||||
|
|
@ -141,10 +123,9 @@ impl IntoStreamMessage for StreamDataEnd {
|
|||
}
|
||||
|
||||
impl StreamDataEnd {
|
||||
pub fn new(sid: &str, to: String) -> Self {
|
||||
pub fn new(sid: &str) -> Self {
|
||||
Self {
|
||||
stream_id: sid.to_string(),
|
||||
to,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -161,7 +142,6 @@ pub struct StreamDataExtra<T> {
|
|||
pub exid: String,
|
||||
pub extp: String,
|
||||
pub payload: Vec<T>,
|
||||
pub to: String,
|
||||
}
|
||||
|
||||
impl<T> IntoStreamMessage for StreamDataExtra<T>
|
||||
|
|
@ -186,12 +166,11 @@ impl<T> StreamDataExtra<T>
|
|||
where
|
||||
T: Serialize + Clone,
|
||||
{
|
||||
pub fn new(exid: &str, extp: &str, data: Vec<T>, to: String) -> Self {
|
||||
pub fn new(exid: &str, extp: &str, data: Vec<T>) -> Self {
|
||||
Self {
|
||||
exid: exid.to_string(),
|
||||
extp: extp.to_string(),
|
||||
payload: data.to_vec(),
|
||||
to,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,3 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use super::model::*;
|
||||
use axum::extract::ws::{CloseFrame, Message, WebSocket};
|
||||
use redis::{TypedCommands, cmd};
|
||||
|
|
@ -66,21 +64,3 @@ pub fn convert_sys_msg_command(msg: &serde_json::Value) -> Option<SysMessage> {
|
|||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_extra_parameters(s: String) -> HashMap<String, String> {
|
||||
let mut result = HashMap::new();
|
||||
|
||||
let plist: Vec<String> = s.split(",").map(|x| x.to_string()).collect();
|
||||
|
||||
for pl in plist {
|
||||
let sm: Vec<String> = pl.split("=").map(|x| x.to_string()).collect();
|
||||
|
||||
if sm.len() != 2 {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.insert(sm[0].to_string(), sm[1].to_string());
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,21 +1,5 @@
|
|||
use std::fs::File;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub trait Backup: Send + Sync + Serialize {
|
||||
fn dump_backup(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let tx = format!("gtx-{}.json", uuid::Uuid::new_v4().to_string());
|
||||
let json = serde_json::to_string(&self).unwrap();
|
||||
let json2: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
|
||||
let writer = File::create(format!("./{tx}")).unwrap();
|
||||
let _ = serde_json::to_writer(writer, &json2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// system message to send back to client, this may be called from other services
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SysMessage {
|
||||
|
|
@ -56,27 +40,6 @@ pub struct RecipeRequestPayload {
|
|||
pub parameters: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum PriceRequestAction {
|
||||
View(String),
|
||||
Edit(String),
|
||||
}
|
||||
|
||||
/// Price request payload struct
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct PriceRequestPayload {
|
||||
/// Allowed interactions of price request
|
||||
pub action: PriceRequestAction,
|
||||
/// Country of recipe
|
||||
pub country: String,
|
||||
/// Filename to override default get, expect path
|
||||
pub override_file: Option<String>,
|
||||
/// Extended infos, required parameters or unimplemented fields in the current struct. Expected pattern `<key1>=<val1>,<key2>=<val2>,...`
|
||||
pub parameters: Option<String>,
|
||||
/// User info expect at least id, token, name
|
||||
pub user_info: serde_json::Value,
|
||||
}
|
||||
|
||||
/// Command request for external services
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct CommandRequestPayload {
|
||||
|
|
@ -121,21 +84,3 @@ pub struct AuthUserField {
|
|||
pub email: String,
|
||||
pub permissions: String,
|
||||
}
|
||||
|
||||
/// For sending to recipe repo, saving/committing value/file to git
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct CommitPayload {
|
||||
/// file/content as bytes
|
||||
pub file_bytes: Vec<u8>,
|
||||
/// path to commit file
|
||||
pub path: String,
|
||||
/// signature username part (committer)
|
||||
pub signature_username: String,
|
||||
/// signature email part (committer)
|
||||
pub signature_email: String,
|
||||
/// commit message
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
// use default backup method
|
||||
impl Backup for CommitPayload {}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ pub async fn read(
|
|||
Message::Text(t) => {
|
||||
let req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?;
|
||||
|
||||
// info!("get msg: {}", req.type_w);
|
||||
info!("get msg: {}", req.type_w);
|
||||
match req.type_w.as_str() {
|
||||
"recipe" if req.payload.is_some() => {
|
||||
tasks::recipe::handle_recipe_request(
|
||||
|
|
@ -61,27 +61,7 @@ pub async fn read(
|
|||
req,
|
||||
uid_clone.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"recipe_versions" if req.payload.is_some() => {
|
||||
tasks::recipe::handle_recipe_versions_list_request(
|
||||
config.clone(),
|
||||
redis.clone(),
|
||||
tx.clone(),
|
||||
req,
|
||||
uid_clone.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"price" if req.payload.is_some() => {
|
||||
tasks::price::handle_price_request(
|
||||
config.clone(),
|
||||
redis.clone(),
|
||||
tx.clone(),
|
||||
req,
|
||||
uid_clone.clone(),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
}
|
||||
"command" if req.payload.is_some() => {
|
||||
tasks::command::handle_command_request(state.clone(), tx.clone(), req)
|
||||
|
|
@ -221,8 +201,6 @@ pub async fn write(
|
|||
}
|
||||
|
||||
let _ = sender.send(res.to_string().into()).await;
|
||||
} else {
|
||||
warn!("failed to send message, as the receiver not detected: {res:?}");
|
||||
}
|
||||
}
|
||||
TxControlMessage::CloseExist => {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
pub mod auth;
|
||||
pub mod command;
|
||||
pub mod price;
|
||||
pub mod recipe;
|
||||
pub mod sheet;
|
||||
pub mod watchdog;
|
||||
|
|
|
|||
|
|
@ -1,341 +0,0 @@
|
|||
use crate::app::*;
|
||||
use crate::stream::model::{
|
||||
IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart,
|
||||
};
|
||||
use crate::websocket::{core::*, helper::*, model::*};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::{fs::File, io::Read, sync::Arc};
|
||||
use tokio::sync::{Mutex, mpsc::Sender};
|
||||
|
||||
use crate::websocket::core::WebsocketMessageResult;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct MenuPrice {
|
||||
ProductCode: String,
|
||||
NewPrice: serde_json::Value,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
StringParam: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
Discount: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
Percent: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
roundup: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct RecipePrice {
|
||||
#[serde(flatten)]
|
||||
metadata: serde_json::Value,
|
||||
content: Vec<MenuPrice>,
|
||||
}
|
||||
|
||||
impl RecipePrice {
|
||||
pub const HIDE_PARAM: &str = "hide=true";
|
||||
|
||||
pub fn import(path: String) -> RecipePrice {
|
||||
debug!("try import {path}");
|
||||
let mut file = File::open(path).expect("file not found");
|
||||
let mut data = String::new();
|
||||
|
||||
file.read_to_string(&mut data).unwrap();
|
||||
|
||||
let res: Result<RecipePrice, _> = serde_json::from_str(&data);
|
||||
match res {
|
||||
Ok(rp) => rp,
|
||||
Err(e) => {
|
||||
error!("error while deserialize price: {e}");
|
||||
RecipePrice {
|
||||
content: Vec::new(),
|
||||
metadata: serde_json::Value::Null,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn import_from_raw_string(raw: String) -> RecipePrice {
|
||||
let res: Result<RecipePrice, _> = serde_json::from_str(&raw);
|
||||
match res {
|
||||
Ok(rp) => rp,
|
||||
Err(e) => {
|
||||
error!("error while deserialize price: {e}");
|
||||
RecipePrice {
|
||||
content: Vec::new(),
|
||||
metadata: serde_json::Value::Null,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// only for getting data without modify
|
||||
pub fn find_by_pd(&self, pd: &str) -> Option<MenuPrice> {
|
||||
self.content
|
||||
.iter()
|
||||
.find(|x| x.ProductCode.eq(pd))
|
||||
.map(|x| x.clone())
|
||||
}
|
||||
|
||||
pub fn modify_price_by_pd(&mut self, pd: &str, price: serde_json::Value) {
|
||||
if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) {
|
||||
mp.NewPrice = price;
|
||||
}
|
||||
}
|
||||
|
||||
fn modify_string_param_by_pd(&mut self, pd: &str, param: &str) {
|
||||
if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) {
|
||||
mp.StringParam = Some(param.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_visibility_by_pd(&mut self, pd: &str, show: bool) {
|
||||
if !show {
|
||||
self.modify_string_param_by_pd(pd, RecipePrice::HIDE_PARAM);
|
||||
} else {
|
||||
self.modify_string_param_by_pd(pd, "");
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: disable write to file
|
||||
// pub fn export_to_json_file(self, outpath: Option<String>) {
|
||||
// let json = serde_json::to_string(&self).unwrap();
|
||||
// let json2: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
|
||||
// if let Some(outpath) = outpath {
|
||||
// let writer = File::create(outpath).unwrap();
|
||||
// let _ = serde_json::to_writer_pretty(writer, &json2);
|
||||
// } else {
|
||||
// println!("Default save to (execute)/price.json");
|
||||
// let writer = File::create("price.json").unwrap();
|
||||
// let _ = serde_json::to_writer_pretty(writer, &json2);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
/// Get list of price
|
||||
// pub async fn handle_price_list_request(
|
||||
// config: DevConfig,
|
||||
// redis: redis::Client,
|
||||
// tx: Sender<TxControlMessage>,
|
||||
// req: WebsocketMessageRequest,
|
||||
// uid_clone: Arc<Mutex<String>>,
|
||||
// ) -> WebsocketMessageResult {
|
||||
|
||||
// }
|
||||
|
||||
/// Get main price profile of country
|
||||
pub async fn handle_price_request(
|
||||
config: DevConfig,
|
||||
redis: redis::Client,
|
||||
tx: Sender<TxControlMessage>,
|
||||
req: WebsocketMessageRequest,
|
||||
uid_clone: Arc<Mutex<String>>,
|
||||
) -> WebsocketMessageResult {
|
||||
let p = req.payload.unwrap();
|
||||
|
||||
let price_param: PriceRequestPayload = serde_json::from_value(p)?;
|
||||
|
||||
let mut price_file_format = format!(
|
||||
"{}/profile_{}_master.json",
|
||||
price_param.country,
|
||||
price_param.country.to_uppercase()
|
||||
);
|
||||
|
||||
if let Some(override_file) = price_param.override_file {
|
||||
price_file_format = override_file;
|
||||
}
|
||||
|
||||
let price_action = price_param.action;
|
||||
|
||||
let price_content =
|
||||
match invoke_checkout_request(config.clone(), price_file_format.clone()).await {
|
||||
Ok(pc) => pc,
|
||||
Err(e) => return Err(format!("Cannot find price of expected country: {e:?}").into()),
|
||||
};
|
||||
|
||||
info!("price content len: {}", price_content.len());
|
||||
|
||||
let mut rpp = RecipePrice::import_from_raw_string(price_content);
|
||||
let _uid = uid_clone.clone();
|
||||
let uidd = _uid.try_lock().unwrap();
|
||||
|
||||
info!("price action: {price_action:?}");
|
||||
|
||||
let user_info = price_param.user_info.clone();
|
||||
|
||||
match price_action {
|
||||
PriceRequestAction::View(view_opt) => {
|
||||
let viewing_options: HashMap<String, String> = get_extra_parameters(view_opt);
|
||||
// sa=all
|
||||
// sa=get,pd=...
|
||||
// sa=query,list=1|2|3
|
||||
// sa=query,where=contain,kw=...
|
||||
let sub_action = viewing_options.get("sa");
|
||||
let pd = viewing_options.get("pd");
|
||||
|
||||
if let Some(sa) = sub_action {
|
||||
let mut result = Vec::new();
|
||||
let action_done = match sa.as_str() {
|
||||
"all" => {
|
||||
result = rpp.content;
|
||||
|
||||
true
|
||||
}
|
||||
"get"
|
||||
if let Some(pd) = pd
|
||||
&& let Some(mp) = rpp.find_by_pd(pd) =>
|
||||
{
|
||||
result.push(mp);
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if action_done {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "ok",
|
||||
"content": result,
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
} else {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "fail",
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
PriceRequestAction::Edit(edit_opt) => {
|
||||
let editing_options: HashMap<String, String> = get_extra_parameters(edit_opt);
|
||||
// sa=change,pd=...,to=...
|
||||
// sa=hide,pd=...
|
||||
// sa=disable,pd=... disable = hide
|
||||
// sa=show,pd=...
|
||||
// sa=toggle,pd=...,state=show|hide
|
||||
let sub_action = editing_options.get("sa");
|
||||
let pd = editing_options.get("pd");
|
||||
let to = editing_options.get("to");
|
||||
let state = editing_options.get("state");
|
||||
|
||||
if let Some(sa) = sub_action {
|
||||
let mut action_message = String::new();
|
||||
let action_done = match sa.as_str() {
|
||||
"change"
|
||||
if let Some(pd) = pd
|
||||
&& let Some(to) = to
|
||||
&& let Some(mp) = rpp.find_by_pd(pd.as_str()) =>
|
||||
{
|
||||
info!(
|
||||
"[CHANGE] price of {pd} from {} to {to}",
|
||||
mp.NewPrice.as_i64().unwrap()
|
||||
);
|
||||
action_message = format!(
|
||||
"[CHANGE] price of {pd} from {} to {to}",
|
||||
mp.NewPrice.as_i64().unwrap()
|
||||
);
|
||||
|
||||
let price_int = to.parse::<i64>()?;
|
||||
|
||||
rpp.modify_price_by_pd(pd, serde_json::json!(price_int));
|
||||
true
|
||||
}
|
||||
"toggle"
|
||||
if let Some(pd) = pd
|
||||
&& let Some(state) = state =>
|
||||
{
|
||||
info!("[TOGGLE] {pd} to {state}");
|
||||
action_message = format!("[TOGGLE] {pd} to {state}");
|
||||
|
||||
match state.as_str() {
|
||||
"show" => rpp.set_visibility_by_pd(pd.as_str(), true),
|
||||
"hide" | "disable" => rpp.set_visibility_by_pd(pd.as_str(), false),
|
||||
_ => {
|
||||
warn!("unknown state toggle");
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if action_done {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "ok",
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
|
||||
// send save
|
||||
//
|
||||
|
||||
let all_prices_str = serde_json::to_string_pretty(&rpp)?;
|
||||
let commit_payload = CommitPayload {
|
||||
file_bytes: all_prices_str.as_bytes().to_vec(),
|
||||
path: price_file_format.clone(),
|
||||
signature_username: user_info
|
||||
.get("displayName")
|
||||
.unwrap_or_default()
|
||||
.as_str()
|
||||
.unwrap_or(&"unknown".to_string())
|
||||
.to_string(),
|
||||
signature_email: user_info
|
||||
.get("email")
|
||||
.unwrap_or_default()
|
||||
.as_str()
|
||||
.unwrap_or(&"unknown".to_string())
|
||||
.to_string(),
|
||||
message: action_message,
|
||||
};
|
||||
|
||||
if invoke_pull_sync_request(config.clone()).await.is_err() {
|
||||
// backup
|
||||
let _ = commit_payload.dump_backup();
|
||||
return Err("Fail to sync repo, backing up ...".into());
|
||||
}
|
||||
|
||||
let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await;
|
||||
|
||||
if invoke_push_request(config.clone()).await.is_err() {
|
||||
let _ = commit_payload.dump_backup();
|
||||
return Err("Fail to push repo, backing up ...".into());
|
||||
}
|
||||
|
||||
// push to git
|
||||
} else {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "fail",
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -4,7 +4,6 @@ use crate::stream::model::{
|
|||
};
|
||||
use crate::websocket::{core::*, helper::*, model::*};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
|
||||
|
||||
use async_compression::tokio::bufread::BrotliDecoder;
|
||||
|
|
@ -93,7 +92,6 @@ pub async fn throttle_send_recipe(
|
|||
r01s.len(),
|
||||
CHUNK_SIZE,
|
||||
Some(uid.try_lock().unwrap().to_string()),
|
||||
format!("version={version},country={country}").to_string(),
|
||||
);
|
||||
|
||||
let sid = ss.get_id();
|
||||
|
|
@ -120,7 +118,7 @@ pub async fn throttle_send_recipe(
|
|||
for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() {
|
||||
let curr_ch_id = format!("{mat_exid}_{index}");
|
||||
|
||||
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec(), uidd.clone());
|
||||
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec());
|
||||
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(extra_matset.as_msg()))
|
||||
|
|
@ -134,7 +132,7 @@ pub async fn throttle_send_recipe(
|
|||
let extl = "topplist";
|
||||
for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() {
|
||||
let curr_ch_id = format!("{mat_exid}_tl{index}");
|
||||
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec(), uidd.clone());
|
||||
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec());
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(extra_topplist.as_msg()))
|
||||
.await
|
||||
|
|
@ -147,7 +145,7 @@ pub async fn throttle_send_recipe(
|
|||
let extg = "toppgrp";
|
||||
for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() {
|
||||
let curr_ch_id = format!("{mat_exid}_tg{index}");
|
||||
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec(), uidd.clone());
|
||||
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec());
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(extra_toppgrp.as_msg()))
|
||||
.await
|
||||
|
|
@ -165,7 +163,7 @@ pub async fn throttle_send_recipe(
|
|||
info!("sending {sid}");
|
||||
|
||||
// return sid;
|
||||
let end_msg = StreamDataEnd::new(&sid, uidd.clone());
|
||||
let end_msg = StreamDataEnd::new(&sid);
|
||||
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(end_msg.as_msg()))
|
||||
|
|
@ -224,23 +222,6 @@ pub async fn handle_recipe_request(
|
|||
};
|
||||
}
|
||||
|
||||
// detect if use different version
|
||||
// parameter: use_legacy_version=true,version=888
|
||||
if let Some(extra_param) = recipe_param.clone().parameters {
|
||||
let pmap = get_extra_parameters(extra_param);
|
||||
|
||||
latest_version = if pmap.contains_key("use_legacy_version")
|
||||
&& let Some(legacy_cfg) = pmap.get("use_legacy_version")
|
||||
&& legacy_cfg.eq("true")
|
||||
{
|
||||
pmap.get("version").unwrap_or(&latest_version).to_string()
|
||||
} else {
|
||||
latest_version
|
||||
};
|
||||
|
||||
info!("after param in recipe: {latest_version}");
|
||||
}
|
||||
|
||||
let req_file = if is_req_patch(&recipe_param) {
|
||||
format!(
|
||||
"stx_{country}_{version}.json",
|
||||
|
|
@ -372,47 +353,3 @@ pub async fn handle_recipe_request(
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_recipe_versions_list_request(
|
||||
config: DevConfig,
|
||||
redis: redis::Client,
|
||||
tx: Sender<TxControlMessage>,
|
||||
req: WebsocketMessageRequest,
|
||||
uid_clone: Arc<Mutex<String>>,
|
||||
) -> WebsocketMessageResult {
|
||||
let p = req.payload.unwrap();
|
||||
let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?;
|
||||
|
||||
let version_list = format!("{country}", country = recipe_param.country);
|
||||
|
||||
let country_versions_str = match invoke_checkout_request(config.clone(), version_list).await {
|
||||
Ok(vs) => vs,
|
||||
Err(e) => return Err(format!("Cannot find versions of expected country: {e:?}").into()),
|
||||
};
|
||||
|
||||
// extract version as list
|
||||
let files: Vec<String> = country_versions_str
|
||||
.split(",")
|
||||
.map(|x| x.to_string())
|
||||
.collect();
|
||||
|
||||
let result: Vec<String> = files
|
||||
.iter()
|
||||
.filter(|x| x.starts_with("coffeethai02") && x.ends_with(".json"))
|
||||
.map(|x| x.replace("coffeethai02_", "").replace(".json", ""))
|
||||
.collect();
|
||||
|
||||
let uidd = uid_clone.clone().try_lock().unwrap().to_string();
|
||||
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "version_selectors",
|
||||
"payload": {
|
||||
"versions": result,
|
||||
"to": uidd
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ pub async fn get_watchdog_task(
|
|||
let h = hub.try_lock().unwrap();
|
||||
let curr_user = user.try_lock().unwrap().to_string();
|
||||
|
||||
// info!("{}: checking invalid ...", curr_user);
|
||||
info!("{}: checking invalid ...", curr_user);
|
||||
|
||||
if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") {
|
||||
warn!("detect unauthorized -- {}", curr_user);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue