277 lines
9.6 KiB
Rust
277 lines
9.6 KiB
Rust
|
|
use crate::websocket::{core::*, model::*};
|
||
|
|
use axum::{
|
||
|
|
Router,
|
||
|
|
routing::{get, post},
|
||
|
|
serve::ListenerExt,
|
||
|
|
};
|
||
|
|
use log::{error, info};
|
||
|
|
use redis::TypedCommands;
|
||
|
|
use std::{
|
||
|
|
collections::{HashMap, VecDeque},
|
||
|
|
env,
|
||
|
|
sync::Arc,
|
||
|
|
};
|
||
|
|
use tokio::sync::{Mutex, mpsc::Sender};
|
||
|
|
|
||
|
|
#[derive(Clone)]
|
||
|
|
pub struct Hub {
|
||
|
|
pub clients: HashMap<String, Sender<TxControlMessage>>,
|
||
|
|
}
|
||
|
|
|
||
|
|
#[derive(Clone)]
|
||
|
|
pub struct DevConfig {
|
||
|
|
pub api_key: String,
|
||
|
|
pub api_domain: String,
|
||
|
|
pub api_recipe_service: String,
|
||
|
|
pub api_redis_url: String,
|
||
|
|
pub api_resolver: String,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl DevConfig {
|
||
|
|
pub fn new(
|
||
|
|
key: String,
|
||
|
|
domain: String,
|
||
|
|
rp_service: String,
|
||
|
|
api_redis_url: String,
|
||
|
|
api_resolver: String,
|
||
|
|
) -> DevConfig {
|
||
|
|
DevConfig {
|
||
|
|
api_key: key,
|
||
|
|
api_domain: domain,
|
||
|
|
api_recipe_service: rp_service,
|
||
|
|
api_redis_url,
|
||
|
|
api_resolver,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn get_recipe_url(&self) -> String {
|
||
|
|
format!("{}{}", self.api_domain, self.api_recipe_service)
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn get_file_from_recipe_repo(&self, path: String) -> String {
|
||
|
|
format!("{}/checkout?path={}", self.get_recipe_url(), path)
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn get_api_header(&self) -> (String, String) {
|
||
|
|
("X-API-Key".to_string(), self.api_key.clone())
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn get_yuki_resolver(&self) -> String {
|
||
|
|
format!("{}/resolve", self.api_resolver)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub struct AppState {
|
||
|
|
pub dev_config: DevConfig,
|
||
|
|
pub redis_cli: redis::Client,
|
||
|
|
pub system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
||
|
|
// saved client uid:client uuid
|
||
|
|
pub connectors_mapping: Arc<Mutex<Hub>>,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl AppState {
|
||
|
|
pub fn get_cfg(&self) -> DevConfig {
|
||
|
|
self.dev_config.clone()
|
||
|
|
}
|
||
|
|
|
||
|
|
pub async fn new(
|
||
|
|
dev_config: DevConfig,
|
||
|
|
redis_cli: redis::Client,
|
||
|
|
system_tx: tokio::sync::broadcast::Sender<serde_json::Value>,
|
||
|
|
mut system_rx: tokio::sync::broadcast::Receiver<serde_json::Value>,
|
||
|
|
) -> Arc<AppState> {
|
||
|
|
let redis_cli_clone = redis_cli.clone();
|
||
|
|
let tx_new = system_tx.clone();
|
||
|
|
let result = Arc::new(AppState {
|
||
|
|
dev_config,
|
||
|
|
redis_cli,
|
||
|
|
system_tx,
|
||
|
|
connectors_mapping: Arc::new(Mutex::new(Hub {
|
||
|
|
clients: HashMap::new(),
|
||
|
|
})),
|
||
|
|
});
|
||
|
|
|
||
|
|
tokio::spawn(async move {
|
||
|
|
let mut lredis = redis_cli_clone.clone();
|
||
|
|
let current_queue: crossbeam_queue::ArrayQueue<CommandRequestPayload> =
|
||
|
|
crossbeam_queue::ArrayQueue::new(1);
|
||
|
|
let mut pending_command: VecDeque<CommandRequestPayload> = VecDeque::new();
|
||
|
|
let mut check_available_path = String::new();
|
||
|
|
|
||
|
|
loop {
|
||
|
|
if let Ok(rmsg) = system_rx.recv().await {
|
||
|
|
let sys_msg = crate::websocket::helper::convert_sys_msg_command(&rmsg);
|
||
|
|
|
||
|
|
// add queue process
|
||
|
|
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
|
||
|
|
Ok(cmd) => cmd,
|
||
|
|
Err(e) => {
|
||
|
|
if sys_msg.is_none() {
|
||
|
|
// maybe error
|
||
|
|
error!("error deserialize: {e:?} ---> Skip");
|
||
|
|
}
|
||
|
|
continue;
|
||
|
|
} // reject
|
||
|
|
};
|
||
|
|
info!("get cmd: {}", command_req.srv_name);
|
||
|
|
|
||
|
|
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
|
||
|
|
if pending_command.len() < 10 {
|
||
|
|
pending_command.push_back(fail_payload)
|
||
|
|
} else {
|
||
|
|
let user_name = fail_payload
|
||
|
|
.user_info
|
||
|
|
.get("displayName")
|
||
|
|
.unwrap_or_default();
|
||
|
|
|
||
|
|
let _ = tx_new.send(serde_json::json!({
|
||
|
|
"type": "notify",
|
||
|
|
"payload": {
|
||
|
|
"from": "system_tx",
|
||
|
|
"msg": "request queue full, try again later",
|
||
|
|
"level": "ERR",
|
||
|
|
"to": user_name,
|
||
|
|
}
|
||
|
|
}));
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
// set check to latest push to queue ok
|
||
|
|
check_available_path = format!("{}/status", command_req.srv_name);
|
||
|
|
info!("checking {check_available_path}");
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// send process
|
||
|
|
if let Ok(Some(status)) = lredis.get(&check_available_path) {
|
||
|
|
info!("status: {status}");
|
||
|
|
match status.as_str() {
|
||
|
|
"ok" | "OK" | "Ok" => {
|
||
|
|
info!("queue: {}", current_queue.len());
|
||
|
|
//
|
||
|
|
if current_queue.is_full()
|
||
|
|
&& let Some(cmd) = current_queue.pop()
|
||
|
|
{
|
||
|
|
// get one
|
||
|
|
let channel = format!("{}/job", cmd.srv_name);
|
||
|
|
info!("channel job: {channel}");
|
||
|
|
info!("job: {cmd:?}");
|
||
|
|
|
||
|
|
let prep = serde_json::json!({
|
||
|
|
"type": "command",
|
||
|
|
"payload": cmd
|
||
|
|
});
|
||
|
|
|
||
|
|
let result_pub = lredis.publish(
|
||
|
|
channel,
|
||
|
|
serde_json::to_string(&prep).unwrap_or("{}".to_string()),
|
||
|
|
);
|
||
|
|
info!("published: {result_pub:?}");
|
||
|
|
|
||
|
|
// queue next
|
||
|
|
if let Some(next_cmd) = pending_command.pop_front() {
|
||
|
|
check_available_path = format!("{}/status", next_cmd.srv_name);
|
||
|
|
// ignore result
|
||
|
|
let _ = current_queue.push(next_cmd);
|
||
|
|
} else {
|
||
|
|
check_available_path = String::new();
|
||
|
|
}
|
||
|
|
} else if current_queue.is_empty() {
|
||
|
|
check_available_path = String::new();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
_ => {}
|
||
|
|
}
|
||
|
|
} else if current_queue.is_empty()
|
||
|
|
&& let Some(next_cmd) = pending_command.pop_front()
|
||
|
|
{
|
||
|
|
// case empty queue, fetch next
|
||
|
|
check_available_path = format!("{}/status", next_cmd.srv_name);
|
||
|
|
// ignore result
|
||
|
|
let _ = current_queue.push(next_cmd);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
result
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub async fn invoke_checkout_request(
|
||
|
|
config: DevConfig,
|
||
|
|
path: String,
|
||
|
|
) -> Result<String, Box<dyn std::error::Error>> {
|
||
|
|
let client = reqwest::Client::new();
|
||
|
|
|
||
|
|
let req_path = config.get_file_from_recipe_repo(path);
|
||
|
|
// println!("dbg: {req_path}");
|
||
|
|
let res = client.get(req_path).send().await?;
|
||
|
|
|
||
|
|
match res.text().await {
|
||
|
|
Ok(raw) => Ok(raw),
|
||
|
|
Err(e) => Err(format!("{e}").into()),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pub async fn create_recipe_repo_router() -> Router<Arc<AppState>> {
|
||
|
|
Router::new().route("/ws", get(crate::websocket::handler::websocket_handler))
|
||
|
|
}
|
||
|
|
|
||
|
|
pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
|
||
|
|
let server_port = env::var("PORT").unwrap_or("36579".to_string());
|
||
|
|
|
||
|
|
let api_key = env::var("DEV_API_KEY").expect("no api key");
|
||
|
|
let api_domain = env::var("DEV_API_DOMAIN").expect("no domain");
|
||
|
|
let api_recipe_service = env::var("DEV_API_RECIPE_SERVICE").expect("no service");
|
||
|
|
|
||
|
|
let api_redis = env::var("DEV_API_REDIS").unwrap_or("0.0.0.0".to_string());
|
||
|
|
let api_redis_port = env::var("DEV_API_REDIS_PORT").unwrap_or("6379".to_string());
|
||
|
|
|
||
|
|
let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver");
|
||
|
|
|
||
|
|
let dev_cfg = crate::app::DevConfig::new(
|
||
|
|
api_key,
|
||
|
|
api_domain,
|
||
|
|
api_recipe_service,
|
||
|
|
format!("redis://{api_redis}:{api_redis_port}"),
|
||
|
|
api_resolver,
|
||
|
|
);
|
||
|
|
|
||
|
|
// test_send(dev_cfg).await?;
|
||
|
|
//
|
||
|
|
let redis_cli = redis::Client::open(dev_cfg.api_redis_url.clone())?;
|
||
|
|
|
||
|
|
let (sys_tx, sys_rx) = tokio::sync::broadcast::channel::<serde_json::Value>(16);
|
||
|
|
|
||
|
|
let app_state = AppState::new(dev_cfg, redis_cli, sys_tx, sys_rx).await;
|
||
|
|
|
||
|
|
let rp_router = create_recipe_repo_router().await;
|
||
|
|
// let doc_router = create_tx_patcher_route().await;
|
||
|
|
|
||
|
|
let app = Router::new()
|
||
|
|
// .route("/sessionLogin", post(session_login))
|
||
|
|
.route(
|
||
|
|
"/syscb",
|
||
|
|
post(crate::websocket::handler::post_from_other_system),
|
||
|
|
)
|
||
|
|
// .route("/regas", post(request_api_session_key))
|
||
|
|
.nest("/recipe", rp_router)
|
||
|
|
// .nest("/docs", doc_router)
|
||
|
|
.with_state(app_state);
|
||
|
|
|
||
|
|
// feature: no delay, full throttle
|
||
|
|
let nodelay_listener = || async {
|
||
|
|
tokio::net::TcpListener::bind(format!("0.0.0.0:{server_port}"))
|
||
|
|
.await
|
||
|
|
.unwrap()
|
||
|
|
.tap_io(|tcp_stream| {
|
||
|
|
if let Err(err) = tcp_stream.set_nodelay(true) {
|
||
|
|
error!("failed to set TCP_NODELAY on incoming connection: {err:#?}");
|
||
|
|
}
|
||
|
|
})
|
||
|
|
};
|
||
|
|
|
||
|
|
axum::serve(nodelay_listener().await, app).await?;
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|