fix: message not send

- add target send

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-04-28 17:18:03 +07:00
parent da956d39a7
commit 295f69a34c
3 changed files with 15 additions and 5 deletions

View file

@ -26,6 +26,13 @@ impl IntoStreamMessage for StreamDataStart {
const MSG_NAME: &str = "stream_data_start"; const MSG_NAME: &str = "stream_data_start";
fn build(&self) -> serde_json::Value { fn build(&self) -> serde_json::Value {
let mut payload = serde_json::to_value(self).unwrap();
payload.as_object_mut().unwrap().insert(
"to".to_string(),
serde_json::json!(self.stream_ref.clone().unwrap_or_default()),
);
serde_json::json!({ serde_json::json!({
"type": StreamDataStart::MSG_NAME, "type": StreamDataStart::MSG_NAME,
"payload": self.clone() "payload": self.clone()
@ -61,6 +68,7 @@ pub struct StreamDataChunk<T> {
pub start_idx: usize, pub start_idx: usize,
/// Chunked data which splited into N items per chunk /// Chunked data which splited into N items per chunk
pub data: Vec<T>, pub data: Vec<T>,
#[serde(rename = "to")]
uid: String, uid: String,
} }
@ -142,6 +150,7 @@ pub struct StreamDataExtra<T> {
pub exid: String, pub exid: String,
pub extp: String, pub extp: String,
pub payload: Vec<T>, pub payload: Vec<T>,
pub to: String,
} }
impl<T> IntoStreamMessage for StreamDataExtra<T> impl<T> IntoStreamMessage for StreamDataExtra<T>
@ -166,11 +175,12 @@ impl<T> StreamDataExtra<T>
where where
T: Serialize + Clone, T: Serialize + Clone,
{ {
pub fn new(exid: &str, extp: &str, data: Vec<T>) -> Self { pub fn new(exid: &str, extp: &str, data: Vec<T>, to: String) -> Self {
Self { Self {
exid: exid.to_string(), exid: exid.to_string(),
extp: extp.to_string(), extp: extp.to_string(),
payload: data.to_vec(), payload: data.to_vec(),
to,
} }
} }

View file

@ -61,7 +61,7 @@ pub async fn read(
req, req,
uid_clone.clone(), uid_clone.clone(),
) )
.await? .await?;
} }
"command" if req.payload.is_some() => { "command" if req.payload.is_some() => {
tasks::command::handle_command_request(state.clone(), tx.clone(), req) tasks::command::handle_command_request(state.clone(), tx.clone(), req)

View file

@ -118,7 +118,7 @@ pub async fn throttle_send_recipe(
for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_{index}"); let curr_ch_id = format!("{mat_exid}_{index}");
let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec()); let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec(), uidd.clone());
if let Some(err) = tx if let Some(err) = tx
.send(TxControlMessage::Payload(extra_matset.as_msg())) .send(TxControlMessage::Payload(extra_matset.as_msg()))
@ -132,7 +132,7 @@ pub async fn throttle_send_recipe(
let extl = "topplist"; let extl = "topplist";
for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_tl{index}"); let curr_ch_id = format!("{mat_exid}_tl{index}");
let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec()); let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec(), uidd.clone());
if let Some(err) = tx if let Some(err) = tx
.send(TxControlMessage::Payload(extra_topplist.as_msg())) .send(TxControlMessage::Payload(extra_topplist.as_msg()))
.await .await
@ -145,7 +145,7 @@ pub async fn throttle_send_recipe(
let extg = "toppgrp"; let extg = "toppgrp";
for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() { for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() {
let curr_ch_id = format!("{mat_exid}_tg{index}"); let curr_ch_id = format!("{mat_exid}_tg{index}");
let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec()); let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec(), uidd.clone());
if let Some(err) = tx if let Some(err) = tx
.send(TxControlMessage::Payload(extra_toppgrp.as_msg())) .send(TxControlMessage::Payload(extra_toppgrp.as_msg()))
.await .await