Compare commits
3 commits
da956d39a7
...
0f857445a4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f857445a4 | ||
|
|
ab84060ab5 | ||
|
|
295f69a34c |
11 changed files with 717 additions and 15 deletions
18
Cargo.lock
generated
18
Cargo.lock
generated
|
|
@ -1567,6 +1567,16 @@ version = "0.3.17"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||
|
||||
[[package]]
|
||||
name = "mime_guess"
|
||||
version = "2.0.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
|
||||
dependencies = [
|
||||
"mime",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.9"
|
||||
|
|
@ -2074,6 +2084,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"encoding_rs",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
"http",
|
||||
"http-body",
|
||||
|
|
@ -2084,6 +2095,7 @@ dependencies = [
|
|||
"js-sys",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"quinn",
|
||||
|
|
@ -2827,6 +2839,12 @@ version = "1.20.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de"
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.24"
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ libtbr = { git = "https://pakin-inspiron-15-3530.tail110d9.ts.net/pakin/libtbr.g
|
|||
log = "0.4.29"
|
||||
rayon = "1.11.0"
|
||||
redis = { version = "1.0.2", features = ["tokio-comp"] }
|
||||
reqwest = "0.13.1"
|
||||
reqwest = { version = "0.13.1", features = ["multipart"] }
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = "1.0.149"
|
||||
tokio = { version = "1.49.0", features = ["full"] }
|
||||
|
|
|
|||
167
src/app.rs
167
src/app.rs
|
|
@ -4,14 +4,21 @@ use axum::{
|
|||
routing::{get, post},
|
||||
serve::ListenerExt,
|
||||
};
|
||||
use log::{error, info};
|
||||
use log::{error, info, warn};
|
||||
use redis::TypedCommands;
|
||||
use reqwest::{StatusCode, multipart};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
env,
|
||||
fs::{self, File},
|
||||
io::BufReader,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::{
|
||||
fs::read_dir,
|
||||
sync::{Mutex, mpsc::Sender},
|
||||
};
|
||||
use tokio::sync::{Mutex, mpsc::Sender};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Hub {
|
||||
|
|
@ -52,6 +59,18 @@ impl DevConfig {
|
|||
format!("{}/checkout?path={}", self.get_recipe_url(), path)
|
||||
}
|
||||
|
||||
pub fn get_post_file_to_recipe_repo(&self) -> String {
|
||||
format!("{}/commit", self.get_recipe_url())
|
||||
}
|
||||
|
||||
pub fn get_pull_recipe_repo(&self) -> String {
|
||||
format!("{}/pull", self.get_recipe_url())
|
||||
}
|
||||
|
||||
pub fn get_push_recipe_repo(&self) -> String {
|
||||
format!("{}/push", self.get_recipe_url())
|
||||
}
|
||||
|
||||
pub fn get_api_header(&self) -> (String, String) {
|
||||
("X-API-Key".to_string(), self.api_key.clone())
|
||||
}
|
||||
|
|
@ -83,7 +102,7 @@ impl AppState {
|
|||
let redis_cli_clone = redis_cli.clone();
|
||||
let tx_new = system_tx.clone();
|
||||
let result = Arc::new(AppState {
|
||||
dev_config,
|
||||
dev_config: dev_config.clone(),
|
||||
redis_cli,
|
||||
system_tx,
|
||||
connectors_mapping: Arc::new(Mutex::new(Hub {
|
||||
|
|
@ -91,6 +110,72 @@ impl AppState {
|
|||
})),
|
||||
});
|
||||
|
||||
// backup job
|
||||
let dev_config_backup = dev_config.clone();
|
||||
tokio::spawn(async move {
|
||||
let m_cfg = dev_config_backup.clone();
|
||||
|
||||
loop {
|
||||
// auto sync
|
||||
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
|
||||
warn!("pulling repo unhealthy, retry again in 5 minutes");
|
||||
continue;
|
||||
}
|
||||
|
||||
match read_dir(".").await {
|
||||
Ok(mut d) => {
|
||||
while let Ok(Some(entry)) = d.next_entry().await {
|
||||
let ent_path = entry.path();
|
||||
|
||||
if let Some(filename) = ent_path.file_name()
|
||||
&& let Some(filename_str) = filename.to_str()
|
||||
&& filename_str.starts_with("gtx")
|
||||
&& filename_str.ends_with(".json")
|
||||
{
|
||||
// read file
|
||||
//
|
||||
let f = match File::open(ent_path.clone()) {
|
||||
Ok(f) => f,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let buf = BufReader::new(f);
|
||||
|
||||
let commit_from_backup: CommitPayload =
|
||||
match serde_json::from_reader(buf) {
|
||||
Ok(cm) => cm,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
|
||||
warn!("pulling repo unhealthy, retry again in 5 minutes");
|
||||
continue;
|
||||
}
|
||||
|
||||
let _ =
|
||||
invoke_commit_request(m_cfg.clone(), commit_from_backup).await;
|
||||
|
||||
if invoke_push_request(m_cfg.clone()).await.is_ok() {
|
||||
// push success
|
||||
info!("push backup success");
|
||||
if fs::remove_file(ent_path.clone()).is_ok() {
|
||||
info!("clean backup");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
|
||||
info!("[backup] idle");
|
||||
|
||||
tokio::time::sleep(Duration::from_mins(5)).await;
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut lredis = redis_cli_clone.clone();
|
||||
let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> =
|
||||
|
|
@ -212,6 +297,82 @@ pub async fn invoke_checkout_request(
|
|||
}
|
||||
}
|
||||
|
||||
/// Invoke git pull, may takes sometime
|
||||
pub async fn invoke_pull_sync_request(
|
||||
config: DevConfig,
|
||||
) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let req_path = config.get_pull_recipe_repo();
|
||||
// println!("dbg: {req_path}");
|
||||
let res = client.get(req_path).send().await?;
|
||||
|
||||
if res.status() != StatusCode::OK {
|
||||
// pull fail
|
||||
|
||||
error!(
|
||||
"invoke pull fail: [{}] {:?}",
|
||||
res.status(),
|
||||
res.text().await
|
||||
);
|
||||
return Err("pull fail".into());
|
||||
}
|
||||
|
||||
match res.text().await {
|
||||
Ok(raw) => Ok(raw),
|
||||
Err(e) => Err(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Invoke sending from server to server for committing
|
||||
pub async fn invoke_commit_request(
|
||||
config: DevConfig,
|
||||
payload: CommitPayload,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
let commit_path = config.get_post_file_to_recipe_repo();
|
||||
let form = multipart::Form::new()
|
||||
.text("message", payload.message)
|
||||
.text("signature_username", payload.signature_username)
|
||||
.text("signature_email", payload.signature_email)
|
||||
.text("path", payload.path)
|
||||
.part(
|
||||
"file",
|
||||
multipart::Part::bytes(payload.file_bytes)
|
||||
.mime_str("application/octet-stream")
|
||||
.unwrap(),
|
||||
);
|
||||
let response = client.post(commit_path).multipart(form).send().await?;
|
||||
|
||||
info!("commit status: {}", response.status());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn invoke_push_request(config: DevConfig) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let req_path = config.get_push_recipe_repo();
|
||||
// println!("dbg: {req_path}");
|
||||
let res = client.get(req_path).send().await?;
|
||||
|
||||
if res.status() != StatusCode::OK {
|
||||
// pull fail
|
||||
|
||||
error!(
|
||||
"invoke push fail: [{}] {:?}",
|
||||
res.status(),
|
||||
res.text().await
|
||||
);
|
||||
return Err("push fail".into());
|
||||
}
|
||||
|
||||
match res.text().await {
|
||||
Ok(raw) => Ok(raw),
|
||||
Err(e) => Err(format!("{e}").into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_recipe_repo_router() -> Router<Arc<AppState>> {
|
||||
Router::new().route("/ws", get(crate::websocket::handler::websocket_handler))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,15 +20,24 @@ pub struct StreamDataStart {
|
|||
#[serde(rename = "ref")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub stream_ref: Option<String>,
|
||||
/// extra data, information
|
||||
pub metadata: String,
|
||||
}
|
||||
|
||||
impl IntoStreamMessage for StreamDataStart {
|
||||
const MSG_NAME: &str = "stream_data_start";
|
||||
|
||||
fn build(&self) -> serde_json::Value {
|
||||
let mut payload = serde_json::to_value(self).unwrap();
|
||||
|
||||
payload.as_object_mut().unwrap().insert(
|
||||
"to".to_string(),
|
||||
serde_json::json!(self.stream_ref.clone().unwrap_or_default()),
|
||||
);
|
||||
|
||||
serde_json::json!({
|
||||
"type": StreamDataStart::MSG_NAME,
|
||||
"payload": self.clone()
|
||||
"payload": payload.clone()
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -38,12 +47,18 @@ impl IntoStreamMessage for StreamDataStart {
|
|||
}
|
||||
|
||||
impl StreamDataStart {
|
||||
pub fn new(total_size: usize, chunk_size: usize, stream_ref: Option<String>) -> Self {
|
||||
pub fn new(
|
||||
total_size: usize,
|
||||
chunk_size: usize,
|
||||
stream_ref: Option<String>,
|
||||
metadata: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
stream_id: Uuid::new_v4().to_string(),
|
||||
total_size,
|
||||
chunk_size,
|
||||
stream_ref,
|
||||
metadata,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -61,6 +76,7 @@ pub struct StreamDataChunk<T> {
|
|||
pub start_idx: usize,
|
||||
/// Chunked data which splited into N items per chunk
|
||||
pub data: Vec<T>,
|
||||
#[serde(rename = "to")]
|
||||
uid: String,
|
||||
}
|
||||
|
||||
|
|
@ -105,6 +121,8 @@ where
|
|||
pub struct StreamDataEnd {
|
||||
/// Uuid v4, client must mapping later values with this stream id
|
||||
pub stream_id: String,
|
||||
/// endpoint user
|
||||
pub to: String,
|
||||
}
|
||||
|
||||
impl IntoStreamMessage for StreamDataEnd {
|
||||
|
|
@ -123,9 +141,10 @@ impl IntoStreamMessage for StreamDataEnd {
|
|||
}
|
||||
|
||||
impl StreamDataEnd {
|
||||
pub fn new(sid: &str) -> Self {
|
||||
pub fn new(sid: &str, to: String) -> Self {
|
||||
Self {
|
||||
stream_id: sid.to_string(),
|
||||
to,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -142,6 +161,7 @@ pub struct StreamDataExtra<T> {
|
|||
pub exid: String,
|
||||
pub extp: String,
|
||||
pub payload: Vec<T>,
|
||||
pub to: String,
|
||||
}
|
||||
|
||||
impl<T> IntoStreamMessage for StreamDataExtra<T>
|
||||
|
|
@ -166,11 +186,12 @@ impl<T> StreamDataExtra<T>
|
|||
where
|
||||
T: Serialize + Clone,
|
||||
{
|
||||
pub fn new(exid: &str, extp: &str, data: Vec<T>) -> Self {
|
||||
pub fn new(exid: &str, extp: &str, data: Vec<T>, to: String) -> Self {
|
||||
Self {
|
||||
exid: exid.to_string(),
|
||||
extp: extp.to_string(),
|
||||
payload: data.to_vec(),
|
||||
to,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use super::model::*;
|
||||
use axum::extract::ws::{CloseFrame, Message, WebSocket};
|
||||
use redis::{TypedCommands, cmd};
|
||||
|
|
@ -64,3 +66,21 @@ pub fn convert_sys_msg_command(msg: &serde_json::Value) -> Option<SysMessage> {
|
|||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_extra_parameters(s: String) -> HashMap<String, String> {
|
||||
let mut result = HashMap::new();
|
||||
|
||||
let plist: Vec<String> = s.split(",").map(|x| x.to_string()).collect();
|
||||
|
||||
for pl in plist {
|
||||
let sm: Vec<String> = pl.split("=").map(|x| x.to_string()).collect();
|
||||
|
||||
if sm.len() != 2 {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.insert(sm[0].to_string(), sm[1].to_string());
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,21 @@
|
|||
use std::fs::File;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub trait Backup: Send + Sync + Serialize {
|
||||
fn dump_backup(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let tx = format!("gtx-{}.json", uuid::Uuid::new_v4().to_string());
|
||||
let json = serde_json::to_string(&self).unwrap();
|
||||
let json2: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
|
||||
let writer = File::create(format!("./{tx}")).unwrap();
|
||||
let _ = serde_json::to_writer(writer, &json2);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// system message to send back to client, this may be called from other services
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SysMessage {
|
||||
|
|
@ -40,6 +56,27 @@ pub struct RecipeRequestPayload {
|
|||
pub parameters: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub enum PriceRequestAction {
|
||||
View(String),
|
||||
Edit(String),
|
||||
}
|
||||
|
||||
/// Price request payload struct
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct PriceRequestPayload {
|
||||
/// Allowed interactions of price request
|
||||
pub action: PriceRequestAction,
|
||||
/// Country of recipe
|
||||
pub country: String,
|
||||
/// Filename to override default get, expect path
|
||||
pub override_file: Option<String>,
|
||||
/// Extended infos, required parameters or unimplemented fields in the current struct. Expected pattern `<key1>=<val1>,<key2>=<val2>,...`
|
||||
pub parameters: Option<String>,
|
||||
/// User info expect at least id, token, name
|
||||
pub user_info: serde_json::Value,
|
||||
}
|
||||
|
||||
/// Command request for external services
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct CommandRequestPayload {
|
||||
|
|
@ -84,3 +121,21 @@ pub struct AuthUserField {
|
|||
pub email: String,
|
||||
pub permissions: String,
|
||||
}
|
||||
|
||||
/// For sending to recipe repo, saving/committing value/file to git
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct CommitPayload {
|
||||
/// file/content as bytes
|
||||
pub file_bytes: Vec<u8>,
|
||||
/// path to commit file
|
||||
pub path: String,
|
||||
/// signature username part (committer)
|
||||
pub signature_username: String,
|
||||
/// signature email part (committer)
|
||||
pub signature_email: String,
|
||||
/// commit message
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
// use default backup method
|
||||
impl Backup for CommitPayload {}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ pub async fn read(
|
|||
Message::Text(t) => {
|
||||
let req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?;
|
||||
|
||||
info!("get msg: {}", req.type_w);
|
||||
// info!("get msg: {}", req.type_w);
|
||||
match req.type_w.as_str() {
|
||||
"recipe" if req.payload.is_some() => {
|
||||
tasks::recipe::handle_recipe_request(
|
||||
|
|
@ -61,7 +61,27 @@ pub async fn read(
|
|||
req,
|
||||
uid_clone.clone(),
|
||||
)
|
||||
.await?
|
||||
.await?;
|
||||
}
|
||||
"recipe_versions" if req.payload.is_some() => {
|
||||
tasks::recipe::handle_recipe_versions_list_request(
|
||||
config.clone(),
|
||||
redis.clone(),
|
||||
tx.clone(),
|
||||
req,
|
||||
uid_clone.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"price" if req.payload.is_some() => {
|
||||
tasks::price::handle_price_request(
|
||||
config.clone(),
|
||||
redis.clone(),
|
||||
tx.clone(),
|
||||
req,
|
||||
uid_clone.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
"command" if req.payload.is_some() => {
|
||||
tasks::command::handle_command_request(state.clone(), tx.clone(), req)
|
||||
|
|
@ -201,6 +221,8 @@ pub async fn write(
|
|||
}
|
||||
|
||||
let _ = sender.send(res.to_string().into()).await;
|
||||
} else {
|
||||
warn!("failed to send message, as the receiver not detected: {res:?}");
|
||||
}
|
||||
}
|
||||
TxControlMessage::CloseExist => {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
pub mod auth;
|
||||
pub mod command;
|
||||
pub mod price;
|
||||
pub mod recipe;
|
||||
pub mod sheet;
|
||||
pub mod watchdog;
|
||||
|
|
|
|||
341
src/websocket/tasks/price.rs
Normal file
341
src/websocket/tasks/price.rs
Normal file
|
|
@ -0,0 +1,341 @@
|
|||
use crate::app::*;
|
||||
use crate::stream::model::{
|
||||
IntoStreamMessage, StreamDataChunk, StreamDataEnd, StreamDataExtra, StreamDataStart,
|
||||
};
|
||||
use crate::websocket::{core::*, helper::*, model::*};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::{fs::File, io::Read, sync::Arc};
|
||||
use tokio::sync::{Mutex, mpsc::Sender};
|
||||
|
||||
use crate::websocket::core::WebsocketMessageResult;
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
struct MenuPrice {
|
||||
ProductCode: String,
|
||||
NewPrice: serde_json::Value,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
StringParam: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
Discount: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
Percent: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
roundup: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct RecipePrice {
|
||||
#[serde(flatten)]
|
||||
metadata: serde_json::Value,
|
||||
content: Vec<MenuPrice>,
|
||||
}
|
||||
|
||||
impl RecipePrice {
|
||||
pub const HIDE_PARAM: &str = "hide=true";
|
||||
|
||||
pub fn import(path: String) -> RecipePrice {
|
||||
debug!("try import {path}");
|
||||
let mut file = File::open(path).expect("file not found");
|
||||
let mut data = String::new();
|
||||
|
||||
file.read_to_string(&mut data).unwrap();
|
||||
|
||||
let res: Result<RecipePrice, _> = serde_json::from_str(&data);
|
||||
match res {
|
||||
Ok(rp) => rp,
|
||||
Err(e) => {
|
||||
error!("error while deserialize price: {e}");
|
||||
RecipePrice {
|
||||
content: Vec::new(),
|
||||
metadata: serde_json::Value::Null,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn import_from_raw_string(raw: String) -> RecipePrice {
|
||||
let res: Result<RecipePrice, _> = serde_json::from_str(&raw);
|
||||
match res {
|
||||
Ok(rp) => rp,
|
||||
Err(e) => {
|
||||
error!("error while deserialize price: {e}");
|
||||
RecipePrice {
|
||||
content: Vec::new(),
|
||||
metadata: serde_json::Value::Null,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// only for getting data without modify
|
||||
pub fn find_by_pd(&self, pd: &str) -> Option<MenuPrice> {
|
||||
self.content
|
||||
.iter()
|
||||
.find(|x| x.ProductCode.eq(pd))
|
||||
.map(|x| x.clone())
|
||||
}
|
||||
|
||||
pub fn modify_price_by_pd(&mut self, pd: &str, price: serde_json::Value) {
|
||||
if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) {
|
||||
mp.NewPrice = price;
|
||||
}
|
||||
}
|
||||
|
||||
fn modify_string_param_by_pd(&mut self, pd: &str, param: &str) {
|
||||
if let Some(mp) = self.content.iter_mut().find(|x| x.ProductCode.eq(pd)) {
|
||||
mp.StringParam = Some(param.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_visibility_by_pd(&mut self, pd: &str, show: bool) {
|
||||
if !show {
|
||||
self.modify_string_param_by_pd(pd, RecipePrice::HIDE_PARAM);
|
||||
} else {
|
||||
self.modify_string_param_by_pd(pd, "");
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: disable write to file
|
||||
// pub fn export_to_json_file(self, outpath: Option<String>) {
|
||||
// let json = serde_json::to_string(&self).unwrap();
|
||||
// let json2: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
|
||||
// if let Some(outpath) = outpath {
|
||||
// let writer = File::create(outpath).unwrap();
|
||||
// let _ = serde_json::to_writer_pretty(writer, &json2);
|
||||
// } else {
|
||||
// println!("Default save to (execute)/price.json");
|
||||
// let writer = File::create("price.json").unwrap();
|
||||
// let _ = serde_json::to_writer_pretty(writer, &json2);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
/// Get list of price
|
||||
// pub async fn handle_price_list_request(
|
||||
// config: DevConfig,
|
||||
// redis: redis::Client,
|
||||
// tx: Sender<TxControlMessage>,
|
||||
// req: WebsocketMessageRequest,
|
||||
// uid_clone: Arc<Mutex<String>>,
|
||||
// ) -> WebsocketMessageResult {
|
||||
|
||||
// }
|
||||
|
||||
/// Get main price profile of country
|
||||
pub async fn handle_price_request(
|
||||
config: DevConfig,
|
||||
redis: redis::Client,
|
||||
tx: Sender<TxControlMessage>,
|
||||
req: WebsocketMessageRequest,
|
||||
uid_clone: Arc<Mutex<String>>,
|
||||
) -> WebsocketMessageResult {
|
||||
let p = req.payload.unwrap();
|
||||
|
||||
let price_param: PriceRequestPayload = serde_json::from_value(p)?;
|
||||
|
||||
let mut price_file_format = format!(
|
||||
"{}/profile_{}_master.json",
|
||||
price_param.country,
|
||||
price_param.country.to_uppercase()
|
||||
);
|
||||
|
||||
if let Some(override_file) = price_param.override_file {
|
||||
price_file_format = override_file;
|
||||
}
|
||||
|
||||
let price_action = price_param.action;
|
||||
|
||||
let price_content =
|
||||
match invoke_checkout_request(config.clone(), price_file_format.clone()).await {
|
||||
Ok(pc) => pc,
|
||||
Err(e) => return Err(format!("Cannot find price of expected country: {e:?}").into()),
|
||||
};
|
||||
|
||||
info!("price content len: {}", price_content.len());
|
||||
|
||||
let mut rpp = RecipePrice::import_from_raw_string(price_content);
|
||||
let _uid = uid_clone.clone();
|
||||
let uidd = _uid.try_lock().unwrap();
|
||||
|
||||
info!("price action: {price_action:?}");
|
||||
|
||||
let user_info = price_param.user_info.clone();
|
||||
|
||||
match price_action {
|
||||
PriceRequestAction::View(view_opt) => {
|
||||
let viewing_options: HashMap<String, String> = get_extra_parameters(view_opt);
|
||||
// sa=all
|
||||
// sa=get,pd=...
|
||||
// sa=query,list=1|2|3
|
||||
// sa=query,where=contain,kw=...
|
||||
let sub_action = viewing_options.get("sa");
|
||||
let pd = viewing_options.get("pd");
|
||||
|
||||
if let Some(sa) = sub_action {
|
||||
let mut result = Vec::new();
|
||||
let action_done = match sa.as_str() {
|
||||
"all" => {
|
||||
result = rpp.content;
|
||||
|
||||
true
|
||||
}
|
||||
"get"
|
||||
if let Some(pd) = pd
|
||||
&& let Some(mp) = rpp.find_by_pd(pd) =>
|
||||
{
|
||||
result.push(mp);
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if action_done {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "ok",
|
||||
"content": result,
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
} else {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "fail",
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
PriceRequestAction::Edit(edit_opt) => {
|
||||
let editing_options: HashMap<String, String> = get_extra_parameters(edit_opt);
|
||||
// sa=change,pd=...,to=...
|
||||
// sa=hide,pd=...
|
||||
// sa=disable,pd=... disable = hide
|
||||
// sa=show,pd=...
|
||||
// sa=toggle,pd=...,state=show|hide
|
||||
let sub_action = editing_options.get("sa");
|
||||
let pd = editing_options.get("pd");
|
||||
let to = editing_options.get("to");
|
||||
let state = editing_options.get("state");
|
||||
|
||||
if let Some(sa) = sub_action {
|
||||
let mut action_message = String::new();
|
||||
let action_done = match sa.as_str() {
|
||||
"change"
|
||||
if let Some(pd) = pd
|
||||
&& let Some(to) = to
|
||||
&& let Some(mp) = rpp.find_by_pd(pd.as_str()) =>
|
||||
{
|
||||
info!(
|
||||
"[CHANGE] price of {pd} from {} to {to}",
|
||||
mp.NewPrice.as_i64().unwrap()
|
||||
);
|
||||
action_message = format!(
|
||||
"[CHANGE] price of {pd} from {} to {to}",
|
||||
mp.NewPrice.as_i64().unwrap()
|
||||
);
|
||||
|
||||
let price_int = to.parse::<i64>()?;
|
||||
|
||||
rpp.modify_price_by_pd(pd, serde_json::json!(price_int));
|
||||
true
|
||||
}
|
||||
"toggle"
|
||||
if let Some(pd) = pd
|
||||
&& let Some(state) = state =>
|
||||
{
|
||||
info!("[TOGGLE] {pd} to {state}");
|
||||
action_message = format!("[TOGGLE] {pd} to {state}");
|
||||
|
||||
match state.as_str() {
|
||||
"show" => rpp.set_visibility_by_pd(pd.as_str(), true),
|
||||
"hide" | "disable" => rpp.set_visibility_by_pd(pd.as_str(), false),
|
||||
_ => {
|
||||
warn!("unknown state toggle");
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if action_done {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "ok",
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
|
||||
// send save
|
||||
//
|
||||
|
||||
let all_prices_str = serde_json::to_string_pretty(&rpp)?;
|
||||
let commit_payload = CommitPayload {
|
||||
file_bytes: all_prices_str.as_bytes().to_vec(),
|
||||
path: price_file_format.clone(),
|
||||
signature_username: user_info
|
||||
.get("displayName")
|
||||
.unwrap_or_default()
|
||||
.as_str()
|
||||
.unwrap_or(&"unknown".to_string())
|
||||
.to_string(),
|
||||
signature_email: user_info
|
||||
.get("email")
|
||||
.unwrap_or_default()
|
||||
.as_str()
|
||||
.unwrap_or(&"unknown".to_string())
|
||||
.to_string(),
|
||||
message: action_message,
|
||||
};
|
||||
|
||||
if invoke_pull_sync_request(config.clone()).await.is_err() {
|
||||
// backup
|
||||
let _ = commit_payload.dump_backup();
|
||||
return Err("Fail to sync repo, backing up ...".into());
|
||||
}
|
||||
|
||||
let _ = invoke_commit_request(config.clone(), commit_payload.clone()).await;
|
||||
|
||||
if invoke_push_request(config.clone()).await.is_err() {
|
||||
let _ = commit_payload.dump_backup();
|
||||
return Err("Fail to push repo, backing up ...".into());
|
||||
}
|
||||
|
||||
// push to git
|
||||
} else {
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "price",
|
||||
"payload": {
|
||||
"req_action": sa,
|
||||
"status": "fail",
|
||||
"to": uidd.to_string()
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -4,6 +4,7 @@ use crate::stream::model::{
|
|||
};
|
||||
use crate::websocket::{core::*, helper::*, model::*};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::{fs::File, io::Read, path::PathBuf, sync::Arc};
|
||||
|
||||
use async_compression::tokio::bufread::BrotliDecoder;
|
||||
|
|
@ -92,6 +93,7 @@ pub async fn throttle_send_recipe(
|
|||
r01s.len(),
|
||||
CHUNK_SIZE,
|
||||
Some(uid.try_lock().unwrap().to_string()),
|
||||
format!("version={version},country={country}").to_string(),
|
||||
);
|
||||
|
||||
let sid = ss.get_id();
|
||||
|
|
@ -118,7 +120,7 @@ pub async fn throttle_send_recipe(
|
|||
for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() {
|
||||
let curr_ch_id = format!("{mat_exid}_{index}");
|
||||
|
||||
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec());
|
||||
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec(), uidd.clone());
|
||||
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(extra_matset.as_msg()))
|
||||
|
|
@ -132,7 +134,7 @@ pub async fn throttle_send_recipe(
|
|||
let extl = "topplist";
|
||||
for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() {
|
||||
let curr_ch_id = format!("{mat_exid}_tl{index}");
|
||||
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec());
|
||||
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec(), uidd.clone());
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(extra_topplist.as_msg()))
|
||||
.await
|
||||
|
|
@ -145,7 +147,7 @@ pub async fn throttle_send_recipe(
|
|||
let extg = "toppgrp";
|
||||
for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() {
|
||||
let curr_ch_id = format!("{mat_exid}_tg{index}");
|
||||
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec());
|
||||
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec(), uidd.clone());
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(extra_toppgrp.as_msg()))
|
||||
.await
|
||||
|
|
@ -163,7 +165,7 @@ pub async fn throttle_send_recipe(
|
|||
info!("sending {sid}");
|
||||
|
||||
// return sid;
|
||||
let end_msg = StreamDataEnd::new(&sid);
|
||||
let end_msg = StreamDataEnd::new(&sid, uidd.clone());
|
||||
|
||||
if let Some(err) = tx
|
||||
.send(TxControlMessage::Payload(end_msg.as_msg()))
|
||||
|
|
@ -222,6 +224,23 @@ pub async fn handle_recipe_request(
|
|||
};
|
||||
}
|
||||
|
||||
// detect if use different version
|
||||
// parameter: use_legacy_version=true,version=888
|
||||
if let Some(extra_param) = recipe_param.clone().parameters {
|
||||
let pmap = get_extra_parameters(extra_param);
|
||||
|
||||
latest_version = if pmap.contains_key("use_legacy_version")
|
||||
&& let Some(legacy_cfg) = pmap.get("use_legacy_version")
|
||||
&& legacy_cfg.eq("true")
|
||||
{
|
||||
pmap.get("version").unwrap_or(&latest_version).to_string()
|
||||
} else {
|
||||
latest_version
|
||||
};
|
||||
|
||||
info!("after param in recipe: {latest_version}");
|
||||
}
|
||||
|
||||
let req_file = if is_req_patch(&recipe_param) {
|
||||
format!(
|
||||
"stx_{country}_{version}.json",
|
||||
|
|
@ -353,3 +372,47 @@ pub async fn handle_recipe_request(
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_recipe_versions_list_request(
|
||||
config: DevConfig,
|
||||
redis: redis::Client,
|
||||
tx: Sender<TxControlMessage>,
|
||||
req: WebsocketMessageRequest,
|
||||
uid_clone: Arc<Mutex<String>>,
|
||||
) -> WebsocketMessageResult {
|
||||
let p = req.payload.unwrap();
|
||||
let recipe_param: RecipeRequestPayload = serde_json::from_value(p)?;
|
||||
|
||||
let version_list = format!("{country}", country = recipe_param.country);
|
||||
|
||||
let country_versions_str = match invoke_checkout_request(config.clone(), version_list).await {
|
||||
Ok(vs) => vs,
|
||||
Err(e) => return Err(format!("Cannot find versions of expected country: {e:?}").into()),
|
||||
};
|
||||
|
||||
// extract version as list
|
||||
let files: Vec<String> = country_versions_str
|
||||
.split(",")
|
||||
.map(|x| x.to_string())
|
||||
.collect();
|
||||
|
||||
let result: Vec<String> = files
|
||||
.iter()
|
||||
.filter(|x| x.starts_with("coffeethai02") && x.ends_with(".json"))
|
||||
.map(|x| x.replace("coffeethai02_", "").replace(".json", ""))
|
||||
.collect();
|
||||
|
||||
let uidd = uid_clone.clone().try_lock().unwrap().to_string();
|
||||
|
||||
let _ = tx
|
||||
.send(TxControlMessage::Payload(serde_json::json!({
|
||||
"type": "version_selectors",
|
||||
"payload": {
|
||||
"versions": result,
|
||||
"to": uidd
|
||||
}
|
||||
})))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ pub async fn get_watchdog_task(
|
|||
let h = hub.try_lock().unwrap();
|
||||
let curr_user = user.try_lock().unwrap().to_string();
|
||||
|
||||
info!("{}: checking invalid ...", curr_user);
|
||||
// info!("{}: checking invalid ...", curr_user);
|
||||
|
||||
if h.clients.contains_key(&curr_user) && curr_user.starts_with("temp") {
|
||||
warn!("detect unauthorized -- {}", curr_user);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue