use crate::websocket::{core::*, model::*}; use axum::{ Router, routing::{get, post}, serve::ListenerExt, }; use log::{error, info, warn}; use redis::TypedCommands; use reqwest::{StatusCode, multipart}; use std::{ collections::{HashMap, VecDeque}, env, fs::{self, File}, io::BufReader, sync::Arc, time::Duration, }; use tokio::{ fs::read_dir, sync::{Mutex, mpsc::Sender}, }; #[derive(Clone)] pub struct Hub { pub clients: HashMap>, } #[derive(Clone)] pub struct DevConfig { pub api_key: String, pub api_domain: String, pub api_recipe_service: String, pub api_redis_url: String, pub api_resolver: String, } impl DevConfig { pub fn new( key: String, domain: String, rp_service: String, api_redis_url: String, api_resolver: String, ) -> DevConfig { DevConfig { api_key: key, api_domain: domain, api_recipe_service: rp_service, api_redis_url, api_resolver, } } pub fn get_recipe_url(&self) -> String { format!("{}{}", self.api_domain, self.api_recipe_service) } pub fn get_file_from_recipe_repo(&self, path: String) -> String { format!("{}/checkout?path={}", self.get_recipe_url(), path) } pub fn get_post_file_to_recipe_repo(&self) -> String { format!("{}/commit", self.get_recipe_url()) } pub fn get_pull_recipe_repo(&self) -> String { format!("{}/pull", self.get_recipe_url()) } pub fn get_push_recipe_repo(&self) -> String { format!("{}/push", self.get_recipe_url()) } pub fn get_api_header(&self) -> (String, String) { ("X-API-Key".to_string(), self.api_key.clone()) } pub fn get_yuki_resolver(&self) -> String { format!("{}/resolve", self.api_resolver) } } pub struct AppState { pub dev_config: DevConfig, pub redis_cli: redis::Client, pub system_tx: tokio::sync::broadcast::Sender, // saved client uid:client uuid pub connectors_mapping: Arc>, } impl AppState { pub fn get_cfg(&self) -> DevConfig { self.dev_config.clone() } pub async fn new( dev_config: DevConfig, redis_cli: redis::Client, system_tx: tokio::sync::broadcast::Sender, mut system_rx: tokio::sync::broadcast::Receiver, ) -> Arc { let redis_cli_clone = redis_cli.clone(); let tx_new = system_tx.clone(); let result = Arc::new(AppState { dev_config: dev_config.clone(), redis_cli, system_tx, connectors_mapping: Arc::new(Mutex::new(Hub { clients: HashMap::new(), })), }); // backup job let dev_config_backup = dev_config.clone(); tokio::spawn(async move { let m_cfg = dev_config_backup.clone(); loop { // auto sync if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { warn!("pulling repo unhealthy, retry again in 5 minutes"); continue; } match read_dir(".").await { Ok(mut d) => { while let Ok(Some(entry)) = d.next_entry().await { let ent_path = entry.path(); if let Some(filename) = ent_path.file_name() && let Some(filename_str) = filename.to_str() && filename_str.starts_with("gtx") && filename_str.ends_with(".json") { // read file // let f = match File::open(ent_path.clone()) { Ok(f) => f, Err(_) => continue, }; let buf = BufReader::new(f); let commit_from_backup: CommitPayload = match serde_json::from_reader(buf) { Ok(cm) => cm, Err(_) => continue, }; if invoke_pull_sync_request(m_cfg.clone()).await.is_err() { warn!("pulling repo unhealthy, retry again in 5 minutes"); continue; } let _ = invoke_commit_request(m_cfg.clone(), commit_from_backup).await; if invoke_push_request(m_cfg.clone()).await.is_ok() { // push success info!("push backup success"); if fs::remove_file(ent_path.clone()).is_ok() { info!("clean backup"); } } } else { continue; } } } Err(_) => {} } info!("[backup] idle"); tokio::time::sleep(Duration::from_mins(5)).await; } }); tokio::spawn(async move { let mut lredis = redis_cli_clone.clone(); let current_queue: crossbeam_queue::ArrayQueue = crossbeam_queue::ArrayQueue::new(1); let mut pending_command: VecDeque = VecDeque::new(); let mut check_available_path = String::new(); loop { if let Ok(rmsg) = system_rx.recv().await { let sys_msg = crate::websocket::helper::convert_sys_msg_command(&rmsg); // add queue process let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) { Ok(cmd) => cmd, Err(e) => { if sys_msg.is_none() { // maybe error error!("error deserialize: {e:?} ---> Skip"); } continue; } // reject }; info!("get cmd: {}", command_req.srv_name); if let Err(fail_payload) = current_queue.push(command_req.clone()) { if pending_command.len() < 10 { pending_command.push_back(fail_payload) } else { let user_name = fail_payload .user_info .get("displayName") .unwrap_or_default(); let _ = tx_new.send(serde_json::json!({ "type": "notify", "payload": { "from": "system_tx", "msg": "request queue full, try again later", "level": "ERR", "to": user_name, } })); } } else { // set check to latest push to queue ok check_available_path = format!("{}/status", command_req.srv_name); info!("checking {check_available_path}"); } } // send process if let Ok(Some(status)) = lredis.get(&check_available_path) { info!("status: {status}"); match status.as_str() { "ok" | "OK" | "Ok" => { info!("queue: {}", current_queue.len()); // if current_queue.is_full() && let Some(cmd) = current_queue.pop() { // get one let channel = format!("{}/job", cmd.srv_name); info!("channel job: {channel}"); info!("job: {cmd:?}"); let prep = serde_json::json!({ "type": "command", "payload": cmd }); let result_pub = lredis.publish( channel, serde_json::to_string(&prep).unwrap_or("{}".to_string()), ); info!("published: {result_pub:?}"); // queue next if let Some(next_cmd) = pending_command.pop_front() { check_available_path = format!("{}/status", next_cmd.srv_name); // ignore result let _ = current_queue.push(next_cmd); } else { check_available_path = String::new(); } } else if current_queue.is_empty() { check_available_path = String::new(); } } _ => {} } } else if current_queue.is_empty() && let Some(next_cmd) = pending_command.pop_front() { // case empty queue, fetch next check_available_path = format!("{}/status", next_cmd.srv_name); // ignore result let _ = current_queue.push(next_cmd); } } }); result } } pub async fn invoke_checkout_request( config: DevConfig, path: String, ) -> Result> { let client = reqwest::Client::new(); let req_path = config.get_file_from_recipe_repo(path); // println!("dbg: {req_path}"); let res = client.get(req_path).send().await?; match res.text().await { Ok(raw) => Ok(raw), Err(e) => Err(format!("{e}").into()), } } /// Invoke git pull, may takes sometime pub async fn invoke_pull_sync_request( config: DevConfig, ) -> Result> { let client = reqwest::Client::new(); let req_path = config.get_pull_recipe_repo(); // println!("dbg: {req_path}"); let res = client.get(req_path).send().await?; if res.status() != StatusCode::OK { // pull fail error!( "invoke pull fail: [{}] {:?}", res.status(), res.text().await ); return Err("pull fail".into()); } match res.text().await { Ok(raw) => Ok(raw), Err(e) => Err(format!("{e}").into()), } } /// Invoke sending from server to server for committing pub async fn invoke_commit_request( config: DevConfig, payload: CommitPayload, ) -> Result<(), Box> { let client = reqwest::Client::new(); let commit_path = config.get_post_file_to_recipe_repo(); let form = multipart::Form::new() .text("message", payload.message) .text("signature_username", payload.signature_username) .text("signature_email", payload.signature_email) .text("path", payload.path) .part( "file", multipart::Part::bytes(payload.file_bytes) .mime_str("application/octet-stream") .unwrap(), ); let response = client.post(commit_path).multipart(form).send().await?; info!("commit status: {}", response.status()); Ok(()) } pub async fn invoke_push_request(config: DevConfig) -> Result> { let client = reqwest::Client::new(); let req_path = config.get_push_recipe_repo(); // println!("dbg: {req_path}"); let res = client.get(req_path).send().await?; if res.status() != StatusCode::OK { // pull fail error!( "invoke push fail: [{}] {:?}", res.status(), res.text().await ); return Err("push fail".into()); } match res.text().await { Ok(raw) => Ok(raw), Err(e) => Err(format!("{e}").into()), } } pub async fn create_recipe_repo_router() -> Router> { Router::new().route("/ws", get(crate::websocket::handler::websocket_handler)) } pub async fn initialize() -> Result<(), Box> { let server_port = env::var("PORT").unwrap_or("36579".to_string()); let api_key = env::var("DEV_API_KEY").expect("no api key"); let api_domain = env::var("DEV_API_DOMAIN").expect("no domain"); let api_recipe_service = env::var("DEV_API_RECIPE_SERVICE").expect("no service"); let api_redis = env::var("DEV_API_REDIS").unwrap_or("0.0.0.0".to_string()); let api_redis_port = env::var("DEV_API_REDIS_PORT").unwrap_or("6379".to_string()); let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver"); let dev_cfg = crate::app::DevConfig::new( api_key, api_domain, api_recipe_service, format!("redis://{api_redis}:{api_redis_port}"), api_resolver, ); // test_send(dev_cfg).await?; // let redis_cli = redis::Client::open(dev_cfg.api_redis_url.clone())?; let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::(16); let app_state = AppState::new(dev_cfg, redis_cli, sys_tx, sys_rx).await; let rp_router = create_recipe_repo_router().await; // let doc_router = create_tx_patcher_route().await; let app = Router::new() // .route("/sessionLogin", post(session_login)) .route( "/syscb", post(crate::websocket::handler::post_from_other_system), ) // .route("/regas", post(request_api_session_key)) .nest("/recipe", rp_router) // .nest("/docs", doc_router) .with_state(app_state); // feature: no delay, full throttle let nodelay_listener = || async { tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}")) .await .unwrap() .tap_io(|tcp_stream| { if let Err(err) = tcp_stream.set_nodelay(true) { error!("failed to set TCP_NODELAY on incoming connection: {err:#?}"); } }) }; axum::serve(nodelay_listener().await, app).await?; Ok(()) }