change: add check if sys message instead
- change from confusing logging but it actually works Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
parent
ab71c8bb03
commit
fecdb94841
1 changed files with 14 additions and 4 deletions
18
src/main.rs
18
src/main.rs
|
|
@ -320,6 +320,13 @@ fn convert_ack_command(cmd_req: &serde_json::Value) -> Option<CommandRequestPayl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn convert_sys_msg_command(msg: &serde_json::Value) -> Option<SysMessage> {
|
||||||
|
match serde_json::from_value(msg.clone()) {
|
||||||
|
Ok(req) => Some(req),
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn read(
|
async fn read(
|
||||||
// redis: redis::Client,
|
// redis: redis::Client,
|
||||||
mut state: Arc<AppState>,
|
mut state: Arc<AppState>,
|
||||||
|
|
@ -335,7 +342,7 @@ async fn read(
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Send back to client from services
|
// Send back to client from services
|
||||||
while let Ok(s_msg) = system_rx.recv().await {
|
while let Ok(s_msg) = system_rx.recv().await {
|
||||||
if convert_ack_command(&s_msg).is_none()
|
if convert_sys_msg_command(&s_msg).is_some()
|
||||||
&& let Some(err) = tx_to_client.send(s_msg).await.err()
|
&& let Some(err) = tx_to_client.send(s_msg).await.err()
|
||||||
{
|
{
|
||||||
println!("[SYS] failed to send back to client: {err}");
|
println!("[SYS] failed to send back to client: {err}");
|
||||||
|
|
@ -835,16 +842,19 @@ impl AppState {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok(rmsg) = system_rx.recv().await {
|
if let Ok(rmsg) = system_rx.recv().await {
|
||||||
info!("receive msg: {rmsg:?}");
|
let sys_msg = convert_sys_msg_command(&rmsg);
|
||||||
|
|
||||||
// add queue process
|
// add queue process
|
||||||
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
|
let command_req: CommandRequestPayload = match serde_json::from_value(rmsg) {
|
||||||
Ok(cmd) => cmd,
|
Ok(cmd) => cmd,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("error deserialize: {e:?} ---> Skip");
|
if sys_msg.is_none() {
|
||||||
|
// maybe error
|
||||||
|
error!("error deserialize: {e:?} ---> Skip");
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
} // reject
|
} // reject
|
||||||
};
|
};
|
||||||
|
|
||||||
info!("get cmd: {}", command_req.srv_name);
|
info!("get cmd: {}", command_req.srv_name);
|
||||||
|
|
||||||
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
|
if let Err(fail_payload) = current_queue.push(command_req.clone()) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue