2026-04-28 16:43:22 +07:00
|
|
|
use crate::websocket::{core::*, model::*};
|
|
|
|
|
use axum::{
|
|
|
|
|
Router,
|
|
|
|
|
routing::{get, post},
|
|
|
|
|
serve::ListenerExt,
|
|
|
|
|
};
|
2026-05-05 17:03:33 +07:00
|
|
|
use log::{error, info, warn};
|
2026-04-28 16:43:22 +07:00
|
|
|
use redis::TypedCommands;
|
2026-05-05 17:03:33 +07:00
|
|
|
use reqwest::{StatusCode, multipart};
|
2026-04-28 16:43:22 +07:00
|
|
|
use std::{
|
|
|
|
|
collections::{HashMap, VecDeque},
|
|
|
|
|
env,
|
2026-05-05 17:03:33 +07:00
|
|
|
fs::{self, File},
|
|
|
|
|
io::BufReader,
|
2026-04-28 16:43:22 +07:00
|
|
|
sync::Arc,
|
2026-05-05 17:03:33 +07:00
|
|
|
time::Duration,
|
|
|
|
|
};
|
|
|
|
|
use tokio::{
|
|
|
|
|
fs::read_dir,
|
|
|
|
|
sync::{Mutex, mpsc::Sender},
|
2026-04-28 16:43:22 +07:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct Hub {
|
|
|
|
|
pub clients: HashMap<String, Sender<TxControlMessage>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct DevConfig {
|
|
|
|
|
pub api_key: String,
|
|
|
|
|
pub api_domain: String,
|
|
|
|
|
pub api_recipe_service: String,
|
|
|
|
|
pub api_redis_url: String,
|
|
|
|
|
pub api_resolver: String,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DevConfig {
|
|
|
|
|
pub fn new(
|
|
|
|
|
key: String,
|
|
|
|
|
domain: String,
|
|
|
|
|
rp_service: String,
|
|
|
|
|
api_redis_url: String,
|
|
|
|
|
api_resolver: String,
|
|
|
|
|
) -> DevConfig {
|
|
|
|
|
DevConfig {
|
|
|
|
|
api_key: key,
|
|
|
|
|
api_domain: domain,
|
|
|
|
|
api_recipe_service: rp_service,
|
|
|
|
|
api_redis_url,
|
|
|
|
|
api_resolver,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_recipe_url(&self) -> String {
|
|
|
|
|
format!("{}{}", self.api_domain, self.api_recipe_service)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_file_from_recipe_repo(&self, path: String) -> String {
|
|
|
|
|
format!("{}/checkout?path={}", self.get_recipe_url(), path)
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-05 17:03:33 +07:00
|
|
|
pub fn get_post_file_to_recipe_repo(&self) -> String {
|
|
|
|
|
format!("{}/commit", self.get_recipe_url())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_pull_recipe_repo(&self) -> String {
|
|
|
|
|
format!("{}/pull", self.get_recipe_url())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_push_recipe_repo(&self) -> String {
|
|
|
|
|
format!("{}/push", self.get_recipe_url())
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-28 16:43:22 +07:00
|
|
|
pub fn get_api_header(&self) -> (String, String) {
|
|
|
|
|
("X-API-Key".to_string(), self.api_key.clone())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn get_yuki_resolver(&self) -> String {
|
|
|
|
|
format!("{}/resolve", self.api_resolver)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct AppState {
|
|
|
|
|
pub dev_config: DevConfig,
|
|
|
|
|
pub redis_cli: redis::Client,
|
|
|
|
|
pub system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
|
|
|
|
// saved client uid:client uuid
|
|
|
|
|
pub connectors_mapping: Arc<Mutex<Hub>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AppState {
|
|
|
|
|
pub fn get_cfg(&self) -> DevConfig {
|
|
|
|
|
self.dev_config.clone()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn new(
|
|
|
|
|
dev_config: DevConfig,
|
|
|
|
|
redis_cli: redis::Client,
|
|
|
|
|
system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
|
|
|
|
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
|
|
|
|
|
) -> Arc<AppState> {
|
|
|
|
|
let redis_cli_clone = redis_cli.clone();
|
|
|
|
|
let tx_new = system_tx.clone();
|
|
|
|
|
let result = Arc::new(AppState {
|
2026-05-05 17:03:33 +07:00
|
|
|
dev_config: dev_config.clone(),
|
2026-04-28 16:43:22 +07:00
|
|
|
redis_cli,
|
|
|
|
|
system_tx,
|
|
|
|
|
connectors_mapping: Arc::new(Mutex::new(Hub {
|
|
|
|
|
clients: HashMap::new(),
|
|
|
|
|
})),
|
|
|
|
|
});
|
|
|
|
|
|
2026-05-05 17:03:33 +07:00
|
|
|
// backup job
|
|
|
|
|
let dev_config_backup = dev_config.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let m_cfg = dev_config_backup.clone();
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
// auto sync
|
|
|
|
|
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
|
|
|
|
|
warn!("pulling repo unhealthy, retry again in 5 minutes");
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match read_dir(".").await {
|
|
|
|
|
Ok(mut d) => {
|
|
|
|
|
while let Ok(Some(entry)) = d.next_entry().await {
|
|
|
|
|
let ent_path = entry.path();
|
|
|
|
|
|
|
|
|
|
if let Some(filename) = ent_path.file_name()
|
|
|
|
|
&& let Some(filename_str) = filename.to_str()
|
|
|
|
|
&& filename_str.starts_with("gtx")
|
|
|
|
|
&& filename_str.ends_with(".json")
|
|
|
|
|
{
|
|
|
|
|
// read file
|
|
|
|
|
//
|
|
|
|
|
let f = match File::open(ent_path.clone()) {
|
|
|
|
|
Ok(f) => f,
|
|
|
|
|
Err(_) => continue,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let buf = BufReader::new(f);
|
|
|
|
|
|
|
|
|
|
let commit_from_backup: CommitPayload =
|
|
|
|
|
match serde_json::from_reader(buf) {
|
|
|
|
|
Ok(cm) => cm,
|
|
|
|
|
Err(_) => continue,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if invoke_pull_sync_request(m_cfg.clone()).await.is_err() {
|
|
|
|
|
warn!("pulling repo unhealthy, retry again in 5 minutes");
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _ =
|
|
|
|
|
invoke_commit_request(m_cfg.clone(), commit_from_backup).await;
|
|
|
|
|
|
|
|
|
|
if invoke_push_request(m_cfg.clone()).await.is_ok() {
|
|
|
|
|
// push success
|
|
|
|
|
info!("push backup success");
|
|
|
|
|
if fs::remove_file(ent_path.clone()).is_ok() {
|
|
|
|
|
info!("clean backup");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info!("[backup] idle");
|
|
|
|
|
|
|
|
|
|
tokio::time::sleep(Duration::from_mins(5)).await;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
2026-04-28 16:43:22 +07:00
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let mut lredis = redis_cli_clone.clone();
|
|
|
|
|
let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> =
|
|
|
|
|
crossbeam_queue::ArrayQueue::new(1);
|
|
|
|
|
let mut pending_command: VecDeque<CommandRequestPayload> = VecDeque::new();
|
|
|
|
|
let mut check_available_path = String::new();
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
if let Ok(rmsg) = system_rx.recv().await {
|
|
|
|
|
let sys_msg = crate::websocket::helper::convert_sys_msg_command(&rmsg);
|
|
|
|
|
|
|
|
|
|
// add queue process
|
|
|
|
|
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
|
|
|
|
|
Ok(cmd) => cmd,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
if sys_msg.is_none() {
|
|
|
|
|
// maybe error
|
|
|
|
|
error!("error deserialize: {e:?} ---> Skip");
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
} // reject
|
|
|
|
|
};
|
|
|
|
|
info!("get cmd: {}", command_req.srv_name);
|
|
|
|
|
|
|
|
|
|
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
|
|
|
|
|
if pending_command.len() < 10 {
|
|
|
|
|
pending_command.push_back(fail_payload)
|
|
|
|
|
} else {
|
|
|
|
|
let user_name = fail_payload
|
|
|
|
|
.user_info
|
|
|
|
|
.get("displayName")
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
|
|
let _ = tx_new.send(serde_json::json!({
|
|
|
|
|
"type": "notify",
|
|
|
|
|
"payload": {
|
|
|
|
|
"from": "system_tx",
|
|
|
|
|
"msg": "request queue full, try again later",
|
|
|
|
|
"level": "ERR",
|
|
|
|
|
"to": user_name,
|
|
|
|
|
}
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// set check to latest push to queue ok
|
|
|
|
|
check_available_path = format!("{}/status", command_req.srv_name);
|
|
|
|
|
info!("checking {check_available_path}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// send process
|
|
|
|
|
if let Ok(Some(status)) = lredis.get(&check_available_path) {
|
|
|
|
|
info!("status: {status}");
|
|
|
|
|
match status.as_str() {
|
|
|
|
|
"ok" | "OK" | "Ok" => {
|
|
|
|
|
info!("queue: {}", current_queue.len());
|
|
|
|
|
//
|
|
|
|
|
if current_queue.is_full()
|
|
|
|
|
&& let Some(cmd) = current_queue.pop()
|
|
|
|
|
{
|
|
|
|
|
// get one
|
|
|
|
|
let channel = format!("{}/job", cmd.srv_name);
|
|
|
|
|
info!("channel job: {channel}");
|
|
|
|
|
info!("job: {cmd:?}");
|
|
|
|
|
|
|
|
|
|
let prep = serde_json::json!({
|
|
|
|
|
"type": "command",
|
|
|
|
|
"payload": cmd
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let result_pub = lredis.publish(
|
|
|
|
|
channel,
|
|
|
|
|
serde_json::to_string(&prep).unwrap_or("{}".to_string()),
|
|
|
|
|
);
|
|
|
|
|
info!("published: {result_pub:?}");
|
|
|
|
|
|
|
|
|
|
// queue next
|
|
|
|
|
if let Some(next_cmd) = pending_command.pop_front() {
|
|
|
|
|
check_available_path = format!("{}/status", next_cmd.srv_name);
|
|
|
|
|
// ignore result
|
|
|
|
|
let _ = current_queue.push(next_cmd);
|
|
|
|
|
} else {
|
|
|
|
|
check_available_path = String::new();
|
|
|
|
|
}
|
|
|
|
|
} else if current_queue.is_empty() {
|
|
|
|
|
check_available_path = String::new();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ => {}
|
|
|
|
|
}
|
|
|
|
|
} else if current_queue.is_empty()
|
|
|
|
|
&& let Some(next_cmd) = pending_command.pop_front()
|
|
|
|
|
{
|
|
|
|
|
// case empty queue, fetch next
|
|
|
|
|
check_available_path = format!("{}/status", next_cmd.srv_name);
|
|
|
|
|
// ignore result
|
|
|
|
|
let _ = current_queue.push(next_cmd);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn invoke_checkout_request(
|
|
|
|
|
config: DevConfig,
|
|
|
|
|
path: String,
|
|
|
|
|
) -> Result<String, Box<dyn std::error::Error>> {
|
|
|
|
|
let client = reqwest::Client::new();
|
|
|
|
|
|
|
|
|
|
let req_path = config.get_file_from_recipe_repo(path);
|
|
|
|
|
// println!("dbg: {req_path}");
|
|
|
|
|
let res = client.get(req_path).send().await?;
|
|
|
|
|
|
|
|
|
|
match res.text().await {
|
|
|
|
|
Ok(raw) => Ok(raw),
|
|
|
|
|
Err(e) => Err(format!("{e}").into()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-05 17:03:33 +07:00
|
|
|
/// Invoke git pull, may takes sometime
|
|
|
|
|
pub async fn invoke_pull_sync_request(
|
|
|
|
|
config: DevConfig,
|
|
|
|
|
) -> Result<String, Box<dyn std::error::Error>> {
|
|
|
|
|
let client = reqwest::Client::new();
|
|
|
|
|
|
|
|
|
|
let req_path = config.get_pull_recipe_repo();
|
|
|
|
|
// println!("dbg: {req_path}");
|
|
|
|
|
let res = client.get(req_path).send().await?;
|
|
|
|
|
|
|
|
|
|
if res.status() != StatusCode::OK {
|
|
|
|
|
// pull fail
|
|
|
|
|
|
|
|
|
|
error!(
|
|
|
|
|
"invoke pull fail: [{}] {:?}",
|
|
|
|
|
res.status(),
|
|
|
|
|
res.text().await
|
|
|
|
|
);
|
|
|
|
|
return Err("pull fail".into());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match res.text().await {
|
|
|
|
|
Ok(raw) => Ok(raw),
|
|
|
|
|
Err(e) => Err(format!("{e}").into()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Invoke sending from server to server for committing
|
|
|
|
|
pub async fn invoke_commit_request(
|
|
|
|
|
config: DevConfig,
|
|
|
|
|
payload: CommitPayload,
|
|
|
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
|
let client = reqwest::Client::new();
|
|
|
|
|
let commit_path = config.get_post_file_to_recipe_repo();
|
|
|
|
|
let form = multipart::Form::new()
|
|
|
|
|
.text("message", payload.message)
|
|
|
|
|
.text("signature_username", payload.signature_username)
|
|
|
|
|
.text("signature_email", payload.signature_email)
|
|
|
|
|
.text("path", payload.path)
|
|
|
|
|
.part(
|
|
|
|
|
"file",
|
|
|
|
|
multipart::Part::bytes(payload.file_bytes)
|
|
|
|
|
.mime_str("application/octet-stream")
|
|
|
|
|
.unwrap(),
|
|
|
|
|
);
|
|
|
|
|
let response = client.post(commit_path).multipart(form).send().await?;
|
|
|
|
|
|
|
|
|
|
info!("commit status: {}", response.status());
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-08 14:01:31 +07:00
|
|
|
/// Invoke sending from server to server for committing case multiple files
|
|
|
|
|
pub async fn invoke_commit_multiple_files_request(
|
|
|
|
|
config: DevConfig,
|
|
|
|
|
payloads: Vec<CommitPayload>,
|
|
|
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
|
let client = reqwest::Client::new();
|
|
|
|
|
let commit_path = config.get_post_file_to_recipe_repo();
|
|
|
|
|
let mut form = multipart::Form::new()
|
|
|
|
|
.text("message", payloads.first().unwrap().message.clone())
|
|
|
|
|
.text(
|
|
|
|
|
"signature_username",
|
|
|
|
|
payloads.first().unwrap().signature_username.clone(),
|
|
|
|
|
)
|
|
|
|
|
.text(
|
|
|
|
|
"signature_email",
|
|
|
|
|
payloads.first().unwrap().signature_email.clone(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
for (index, payload) in payloads.iter().enumerate() {
|
|
|
|
|
form = form
|
|
|
|
|
.text(format!("path{index}"), payload.path.clone())
|
|
|
|
|
.part(
|
|
|
|
|
format!("file{index}"),
|
|
|
|
|
multipart::Part::bytes(payload.file_bytes.clone())
|
|
|
|
|
.mime_str("application/octet-stream")
|
|
|
|
|
.unwrap(),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let response = client.post(commit_path).multipart(form).send().await?;
|
|
|
|
|
|
|
|
|
|
info!("commit status: {}", response.status());
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-05 17:03:33 +07:00
|
|
|
pub async fn invoke_push_request(config: DevConfig) -> Result<String, Box<dyn std::error::Error>> {
|
|
|
|
|
let client = reqwest::Client::new();
|
|
|
|
|
|
|
|
|
|
let req_path = config.get_push_recipe_repo();
|
|
|
|
|
// println!("dbg: {req_path}");
|
|
|
|
|
let res = client.get(req_path).send().await?;
|
|
|
|
|
|
|
|
|
|
if res.status() != StatusCode::OK {
|
|
|
|
|
// pull fail
|
|
|
|
|
|
|
|
|
|
error!(
|
|
|
|
|
"invoke push fail: [{}] {:?}",
|
|
|
|
|
res.status(),
|
|
|
|
|
res.text().await
|
|
|
|
|
);
|
|
|
|
|
return Err("push fail".into());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match res.text().await {
|
|
|
|
|
Ok(raw) => Ok(raw),
|
|
|
|
|
Err(e) => Err(format!("{e}").into()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-28 16:43:22 +07:00
|
|
|
pub async fn create_recipe_repo_router() -> Router<Arc<AppState>> {
|
|
|
|
|
Router::new().route("/ws", get(crate::websocket::handler::websocket_handler))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
|
let server_port = env::var("PORT").unwrap_or("36579".to_string());
|
|
|
|
|
|
|
|
|
|
let api_key = env::var("DEV_API_KEY").expect("no api key");
|
|
|
|
|
let api_domain = env::var("DEV_API_DOMAIN").expect("no domain");
|
|
|
|
|
let api_recipe_service = env::var("DEV_API_RECIPE_SERVICE").expect("no service");
|
|
|
|
|
|
|
|
|
|
let api_redis = env::var("DEV_API_REDIS").unwrap_or("0.0.0.0".to_string());
|
|
|
|
|
let api_redis_port = env::var("DEV_API_REDIS_PORT").unwrap_or("6379".to_string());
|
|
|
|
|
|
|
|
|
|
let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver");
|
|
|
|
|
|
|
|
|
|
let dev_cfg = crate::app::DevConfig::new(
|
|
|
|
|
api_key,
|
|
|
|
|
api_domain,
|
|
|
|
|
api_recipe_service,
|
|
|
|
|
format!("redis://{api_redis}:{api_redis_port}"),
|
|
|
|
|
api_resolver,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// test_send(dev_cfg).await?;
|
|
|
|
|
//
|
|
|
|
|
let redis_cli = redis::Client::open(dev_cfg.api_redis_url.clone())?;
|
|
|
|
|
|
|
|
|
|
let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
|
|
|
|
|
|
|
|
|
|
let app_state = AppState::new(dev_cfg, redis_cli, sys_tx, sys_rx).await;
|
|
|
|
|
|
|
|
|
|
let rp_router = create_recipe_repo_router().await;
|
|
|
|
|
// let doc_router = create_tx_patcher_route().await;
|
|
|
|
|
|
|
|
|
|
let app = Router::new()
|
|
|
|
|
// .route("/sessionLogin", post(session_login))
|
|
|
|
|
.route(
|
|
|
|
|
"/syscb",
|
|
|
|
|
post(crate::websocket::handler::post_from_other_system),
|
|
|
|
|
)
|
|
|
|
|
// .route("/regas", post(request_api_session_key))
|
|
|
|
|
.nest("/recipe", rp_router)
|
|
|
|
|
// .nest("/docs", doc_router)
|
|
|
|
|
.with_state(app_state);
|
|
|
|
|
|
|
|
|
|
// feature: no delay, full throttle
|
|
|
|
|
let nodelay_listener = || async {
|
|
|
|
|
tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}"))
|
|
|
|
|
.await
|
|
|
|
|
.unwrap()
|
|
|
|
|
.tap_io(|tcp_stream| {
|
|
|
|
|
if let Err(err) = tcp_stream.set_nodelay(true) {
|
|
|
|
|
error!("failed to set TCP_NODELAY on incoming connection: {err:#?}");
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
axum::serve(nodelay_listener().await, app).await?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|