From b1d9cde9c86b054eaf10cdb8137654ed19273447 Mon Sep 17 00:00:00 2001 From: Pakin Date: Fri, 6 Mar 2026 15:08:30 +0700 Subject: [PATCH] fix: skip command instead if fail struct - add logging - skip current command if struct mismatch and cannot deserialize Signed-off-by: Pakin --- src/main.rs | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2bcc4d0..b18b50f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use futures::{ stream::{SplitSink, SplitStream}, }; use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01}; -use log::info; +use log::{error, info}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use redis::{TypedCommands, cmd}; use serde::{Deserialize, Serialize}; @@ -422,10 +422,13 @@ async fn read( // // guard expect value let p = req.payload.unwrap(); + info!("get command request"); + // TODO // - Queue requests // - Send if service available if let Some(_) = state.system_tx.send(p).err() { + info!("failed to send command request"); let _ = tx .send(serde_json::json!({ "type": "notify", @@ -622,12 +625,18 @@ impl AppState { loop { if let Ok(rmsg) = system_rx.recv().await { + info!("receive msg: {rmsg:?}"); // add queue process let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) { Ok(cmd) => cmd, - Err(_) => return, // reject + Err(e) => { + error!("error deserialize: {e:?} ---> Skip"); + continue; + } // reject }; + info!("get cmd: {}", command_req.srv_name); + if let Err(fail_payload) = current_queue.push(command_req.clone()) { if pending_command.len() < 10 { pending_command.push_back(fail_payload) @@ -647,23 +656,35 @@ impl AppState { } else { // set check to latest push to queue ok check_available_path = format!("{}/status", command_req.srv_name); + info!("checking {check_available_path}"); } } // send process if let Ok(Some(status)) = lredis.get(&check_available_path) { + info!("status: {status}"); match status.as_str() { "ok" | "OK" | "Ok" => { + info!("queue: {}", current_queue.len()); // if current_queue.is_full() && let Some(cmd) = current_queue.pop() { // get one let channel = format!("{}/job", cmd.srv_name); - let _ = lredis.publish( + info!("channel job: {channel}"); + info!("job: {cmd:?}"); + + let prep = serde_json::json!({ + "type": "command", + "payload": cmd + }); + + let result_pub = lredis.publish( channel, - serde_json::to_string(&cmd).unwrap_or("{}".to_string()), + serde_json::to_string(&prep).unwrap_or("{}".to_string()), ); + info!("published: {result_pub:?}"); // queue next if let Some(next_cmd) = pending_command.pop_front() {