import time import re import os import json import redis import gspread import requests import traceback import threading import uuid import math from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from dotenv import load_dotenv from google.oauth2 import service_account REDIS_HOST = os.getenv("REDIS_HOST") REDIS_PORT = os.getenv("REDIS_PORT") FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL") LOG_SERVER_URL = os.getenv("LOG_SERVER_URL") SERVICE_NAME = os.getenv("SERVICE_NAME") ENTER_CHANNEL = f"{SERVICE_NAME}/enter" HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat" EXIT_CHANNEL = f"{SERVICE_NAME}/exit" GET_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalogs" GET_MENU_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalog/menu" UPDATE_MENU_CHANNEL = f"{SERVICE_NAME}/update/menu" DELETE_MENU_CHANNEL = f"{SERVICE_NAME}/delete/menu" ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog" ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu" SWAP_MENU_CHANNEL = f"{SERVICE_NAME}/swap/menu" # Grist set up ... GRIST_URL = os.getenv("GRIST_URL") GRIST_API_KEY = os.getenv("GRIST_API_KEY") r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) # ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}' COUNTRY_MAPPING = { "tha": { "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"), "sheets": { "new-layout": os.getenv("SHEET_NEW_LAYOUT_THA"), "new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"), "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA") }, "grist_doc_id": { "new-layout": os.getenv("DOC_ID_NEW_LAYOUT_THA"), "new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"), "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA") } }, "tha-no-layout": { "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"), "sheets": { "new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"), "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA") }, "grist_doc_id": { "new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"), "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA") } }, "hkg": { "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_HKG"), "sheets": { "new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_HKG"), "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_HKG") }, "grist_doc_id": { "new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_HKG"), "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_HKG") } }, "sgp": { "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_SGP"), "sheets": { "new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_SGP"), "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_SGP") }, "grist_doc_id": { "new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_SGP"), "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_SGP") } } } def get_gspread_client(): SCOPES = [ 'https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive' ] creds = None service_account_email = os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL") private_key = os.getenv("GOOGLE_PRIVATE_KEY").replace("\\n", "\n") credentials_info = { "type": "service_account", "client_email": service_account_email, "private_key": private_key, "token_uri": "https://oauth2.googleapis.com/token", } creds = service_account.Credentials.from_service_account_info( credentials_info, scopes=SCOPES ) return gspread.authorize(creds) class LockManager: def __init__(self): # { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} } self.locks: Dict[str, dict] = {} self.timeout = 30 def get_room_key(self, country: str, catalog: str) -> str: return f"{country}:{catalog}" def acquire(self, country: str, catalog: str, user_id: str): now = time.time() room_key = self.get_room_key(country, catalog) # (1 person, 1 room) for key, data in list(self.locks.items()): if data and data["user_id"] == user_id and key != room_key: if now - data["timestamp"] < self.timeout: return False, "You are already in another room." else: self.locks[key] = None # Check if the room you're requesting is already occupied by someone else. current_lock = self.locks.get(room_key) if current_lock and current_lock["user_id"] != user_id: if now - current_lock["timestamp"] < self.timeout: return False, f"Room locked by {current_lock['user_id']}" # queue self.locks[room_key] = {"user_id": user_id, "timestamp": now} return True, user_id def keep_alive(self, country: str, catalog: str, user_id: str): now = time.time() room_key = self.get_room_key(country, catalog) lock = self.locks.get(room_key) # Renew if lock and lock["user_id"] == user_id: if now - lock["timestamp"] < self.timeout: self.locks[room_key]["timestamp"] = now return True return False def release(self, country: str, catalog: str, user_id: str): room_key = self.get_room_key(country, catalog) lock = self.locks.get(room_key) # Clear if lock and lock["user_id"] == user_id: self.locks[room_key] = None return True return False def is_authorized(self, country: str, catalog: str, user_id: str): now = time.time() lock = self.locks.get(self.get_room_key(country, catalog)) if lock and lock["user_id"] == user_id: if now - lock["timestamp"] < self.timeout: return True return False def get_lock_info(self, country: str, catalog: str): now = time.time() lock = self.locks.get(self.get_room_key(country, catalog)) if lock and (now - lock["timestamp"] < self.timeout): return {"is_locked": True, "locked_by": lock["user_id"]} return {"is_locked": False, "locked_by": None} lock_manager = LockManager() # -- Redis handler -- def redis_message_handler(): pubsub = r.pubsub() pubsub.subscribe(ENTER_CHANNEL, HEARTBEAT_CHANNEL, EXIT_CHANNEL, GET_CATALOG_CHANNEL, GET_MENU_CATALOG_CHANNEL, UPDATE_MENU_CHANNEL, DELETE_MENU_CHANNEL, ADD_CATALOG_CHANNEL, ADD_MENU_CHANNEL, SWAP_MENU_CHANNEL) print(f"[*] Redis Listener started on service: {SERVICE_NAME}") for message in pubsub.listen(): if message['type'] != 'message': continue try: data = json.loads(message['data']) payload = data.get("payload", {}) values = payload.get("values", {}) user_info = payload.get("user_info", {}) country = values.get("country", "").strip().lower() catalog = values.get("catalog", "") catalog_name = values.get("catalog_name", "") content = values.get("content", []) srv_name = payload.get("srv_name", "") user_id = user_info.get("uid") if srv_name != SERVICE_NAME: continue channel = message['channel'] if not (user_id and country): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if channel == ENTER_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue success, msg = lock_manager.acquire(country, catalog, user_id) if success: threading.Thread( target=process_and_stream_sheet_data, args=(country, catalog, user_id), daemon=True ).start() print(f"[{SERVICE_NAME}] Enter Room: {catalog} | User: {user_id} | Success: {success}") elif channel == HEARTBEAT_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue alive = lock_manager.keep_alive(country, catalog, user_id) if not alive: print(f"[{SERVICE_NAME}] Heartbeat Failed: {catalog} | User: {user_id}") elif channel == EXIT_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue lock_manager.release(country, catalog, user_id) print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}") elif channel == GET_CATALOG_CHANNEL: try: if not FRONTEND_NOTIFY_URL: print(f"{SERVICE_NAME} Warning: FRONTEND_NOTIFY_URL is not set.") continue result = get_catalogs(country) payload = { "type": "notify", "payload": { "from": SERVICE_NAME, "level": "content", "to": user_id, "content": result } } requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) print(payload) print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}") except Exception as e: print(f"[{SERVICE_NAME}] Get catalog error: {e}") elif channel == GET_MENU_CATALOG_CHANNEL: if not catalog: print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): print(f"[{SERVICE_NAME}] get catalog menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue threading.Thread( target=process_and_stream_sheet_data, args=(country, catalog, user_id), daemon=True ).start() elif channel == ADD_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): print(f"[{SERVICE_NAME}] Add menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: handle_add_menu(country, catalog, content) print(f"[{SERVICE_NAME}] Add menu success: {catalog} | User: {user_id}") except Exception as e: print(f"[{SERVICE_NAME}] Add menu error: {e}") elif channel == ADD_CATALOG_CHANNEL: if not (catalog_name, catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue try: handle_add_catalog(country, catalog_name, catalog) except Exception as e: print(f"[{SERVICE_NAME}] Add catalog error: {e}") elif channel == UPDATE_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): # raise HTTPException(status_code=403, detail="Unauthorized access to this room") print(f"[{SERVICE_NAME}] Updated catalog failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: update_sheets(country, catalog, content) print(f"[{SERVICE_NAME}] Update success: {catalog} | User: {user_id}") except Exception as e: print(f"[{SERVICE_NAME}] Update error: {e}") elif channel == DELETE_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if not lock_manager.is_authorized(country, catalog, user_id): print(f"[{SERVICE_NAME}] Delete menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: handle_delete_menu(country, catalog, content) except Exception as e: print(f"[{SERVICE_NAME}] Delete menu error: {e}") elif channel == SWAP_MENU_CHANNEL: if not (content and catalog): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}") continue if not lock_manager.is_authorized(country, catalog, user_id): print(f"[{SERVICE_NAME}] Swap menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") continue try: handle_swap_menu(country, catalog, content) print(f"[{SERVICE_NAME}] Swap data success: {catalog}") except Exception as e: print(f"[{SERVICE_NAME}] Swap data error: {e}") traceback.print_exc() except Exception as e: print(f"[Redis Error] Error processing message: {e}") traceback.print_exc() def find_grist_table_id(doc_id, catalog_suffix): url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} try: resp = requests.get(url, headers=headers) if resp.status_code == 200: tables = resp.json().get("tables", []) for t in tables: t_id = t.get("id", "") reconstructed = reconstruct_table_name(t_id) if f"file={catalog_suffix}" in reconstructed: return t_id label = t.get("fields", {}).get("label", "") if f"file={catalog_suffix}" in label: return t_id return None except Exception as e: print(f"Error: {e}") return None def send_stream_notification(msg_type: str, content: any, batch_id: str, current_chunk: int, total_chunks: int, total_items: int, user_id: str): """ msg_type: "start", "chunk", "end", "error" """ if not FRONTEND_NOTIFY_URL: print("Warning: FRONTEND_NOTIFY_URL is not set.") return payload = { "type": "notify", "payload": { "from": SERVICE_NAME, "level": "content", "msg": msg_type, "batch_id": batch_id, "current_chunk": current_chunk, "total_chunks": total_chunks, "total_items": total_items, "to": user_id, "content": content } } try: # requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}") print(payload) except Exception as e: print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}") def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): batch_id = str(uuid.uuid4()) send_stream_notification("start", {"message": f"Start fetching catalog: {catalog}"}, batch_id, 0, 0, 0, user_id) try: config = COUNTRY_MAPPING.get(country) grist_docs = config.get("grist_doc_id", {}) has_nl = "new-layout" in grist_docs doc_nv2 = grist_docs.get("new-layout-v2") doc_nd = grist_docs.get("name-desc-v2") full_table_nv2 = find_grist_table_id(doc_nv2, catalog) nd_all = fetch_grist_table_data(doc_nd, "Name_desc_v2") final_result = [] if not full_table_nv2: raise Exception(f"Table for catalog {catalog} not found in New-Layout-V2") nv2_data = fetch_grist_table_data(doc_nv2, full_table_nv2) # NV2 : name row, desc row, img row, blank row (loop) # 4 rows = 1 menu i = 0 while i < len(nv2_data): nv2_obj = nv2_data[i] nv2_row = nv2_obj["fields"] if len(nv2_row) <= 1 or nv2_row[1] != "name": i += 1 continue name_en = nv2_row[2].strip() if len(nv2_row) > 2 else "" name_th = nv2_row[3].strip() if len(nv2_row) > 3 else "" if not name_th and not name_en: i += 4 continue menu_item = { "new_layout_v2": [], "name_desc_v2": [] } individual_codes = [] for sub_idx in range(4): curr_i = i + sub_idx if curr_i < len(nv2_data): row_data = nv2_data[curr_i]["fields"] row_info = { "row_index": nv2_data[curr_i]["id"], "cells": [] } if sub_idx < 3: for c_idx in range(len(row_data)): if c_idx < len(row_data): row_info["cells"].append({ "value": row_data[c_idx], "coord": {"row": nv2_data[curr_i]["id"], "col": c_idx + 1} }) menu_item["new_layout_v2"].append(row_info) # all product codes (col I, J, K = index 8, 9, 10) if sub_idx == 0 and len(row_data) > 10: for code_idx in [8, 9, 10]: raw_value = str(row_data[code_idx]).strip() if not raw_value or raw_value == "-": continue codes = [c.strip() for c in raw_value.split(",") if c.strip() and c.strip() != "-"] individual_codes.extend(codes) # find in name-desc-v2 for code in set(individual_codes): targets = [f"MENU.{code}.name", f"MENU.{code}.desc"] for nd_obj in nd_all: nd_row = nd_obj["fields"] nd_id = nd_obj["id"] if len(nd_row) > 0 and nd_row[0].strip() in targets: nd_info = {"key": nd_row[0], "row_index": nd_id, "cells": []} for c_idx in range(4, 8): if c_idx < len(nd_row): nd_info["cells"].append({ "value": nd_row[c_idx], "coord": {"row": nd_id, "col": c_idx + 1} }) menu_item["name_desc_v2"].append(nd_info) final_result.append(menu_item) i += 4 # --- Stream Chunk --- CHUNK_SIZE = 10 total_items = len(final_result) total_chunks = math.ceil(total_items / CHUNK_SIZE) if total_items > 0 else 1 for i in range(total_chunks): start_idx = i * CHUNK_SIZE end_idx = start_idx + CHUNK_SIZE chunk_data = final_result[start_idx:end_idx] send_stream_notification("chunk", chunk_data, batch_id, i + 1, total_chunks, total_items, user_id) send_stream_notification("end", {"message": "All data sent successfully"}, batch_id, total_chunks, total_chunks, total_items, user_id) except Exception as e: traceback.print_exc() send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, 0, user_id) # get catalog def get_catalogs(country: str): config = COUNTRY_MAPPING.get(country) if not config or not config.get("grist_doc_id"): print(f"[{SERVICE_NAME}] Country or Grist doc not found") raise HTTPException(status_code=404, detail="Grist config not found") try: grist_docs = config["grist_doc_id"] doc_id = None if grist_docs.get("new-layout"): doc_id = grist_docs["new-layout"] elif grist_docs.get("new-layout-v2"): doc_id = grist_docs["new-layout-v2"] else: print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}") return None url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} resp = requests.get(url, headers=headers) catalogs = [] if resp.status_code == 200: tables = resp.json().get("tables", []) for i, t in enumerate(tables): t_id = t["id"] if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2": continue reconstructed_name = reconstruct_table_name(t_id) match = re.search(r'file=([^,]+)', reconstructed_name) if match: clean_catalog = match.group(1).strip() else: clean_catalog = t_id lock_info = lock_manager.get_lock_info(country, clean_catalog) catalogs.append({ "catalog": clean_catalog, "row_index": i, "status": "locked" if lock_info["is_locked"] else "free", "locked_by": lock_info["locked_by"] }) return {"status": "success", "country": country, "catalogs": catalogs} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) """ ADD_MENU_CHANNEL """ def get_val(arr, idx, default="-"): if not arr or idx >= len(arr): return default val = arr[idx] return val if val != "" else default # List to Grist format {"fields": {"A": ..., "B": ...}} def to_grist_record(row_list): # chr(65+i) change to A, B, C... return {"fields": {chr(65 + i): val for i, val in enumerate(row_list)}} def handle_add_menu(country: str, catalog: str, content: list): config = COUNTRY_MAPPING.get(country) if not config: print(f"[{SERVICE_NAME}] Country {country} config not found") return grist_docs = config.get("grist_doc_id", {}) has_nl = "new-layout" in grist_docs doc_nl = grist_docs.get("new-layout") doc_nv2 = grist_docs.get("new-layout-v2") doc_nd = grist_docs.get("name-desc-v2") nl_table_id = find_grist_table_id(doc_nl, catalog) if has_nl else None nv2_table_id = find_grist_table_id(doc_nv2, catalog) nd_table_id = "Name_desc_v2" if has_nl and not nl_table_id: print(f"[{SERVICE_NAME}] Table for {catalog} not found in doc NL") return if not nv2_table_id: print(f"[{SERVICE_NAME}] Table for {catalog} not found in doc NV2") return def norm_val(v): return v if v not in [None, ""] else "-" nd_existing_data = fetch_grist_table_data(doc_nd, nd_table_id) existing_keys = set() if nd_existing_data: for row_obj in nd_existing_data: row = row_obj["fields"] key = row[0] if key: existing_keys.add(key) # for THA existing_nl_codes = set() if has_nl and nl_table_id: nl_data = fetch_grist_table_data(doc_nl, nl_table_id) if nl_data: for row_obj in nl_data: row = row_obj["fields"] codes = tuple(row[i] if i < len(row) else "-" for i in [6, 7, 8, 10, 11, 12]) existing_nl_codes.add(codes) nv2_data = fetch_grist_table_data(doc_nv2, nv2_table_id) existing_nv2_codes = set() if nv2_data: for row_obj in nv2_data: row = row_obj["fields"] is_name = row[1] if len(row) > 1 else "" if is_name == "name": codes = (norm_val(row[8] if len(row) > 8 else "-"), norm_val(row[9] if len(row) > 9 else "-"), norm_val(row[10] if len(row) > 10 else "-")) existing_nv2_codes.add(codes) nl_records = [] nv2_records = [] nd_records = [] for item in content: cells = item.get("cells", []) if not cells: continue payload_lang = item.get("payload", {}) lang_name = payload_lang.get("lang_name", ["", "", "", ""]) lang_desc = payload_lang.get("lang_desc", ["", "", "", ""]) # ========================================== # New-Layout (THA) # ========================================== if has_nl: current_nl_tuple = tuple(norm_val(get_val(cells, i)) for i in [6,7,8,10,11,12]) if current_nl_tuple not in existing_nl_codes: nl_records.append(to_grist_record(cells)) existing_nl_codes.add(current_nl_tuple) # ========================================== # New-Layout-V2 (All country) # ========================================== hot_code = f"{get_val(cells, 6)},{get_val(cells, 10)}" cold_code = f"{get_val(cells, 7)},{get_val(cells, 11)}" blend_code = f"{get_val(cells, 8)},{get_val(cells, 12)}" current_nv2_tuple = (hot_code, cold_code, blend_code) if current_nv2_tuple not in existing_nv2_codes: name_row = ["", "name", get_val(cells, 3), get_val(cells, 2), get_val(lang_name, 0), get_val(lang_name, 1), get_val(lang_name, 2), get_val(lang_name, 3), hot_code, cold_code, blend_code, "", "", "", "", "", "", "", get_val(cells, 14), get_val(cells, 15), get_val(cells, 16), get_val(cells, 17), get_val(cells, 18)] desc_row = ["", "desc", get_val(cells, 5), get_val(cells, 4), get_val(lang_desc, 0), get_val(lang_desc, 1), get_val(lang_desc, 2), get_val(lang_desc, 3), "||||||||||||||||||||||||||", "||||||||||||||||||||||||||", "||||||||||||||||||||||||||", "", "", "", "", "", "", "", "-", "-", "-", "-", "-"] img_row = ["", "img", get_val(cells, 9), "-", "-", "-", "-", "-", get_val(cells, 13), "||||||||||||||||||||||||||", "||||||||||||||||||||||||||", "", "", "", "", "", "", "", "-", "-", "-", "-", "-"] blank_row = [""] * 23 nv2_records.extend([to_grist_record(name_row), to_grist_record(desc_row), to_grist_record(img_row), to_grist_record(blank_row)]) existing_nv2_codes.add(current_nv2_tuple) # ========================================== # Name-desc-V2 (All country) # ========================================== product_code_indices = [6, 7, 8, 10, 11, 12] for p_idx in product_code_indices: code = get_val(cells, p_idx) if code == "-" or not code: continue menu_name_key = f"MENU.{code}.name" if menu_name_key in existing_keys: continue parts = code.split('-') drink_type = "UNKNOWN" drink_type_th = "" if len(parts) >= 3: type_id = parts[2] if type_id == "01": drink_type = "HOT" drink_type_th = "ร้อน" elif type_id == "02": drink_type = "ICED" drink_type_th = "เย็น" elif type_id == "03": drink_type = "SMOOTHIE" drink_type_th = "ปั่น" name_en = get_val(cells, 3, "") name_th = get_val(cells, 2, "") if drink_type == "SMOOTHIE": prefix = f"{name_en} {drink_type}".strip() else: prefix = f"{drink_type} {name_en}".strip() prefix_th = f"{name_th} {drink_type_th}".strip() if drink_type_th else name_th nd_name_row = [menu_name_key, get_val(cells, 9), prefix, prefix_th, get_val(lang_name, 0), get_val(lang_name, 1), get_val(lang_name, 2), get_val(lang_name, 3)] nd_desc_row = [f"MENU.{code}.desc", "-", get_val(cells, 5), get_val(cells, 4), get_val(lang_desc, 0), get_val(lang_desc, 1), get_val(lang_desc, 2), get_val(lang_desc, 3)] nd_records.extend([to_grist_record(nd_name_row), to_grist_record(nd_desc_row)]) existing_keys.add(menu_name_key) headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} def add_records_to_grist(doc_id, table_id, records): if not records or not doc_id or not table_id: return url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records" try: resp = requests.post(url, headers=headers, json={"records": records}) if resp.status_code != 200: print(f"Error adding to {table_id}: {resp.text}") except Exception as e: print(f"Request failed for {table_id}: {e}") # NL (เฉพาะ THA) if has_nl and nl_records: add_records_to_grist(doc_nl, nl_table_id, nl_records) # NV2 if nv2_table_id and nv2_records: add_records_to_grist(doc_nv2, nv2_table_id, nv2_records) # ND_V2 if nd_records: add_records_to_grist(doc_nd, nd_table_id, nd_records) """ ADD_CATALOG_CHANNEL """ def handle_add_catalog(country: str, catalog_name: str, catalog: str): config = COUNTRY_MAPPING.get(country) grist_docs = config.get("grist_doc_id", {}) docs_to_create = [] if "new-layout" in grist_docs: docs_to_create.append(grist_docs["new-layout"]) if "new-layout-v2" in grist_docs: docs_to_create.append(grist_docs["new-layout-v2"]) table_label = f"Name={catalog_name},file={catalog}" table_id = re.sub(r'[^a-zA-Z0-9_]', '_', table_label) table_id = table_id[0].upper() + table_id[1:] headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} def create_table_in_doc(doc_id): url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" columns = [{"id": chr(65+i), "fields": {"label": chr(65+i)}} for i in range(15)] payload = { "tables": [{ "id": table_id, "columns": columns }] } resp = requests.post(url, headers=headers, json=payload) if resp.status_code == 200: print(f"[{SERVICE_NAME}] Created table {table_id} in doc {doc_id}") else: print(f"[{SERVICE_NAME}] Failed to create table: {resp.text}") for doc_id in docs_to_create: create_table_in_doc(doc_id) """ UPDATE_MENU_CHANNEL """ def update_sheets(country: str, catalog: str, content: list): config = COUNTRY_MAPPING.get(country) grist_docs = config.get("grist_doc_id", {}) has_nl = "new-layout" in grist_docs doc_map = {} if "new-layout" in grist_docs: doc_map["new_layout"] = grist_docs["new-layout"] if "new-layout-v2" in grist_docs: doc_map["new_layout_v2"] = grist_docs["new-layout-v2"] if "name-desc-v2" in grist_docs: doc_map["name_desc_v2"] = grist_docs["name-desc-v2"] full_table_name = None for key in ["new_layout", "new_layout_v2"]: if key in doc_map: full_table_name = find_grist_table_id(doc_map[key], catalog) if full_table_name: break if not full_table_name: raise Exception(f"Table for catalog {catalog} not found") headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} nv2_updates_for_nl = [] for item in content: for sheet_key in doc_map.keys(): rows = item.get(sheet_key, []) if sheet_key == "new_layout": rows = [rows] if rows else [] if not rows: continue doc_id = doc_map[sheet_key] target_table = "Name_desc_v2" if sheet_key == "name_desc_v2" else full_table_name records_to_update = [] for row in rows: cells = row.get("cells", []) if not cells: continue row_i = cells[0]["coord"]["row"] fields = {} for cell in cells: col_letter = col_to_letter(cell["coord"]["col"]) fields[col_letter] = str(cell["value"]) records_to_update.append({ "id": row_i, "fields": fields }) # keep nv2 data to sync in new-layout [If is tha] if has_nl and sheet_key == "new_layout_v2": nv2_updates_for_nl.append({ "row_id": row_i, "fields": fields, "cells": cells }) if records_to_update: grouped_updates = {} for record in records_to_update: field_keys = tuple(sorted(record["fields"].keys())) if field_keys not in grouped_updates: grouped_updates[field_keys] = [] grouped_updates[field_keys].append(record) update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{target_table}/records" for field_keys, batch in grouped_updates.items(): resp = requests.patch(update_url, headers=headers, json={"records": batch}) if resp.status_code != 200: print(f"[{SERVICE_NAME}] Grist update failed for {target_table} | Doc_id : {doc_id} | Status : {resp.text}") # === SYNC: new-layout-v2 → new-layout (THA) === if has_nl and nv2_updates_for_nl: sync_nv2_to_nl(country, catalog, nv2_updates_for_nl, headers) def sync_nv2_to_nl(country: str, catalog: str, nv2_updates: list, headers: dict): """Sync updates จาก new-layout-v2 ไปอัปเดต new-layout สำหรับ THA""" config = COUNTRY_MAPPING.get(country) grist_docs = config.get("grist_doc_id", {}) doc_nl = grist_docs["new-layout"] doc_nv2 = grist_docs["new-layout-v2"] nl_table_id = find_grist_table_id(doc_nl, catalog) nv2_table_id = find_grist_table_id(doc_nv2, catalog) if not nl_table_id or not nv2_table_id: print(f"[{SERVICE_NAME}] Tables not found for sync") return nv2_all_data = fetch_grist_table_data(doc_nv2, nv2_table_id) nl_all_data = fetch_grist_table_data(doc_nl, nl_table_id) # === NL codes map === # NL tuple: (G, H, I, K, L, M) = (index 6, 7, 8, 10, 11, 12) nl_codes_map = {} for row_obj in nl_all_data: row = row_obj["fields"] g = str(row[6]).strip() if len(row) > 6 else "-" h = str(row[7]).strip() if len(row) > 7 else "-" i = str(row[8]).strip() if len(row) > 8 else "-" k = str(row[10]).strip() if len(row) > 10 else "-" l = str(row[11]).strip() if len(row) > 11 else "-" m = str(row[12]).strip() if len(row) > 12 else "-" nl_codes_map[(g, h, i, k, l, m)] = row_obj["id"] # === NV2 → NL mapping (any row in block to NL in one row) === nv2_to_nl_map = {} for i in range(len(nv2_all_data)): row = nv2_all_data[i]["fields"] if len(row) > 1 and str(row[1]).strip() == "name": # productCodes NV2 name row: I(8), J(9), K(10) raw_i = str(row[8]).strip() if len(row) > 8 else "-" raw_j = str(row[9]).strip() if len(row) > 9 else "-" raw_k = str(row[10]).strip() if len(row) > 10 else "-" # Split comma-separated parts_i = [p.strip() for p in raw_i.split(",") if p.strip() and p.strip() != "-"] parts_j = [p.strip() for p in raw_j.split(",") if p.strip() and p.strip() != "-"] parts_k = [p.strip() for p in raw_k.split(",") if p.strip() and p.strip() != "-"] col_g = parts_i[0] if len(parts_i) > 0 else "-" col_k = parts_i[1] if len(parts_i) > 1 else "-" col_h = parts_j[0] if len(parts_j) > 0 else "-" col_l = parts_j[1] if len(parts_j) > 1 else "-" col_i = parts_k[0] if len(parts_k) > 0 else "-" col_m = parts_k[1] if len(parts_k) > 1 else "-" # find NL row to matching codes nl_id = nl_codes_map.get((col_g, col_h, col_i, col_k, col_l, col_m)) if nl_id: for offset in range(4): if (i + offset) < len(nv2_all_data): nv2_row_obj = nv2_all_data[i + offset] nv2_id = nv2_row_obj["id"] nv2_row = nv2_row_obj["fields"] row_type = str(nv2_row[1]).strip() if len(nv2_row) > 1 else "" nv2_to_nl_map[nv2_id] = { "nl_id": nl_id, "row_type": row_type } nl_records_to_update = [] for update in nv2_updates: nv2_row_id = update["row_id"] update_fields = update["fields"] # {col_letter: value} mapping = nv2_to_nl_map.get(nv2_row_id) if not mapping: print(f"[{SERVICE_NAME}] NL row not found for NV2 row {nv2_row_id}") continue nl_target_id = mapping["nl_id"] row_type = mapping["row_type"] nl_fields = {} if row_type == "name": # C→D (name_en), D→C (name_th) if 'C' in update_fields: nl_fields['D'] = update_fields['C'] if 'D' in update_fields: nl_fields['C'] = update_fields['D'] # I→G,K if 'I' in update_fields: val = update_fields['I'] parts = [p.strip() for p in str(val).split(",") if p.strip()] if len(parts) > 0: nl_fields['G'] = parts[0] if len(parts) > 1: nl_fields['K'] = parts[1] # J→H,L if 'J' in update_fields: val = update_fields['J'] parts = [p.strip() for p in str(val).split(",") if p.strip()] if len(parts) > 0: nl_fields['H'] = parts[0] if len(parts) > 1: nl_fields['L'] = parts[1] # K→I,M if 'K' in update_fields: val = update_fields['K'] parts = [p.strip() for p in str(val).split(",") if p.strip()] if len(parts) > 0: nl_fields['I'] = parts[0] if len(parts) > 1: nl_fields['M'] = parts[1] # S→O, T→P, U→R, V→R, W→S if 'S' in update_fields: nl_fields['O'] = update_fields['S'] if 'T' in update_fields: nl_fields['P'] = update_fields['T'] if 'U' in update_fields: nl_fields['Q'] = update_fields['U'] if 'V' in update_fields: nl_fields['R'] = update_fields['V'] if 'W' in update_fields: nl_fields['S'] = update_fields['W'] elif row_type == "desc": # C→F (desc_en), D→E (desc_th) if 'C' in update_fields: nl_fields['F'] = update_fields['C'] if 'D' in update_fields: nl_fields['E'] = update_fields['D'] elif row_type == "img": # C→J (img) if 'C' in update_fields: nl_fields['J'] = update_fields['C'] # I→N if 'I' in update_fields: nl_fields['N'] = update_fields['I'] if nl_fields: nl_records_to_update.append({ "id": nl_target_id, "fields": nl_fields }) # === Update to NL === if nl_records_to_update: update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl}/tables/{nl_table_id}/records" # Group by field keys (Grist requirement) grouped_updates = {} for record in nl_records_to_update: field_keys = tuple(sorted(record["fields"].keys())) if field_keys not in grouped_updates: grouped_updates[field_keys] = [] grouped_updates[field_keys].append(record) for field_keys, batch in grouped_updates.items(): resp = requests.patch(update_url, headers=headers, json={"records": batch}) if resp.status_code != 200: print(f"[{SERVICE_NAME}] NL sync update failed: {resp.text}") else: print(f"[{SERVICE_NAME}] Synced {len(batch)} NL records") # column to A1 spreadsheet format def col_to_letter(col: int) -> str: result = "" while col > 0: col, rem = divmod(col - 1, 26) result = chr(65 + rem) + result return result """ DELETE_MENU_CHANNEL """ def handle_delete_menu(country: str, catalog: str, content: list): config = COUNTRY_MAPPING.get(country) if not config: print(f"[{SERVICE_NAME}] Country {country} config not found") return grist_docs = config.get("grist_doc_id", {}) has_nl = "new-layout" in grist_docs doc_nl = grist_docs.get("new-layout") doc_nv2 = grist_docs.get("new-layout-v2") nl_table_id = find_grist_table_id(doc_nl, catalog) if has_nl else None nv2_table_id = find_grist_table_id(doc_nv2, catalog) if not nv2_table_id: print(f"[{SERVICE_NAME}] NV2 Table not found for catalog: {catalog}") return target_ids = {int(item.get("target_id")) for item in content if item.get("target_id")} if not target_ids: print(f"[{SERVICE_NAME}] Payload to delete is empty.") return headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} try: nv2_all_data = fetch_grist_table_data(doc_nv2, nv2_table_id) # ProductCodes NV2 name rows that match with target_ids # (col_G, col_H, col_I, col_K, col_L, col_M) search_nl_codes_sets = [] nv2_ids_to_delete = [] for i in range(len(nv2_all_data)): row_obj = nv2_all_data[i] row = row_obj["fields"] if len(row) > 1 and row[1] == "name" and row_obj["id"] in target_ids: # === ProductCodes NV2 col I, J, K (index 8, 9, 10) === # comma-separated # col I (index 8) → col G (index 6) and col K (index 10) of NL raw_i = str(row[8]).strip() if len(row) > 8 else "-" parts_i = [p.strip() for p in raw_i.split(",") if p.strip() and p.strip() != "-"] col_g = parts_i[0] if len(parts_i) > 0 else "-" col_k = parts_i[1] if len(parts_i) > 1 else "-" # col J (index 9) → col H (index 7) and col L (index 11) of NL raw_j = str(row[9]).strip() if len(row) > 9 else "-" parts_j = [p.strip() for p in raw_j.split(",") if p.strip() and p.strip() != "-"] col_h = parts_j[0] if len(parts_j) > 0 else "-" col_l = parts_j[1] if len(parts_j) > 1 else "-" # col K (index 10) → col I (index 8) and col M (index 12) of NL raw_k = str(row[10]).strip() if len(row) > 10 else "-" parts_k = [p.strip() for p in raw_k.split(",") if p.strip() and p.strip() != "-"] col_i = parts_k[0] if len(parts_k) > 0 else "-" col_m = parts_k[1] if len(parts_k) > 1 else "-" # To find in new-layout search_nl_codes_sets.append((col_g, col_h, col_i, col_k, col_l, col_m)) # NV2 block IDs (4 row: name, desc, img, blank) [Delete] for offset in range(4): if (i + offset) < len(nv2_all_data): record_to_del = nv2_all_data[i + offset]["id"] if record_to_del not in nv2_ids_to_delete: nv2_ids_to_delete.append(record_to_del) # === Delete in new-layout (THA) === if has_nl and search_nl_codes_sets: nl_all_data = fetch_grist_table_data(doc_nl, nl_table_id) nl_ids_to_delete = [] for row_obj in nl_all_data: row = row_obj["fields"] # NL: col G(6), H(7), I(8), K(10), L(11), M(12) nl_g = str(row[6]).strip() if len(row) > 6 else "-" nl_h = str(row[7]).strip() if len(row) > 7 else "-" nl_i = str(row[8]).strip() if len(row) > 8 else "-" nl_k = str(row[10]).strip() if len(row) > 10 else "-" nl_l = str(row[11]).strip() if len(row) > 11 else "-" nl_m = str(row[12]).strip() if len(row) > 12 else "-" nl_tuple = (nl_g, nl_h, nl_i, nl_k, nl_l, nl_m) if nl_tuple in search_nl_codes_sets: nl_ids_to_delete.append(row_obj["id"]) if nl_ids_to_delete: del_nl_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl}/tables/{nl_table_id}/records/delete" resp_nl = requests.post(del_nl_url, headers=headers, json=nl_ids_to_delete) print(f"[{SERVICE_NAME}] Deleted NL IDs: {nl_ids_to_delete} | Status: {resp_nl.status_code}") # === Delete in new-layout-v2 (All country) === if nv2_ids_to_delete: del_nv2_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nv2}/tables/{nv2_table_id}/records/delete" resp_v2 = requests.post(del_nv2_url, headers=headers, json=nv2_ids_to_delete) print(f"[{SERVICE_NAME}] Deleted NV2 IDs: {nv2_ids_to_delete} | Status: {resp_v2.status_code}") except Exception as e: print(f"[{SERVICE_NAME}] Delete menu error: {e}") traceback.print_exc() """ SWAP_MENU_CHANNEL """ def format_for_grist(data): # List to Dict if isinstance(data, list): return {col_to_letter(i + 1): val for i, val in enumerate(data)} return data def handle_swap_menu(country: str, catalog: str, content: list): config = COUNTRY_MAPPING.get(country) grist_docs = config.get("grist_doc_id", {}) has_nl = "new-layout" in grist_docs doc_nl = grist_docs.get("new-layout") doc_nv2 = grist_docs.get("new-layout-v2") nl_table = find_grist_table_id(doc_nl, catalog) if has_nl else None nv2_table = find_grist_table_id(doc_nv2, catalog) if not nv2_table: print(f"[{SERVICE_NAME}] Swap failed: NV2 Table not found") return nl_all = fetch_grist_table_data(doc_nl, nl_table) if has_nl and nl_table else [] nv2_all = fetch_grist_table_data(doc_nv2, nv2_table) nl_map = {row["id"]: row["fields"] for row in nl_all} if has_nl else {} nv2_map = {row["id"]: row["fields"] for row in nv2_all} nv2_to_nl_map = {} if has_nl: nl_codes_map = {} for row_obj in nl_all: row = row_obj["fields"] g = str(row[6]).strip() if len(row) > 6 else "-" h = str(row[7]).strip() if len(row) > 7 else "-" i = str(row[8]).strip() if len(row) > 8 else "-" k = str(row[10]).strip() if len(row) > 10 else "-" l = str(row[11]).strip() if len(row) > 11 else "-" m = str(row[12]).strip() if len(row) > 12 else "-" nl_codes_map[(g, h, i, k, l, m)] = row_obj["id"] for i in range(len(nv2_all)): row = nv2_all[i]["fields"] if len(row) > 1 and str(row[1]).strip() == "name": raw_i = str(row[8]).strip() if len(row) > 8 else "-" raw_j = str(row[9]).strip() if len(row) > 9 else "-" raw_k = str(row[10]).strip() if len(row) > 10 else "-" parts_i = [p.strip() for p in raw_i.split(",") if p.strip() and p.strip() != "-"] parts_j = [p.strip() for p in raw_j.split(",") if p.strip() and p.strip() != "-"] parts_k = [p.strip() for p in raw_k.split(",") if p.strip() and p.strip() != "-"] col_g = parts_i[0] if len(parts_i) > 0 else "-" col_k = parts_i[1] if len(parts_i) > 1 else "-" col_h = parts_j[0] if len(parts_j) > 0 else "-" col_l = parts_j[1] if len(parts_j) > 1 else "-" col_i = parts_k[0] if len(parts_k) > 0 else "-" col_m = parts_k[1] if len(parts_k) > 1 else "-" nl_id = nl_codes_map.get((col_g, col_h, col_i, col_k, col_l, col_m)) if nl_id: for offset in range(4): if (i + offset) < len(nv2_all): nv2_to_nl_map[nv2_all[i + offset]["id"]] = nl_id nv2_original = {} # {target_id: [block_fields]} nl_original = {} # {nl_target_id: nl_fields} for pair in content: id_src = pair.get("source_id") id_tgt = pair.get("target_id") if not id_src or not id_tgt: continue # NV2 block original source block_src = find_nv2_block_by_id(nv2_all, id_src) if block_src: # original fields nv2_original[id_tgt] = [list(nv2_map[sid]) for sid in block_src] # NL original source (THA) if has_nl: nl_src_id = nv2_to_nl_map.get(id_src) if nl_src_id: nl_original[nv2_to_nl_map.get(id_tgt)] = list(nl_map[nl_src_id]) nv2_records_to_update = [] nl_records_to_update = [] for pair in content: id_src = pair.get("source_id") id_tgt = pair.get("target_id") if not id_src or not id_tgt: continue block_tgt = find_nv2_block_by_id(nv2_all, id_tgt) # === Swap NV2 === original_block = nv2_original.get(id_tgt, []) for i in range(min(len(block_tgt), len(original_block))): id_v2_tgt = block_tgt[i] fields_src = original_block[i] nv2_records_to_update.append({ "id": id_v2_tgt, "fields": format_for_grist(fields_src) }) # === Swap NL (THA) === if has_nl: nl_tgt_id = nv2_to_nl_map.get(id_tgt) nl_src_original = nl_original.get(nl_tgt_id) if nl_tgt_id and nl_src_original: nl_records_to_update.append({ "id": nl_tgt_id, "fields": format_for_grist(nl_src_original) }) headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} if has_nl and nl_records_to_update: resp_v1 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nl}/tables/{nl_table}/records", headers=headers, json={"records": nl_records_to_update}) print(f"[{SERVICE_NAME}] Swap New-layout Result: {resp_v1.status_code}") if nv2_records_to_update: resp_v2 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nv2}/tables/{nv2_table}/records", headers=headers, json={"records": nv2_records_to_update}) print(f"[{SERVICE_NAME}] Swap New-layout-v2 Result: {resp_v2.status_code}") def find_nv2_block_by_id(nv2_data, name_row_id): """Find block 4 rows by name row id """ for i, nv2_obj in enumerate(nv2_data): if nv2_obj["id"] == name_row_id: return [nv2_data[i+j]["id"] for j in range(4) if (i+j) < len(nv2_data)] return [] def generate_search_targets(nl_row): """ New-layout-v2 [format] """ pairs = [ (nl_row[6] if len(nl_row) > 6 else "-", nl_row[10] if len(nl_row) > 10 else "-"), (nl_row[7] if len(nl_row) > 7 else "-", nl_row[11] if len(nl_row) > 11 else "-"), (nl_row[8] if len(nl_row) > 8 else "-", nl_row[12] if len(nl_row) > 12 else "-") ] targets = [] for p1, p2 in pairs: c1 = str(p1).strip() if str(p1).strip() and str(p1).strip() != "-" else "-" c2 = str(p2).strip() if str(p2).strip() and str(p2).strip() != "-" else "-" if c1 != "-" or c2 != "-": targets.append(f"{c1},{c2}") return targets def find_nv2_block_by_targets(nv2_data, search_targets): """ find block target in new-layout-v2 """ search_set = set(search_targets) for i, nv2_obj in enumerate(nv2_data): nv2_row = nv2_obj["fields"] if len(nv2_row) > 10: # I, J, K (index 8, 9, 10) v2_vals = [str(nv2_row[8]).strip(), str(nv2_row[9]).strip(), str(nv2_row[10]).strip()] if any(v in search_set for v in v2_vals): return [nv2_data[i+j]["id"] for j in range(4) if (i+j) < len(nv2_data)] return [] app = FastAPI() class CatalogPayloadValues(BaseModel): country: str class CatalogPayload(BaseModel): user_info: dict = {} srv_name: str values: CatalogPayloadValues class CatalogRequest(BaseModel): type: str payload: CatalogPayload class RoomRequest(BaseModel): country: str catalog: str user_id: str @app.on_event("startup") def startup_event(): thread = threading.Thread(target=redis_message_handler, daemon=True) thread.start() @app.post("/catalogs") def get_api_catalogs(req: CatalogRequest): payload = req.payload if payload.srv_name != SERVICE_NAME: raise HTTPException(status_code=400, detail="Invalid service name") country = payload.values.country.strip().lower() config = COUNTRY_MAPPING.get(country) if not config or not config.get("grist_doc_id"): raise HTTPException(status_code=404, detail="Country or Grist config not found") try: grist_docs = config["grist_doc_id"] doc_id = None if grist_docs.get("new-layout"): doc_id = grist_docs["new-layout"] elif grist_docs.get("new-layout-v2"): doc_id = grist_docs["new-layout-v2"] else: print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}") return None url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} resp = requests.get(url, headers=headers) catalogs = [] if resp.status_code == 200: tables = resp.json().get("tables", []) for i, t in enumerate(tables): t_id = t["id"] if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2": continue reconstructed_name = reconstruct_table_name(t_id) match = re.search(r'file=([^,]+)', reconstructed_name) if match: clean_catalog = match.group(1).strip() else: clean_catalog = t_id lock_info = lock_manager.get_lock_info(country, clean_catalog) catalogs.append({ "catalog": clean_catalog, "row_index": i, "status": "locked" if lock_info["is_locked"] else "free", "locked_by": lock_info["locked_by"] }) return {"status": "success", "country": country, "catalogs": catalogs} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.get("/catalog/data/{country}/{catalog}/{user_id}") def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks): country = country.strip().lower() if not lock_manager.is_authorized(country, catalog, user_id): raise HTTPException(status_code=403, detail="Unauthorized access to this room") background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id) # return {"status": "success", "country": country, "catalog": catalog, "data": final_result} return { "status": "processing", "country": country, "catalog": catalog, "message": "Data stream started. Please listen to frontend notifications." } @app.post("/api/room/enter") def enter_room(req: RoomRequest): country = req.country.strip().lower() success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id) if success: threading.Thread( target=process_and_stream_sheet_data, args=(country, req.catalog, req.user_id), daemon=True ).start() return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout} else: raise HTTPException(status_code=403, detail=message_or_user) @app.post("/api/room/heartbeat") def heartbeat(req: RoomRequest): country = req.country.strip().lower() if lock_manager.keep_alive(country, req.catalog, req.user_id): return {"status": "alive"} else: raise HTTPException(status_code=401, detail="Timeout permission denied") @app.post("/api/room/exit") def exit_room(req: RoomRequest): country = req.country.strip().lower() if lock_manager.release(country, req.catalog, req.user_id): return {"status": "success", "message": "Successfully exited"} else: return {"status": "success", "message": "You no longer have permission"} @app.get("/grist/pull/sheet/{country}") def grist_pull_sheet_api(country: str): try: sync_sheets_to_grist(country) except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @app.get("/grist/push/data/sheet/{country}") def grist_push_data_sheet_api(country: str): try: sync_grist_to_sheets(country) except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) """ Grist function endpoint [ Grist sync sheet to grist ] """ def get_column_letter(n): """0 -> A, 1 -> B, 25 -> Z, 26 -> AA""" result = "" n += 1 while n > 0: n, remainder = divmod(n - 1, 26) result = chr(65 + remainder) + result return result def sync_sheets_to_grist(country_key): if country_key not in COUNTRY_MAPPING: print(f"Error: Country '{country_key}' not found.") return config = COUNTRY_MAPPING[country_key] spreadsheet_id = config["spreadsheet_id"] sheets = config["sheets"] # dict แทน list grist_doc_ids = config.get("grist_doc_id", {}) gc = get_gspread_client() spreadsheet = gc.open_by_key(spreadsheet_id) for sheet_key, sheet_name in sheets.items(): if not sheet_name: continue doc_id = grist_doc_ids.get(sheet_key) if not doc_id: continue try: worksheet = spreadsheet.worksheet(sheet_name) all_values = worksheet.get_all_values() if not all_values: continue header = all_values[0] data_rows = all_values[1:] if sheet_key in ["new-layout-v2", "new-layout"]: process_new_layout_sheet(doc_id, header, data_rows) elif sheet_key == "name-desc-v2": upload_to_grist_self_hosted(doc_id, "name-desc-v2", header, data_rows) except Exception as e: print(f"[{SERVICE_NAME}] Error processing sheet {sheet_name}: {e}") def process_new_layout_sheet(doc_id, header, data_rows): table_groups = {} current_table_name = None for row in data_rows: col_a = row[0] if len(row) > 0 else "" if col_a.startswith("Name="): current_table_name = col_a table_groups[current_table_name] = [] elif current_table_name: table_groups[current_table_name].append(row) for table_name, rows in table_groups.items(): if rows: upload_to_grist_self_hosted(doc_id, table_name, header, rows) def upload_to_grist_self_hosted(doc_id, table_name, header, rows): clean_table_id = re.sub(r'[^a-zA-Z0-9_]', '_', table_name) clean_table_id = clean_table_id[0].upper() + clean_table_id[1:] chunk_size = 500 if clean_table_id[0].isdigit(): clean_table_id = f"t_{clean_table_id}" base_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = { "Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json" } column_defs = [] for i, h_text in enumerate(header): col_letter = get_column_letter(i) column_defs.append({ "id": col_letter, "fields": {"label": h_text.strip() or col_letter} }) try: check_resp = requests.get(f"{base_url}/{clean_table_id}/records", headers=headers) if check_resp.status_code == 200: existing = check_resp.json().get("records", []) if existing: row_ids = [r["id"] for r in existing] for i in range(0, len(row_ids), chunk_size): requests.post(f"{base_url}/{clean_table_id}/records/delete", headers=headers, json=row_ids[i:i+chunk_size]) else: print(f"[{SERVICE_NAME}] Creating table {clean_table_id} with indexed columns...") create_resp = requests.post(base_url, headers=headers, json={ "tables": [{"id": clean_table_id, "columns": column_defs}] }) if create_resp.status_code == 200: time.sleep(1) else: print(f"[{SERVICE_NAME}] Create failed: {create_resp.text}") return records_to_add = [] for row in rows: fields = {} for i, val in enumerate(row): if i < len(column_defs): col_id = column_defs[i]["id"] fields[col_id] = val if fields: records_to_add.append({"fields": fields}) if records_to_add: for i in range(0, len(records_to_add), chunk_size): chunk = records_to_add[i:i+chunk_size] res = requests.post(f"{base_url}/{clean_table_id}/records", headers=headers, json={"records": chunk}) if res.status_code != 200: print(f"Error adding data to {clean_table_id}: {res.text}") print(f"[{SERVICE_NAME}] Synced {len(records_to_add)} rows to {clean_table_id}") except Exception as e: print(f"[{SERVICE_NAME}] Error on {clean_table_id}: {e}") """ Grist [ Sync grist to google sheet ] """ def col_to_index(col_str): """Change 'A', 'B', 'AA' back to Index (0, 1, 26)""" idx = 0 for char in col_str: idx = idx * 26 + (ord(char) - ord('A') + 1) return idx - 1 def reconstruct_table_name(t_id): name = t_id if name.startswith("Name_"): name = name.replace("Name_", "Name=", 1) if "_file_" in name: name = name.replace("_file_", ",file=", 1) if name.endswith("_skt"): name = name[:-4] + ".skt" return name def get_all_grist_tables(doc_id): """ Pull all table name in Document """ url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} resp = requests.get(url, headers=headers) if resp.status_code == 200: tables = resp.json().get("tables", []) return [t["id"] for t in tables if not t["id"].startswith("Grist")] return [] def fetch_grist_table_data(doc_id, table_id): """ pull Record and change to {'A': 'val1', 'B': 'val2'} """ url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} resp = requests.get(url, headers=headers) if resp.status_code != 200: print(f"Failed to fetch table {table_id}: {resp.text}") return [] records = resp.json().get("records", []) if not records: return [] parsed_rows = [] for r in records: fields = r.get("fields", {}) record_id = r.get("id") max_idx = -1 for k in fields.keys(): if re.match(r'^[A-Z]+$', k): max_idx = max(max_idx, col_to_index(k)) row = [""] * (max_idx + 1) for k, v in fields.items(): if re.match(r'^[A-Z]+$', k): row[col_to_index(k)] = str(v) if v is not None else "" parsed_rows.append({ "id": record_id, "fields": row }) return parsed_rows def sync_grist_to_sheets(country_key): if country_key not in COUNTRY_MAPPING: print(f"Error: Country '{country_key}' not found.") return config = COUNTRY_MAPPING[country_key] spreadsheet_id = config["spreadsheet_id"] sheets = config["sheets"] grist_doc_ids = config.get("grist_doc_id", {}) gc = get_gspread_client() spreadsheet = gc.open_by_key(spreadsheet_id) for sheet_key, sheet_name in sheets.items(): if not sheet_name: continue doc_id = grist_doc_ids.get(sheet_key) if not doc_id: continue try: worksheet = spreadsheet.worksheet(sheet_name) if sheet_key == "name-desc-v2": rows = fetch_grist_table_data(doc_id, "Name_desc_v2") if rows: sheet_rows = [r["fields"] for r in rows] worksheet.batch_clear(["A2:ZZ"]) worksheet.update(values=sheet_rows, range_name="A2") print(f"[{SERVICE_NAME}] Updated {len(rows)} rows to sheet '{sheet_name}'.") elif sheet_key in ["new-layout", "new-layout-v2"]: print(f"Fetching multiple tables from doc: {doc_id}...") all_tables = get_all_grist_tables(doc_id) all_new_rows = [] for t_id in all_tables: if t_id.lower() == "name_desc_v2": continue rows = fetch_grist_table_data(doc_id, t_id) if rows: original_name = reconstruct_table_name(t_id) all_new_rows.append([original_name]) sheet_rows = [r["fields"] for r in rows] all_new_rows.extend(sheet_rows) if all_new_rows: worksheet.batch_clear(["A2:ZZ"]) worksheet.update(values=all_new_rows, range_name="A2") print(f"[{SERVICE_NAME}] Updated {len(all_new_rows)} lines to sheet '{sheet_name}'.") else: print(f"[{SERVICE_NAME}] No data found for sheet '{sheet_name}'.") except Exception as e: print(f"[{SERVICE_NAME}] Error syncing to sheet {sheet_name}: {e}")