feat: heartbeat check with log, sheet api config

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-05-09 11:02:02 +07:00
parent fa62d9d83f
commit d048dc2437
8 changed files with 751 additions and 21 deletions

View file

@ -1,4 +1,4 @@
use crate::websocket::{core::*, model::*};
use crate::websocket::{core::*, helper::read_sheet_config, model::*};
use axum::{
Router,
routing::{get, post},
@ -32,6 +32,7 @@ pub struct DevConfig {
pub api_recipe_service: String,
pub api_redis_url: String,
pub api_resolver: String,
pub api_sheet_endpoints: Arc<Mutex<Vec<String>>>,
}
impl DevConfig {
@ -41,6 +42,7 @@ impl DevConfig {
rp_service: String,
api_redis_url: String,
api_resolver: String,
api_sheet_endpoints: Arc<Mutex<Vec<String>>>,
) -> DevConfig {
DevConfig {
api_key: key,
@ -48,6 +50,7 @@ impl DevConfig {
api_recipe_service: rp_service,
api_redis_url,
api_resolver,
api_sheet_endpoints,
}
}
@ -78,6 +81,17 @@ impl DevConfig {
pub fn get_yuki_resolver(&self) -> String {
format!("{}/resolve", self.api_resolver)
}
pub fn check_sheet_endpoints(&self, service_endpoint: &str) -> bool {
self.api_sheet_endpoints
.try_lock()
.unwrap()
.contains(&service_endpoint.to_string())
}
pub fn load_sheet_endpoints_runtime(&self, new_config: Vec<String>) {
*self.api_sheet_endpoints.try_lock().unwrap() = new_config;
}
}
pub struct AppState {
@ -425,12 +439,17 @@ pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
let api_resolver = env::var("RESOLVER_SERVICE_URL").expect("no available resolver");
// read up sheet config
//
let sheet_endpoint_config = read_sheet_config()?;
let dev_cfg = crate::app::DevConfig::new(
api_key,
api_domain,
api_recipe_service,
format!("redis://{api_redis}:{api_redis_port}"),
api_resolver,
Arc::new(Mutex::new(sheet_endpoint_config)),
);
// test_send(dev_cfg).await?;
@ -450,6 +469,7 @@ pub async fn initialize() -> Result<(), Box<dyn std::error::Error>> {
"/syscb",
post(crate::websocket::handler::post_from_other_system),
)
.route("/load-config", post(crate::websocket::handler::post_config))
// .route("/regas", post(request_api_session_key))
.nest("/recipe", rp_router)
// .nest("/docs", doc_router)

View file

@ -6,12 +6,15 @@ use axum::{
use futures::StreamExt;
use log::{info, warn};
use redis::TypedCommands;
use std::sync::Arc;
use std::{fs::File, io::BufWriter, sync::Arc};
use tokio::{sync::Mutex, sync::mpsc, time::Instant};
use uuid::Uuid;
use super::{core::*, model::*};
use crate::app::{AppState, Hub};
use crate::{
app::{AppState, Hub},
websocket::helper::read_sheet_config,
};
pub async fn post_from_other_system(
State(state): State<Arc<AppState>>,
@ -33,6 +36,44 @@ pub async fn post_from_other_system(
}
}
pub async fn post_config(
State(state): State<Arc<AppState>>,
Json(payload): Json<serde_json::Value>,
) -> impl IntoResponse {
// get config path
let path = payload.get("filename").unwrap().as_str().unwrap();
let file = match File::create(path) {
Ok(f) => f,
Err(_) => {
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"file cannot create",
)
.into_response();
}
};
let buf = BufWriter::new(file);
match serde_json::to_writer(buf, &payload) {
Ok(_) => {}
Err(e) => {
return (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
format!("file cannot write, {}", e),
)
.into_response();
}
}
if path.eq("sheet-api.json")
&& let Ok(new_config) = read_sheet_config()
{
state.dev_config.load_sheet_endpoints_runtime(new_config);
}
return (axum::http::StatusCode::OK, "load config success").into_response();
}
pub async fn request_api_session_key(
State(state): State<Arc<AppState>>,
Json(msg): Json<ApiSessionRequest>,

View file

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, fs::File, io::BufReader};
use super::model::*;
use axum::extract::ws::{CloseFrame, Message, WebSocket};
@ -84,3 +84,23 @@ pub fn get_extra_parameters(s: String) -> HashMap<String, String> {
result
}
pub fn read_sheet_config() -> Result<Vec<String>, Box<dyn std::error::Error>> {
let mut res = Vec::new();
let config_file = File::open("./sheet-api.json")?;
let mut buf = BufReader::new(config_file);
let val: serde_json::Value = serde_json::from_reader(&mut buf)?;
if let Some(eobj) = val.get("endpoints")
&& let Some(endpoint_array) = eobj.as_array().clone()
{
res = endpoint_array
.iter()
.map(|x| x.as_str().unwrap_or_default().to_string())
.collect::<Vec<String>>();
}
Ok(res)
}

View file

@ -1,6 +1,6 @@
use super::{core::*, helper::*, model::*};
use crate::{app::*, websocket::tasks};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use axum::extract::ws::{Message, WebSocket};
use futures::{
@ -88,10 +88,30 @@ pub async fn read(
.await?;
}
"heartbeat" => {
*last_seen.lock().await = Instant::now();
let new_updated_time = Instant::now();
let uidd = uid.try_lock().unwrap().clone();
*last_seen.lock().await = new_updated_time;
info!("{}: active", uidd.to_string());
// send back response to keep alive
// user can now know if server is active or not
let _ = tx
.send_timeout(
TxControlMessage::Payload(serde_json::json!({
"type": "heartbeat",
"payload": {
"active": true,
"refresh_time": format!("{:?}", new_updated_time),
"to": uidd
}
})),
Duration::from_secs(3),
)
.await;
}
"sheet" if req.payload.is_some() => {
if tasks::sheet::handle_sheet_request(redis.clone(), req)
if tasks::sheet::handle_sheet_request(config.clone(), redis.clone(), req)
.await
.is_err()
{

View file

@ -72,6 +72,7 @@ pub async fn throttle_send_recipe(
version: String,
uid: Arc<Mutex<String>>,
) {
info!("Starting throttle");
let r01s: Vec<Recipe01> = recipe
.Recipe01
.par_iter()
@ -351,6 +352,7 @@ pub async fn handle_recipe_request(
"".to_string()
}
};
info!("content ready: {}", content.len());
let recipe = serde_json::from_str::<Recipe>(&content);
if let Ok(rp) = recipe {

View file

@ -1,9 +1,13 @@
use crate::websocket::{core::*, model::*};
use crate::{
app::DevConfig,
websocket::{core::*, model::*},
};
use log::{error, info};
use redis::TypedCommands;
/// Handle request of sheet type from websocket (read)
pub async fn handle_sheet_request(
config: DevConfig,
redis: redis::Client,
req: WebsocketMessageRequest,
) -> WebsocketMessageResult {
@ -33,13 +37,10 @@ pub async fn handle_sheet_request(
.unwrap_or_default();
// TODO: will be changed to config from yaml file
let ch_target = if let Some(pm) = parameters.as_str() {
match pm {
"get_all_catalogs" => "catalogs",
"get_catalog" | "enter" => "enter",
"heartbeat" => "heartbeat",
_ => pm,
}
let ch_target = if let Some(pm) = parameters.as_str()
&& config.check_sheet_endpoints(pm)
{
pm
} else {
"junk"
};