import time import re import os import json import redis import gspread import requests import traceback import threading import uuid import math from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from dotenv import load_dotenv from google.oauth2 import service_account REDIS_HOST = os.getenv("REDIS_HOST") REDIS_PORT = os.getenv("REDIS_PORT") FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL") LOG_SERVER_URL = os.getenv("LOG_SERVER_URL") SERVICE_NAME = os.getenv("SERVICE_NAME") ENTER_CHANNEL = f"{SERVICE_NAME}/enter" HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat" EXIT_CHANNEL = f"{SERVICE_NAME}/exit" GET_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalogs" UPDATE_MENU_CHANNEL = f"{SERVICE_NAME}/update/menu" DELETE_MENU_CHANNEL = f"{SERVICE_NAME}/delete/menu" ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog" ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu" r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}' COUNTRY_MAPPING = { "tha": { "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"), "sheets": [ os.getenv("SHEET_NEW_LAYOUT_THA"), os.getenv("SHEET_NEW_LAYOUT_V2_THA"), os.getenv("SHEET_NAME_DESC_V2_THA") ] }, "tha_premium": { "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA_PREMIUM"), "sheets": [ os.getenv("SHEET_NEW_LAYOUT_THA_PREMIUM") ] } } def get_gspread_client(): SCOPES = [ 'https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive' ] creds = None service_account_email = os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL") private_key = os.getenv("GOOGLE_PRIVATE_KEY").replace("\\n", "\n") credentials_info = { "type": "service_account", "client_email": service_account_email, "private_key": private_key, "token_uri": "https://oauth2.googleapis.com/token", } creds = service_account.Credentials.from_service_account_info( credentials_info, scopes=SCOPES ) return gspread.authorize(creds) class LockManager: def __init__(self): # { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} } self.locks: Dict[str, dict] = {} self.timeout = 30 def get_room_key(self, country: str, catalog: str) -> str: return f"{country}:{catalog}" def acquire(self, country: str, catalog: str, user_id: str): now = time.time() room_key = self.get_room_key(country, catalog) # (1 person, 1 room) for key, data in list(self.locks.items()): if data and data["user_id"] == user_id and key != room_key: if now - data["timestamp"] < self.timeout: return False, "You are already in another room." else: self.locks[key] = None # Check if the room you're requesting is already occupied by someone else. current_lock = self.locks.get(room_key) if current_lock and current_lock["user_id"] != user_id: if now - current_lock["timestamp"] < self.timeout: return False, f"Room locked by {current_lock['user_id']}" # queue self.locks[room_key] = {"user_id": user_id, "timestamp": now} return True, user_id def keep_alive(self, country: str, catalog: str, user_id: str): now = time.time() room_key = self.get_room_key(country, catalog) lock = self.locks.get(room_key) # Renew if lock and lock["user_id"] == user_id: if now - lock["timestamp"] < self.timeout: self.locks[room_key]["timestamp"] = now return True return False def release(self, country: str, catalog: str, user_id: str): room_key = self.get_room_key(country, catalog) lock = self.locks.get(room_key) # Clear if lock and lock["user_id"] == user_id: self.locks[room_key] = None return True return False def is_authorized(self, country: str, catalog: str, user_id: str): now = time.time() lock = self.locks.get(self.get_room_key(country, catalog)) if lock and lock["user_id"] == user_id: if now - lock["timestamp"] < self.timeout: return True return False def get_lock_info(self, country: str, catalog: str): now = time.time() lock = self.locks.get(self.get_room_key(country, catalog)) if lock and (now - lock["timestamp"] < self.timeout): return {"is_locked": True, "locked_by": lock["user_id"]} return {"is_locked": False, "locked_by": None} lock_manager = LockManager() # -- Redis handler -- def redis_message_handler(): pubsub = r.pubsub() pubsub.subscribe(ENTER_CHANNEL, HEARTBEAT_CHANNEL, EXIT_CHANNEL, GET_CATALOG_CHANNEL, UPDATE_MENU_CHANNEL, DELETE_MENU_CHANNEL, ADD_CATALOG_CHANNEL, ADD_MENU_CHANNEL) print(f"[*] Redis Listener started on service: {SERVICE_NAME}") for message in pubsub.listen(): if message['type'] != 'message': continue try: data = json.loads(message['data']) payload = data.get("payload", {}) values = payload.get("values", {}) user_info = payload.get("user_info", {}) country = values.get("country", "").strip().lower() catalog = values.get("catalog", "") catalog_name = values.get("catalog_name", "") content = values.get("content", []) srv_name = payload.get("srv_name", "") user_id = user_info.get("uid") if srv_name != SERVICE_NAME: continue channel = message['channel'] if not (user_id and country): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if channel == ENTER_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue success, msg = lock_manager.acquire(country, catalog, user_id) if success: threading.Thread( target=process_and_stream_sheet_data, args=(country, catalog, user_id), daemon=True ).start() print(f"[{SERVICE_NAME}] Enter Room: {catalog} | User: {user_id} | Success: {success}") elif channel == HEARTBEAT_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue alive = lock_manager.keep_alive(country, catalog, user_id) if not alive: print(f"[{SERVICE_NAME}] Heartbeat Failed: {catalog} | User: {user_id}") elif channel == EXIT_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue lock_manager.release(country, catalog, user_id) print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}") elif channel == GET_CATALOG_CHANNEL: try: if not FRONTEND_NOTIFY_URL: print("Warning: FRONTEND_NOTIFY_URL is not set.") continue result = get_catalogs(country) payload = { "type": "notify", "payload": { "from": SERVICE_NAME, "level": "content", "to": user_id, "content": result } } requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) print(payload) print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}") except Exception as e: print(f"[{SERVICE_NAME}] Get catalog error: {e}") elif channel == ADD_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): print(f"[{SERVICE_NAME}] Add menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: handle_add_menu(country, catalog, content) print(f"[{SERVICE_NAME}] Add menu success: {catalog} | User: {user_id}") except Exception as e: print(f"[{SERVICE_NAME}] Add menu error: {e}") elif channel == ADD_CATALOG_CHANNEL: if not (catalog_name, catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue try: handle_add_catalog(country, catalog_name, catalog) except Exception as e: print(f"[{SERVICE_NAME}] Add catalog error: {e}") elif channel == UPDATE_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): # raise HTTPException(status_code=403, detail="Unauthorized access to this room") print(f"[{SERVICE_NAME}] Updated catalog failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: update_sheets(country, content) print(f"[{SERVICE_NAME}] Update success: {catalog} | User: {user_id}") except Exception as e: print(f"[{SERVICE_NAME}] Update error: {e}") elif channel == DELETE_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): print(f"[{SERVICE_NAME}] Delete menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: handle_delete_menu(country, catalog, content) except Exception as e: print(f"[{SERVICE_NAME}] Delete menu error: {e}") except Exception as e: print(f"[Redis Error] Error processing message: {e}") traceback.print_exc() def send_stream_notification(msg_type: str, content: any, batch_id: str, current_chunk: int, total_chunks: int, user_id: str): """ msg_type: "start", "chunk", "end", "error" """ if not FRONTEND_NOTIFY_URL: print("Warning: FRONTEND_NOTIFY_URL is not set.") return payload = { "type": "notify", "payload": { "from": SERVICE_NAME, "level": "content", "msg": msg_type, "batch_id": batch_id, "current_chunk": current_chunk, "total_chunks": total_chunks, "to": user_id, "content": content } } try: # requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}") print(payload) except Exception as e: print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}") def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): batch_id = str(uuid.uuid4()) send_stream_notification("start", {"message": f"Start fetching catalog: {catalog}"}, batch_id, 0, 0, user_id) try: config = COUNTRY_MAPPING.get(country) client = get_gspread_client() spreadsheet = client.open_by_key(config["spreadsheet_id"]) try: nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) nv2_name = next(s for s in config["sheets"] if "new-layout-v2" in s.lower()) nd_name = next(s for s in config["sheets"] if "name-desc-v2" in s.lower()) except StopIteration: raise HTTPException(status_code=404, detail="Sheet mapping configuration error") nl_all = spreadsheet.worksheet(nl_name).get_all_values() nv2_all = spreadsheet.worksheet(nv2_name).get_all_values() nd_all = spreadsheet.worksheet(nd_name).get_all_values() # range for new-layout-v2 nv2_start, nv2_end = find_catalog_range(nv2_all, catalog) if nv2_start is None: nv2_start, nv2_end = 0, len(nv2_all) - 1 final_result = [] is_in_block = False for i, nl_row in enumerate(nl_all): col_a = nl_row[0].strip() if len(nl_row) > 0 else "" if col_a in ["IGNORE", "-"]: col_a = "" if not is_in_block: if catalog in col_a: is_in_block = True continue else: if col_a and catalog not in col_a: break name_th = nl_row[2].strip() if len(nl_row) > 2 else "" name_en = nl_row[3].strip() if len(nl_row) > 3 else "" if not name_th and not name_en: continue menu_item = { "new_layout": {"row_index": i + 1, "values": nl_row}, "new_layout_v2": [], "name_desc_v2": [] } # Search Targets (G-K, H-L, I-M) # G(6)-K(10), H(7)-L(11), I(8)-M(12) pairs = [ (nl_row[6] if len(nl_row) > 6 else "-", nl_row[10] if len(nl_row) > 10 else "-"), (nl_row[7] if len(nl_row) > 7 else "-", nl_row[11] if len(nl_row) > 11 else "-"), (nl_row[8] if len(nl_row) > 8 else "-", nl_row[12] if len(nl_row) > 12 else "-") ] search_targets = [] individual_codes = [] for p1, p2 in pairs: c1 = p1.strip() if p1.strip() and p1.strip() != "-" else "-" c2 = p2.strip() if p2.strip() and p2.strip() != "-" else "-" # Format: {code1},{code2} if c1 != "-" or c2 != "-": search_targets.append(f"{c1},{c2}") # keep to find in name-desc-v2 if c1 != "-": individual_codes.append(c1) if c2 != "-": individual_codes.append(c2) # find in new-layout-v2 (check I, J, K | Index 8, 9, 10) for j in range(nv2_start, nv2_end + 1): nv2_row = nv2_all[j] if len(nv2_row) > 10: # range max is column K # I(8), J(9), K(10) v2_vals = [nv2_row[8].strip(), nv2_row[9].strip(), nv2_row[10].strip()] # Check target have or not in I, J, K match_v2 = any(t in v2_vals for t in search_targets) if match_v2: for sub_idx in range(3): # 3 row (Name, Desc, Img) curr_j = j + sub_idx if curr_j < len(nv2_all): row_data = nv2_all[curr_j] row_info = { "row_index": curr_j + 1, "cells": [] } # E-H (index 4-7) with only 2 row if sub_idx < 2: for c_idx in range(4, 8): if c_idx < len(row_data): row_info["cells"].append({ "value": row_data[c_idx], "coord": get_coord(curr_j, c_idx) }) menu_item["new_layout_v2"].append(row_info) break # find in name-desc-v2 for code in set(individual_codes): targets = [f"MENU.{code}.name", f"MENU.{code}.desc"] for nd_idx, nd_row in enumerate(nd_all): if len(nd_row) > 0 and nd_row[0].strip() in targets: nd_info = {"key": nd_row[0], "row_index": nd_idx+1, "cells": []} for c_idx in range(4, 8): if c_idx < len(nd_row): nd_info["cells"].append({ "value": nd_row[c_idx], "coord": get_coord(nd_idx, c_idx) }) menu_item["name_desc_v2"].append(nd_info) final_result.append(menu_item) # ----------------------------------------------------------------- # init chunk CHUNK_SIZE = 10 total_items = len(final_result) total_chunks = math.ceil(total_items / CHUNK_SIZE) for i in range(total_chunks): start_idx = i * CHUNK_SIZE end_idx = start_idx + CHUNK_SIZE chunk_data = final_result[start_idx:end_idx] send_stream_notification("chunk", chunk_data, batch_id, i + 1, total_chunks, user_id) send_stream_notification("end", {"message": "All data sent successfully"}, batch_id, total_chunks, total_chunks, user_id) except Exception as e: traceback.print_exc() send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, user_id) def get_coord(r, c): return {"row": r + 1, "col": c + 1} def find_catalog_range(all_rows, catalog: str): start = None end = None for i, row in enumerate(all_rows): col_a = row[0].strip() if len(row) > 0 else "" if start is None: if catalog in col_a: start = i end = i else: if col_a and catalog not in col_a: break end = i return start, end # get catalog def get_catalogs(country: str): config = COUNTRY_MAPPING.get(country) if not config or not config["sheets"]: print(f"[{SERVICE_NAME}] Country or sheets not found") try: # First sheet in Map target_sheet_name = config["sheets"][0] client = get_gspread_client() spreadsheet = client.open_by_key(config["spreadsheet_id"]) worksheet = spreadsheet.worksheet(target_sheet_name) # Get A column col_a = worksheet.col_values(1) catalogs = [] # Skip Row 1 (Index 0 in List) for row_idx in range(1, len(col_a)): val = col_a[row_idx].strip() if not val or val in ["-", "IGNORE"]: continue # Specify file=... # format: Name=Test,file=page_catalog_group_recommend.skt match = re.search(r'file=([^,]+)', val) if match: catalog_name = match.group(1).strip() lock_info = lock_manager.get_lock_info(country, catalog_name) catalogs.append({ "catalog": catalog_name, "row_index": row_idx + 1, # Index in Google Sheet "status": "locked" if lock_info["is_locked"] else "free", "locked_by": lock_info["locked_by"] }) return {"status": "success", "country": country, "catalogs": catalogs} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) """ ADD_MENU_CHANNEL """ def find_catalog_block(col_a, catalog: str): start = None end = None pattern = re.compile(rf"file={re.escape(catalog)}") for i, val in enumerate(col_a): val = val.strip() if pattern.search(val): if start is None: start = i end = i else: if start is not None: break return start, end def handle_add_menu(country: str, catalog: str, content: list): config = COUNTRY_MAPPING.get(country) client = get_gspread_client() sheet = client.open_by_key(config["spreadsheet_id"]) nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) worksheet = sheet.worksheet(nl_name) col_a = worksheet.col_values(1) start, end = find_catalog_block(col_a, catalog) if start is None: return insert_at = end + 2 worksheet.insert_rows([[]] * len(content), row=insert_at) batch_requests = [] for i, row in enumerate(content): row_index = insert_at + i cells = row.get("cells", []) if not cells: continue end_col = col_to_letter(len(cells)) batch_requests.append({ "range": f"A{row_index}:{end_col}{row_index}", "values": [cells] }) if batch_requests: worksheet.batch_update(batch_requests, value_input_option="USER_ENTERED") """ ADD_CATALOG_CHANNEL """ def handle_add_catalog(country: str, catalog_name: str, catalog: str): config = COUNTRY_MAPPING.get(country) client = get_gspread_client() sheet = client.open_by_key(config["spreadsheet_id"]) nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) ws = sheet.worksheet(nl_name) all_values = ws.get_all_values() if any(catalog in cell for row in all_values for cell in row): print(f"[{SERVICE_NAME}] catalog is already exists") return # find final row last_row = 0 for i, row in enumerate(all_values, start=1): if any(cell.strip() for cell in row): last_row = i insert_row = last_row + 2 value = f"Name={catalog_name},file={catalog}" ws.update(f"A{insert_row}", [[value]]) print(f"[{SERVICE_NAME}] added at row {insert_row}") """ UPDATE_MENU_CHANNEL """ # update sheet def update_sheets(country: str, content: list): config = COUNTRY_MAPPING.get(country) client = get_gspread_client() spreadsheet = client.open_by_key(config["spreadsheet_id"]) requests_map = build_batch_requests(content) for sheet_name in config["sheets"]: sheet_key = None if "new-layout-v2" in sheet_name.lower(): sheet_key = "new_layout_v2" elif "name-desc-v2" in sheet_name.lower(): sheet_key = "name_desc_v2" else: sheet_key = "new_layout" batch_data = requests_map.get(sheet_key, []) if not batch_data: continue worksheet = spreadsheet.worksheet(sheet_name) worksheet.batch_update(batch_data, value_input_option="USER_ENTERED") # payload -> batch requests def build_batch_requests(content): requests_map = { "new_layout": {}, "new_layout_v2": {}, "name_desc_v2": {} } for item in content: for sheet_key in requests_map.keys(): rows = item.get(sheet_key, []) if sheet_key == "new_layout": rows = [rows] if rows else [] for row in rows: cells = row.get("cells", []) if not cells: continue row_i = cells[0]["coord"]["row"] if row_i not in requests_map[sheet_key]: requests_map[sheet_key][row_i] = {} for cell in cells: col_i = cell["coord"]["col"] val = cell["value"] requests_map[sheet_key][row_i][col_i] = val # convert → batch format final_map = {k: [] for k in requests_map} for sheet_key, rows in requests_map.items(): for row_i, cols in rows.items(): sorted_cols = sorted(cols.items()) chunks = split_into_chunks(sorted_cols) for chunk in chunks: start_col = chunk[0][0] end_col = chunk[-1][0] values = [v for _, v in chunk] final_map[sheet_key].append({ "range": f"{col_to_letter(start_col)}{row_i}:{col_to_letter(end_col)}{row_i}", "values": [values] }) return final_map # column to A1 spreadsheet format def col_to_letter(col: int) -> str: result = "" while col > 0: col, rem = divmod(col - 1, 26) result = chr(65 + rem) + result return result def split_into_chunks(sorted_cols): chunks = [] current = [sorted_cols[0]] for prev, curr in zip(sorted_cols, sorted_cols[1:]): if curr[0] == prev[0] + 1: current.append(curr) else: chunks.append(current) current = [curr] chunks.append(current) return chunks """ DELETE_MENU_CHANNEL """ def handle_delete_menu(country: str, catalog: str, content: list): config = COUNTRY_MAPPING.get(country) client = get_gspread_client() sheet = client.open_by_key(config["spreadsheet_id"]) nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) worksheet = sheet.worksheet(nl_name) col_a = worksheet.col_values(1) # find catalog start range pattern = re.compile(rf"file={re.escape(catalog)}") start = None end = None for i, val in enumerate(col_a): val = val.strip() if start is None: if pattern.search(val): start = i end = i else: if "file=" in val: break end = i if start is None: return rows = [item["row_index"] for item in content if "row_index" in item] # specifically in block print("start, end:", start, end) print("rows before filter:", rows) rows = [ r for r in rows if (start + 1) <= r <= (end + 1) ] if not rows: return ranges = group_consecutive_rows(rows) requests = [] for start_r, end_r in reversed(ranges): requests.append({ "deleteDimension": { "range": { "sheetId": worksheet.id, "dimension": "ROWS", "startIndex": start_r - 1, "endIndex": end_r } } }) sheet.batch_update({ "requests": requests }) def group_consecutive_rows(rows): ranges = [] rows = sorted(rows) start = rows[0] prev = rows[0] for r in rows[1:]: if r == prev + 1: prev = r else: ranges.append((start, prev)) start = r prev = r ranges.append((start, prev)) return ranges app = FastAPI() class CatalogPayloadValues(BaseModel): country: str class CatalogPayload(BaseModel): user_info: dict = {} srv_name: str values: CatalogPayloadValues class CatalogRequest(BaseModel): type: str payload: CatalogPayload class RoomRequest(BaseModel): country: str catalog: str user_id: str @app.on_event("startup") def startup_event(): thread = threading.Thread(target=redis_message_handler, daemon=True) thread.start() # @app.post("/catalogs") # def get_catalogs(req: CatalogRequest): # payload = req.payload # if payload.srv_name != SERVICE_NAME: # raise HTTPException(status_code=400, detail="Invalid service name") # country = payload.values.country.strip().lower() # config = COUNTRY_MAPPING.get(country) # if not config or not config["sheets"]: # raise HTTPException(status_code=404, detail="Country or sheets not found") # try: # # First sheet in Map # target_sheet_name = config["sheets"][0] # client = get_gspread_client() # spreadsheet = client.open_by_key(config["spreadsheet_id"]) # worksheet = spreadsheet.worksheet(target_sheet_name) # # Get A column # col_a = worksheet.col_values(1) # catalogs = [] # # Skip Row 1 (Index 0 in List) # for row_idx in range(1, len(col_a)): # val = col_a[row_idx].strip() # if not val or val in ["-", "IGNORE"]: # continue # # Specify file=... # # format: Name=Test,file=page_catalog_group_recommend.skt # match = re.search(r'file=([^,]+)', val) # if match: # catalog_name = match.group(1).strip() # lock_info = lock_manager.get_lock_info(country, catalog_name) # catalogs.append({ # "catalog": catalog_name, # "row_index": row_idx + 1, # Index in Google Sheet # "status": "locked" if lock_info["is_locked"] else "free", # "locked_by": lock_info["locked_by"] # }) # return {"status": "success", "country": country, "catalogs": catalogs} # except Exception as e: # traceback.print_exc() # raise HTTPException(status_code=500, detail=str(e)) @app.get("/catalog/data/{country}/{catalog}/{user_id}") def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks): country = country.strip().lower() if not lock_manager.is_authorized(country, catalog, user_id): raise HTTPException(status_code=403, detail="Unauthorized access to this room") background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id) # return {"status": "success", "country": country, "catalog": catalog, "data": final_result} return { "status": "processing", "country": country, "catalog": catalog, "message": "Data stream started. Please listen to frontend notifications." } @app.post("/api/room/enter") def enter_room(req: RoomRequest): country = req.country.strip().lower() success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id) if success: threading.Thread( target=process_and_stream_sheet_data, args=(country, req.catalog, req.user_id), daemon=True ).start() return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout} else: raise HTTPException(status_code=403, detail=message_or_user) @app.post("/api/room/heartbeat") def heartbeat(req: RoomRequest): country = req.country.strip().lower() if lock_manager.keep_alive(country, req.catalog, req.user_id): return {"status": "alive"} else: raise HTTPException(status_code=401, detail="Timeout permission denied") @app.post("/api/room/exit") def exit_room(req: RoomRequest): country = req.country.strip().lower() if lock_manager.release(country, req.catalog, req.user_id): return {"status": "success", "message": "Successfully exited"} else: return {"status": "success", "message": "You no longer have permission"}