diff --git a/src/stream/model.rs b/src/stream/model.rs index f8519e8..7199725 100644 --- a/src/stream/model.rs +++ b/src/stream/model.rs @@ -26,6 +26,13 @@ impl IntoStreamMessage for StreamDataStart { const MSG_NAME: &str = "stream_data_start"; fn build(&self) -> serde_json::Value { + let mut payload = serde_json::to_value(self).unwrap(); + + payload.as_object_mut().unwrap().insert( + "to".to_string(), + serde_json::json!(self.stream_ref.clone().unwrap_or_default()), + ); + serde_json::json!({ "type": StreamDataStart::MSG_NAME, "payload": self.clone() @@ -61,6 +68,7 @@ pub struct StreamDataChunk { pub start_idx: usize, /// Chunked data which splited into N items per chunk pub data: Vec, + #[serde(rename = "to")] uid: String, } @@ -142,6 +150,7 @@ pub struct StreamDataExtra { pub exid: String, pub extp: String, pub payload: Vec, + pub to: String, } impl IntoStreamMessage for StreamDataExtra @@ -166,11 +175,12 @@ impl StreamDataExtra where T: Serialize + Clone, { - pub fn new(exid: &str, extp: &str, data: Vec) -> Self { + pub fn new(exid: &str, extp: &str, data: Vec, to: String) -> Self { Self { exid: exid.to_string(), extp: extp.to_string(), payload: data.to_vec(), + to, } } diff --git a/src/websocket/rw.rs b/src/websocket/rw.rs index cedfbb2..565d7a4 100644 --- a/src/websocket/rw.rs +++ b/src/websocket/rw.rs @@ -61,7 +61,7 @@ pub async fn read( req, uid_clone.clone(), ) - .await? + .await?; } "command" if req.payload.is_some() => { tasks::command::handle_command_request(state.clone(), tx.clone(), req) diff --git a/src/websocket/tasks/recipe.rs b/src/websocket/tasks/recipe.rs index 539c331..90f56e6 100644 --- a/src/websocket/tasks/recipe.rs +++ b/src/websocket/tasks/recipe.rs @@ -118,7 +118,7 @@ pub async fn throttle_send_recipe( for (index, chunk) in matset.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_{index}"); - let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec()); + let extra_matset = StreamDataExtra::new(&curr_ch_id, &extp, chunk.to_vec(), uidd.clone()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_matset.as_msg())) @@ -132,7 +132,7 @@ pub async fn throttle_send_recipe( let extl = "topplist"; for (index, chunk) in recipe.Topping.ToppingList.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_tl{index}"); - let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec()); + let extra_topplist = StreamDataExtra::new(&curr_ch_id, &extl, chunk.to_vec(), uidd.clone()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_topplist.as_msg())) .await @@ -145,7 +145,7 @@ pub async fn throttle_send_recipe( let extg = "toppgrp"; for (index, chunk) in recipe.Topping.ToppingGroup.chunks(CHUNK_SIZE).enumerate() { let curr_ch_id = format!("{mat_exid}_tg{index}"); - let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec()); + let extra_toppgrp = StreamDataExtra::new(&curr_ch_id, &extg, chunk.to_vec(), uidd.clone()); if let Some(err) = tx .send(TxControlMessage::Payload(extra_toppgrp.as_msg())) .await