fix: skip command instead if fail struct
- add logging - skip current command if struct mismatch and cannot deserialize Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
parent
8cdba50c40
commit
b1d9cde9c8
1 changed files with 25 additions and 4 deletions
29
src/main.rs
29
src/main.rs
|
|
@ -17,7 +17,7 @@ use futures::{
|
||||||
stream::{SplitSink, SplitStream},
|
stream::{SplitSink, SplitStream},
|
||||||
};
|
};
|
||||||
use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01};
|
use libtbr::models::recipe::{MaterialSetting, Recipe, Recipe01};
|
||||||
use log::info;
|
use log::{error, info};
|
||||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||||
use redis::{TypedCommands, cmd};
|
use redis::{TypedCommands, cmd};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
@ -422,10 +422,13 @@ async fn read(
|
||||||
// // guard expect value
|
// // guard expect value
|
||||||
let p = req.payload.unwrap();
|
let p = req.payload.unwrap();
|
||||||
|
|
||||||
|
info!("get command request");
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
// - Queue requests
|
// - Queue requests
|
||||||
// - Send if service available
|
// - Send if service available
|
||||||
if let Some(_) = state.system_tx.send(p).err() {
|
if let Some(_) = state.system_tx.send(p).err() {
|
||||||
|
info!("failed to send command request");
|
||||||
let _ = tx
|
let _ = tx
|
||||||
.send(serde_json::json!({
|
.send(serde_json::json!({
|
||||||
"type": "notify",
|
"type": "notify",
|
||||||
|
|
@ -622,12 +625,18 @@ impl AppState {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok(rmsg) = system_rx.recv().await {
|
if let Ok(rmsg) = system_rx.recv().await {
|
||||||
|
info!("receive msg: {rmsg:?}");
|
||||||
// add queue process
|
// add queue process
|
||||||
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
|
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
|
||||||
Ok(cmd) => cmd,
|
Ok(cmd) => cmd,
|
||||||
Err(_) => return, // reject
|
Err(e) => {
|
||||||
|
error!("error deserialize: {e:?} ---> Skip");
|
||||||
|
continue;
|
||||||
|
} // reject
|
||||||
};
|
};
|
||||||
|
|
||||||
|
info!("get cmd: {}", command_req.srv_name);
|
||||||
|
|
||||||
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
|
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
|
||||||
if pending_command.len() < 10 {
|
if pending_command.len() < 10 {
|
||||||
pending_command.push_back(fail_payload)
|
pending_command.push_back(fail_payload)
|
||||||
|
|
@ -647,23 +656,35 @@ impl AppState {
|
||||||
} else {
|
} else {
|
||||||
// set check to latest push to queue ok
|
// set check to latest push to queue ok
|
||||||
check_available_path = format!("{}/status", command_req.srv_name);
|
check_available_path = format!("{}/status", command_req.srv_name);
|
||||||
|
info!("checking {check_available_path}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// send process
|
// send process
|
||||||
if let Ok(Some(status)) = lredis.get(&check_available_path) {
|
if let Ok(Some(status)) = lredis.get(&check_available_path) {
|
||||||
|
info!("status: {status}");
|
||||||
match status.as_str() {
|
match status.as_str() {
|
||||||
"ok" | "OK" | "Ok" => {
|
"ok" | "OK" | "Ok" => {
|
||||||
|
info!("queue: {}", current_queue.len());
|
||||||
//
|
//
|
||||||
if current_queue.is_full()
|
if current_queue.is_full()
|
||||||
&& let Some(cmd) = current_queue.pop()
|
&& let Some(cmd) = current_queue.pop()
|
||||||
{
|
{
|
||||||
// get one
|
// get one
|
||||||
let channel = format!("{}/job", cmd.srv_name);
|
let channel = format!("{}/job", cmd.srv_name);
|
||||||
let _ = lredis.publish(
|
info!("channel job: {channel}");
|
||||||
|
info!("job: {cmd:?}");
|
||||||
|
|
||||||
|
let prep = serde_json::json!({
|
||||||
|
"type": "command",
|
||||||
|
"payload": cmd
|
||||||
|
});
|
||||||
|
|
||||||
|
let result_pub = lredis.publish(
|
||||||
channel,
|
channel,
|
||||||
serde_json::to_string(&cmd).unwrap_or("{}".to_string()),
|
serde_json::to_string(&prep).unwrap_or("{}".to_string()),
|
||||||
);
|
);
|
||||||
|
info!("published: {result_pub:?}");
|
||||||
|
|
||||||
// queue next
|
// queue next
|
||||||
if let Some(next_cmd) = pending_command.pop_front() {
|
if let Some(next_cmd) = pending_command.pop_front() {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue