import { get, writable } from 'svelte/store'; import { addNotification, notiStore } from '../stores/noti'; import { currentRecipeVersionsSelector, lastRequestSheetPrice, materialFromServerQuery, priceRecipeData, recipeData, recipeDataError, recipeLoading, recipeOverviewData, recipeStreamMeta, streamingRawData, streamingRawMeta, toppingGroupFromServerQuery, toppingListFromServerQuery } from '../stores/recipeStore'; import { handleSheetStreamStart, handleSheetStreamChunk, handleSheetStreamEnd, handleSheetStreamError, handleCatalogsResponse, handleListMenuResponse, sheetCatalogsLoading, handleRawStreamHeader, handleRawStreamChunk, handleRawStreamEnd } from '../stores/sheetStore'; import { handleGenLayoutBatchStart, handleGenLayoutFile, handleGenLayoutBatchEnd, handleGenLayoutError } from '../stores/genLayoutStore'; import { buildOverviewFromServer } from '$lib/data/recipeService'; import { auth } from '../client/firebase'; import { type RecipeVersion } from '$lib/models/recipe_version.model'; import { goto } from '$app/navigation'; import { socketAlreadySendHeartbeat, socketConnectionOfflineCount } from '../stores/websocketStore'; import type { RecipePrice } from '$lib/models/price.model'; import { sendCommandRequest, sendMessage } from './ws_messageSender'; import { auth as authStore } from '../stores/auth'; import { v4 as uuidv4 } from 'uuid'; import { handleSheetResponseFromNoti } from './sheetNotiHandler'; export const messages = writable([]); type WSMessage = { type: string; payload: any }; // MAXIMUM LIMIT = 1814355 const handlers: Record void> = { chat: (p) => messages.update((m) => [...m, p]), ping: (p) => console.log('ping from server'), recipeResponse: (p) => { let recipe_result = p.result; let recipe_request = p.request; if (recipe_result) { addNotification('INFO:Start fetch recipe!'); } }, stream_data_start: (p) => { let stream_id = p.stream_id; let total_size = p.total_size; let chunk_size = p.chunk_size; let data_meta = p.metadata; if (stream_id) { addNotification('INFO:Start streaming data'); let meta_list = data_meta?.split(','); let version = meta_list[0]?.split('=')[1] ?? ''; let country = meta_list[1]?.split('=')[1] ?? ''; // recipeLoading.set(true); recipeStreamMeta.set({ id: stream_id, total_size: total_size, chunk_size: chunk_size, progress: 0, version, country }); recipeData.set([]); recipeOverviewData.set([]); materialFromServerQuery.set([]); toppingListFromServerQuery.set([]); toppingGroupFromServerQuery.set([]); } }, stream_data_error: (p) => { recipeLoading.set(false); recipeDataError.set(p); setTimeout(() => { addNotification(`ERROR:${p.error}`); }, 2000); }, stream_data_chunk: (p) => { let current_meta = get(recipeStreamMeta); // console.log('current meta', current_meta); if (current_meta) { let stream_id = current_meta.id; let progress_response_id = p.stream_id; if (stream_id === progress_response_id) { let current_response_end = p.start_idx + current_meta.chunk_size; let percent = (current_response_end / current_meta.total_size) * 100; if (percent > 100) { percent = 100; } let data = p.data; let currentData = get(recipeData); for (let rp of data) { currentData.push(rp); } recipeData.set(currentData); recipeStreamMeta.set({ ...current_meta, progress: percent }); // build overview if (percent == 100) { addNotification(`INFO:Current progress ${percent}%`); } buildOverviewFromServer(); } } }, stream_data_end: (p) => { recipeLoading.set(false); // build overview for recipe from server // // console.log('ending stream'); buildOverviewFromServer(); let current_meta = get(recipeStreamMeta); let curr_user = get(authStore); let user_info: any; if (curr_user) { user_info = { displayName: curr_user.displayName, email: curr_user.email, uid: curr_user.uid }; } // send next chain message sendMessage({ type: 'price', payload: { action: { View: 'sa=all' }, country: current_meta?.country ?? '', parameters: '', user_info } }); }, stream_data_extra: (p) => { // extended data from server, may be extra infos // // expected last stream_id + count let exid = p.exid; let extp = p.extp; let ex_payload = p.payload; if (extp) { // know type switch (extp) { case 'matset': let curr_mat_query = get(materialFromServerQuery) ?? []; if (!curr_mat_query) { curr_mat_query = []; } // ex_payload has chunks of material setting for (let m of ex_payload) { let mid = m.id; // curr_mat_query[mid] = m; curr_mat_query.push(m); } // // console.log('current materials: ', JSON.stringify(curr_mat_query)); materialFromServerQuery.set(curr_mat_query); break; case 'topplist': let curr_topping_list_query = get(toppingListFromServerQuery) ?? []; if (!curr_topping_list_query) { curr_topping_list_query = []; } for (let t of ex_payload) { curr_topping_list_query.push(t); } toppingListFromServerQuery.set(curr_topping_list_query); break; case 'toppgrp': let curr_topping_group_query = get(toppingGroupFromServerQuery) ?? []; if (!curr_topping_group_query) { curr_topping_group_query = []; } for (let t of ex_payload) { curr_topping_group_query.push(t); } toppingGroupFromServerQuery.set(curr_topping_group_query); break; } } }, stream_patch_update: (p) => {}, notify: (p) => { const from = p.from; const level = p.level ?? 'INFO'; const msg = p.msg; const target = p.to; // Handle list-menu response if (from === 'list-menu') { const currentUid = auth.currentUser?.uid; if (target && currentUid && target === currentUid && p.value) { handleListMenuResponse({ codes: p.value }); } return; } // Handle gen-service responses if (from === 'gen-service') { switch (level) { case 'batch_start': handleGenLayoutBatchStart({ batch_id: p.batch_id, total_files: p.total_files, total_size_bytes: p.total_size_bytes }); addNotification(`INFO:Gen Layout started (${p.total_files} files)`); break; case 'file': handleGenLayoutFile({ batch_id: p.batch_id, file_index: p.file_index, total_files: p.total_files, file: p.file, content: p.content, is_chunked: p.is_chunked, part_index: p.part_index, total_parts: p.total_parts, is_last_part: p.is_last_part }); break; case 'batch_end': handleGenLayoutBatchEnd({ batch_id: p.batch_id, total_files: p.total_files }); addNotification('INFO:Gen Layout complete'); break; case 'ERROR': handleGenLayoutError(msg); addNotification(`ERR:Gen Layout error: ${msg}`); break; default: console.log('[GenService] Received:', level, msg); } return; } if (from === 'sheet-service' && level === 'content') { const currentUid = auth.currentUser?.uid; if (target && currentUid && target === currentUid) { if (!msg && p.content?.catalogs) { handleCatalogsResponse(p.content); addNotification(`INFO:Loaded ${p.content.catalogs?.length || 0} catalogs`); return; } // Handle streaming messages (with msg field) switch (msg) { case 'start': handleSheetStreamStart(p); addNotification('INFO:Sheet data streaming started'); break; case 'chunk': handleSheetStreamChunk(p); break; case 'end': handleSheetStreamEnd(p); addNotification('INFO:Sheet data streaming complete'); break; case 'error': handleSheetStreamError(p); addNotification(`ERR:Sheet streaming error: ${p.content?.error_detail}`); break; default: // Handle other content notifications from sheet-service console.log('[Sheet] Received content:', p.content); } } return; } // Default notification handling let from_service = p.from ?? ''; let ref_service = p.ref ?? ''; if (target) { let currentUsername = auth.currentUser?.displayName; if (currentUsername && currentUsername === target) { addNotification(`${level}:${msg}`); } } else { // broadcast to all addNotification(`${level}:${msg}`); } }, ui_action: (p) => { if (p.action == 'refreshNow' && p.from == 'brew') { goto('/tools/brew'); } }, version_selectors: (p) => { if (p.versions.length > 0) { currentRecipeVersionsSelector.set([]); let result: RecipeVersion[] = []; for (let vstr of p.versions) { let pure_version = vstr.split('_')[0]; result.push({ display_version: pure_version, actual_version_name: vstr }); } currentRecipeVersionsSelector.set(result); } }, price: (p) => { let req_action = p.req_action; let status = p.status; let to = p.to; let content: RecipePrice[] = p.content ?? []; console.log('get price length: ', content.length); let current_price = get(priceRecipeData); let lastRequestPriceInstance = get(lastRequestSheetPrice); let saved_product_code_to_get_from_sheet = []; let current_meta = get(recipeStreamMeta); lastRequestPriceInstance[current_meta?.country ?? 'unknown'] = {}; for (const c of content) { current_price[c.ProductCode] = c.NewPrice + (c.StringParam ? `,${c.StringParam}` : ''); lastRequestPriceInstance[current_meta?.country ?? 'unknown'][c.ProductCode] = ''; saved_product_code_to_get_from_sheet.push({ product_code: c.ProductCode }); } priceRecipeData.set(current_price); console.log('check length', saved_product_code_to_get_from_sheet.length); // set command request to stream mode so let request_id = uuidv4(); lastRequestPriceInstance[request_id] = current_meta?.country ?? ''; let current_streaming_instance = get(streamingRawData); current_streaming_instance[request_id] = ''; streamingRawData.set(current_streaming_instance); sendCommandRequest('sheet', { country: current_meta?.country ?? '', content: saved_product_code_to_get_from_sheet, param: 'price', stream: true, request_id }); lastRequestSheetPrice.set(lastRequestPriceInstance); }, raw_stream: (p) => { let streamRawInstance = get(streamingRawData); let sub_type = p.sub_type; let request_id = p.request_id; let size_per_chunk = p.size_per_chunk; let total_chunks = p.total_chunks; let idx = p.idx; switch (sub_type) { case 'price': streamingRawMeta.set({ id: request_id, total_size: total_chunks, chunk_size: size_per_chunk, progress: 0 }); break; case 'chunk_price': streamingRawMeta.set({ id: request_id, total_size: total_chunks, chunk_size: size_per_chunk, progress: idx }); let raw_payload = p.raw ?? ''; streamRawInstance[request_id] += raw_payload; streamingRawData.set(streamRawInstance); break; case 'end_price': let lastRequestPriceInstance = get(lastRequestSheetPrice); let country = lastRequestPriceInstance[request_id]; try { let raw_payload = JSON.parse(streamRawInstance[request_id]); let ref_from_raw = raw_payload.payload.ref ?? ''; let from_service_raw = raw_payload.payload.from ?? ''; let parsed_payload = raw_payload.payload ?? ''; if (from_service_raw == 'sheet-service') { handleSheetResponseFromNoti(parsed_payload, ref_from_raw, country); delete streamRawInstance[request_id]; streamingRawData.set(streamRawInstance); } } catch (e) { console.log(`end price process error: ${e}`); } break; default: } }, heartbeat: (p) => { socketConnectionOfflineCount.set(0); socketAlreadySendHeartbeat.set(0); console.log('heartbeat reset offline count'); }, // Raw stream handlers for sheet data (e.g., price) raw_stream: (p) => { // Format: raw_stream with subtype in payload // Header: { subtype: 'price', request_id, header?, country? } const subtype = p.subtype; if (subtype) { handleRawStreamHeader(subtype, p); } }, raw_stream_price: (p) => { // Header for price stream handleRawStreamHeader('price', p); }, raw_stream_chunk_price: (p) => { // Chunk for price stream handleRawStreamChunk('price', p); }, raw_stream_end_price: (p) => { // End for price stream handleRawStreamEnd('price', p); } }; export function handleIncomingMessages(raw: string) { const msg: WSMessage = JSON.parse(raw); // console.log(`[WS MSG] type=${msg.type}`, msg.payload); if (msg == null) { // error response addNotification('ERR:No response from server'); return; } // raw streaming type if (msg.type.startsWith('raw_stream')) { // convert let sub_type = msg.type.replace('raw_stream_', ''); msg.payload.sub_type = sub_type; msg.type = 'raw_stream'; } handlers[msg.type]?.(msg.payload); }