taobin_sheet/main.py

1810 lines
68 KiB
Python

import time
import re
import os
import json
import redis
import gspread
import requests
import traceback
import threading
import uuid
import math
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from dotenv import load_dotenv
from google.oauth2 import service_account
REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT")
FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL")
LOG_SERVER_URL = os.getenv("LOG_SERVER_URL")
SERVICE_NAME = os.getenv("SERVICE_NAME")
ENTER_CHANNEL = f"{SERVICE_NAME}/enter"
HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat"
EXIT_CHANNEL = f"{SERVICE_NAME}/exit"
GET_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalogs"
GET_MENU_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalog/menu"
UPDATE_MENU_CHANNEL = f"{SERVICE_NAME}/update/menu"
DELETE_MENU_CHANNEL = f"{SERVICE_NAME}/delete/menu"
ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog"
ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu"
SWAP_MENU_CHANNEL = f"{SERVICE_NAME}/swap/menu"
# Grist set up ...
GRIST_URL = os.getenv("GRIST_URL")
GRIST_API_KEY = os.getenv("GRIST_API_KEY")
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
# ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}'
COUNTRY_MAPPING = {
"tha": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"),
"sheets": {
"new-layout": os.getenv("SHEET_NEW_LAYOUT_THA"),
"new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA")
},
"grist_doc_id": {
"new-layout": os.getenv("DOC_ID_NEW_LAYOUT_THA"),
"new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA")
}
},
"tha-no-layout": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"),
"sheets": {
"new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA")
},
"grist_doc_id": {
"new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA")
}
},
"hkg": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_HKG"),
"sheets": {
"new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_HKG"),
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_HKG")
},
"grist_doc_id": {
"new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_HKG"),
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_HKG")
}
},
"sgp": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_SGP"),
"sheets": {
"new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_SGP"),
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_SGP")
},
"grist_doc_id": {
"new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_SGP"),
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_SGP")
}
}
}
def get_gspread_client():
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive'
]
creds = None
service_account_email = os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL")
private_key = os.getenv("GOOGLE_PRIVATE_KEY").replace("\\n", "\n")
credentials_info = {
"type": "service_account",
"client_email": service_account_email,
"private_key": private_key,
"token_uri": "https://oauth2.googleapis.com/token",
}
creds = service_account.Credentials.from_service_account_info(
credentials_info,
scopes=SCOPES
)
return gspread.authorize(creds)
class LockManager:
def __init__(self):
# { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} }
self.locks: Dict[str, dict] = {}
self.timeout = 30
def get_room_key(self, country: str, catalog: str) -> str:
return f"{country}:{catalog}"
def acquire(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
# (1 person, 1 room)
for key, data in list(self.locks.items()):
if data and data["user_id"] == user_id and key != room_key:
if now - data["timestamp"] < self.timeout:
return False, "You are already in another room."
else:
self.locks[key] = None
# Check if the room you're requesting is already occupied by someone else.
current_lock = self.locks.get(room_key)
if current_lock and current_lock["user_id"] != user_id:
if now - current_lock["timestamp"] < self.timeout:
return False, f"Room locked by {current_lock['user_id']}"
# queue
self.locks[room_key] = {"user_id": user_id, "timestamp": now}
return True, user_id
def keep_alive(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Renew
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
self.locks[room_key]["timestamp"] = now
return True
return False
def release(self, country: str, catalog: str, user_id: str):
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Clear
if lock and lock["user_id"] == user_id:
self.locks[room_key] = None
return True
return False
def is_authorized(self, country: str, catalog: str, user_id: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
return True
return False
def get_lock_info(self, country: str, catalog: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and (now - lock["timestamp"] < self.timeout):
return {"is_locked": True, "locked_by": lock["user_id"]}
return {"is_locked": False, "locked_by": None}
lock_manager = LockManager()
# -- Redis handler --
def redis_message_handler():
pubsub = r.pubsub()
pubsub.subscribe(ENTER_CHANNEL,
HEARTBEAT_CHANNEL,
EXIT_CHANNEL,
GET_CATALOG_CHANNEL,
GET_MENU_CATALOG_CHANNEL,
UPDATE_MENU_CHANNEL,
DELETE_MENU_CHANNEL,
ADD_CATALOG_CHANNEL,
ADD_MENU_CHANNEL,
SWAP_MENU_CHANNEL)
print(f"[*] Redis Listener started on service: {SERVICE_NAME}")
for message in pubsub.listen():
if message['type'] != 'message':
continue
try:
data = json.loads(message['data'])
payload = data.get("payload", {})
values = payload.get("values", {})
user_info = payload.get("user_info", {})
country = values.get("country", "").strip().lower()
catalog = values.get("catalog", "")
catalog_name = values.get("catalog_name", "")
content = values.get("content", [])
srv_name = payload.get("srv_name", "")
user_id = user_info.get("uid")
if srv_name != SERVICE_NAME:
continue
channel = message['channel']
if not (user_id and country):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if channel == ENTER_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
success, msg = lock_manager.acquire(country, catalog, user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, catalog, user_id),
daemon=True
).start()
print(f"[{SERVICE_NAME}] Enter Room: {catalog} | User: {user_id} | Success: {success}")
elif channel == HEARTBEAT_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
alive = lock_manager.keep_alive(country, catalog, user_id)
if not alive:
print(f"[{SERVICE_NAME}] Heartbeat Failed: {catalog} | User: {user_id}")
elif channel == EXIT_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
lock_manager.release(country, catalog, user_id)
print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}")
elif channel == GET_CATALOG_CHANNEL:
try:
if not FRONTEND_NOTIFY_URL:
print(f"{SERVICE_NAME} Warning: FRONTEND_NOTIFY_URL is not set.")
continue
result = get_catalogs(country)
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"to": user_id,
"content": result
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(payload)
print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Get catalog error: {e}")
elif channel == GET_MENU_CATALOG_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
print(f"[{SERVICE_NAME}] get catalog menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, catalog, user_id),
daemon=True
).start()
elif channel == ADD_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
print(f"[{SERVICE_NAME}] Add menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
handle_add_menu(country, catalog, content)
print(f"[{SERVICE_NAME}] Add menu success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Add menu error: {e}")
elif channel == ADD_CATALOG_CHANNEL:
if not (catalog_name, catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
try:
handle_add_catalog(country, catalog_name, catalog)
except Exception as e:
print(f"[{SERVICE_NAME}] Add catalog error: {e}")
elif channel == UPDATE_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
# raise HTTPException(status_code=403, detail="Unauthorized access to this room")
print(f"[{SERVICE_NAME}] Updated catalog failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
update_sheets(country, catalog, content)
print(f"[{SERVICE_NAME}] Update success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Update error: {e}")
elif channel == DELETE_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
print(f"[{SERVICE_NAME}] Delete menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
handle_delete_menu(country, catalog, content)
except Exception as e:
print(f"[{SERVICE_NAME}] Delete menu error: {e}")
elif channel == SWAP_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
print(f"[{SERVICE_NAME}] Swap menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
handle_swap_menu(country, catalog, content)
print(f"[{SERVICE_NAME}] Swap data success: {catalog}")
except Exception as e:
print(f"[{SERVICE_NAME}] Swap data error: {e}")
traceback.print_exc()
except Exception as e:
print(f"[Redis Error] Error processing message: {e}")
traceback.print_exc()
def find_grist_table_id(doc_id, catalog_suffix):
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
try:
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for t in tables:
t_id = t.get("id", "")
reconstructed = reconstruct_table_name(t_id)
if f"file={catalog_suffix}" in reconstructed:
return t_id
label = t.get("fields", {}).get("label", "")
if f"file={catalog_suffix}" in label:
return t_id
return None
except Exception as e:
print(f"Error: {e}")
return None
def send_stream_notification(msg_type: str, content: any, batch_id: str, current_chunk: int, total_chunks: int, total_items: int, user_id: str):
"""
msg_type: "start", "chunk", "end", "error"
"""
if not FRONTEND_NOTIFY_URL:
print("Warning: FRONTEND_NOTIFY_URL is not set.")
return
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"msg": msg_type,
"batch_id": batch_id,
"current_chunk": current_chunk,
"total_chunks": total_chunks,
"total_items": total_items,
"to": user_id,
"content": content
}
}
try:
# requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}")
print(payload)
except Exception as e:
print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}")
def process_and_stream_sheet_data(country: str, catalog: str, user_id: str):
batch_id = str(uuid.uuid4())
send_stream_notification("start", {"message": f"Start fetching catalog: {catalog}"}, batch_id, 0, 0, 0, user_id)
try:
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", {})
has_nl = "new-layout" in grist_docs
doc_nv2 = grist_docs.get("new-layout-v2")
doc_nd = grist_docs.get("name-desc-v2")
full_table_nv2 = find_grist_table_id(doc_nv2, catalog)
nd_all = fetch_grist_table_data(doc_nd, "Name_desc_v2")
final_result = []
if not full_table_nv2:
raise Exception(f"Table for catalog {catalog} not found in New-Layout-V2")
nv2_data = fetch_grist_table_data(doc_nv2, full_table_nv2)
# NV2 : name row, desc row, img row, blank row (loop)
# 4 rows = 1 menu
i = 0
while i < len(nv2_data):
nv2_obj = nv2_data[i]
nv2_row = nv2_obj["fields"]
if len(nv2_row) <= 1 or nv2_row[1] != "name":
i += 1
continue
name_en = nv2_row[2].strip() if len(nv2_row) > 2 else ""
name_th = nv2_row[3].strip() if len(nv2_row) > 3 else ""
if not name_th and not name_en:
i += 4
continue
menu_item = {
"new_layout_v2": [],
"name_desc_v2": []
}
individual_codes = []
for sub_idx in range(4):
curr_i = i + sub_idx
if curr_i < len(nv2_data):
row_data = nv2_data[curr_i]["fields"]
row_info = {
"row_index": nv2_data[curr_i]["id"],
"cells": []
}
if sub_idx < 3:
for c_idx in range(len(row_data)):
if c_idx < len(row_data):
row_info["cells"].append({
"value": row_data[c_idx],
"coord": {"row": nv2_data[curr_i]["id"], "col": c_idx + 1}
})
menu_item["new_layout_v2"].append(row_info)
# all product codes (col I, J, K = index 8, 9, 10)
if sub_idx == 0 and len(row_data) > 10:
for code_idx in [8, 9, 10]:
raw_value = str(row_data[code_idx]).strip()
if not raw_value or raw_value == "-":
continue
codes = [c.strip() for c in raw_value.split(",") if c.strip() and c.strip() != "-"]
individual_codes.extend(codes)
# find in name-desc-v2
for code in set(individual_codes):
targets = [f"MENU.{code}.name", f"MENU.{code}.desc"]
for nd_obj in nd_all:
nd_row = nd_obj["fields"]
nd_id = nd_obj["id"]
if len(nd_row) > 0 and nd_row[0].strip() in targets:
nd_info = {"key": nd_row[0], "row_index": nd_id, "cells": []}
for c_idx in range(4, 8):
if c_idx < len(nd_row):
nd_info["cells"].append({
"value": nd_row[c_idx],
"coord": {"row": nd_id, "col": c_idx + 1}
})
menu_item["name_desc_v2"].append(nd_info)
final_result.append(menu_item)
i += 4
# --- Stream Chunk ---
CHUNK_SIZE = 10
total_items = len(final_result)
total_chunks = math.ceil(total_items / CHUNK_SIZE) if total_items > 0 else 1
for i in range(total_chunks):
start_idx = i * CHUNK_SIZE
end_idx = start_idx + CHUNK_SIZE
chunk_data = final_result[start_idx:end_idx]
send_stream_notification("chunk", chunk_data, batch_id, i + 1, total_chunks, total_items, user_id)
send_stream_notification("end", {"message": "All data sent successfully"}, batch_id, total_chunks, total_chunks, total_items, user_id)
except Exception as e:
traceback.print_exc()
send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, 0, user_id)
# get catalog
def get_catalogs(country: str):
config = COUNTRY_MAPPING.get(country)
if not config or not config.get("grist_doc_id"):
print(f"[{SERVICE_NAME}] Country or Grist doc not found")
raise HTTPException(status_code=404, detail="Grist config not found")
try:
grist_docs = config["grist_doc_id"]
doc_id = None
if grist_docs.get("new-layout"):
doc_id = grist_docs["new-layout"]
elif grist_docs.get("new-layout-v2"):
doc_id = grist_docs["new-layout-v2"]
else:
print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}")
return None
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
catalogs = []
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for i, t in enumerate(tables):
t_id = t["id"]
if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2":
continue
reconstructed_name = reconstruct_table_name(t_id)
match = re.search(r'file=([^,]+)', reconstructed_name)
if match:
clean_catalog = match.group(1).strip()
else:
clean_catalog = t_id
lock_info = lock_manager.get_lock_info(country, clean_catalog)
catalogs.append({
"catalog": clean_catalog,
"row_index": i,
"status": "locked" if lock_info["is_locked"] else "free",
"locked_by": lock_info["locked_by"]
})
return {"status": "success", "country": country, "catalogs": catalogs}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
"""
ADD_MENU_CHANNEL
"""
def get_val(arr, idx, default="-"):
if not arr or idx >= len(arr):
return default
val = arr[idx]
return val if val != "" else default
# List to Grist format {"fields": {"A": ..., "B": ...}}
def to_grist_record(row_list):
# chr(65+i) change to A, B, C...
return {"fields": {chr(65 + i): val for i, val in enumerate(row_list)}}
def handle_add_menu(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
if not config:
print(f"[{SERVICE_NAME}] Country {country} config not found")
return
grist_docs = config.get("grist_doc_id", {})
has_nl = "new-layout" in grist_docs
doc_nl = grist_docs.get("new-layout")
doc_nv2 = grist_docs.get("new-layout-v2")
doc_nd = grist_docs.get("name-desc-v2")
nl_table_id = find_grist_table_id(doc_nl, catalog) if has_nl else None
nv2_table_id = find_grist_table_id(doc_nv2, catalog)
nd_table_id = "Name_desc_v2"
if has_nl and not nl_table_id:
print(f"[{SERVICE_NAME}] Table for {catalog} not found in doc NL")
return
if not nv2_table_id:
print(f"[{SERVICE_NAME}] Table for {catalog} not found in doc NV2")
return
def norm_val(v): return v if v not in [None, ""] else "-"
nd_existing_data = fetch_grist_table_data(doc_nd, nd_table_id)
existing_keys = set()
if nd_existing_data:
for row_obj in nd_existing_data:
row = row_obj["fields"]
key = row[0]
if key:
existing_keys.add(key)
# for THA
existing_nl_codes = set()
if has_nl and nl_table_id:
nl_data = fetch_grist_table_data(doc_nl, nl_table_id)
if nl_data:
for row_obj in nl_data:
row = row_obj["fields"]
codes = tuple(row[i] if i < len(row) else "-" for i in [6, 7, 8, 10, 11, 12])
existing_nl_codes.add(codes)
nv2_data = fetch_grist_table_data(doc_nv2, nv2_table_id)
existing_nv2_codes = set()
if nv2_data:
for row_obj in nv2_data:
row = row_obj["fields"]
is_name = row[1] if len(row) > 1 else ""
if is_name == "name":
codes = (norm_val(row[8] if len(row) > 8 else "-"),
norm_val(row[9] if len(row) > 9 else "-"),
norm_val(row[10] if len(row) > 10 else "-"))
existing_nv2_codes.add(codes)
nl_records = []
nv2_records = []
nd_records = []
for item in content:
cells = item.get("cells", [])
if not cells:
continue
payload_lang = item.get("payload", {})
lang_name = payload_lang.get("lang_name", ["", "", "", ""])
lang_desc = payload_lang.get("lang_desc", ["", "", "", ""])
# ==========================================
# New-Layout (THA)
# ==========================================
if has_nl:
current_nl_tuple = tuple(norm_val(get_val(cells, i)) for i in [6,7,8,10,11,12])
if current_nl_tuple not in existing_nl_codes:
nl_records.append(to_grist_record(cells))
existing_nl_codes.add(current_nl_tuple)
# ==========================================
# New-Layout-V2 (All country)
# ==========================================
hot_code = f"{get_val(cells, 6)},{get_val(cells, 10)}"
cold_code = f"{get_val(cells, 7)},{get_val(cells, 11)}"
blend_code = f"{get_val(cells, 8)},{get_val(cells, 12)}"
current_nv2_tuple = (hot_code, cold_code, blend_code)
if current_nv2_tuple not in existing_nv2_codes:
name_row = ["", "name", get_val(cells, 3), get_val(cells, 2), get_val(lang_name, 0), get_val(lang_name, 1), get_val(lang_name, 2), get_val(lang_name, 3),
hot_code, cold_code, blend_code, "", "", "", "", "", "", "",
get_val(cells, 14), get_val(cells, 15), get_val(cells, 16), get_val(cells, 17), get_val(cells, 18)]
desc_row = ["", "desc", get_val(cells, 5), get_val(cells, 4), get_val(lang_desc, 0), get_val(lang_desc, 1), get_val(lang_desc, 2), get_val(lang_desc, 3),
"||||||||||||||||||||||||||", "||||||||||||||||||||||||||", "||||||||||||||||||||||||||", "", "", "", "", "", "", "",
"-", "-", "-", "-", "-"]
img_row = ["", "img", get_val(cells, 9), "-", "-", "-", "-", "-", get_val(cells, 13),
"||||||||||||||||||||||||||", "||||||||||||||||||||||||||", "", "", "", "", "", "", "",
"-", "-", "-", "-", "-"]
blank_row = [""] * 23
nv2_records.extend([to_grist_record(name_row), to_grist_record(desc_row), to_grist_record(img_row), to_grist_record(blank_row)])
existing_nv2_codes.add(current_nv2_tuple)
# ==========================================
# Name-desc-V2 (All country)
# ==========================================
product_code_indices = [6, 7, 8, 10, 11, 12]
for p_idx in product_code_indices:
code = get_val(cells, p_idx)
if code == "-" or not code:
continue
menu_name_key = f"MENU.{code}.name"
if menu_name_key in existing_keys:
continue
parts = code.split('-')
drink_type = "UNKNOWN"
drink_type_th = ""
if len(parts) >= 3:
type_id = parts[2]
if type_id == "01":
drink_type = "HOT"
drink_type_th = "ร้อน"
elif type_id == "02":
drink_type = "ICED"
drink_type_th = "เย็น"
elif type_id == "03":
drink_type = "SMOOTHIE"
drink_type_th = "ปั่น"
name_en = get_val(cells, 3, "")
name_th = get_val(cells, 2, "")
if drink_type == "SMOOTHIE":
prefix = f"{name_en} {drink_type}".strip()
else:
prefix = f"{drink_type} {name_en}".strip()
prefix_th = f"{name_th} {drink_type_th}".strip() if drink_type_th else name_th
nd_name_row = [menu_name_key, get_val(cells, 9), prefix, prefix_th, get_val(lang_name, 0), get_val(lang_name, 1), get_val(lang_name, 2), get_val(lang_name, 3)]
nd_desc_row = [f"MENU.{code}.desc", "-", get_val(cells, 5), get_val(cells, 4), get_val(lang_desc, 0), get_val(lang_desc, 1), get_val(lang_desc, 2), get_val(lang_desc, 3)]
nd_records.extend([to_grist_record(nd_name_row), to_grist_record(nd_desc_row)])
existing_keys.add(menu_name_key)
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
def add_records_to_grist(doc_id, table_id, records):
if not records or not doc_id or not table_id:
return
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records"
try:
resp = requests.post(url, headers=headers, json={"records": records})
if resp.status_code != 200:
print(f"Error adding to {table_id}: {resp.text}")
except Exception as e:
print(f"Request failed for {table_id}: {e}")
# NL (เฉพาะ THA)
if has_nl and nl_records:
add_records_to_grist(doc_nl, nl_table_id, nl_records)
# NV2
if nv2_table_id and nv2_records:
add_records_to_grist(doc_nv2, nv2_table_id, nv2_records)
# ND_V2
if nd_records:
add_records_to_grist(doc_nd, nd_table_id, nd_records)
"""
ADD_CATALOG_CHANNEL
"""
def handle_add_catalog(country: str, catalog_name: str, catalog: str):
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", {})
docs_to_create = []
if "new-layout" in grist_docs:
docs_to_create.append(grist_docs["new-layout"])
if "new-layout-v2" in grist_docs:
docs_to_create.append(grist_docs["new-layout-v2"])
table_label = f"Name={catalog_name},file={catalog}"
table_id = re.sub(r'[^a-zA-Z0-9_]', '_', table_label)
table_id = table_id[0].upper() + table_id[1:]
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
def create_table_in_doc(doc_id):
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
columns = [{"id": chr(65+i), "fields": {"label": chr(65+i)}} for i in range(15)]
payload = {
"tables": [{
"id": table_id,
"columns": columns
}]
}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Created table {table_id} in doc {doc_id}")
else:
print(f"[{SERVICE_NAME}] Failed to create table: {resp.text}")
for doc_id in docs_to_create:
create_table_in_doc(doc_id)
"""
UPDATE_MENU_CHANNEL
"""
def update_sheets(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", {})
has_nl = "new-layout" in grist_docs
doc_map = {}
if "new-layout" in grist_docs:
doc_map["new_layout"] = grist_docs["new-layout"]
if "new-layout-v2" in grist_docs:
doc_map["new_layout_v2"] = grist_docs["new-layout-v2"]
if "name-desc-v2" in grist_docs:
doc_map["name_desc_v2"] = grist_docs["name-desc-v2"]
full_table_name = None
for key in ["new_layout", "new_layout_v2"]:
if key in doc_map:
full_table_name = find_grist_table_id(doc_map[key], catalog)
if full_table_name:
break
if not full_table_name:
raise Exception(f"Table for catalog {catalog} not found")
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
nv2_updates_for_nl = []
for item in content:
for sheet_key in doc_map.keys():
rows = item.get(sheet_key, [])
if sheet_key == "new_layout":
rows = [rows] if rows else []
if not rows:
continue
doc_id = doc_map[sheet_key]
target_table = "Name_desc_v2" if sheet_key == "name_desc_v2" else full_table_name
records_to_update = []
for row in rows:
cells = row.get("cells", [])
if not cells:
continue
row_i = cells[0]["coord"]["row"]
fields = {}
for cell in cells:
col_letter = col_to_letter(cell["coord"]["col"])
fields[col_letter] = str(cell["value"])
records_to_update.append({
"id": row_i,
"fields": fields
})
# keep nv2 data to sync in new-layout [If is tha]
if has_nl and sheet_key == "new_layout_v2":
nv2_updates_for_nl.append({
"row_id": row_i,
"fields": fields,
"cells": cells
})
if records_to_update:
grouped_updates = {}
for record in records_to_update:
field_keys = tuple(sorted(record["fields"].keys()))
if field_keys not in grouped_updates:
grouped_updates[field_keys] = []
grouped_updates[field_keys].append(record)
update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{target_table}/records"
for field_keys, batch in grouped_updates.items():
resp = requests.patch(update_url, headers=headers, json={"records": batch})
if resp.status_code != 200:
print(f"[{SERVICE_NAME}] Grist update failed for {target_table} | Doc_id : {doc_id} | Status : {resp.text}")
# === SYNC: new-layout-v2 → new-layout (THA) ===
if has_nl and nv2_updates_for_nl:
sync_nv2_to_nl(country, catalog, nv2_updates_for_nl, headers)
def sync_nv2_to_nl(country: str, catalog: str, nv2_updates: list, headers: dict):
"""Sync updates จาก new-layout-v2 ไปอัปเดต new-layout สำหรับ THA"""
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", {})
doc_nl = grist_docs["new-layout"]
doc_nv2 = grist_docs["new-layout-v2"]
nl_table_id = find_grist_table_id(doc_nl, catalog)
nv2_table_id = find_grist_table_id(doc_nv2, catalog)
if not nl_table_id or not nv2_table_id:
print(f"[{SERVICE_NAME}] Tables not found for sync")
return
nv2_all_data = fetch_grist_table_data(doc_nv2, nv2_table_id)
nl_all_data = fetch_grist_table_data(doc_nl, nl_table_id)
# === NL codes map ===
# NL tuple: (G, H, I, K, L, M) = (index 6, 7, 8, 10, 11, 12)
nl_codes_map = {}
for row_obj in nl_all_data:
row = row_obj["fields"]
g = str(row[6]).strip() if len(row) > 6 else "-"
h = str(row[7]).strip() if len(row) > 7 else "-"
i = str(row[8]).strip() if len(row) > 8 else "-"
k = str(row[10]).strip() if len(row) > 10 else "-"
l = str(row[11]).strip() if len(row) > 11 else "-"
m = str(row[12]).strip() if len(row) > 12 else "-"
nl_codes_map[(g, h, i, k, l, m)] = row_obj["id"]
# === NV2 → NL mapping (any row in block to NL in one row) ===
nv2_to_nl_map = {}
for i in range(len(nv2_all_data)):
row = nv2_all_data[i]["fields"]
if len(row) > 1 and str(row[1]).strip() == "name":
# productCodes NV2 name row: I(8), J(9), K(10)
raw_i = str(row[8]).strip() if len(row) > 8 else "-"
raw_j = str(row[9]).strip() if len(row) > 9 else "-"
raw_k = str(row[10]).strip() if len(row) > 10 else "-"
# Split comma-separated
parts_i = [p.strip() for p in raw_i.split(",") if p.strip() and p.strip() != "-"]
parts_j = [p.strip() for p in raw_j.split(",") if p.strip() and p.strip() != "-"]
parts_k = [p.strip() for p in raw_k.split(",") if p.strip() and p.strip() != "-"]
col_g = parts_i[0] if len(parts_i) > 0 else "-"
col_k = parts_i[1] if len(parts_i) > 1 else "-"
col_h = parts_j[0] if len(parts_j) > 0 else "-"
col_l = parts_j[1] if len(parts_j) > 1 else "-"
col_i = parts_k[0] if len(parts_k) > 0 else "-"
col_m = parts_k[1] if len(parts_k) > 1 else "-"
# find NL row to matching codes
nl_id = nl_codes_map.get((col_g, col_h, col_i, col_k, col_l, col_m))
if nl_id:
for offset in range(4):
if (i + offset) < len(nv2_all_data):
nv2_row_obj = nv2_all_data[i + offset]
nv2_id = nv2_row_obj["id"]
nv2_row = nv2_row_obj["fields"]
row_type = str(nv2_row[1]).strip() if len(nv2_row) > 1 else ""
nv2_to_nl_map[nv2_id] = {
"nl_id": nl_id,
"row_type": row_type
}
nl_records_to_update = []
for update in nv2_updates:
nv2_row_id = update["row_id"]
update_fields = update["fields"] # {col_letter: value}
mapping = nv2_to_nl_map.get(nv2_row_id)
if not mapping:
print(f"[{SERVICE_NAME}] NL row not found for NV2 row {nv2_row_id}")
continue
nl_target_id = mapping["nl_id"]
row_type = mapping["row_type"]
nl_fields = {}
if row_type == "name":
# C→D (name_en), D→C (name_th)
if 'C' in update_fields:
nl_fields['D'] = update_fields['C']
if 'D' in update_fields:
nl_fields['C'] = update_fields['D']
# I→G,K
if 'I' in update_fields:
val = update_fields['I']
parts = [p.strip() for p in str(val).split(",") if p.strip()]
if len(parts) > 0:
nl_fields['G'] = parts[0]
if len(parts) > 1:
nl_fields['K'] = parts[1]
# J→H,L
if 'J' in update_fields:
val = update_fields['J']
parts = [p.strip() for p in str(val).split(",") if p.strip()]
if len(parts) > 0:
nl_fields['H'] = parts[0]
if len(parts) > 1:
nl_fields['L'] = parts[1]
# K→I,M
if 'K' in update_fields:
val = update_fields['K']
parts = [p.strip() for p in str(val).split(",") if p.strip()]
if len(parts) > 0:
nl_fields['I'] = parts[0]
if len(parts) > 1:
nl_fields['M'] = parts[1]
# S→O, T→P, U→R, V→R, W→S
if 'S' in update_fields:
nl_fields['O'] = update_fields['S']
if 'T' in update_fields:
nl_fields['P'] = update_fields['T']
if 'U' in update_fields:
nl_fields['Q'] = update_fields['U']
if 'V' in update_fields:
nl_fields['R'] = update_fields['V']
if 'W' in update_fields:
nl_fields['S'] = update_fields['W']
elif row_type == "desc":
# C→F (desc_en), D→E (desc_th)
if 'C' in update_fields:
nl_fields['F'] = update_fields['C']
if 'D' in update_fields:
nl_fields['E'] = update_fields['D']
elif row_type == "img":
# C→J (img)
if 'C' in update_fields:
nl_fields['J'] = update_fields['C']
# I→N
if 'I' in update_fields:
nl_fields['N'] = update_fields['I']
if nl_fields:
nl_records_to_update.append({
"id": nl_target_id,
"fields": nl_fields
})
# === Update to NL ===
if nl_records_to_update:
update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl}/tables/{nl_table_id}/records"
# Group by field keys (Grist requirement)
grouped_updates = {}
for record in nl_records_to_update:
field_keys = tuple(sorted(record["fields"].keys()))
if field_keys not in grouped_updates:
grouped_updates[field_keys] = []
grouped_updates[field_keys].append(record)
for field_keys, batch in grouped_updates.items():
resp = requests.patch(update_url, headers=headers, json={"records": batch})
if resp.status_code != 200:
print(f"[{SERVICE_NAME}] NL sync update failed: {resp.text}")
else:
print(f"[{SERVICE_NAME}] Synced {len(batch)} NL records")
# column to A1 spreadsheet format
def col_to_letter(col: int) -> str:
result = ""
while col > 0:
col, rem = divmod(col - 1, 26)
result = chr(65 + rem) + result
return result
"""
DELETE_MENU_CHANNEL
"""
def handle_delete_menu(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
if not config:
print(f"[{SERVICE_NAME}] Country {country} config not found")
return
grist_docs = config.get("grist_doc_id", {})
has_nl = "new-layout" in grist_docs
doc_nl = grist_docs.get("new-layout")
doc_nv2 = grist_docs.get("new-layout-v2")
nl_table_id = find_grist_table_id(doc_nl, catalog) if has_nl else None
nv2_table_id = find_grist_table_id(doc_nv2, catalog)
if not nv2_table_id:
print(f"[{SERVICE_NAME}] NV2 Table not found for catalog: {catalog}")
return
target_ids = {int(item.get("target_id")) for item in content if item.get("target_id")}
if not target_ids:
print(f"[{SERVICE_NAME}] Payload to delete is empty.")
return
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
try:
nv2_all_data = fetch_grist_table_data(doc_nv2, nv2_table_id)
# ProductCodes NV2 name rows that match with target_ids
# (col_G, col_H, col_I, col_K, col_L, col_M)
search_nl_codes_sets = []
nv2_ids_to_delete = []
for i in range(len(nv2_all_data)):
row_obj = nv2_all_data[i]
row = row_obj["fields"]
if len(row) > 1 and row[1] == "name" and row_obj["id"] in target_ids:
# === ProductCodes NV2 col I, J, K (index 8, 9, 10) ===
# comma-separated
# col I (index 8) → col G (index 6) and col K (index 10) of NL
raw_i = str(row[8]).strip() if len(row) > 8 else "-"
parts_i = [p.strip() for p in raw_i.split(",") if p.strip() and p.strip() != "-"]
col_g = parts_i[0] if len(parts_i) > 0 else "-"
col_k = parts_i[1] if len(parts_i) > 1 else "-"
# col J (index 9) → col H (index 7) and col L (index 11) of NL
raw_j = str(row[9]).strip() if len(row) > 9 else "-"
parts_j = [p.strip() for p in raw_j.split(",") if p.strip() and p.strip() != "-"]
col_h = parts_j[0] if len(parts_j) > 0 else "-"
col_l = parts_j[1] if len(parts_j) > 1 else "-"
# col K (index 10) → col I (index 8) and col M (index 12) of NL
raw_k = str(row[10]).strip() if len(row) > 10 else "-"
parts_k = [p.strip() for p in raw_k.split(",") if p.strip() and p.strip() != "-"]
col_i = parts_k[0] if len(parts_k) > 0 else "-"
col_m = parts_k[1] if len(parts_k) > 1 else "-"
# To find in new-layout
search_nl_codes_sets.append((col_g, col_h, col_i, col_k, col_l, col_m))
# NV2 block IDs (4 row: name, desc, img, blank) [Delete]
for offset in range(4):
if (i + offset) < len(nv2_all_data):
record_to_del = nv2_all_data[i + offset]["id"]
if record_to_del not in nv2_ids_to_delete:
nv2_ids_to_delete.append(record_to_del)
# === Delete in new-layout (THA) ===
if has_nl and search_nl_codes_sets:
nl_all_data = fetch_grist_table_data(doc_nl, nl_table_id)
nl_ids_to_delete = []
for row_obj in nl_all_data:
row = row_obj["fields"]
# NL: col G(6), H(7), I(8), K(10), L(11), M(12)
nl_g = str(row[6]).strip() if len(row) > 6 else "-"
nl_h = str(row[7]).strip() if len(row) > 7 else "-"
nl_i = str(row[8]).strip() if len(row) > 8 else "-"
nl_k = str(row[10]).strip() if len(row) > 10 else "-"
nl_l = str(row[11]).strip() if len(row) > 11 else "-"
nl_m = str(row[12]).strip() if len(row) > 12 else "-"
nl_tuple = (nl_g, nl_h, nl_i, nl_k, nl_l, nl_m)
if nl_tuple in search_nl_codes_sets:
nl_ids_to_delete.append(row_obj["id"])
if nl_ids_to_delete:
del_nl_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl}/tables/{nl_table_id}/records/delete"
resp_nl = requests.post(del_nl_url, headers=headers, json=nl_ids_to_delete)
print(f"[{SERVICE_NAME}] Deleted NL IDs: {nl_ids_to_delete} | Status: {resp_nl.status_code}")
# === Delete in new-layout-v2 (All country) ===
if nv2_ids_to_delete:
del_nv2_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nv2}/tables/{nv2_table_id}/records/delete"
resp_v2 = requests.post(del_nv2_url, headers=headers, json=nv2_ids_to_delete)
print(f"[{SERVICE_NAME}] Deleted NV2 IDs: {nv2_ids_to_delete} | Status: {resp_v2.status_code}")
except Exception as e:
print(f"[{SERVICE_NAME}] Delete menu error: {e}")
traceback.print_exc()
"""
SWAP_MENU_CHANNEL
"""
def format_for_grist(data):
# List to Dict
if isinstance(data, list):
return {col_to_letter(i + 1): val for i, val in enumerate(data)}
return data
def handle_swap_menu(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", {})
has_nl = "new-layout" in grist_docs
doc_nl = grist_docs.get("new-layout")
doc_nv2 = grist_docs.get("new-layout-v2")
nl_table = find_grist_table_id(doc_nl, catalog) if has_nl else None
nv2_table = find_grist_table_id(doc_nv2, catalog)
if not nv2_table:
print(f"[{SERVICE_NAME}] Swap failed: NV2 Table not found")
return
nl_all = fetch_grist_table_data(doc_nl, nl_table) if has_nl and nl_table else []
nv2_all = fetch_grist_table_data(doc_nv2, nv2_table)
nl_map = {row["id"]: row["fields"] for row in nl_all} if has_nl else {}
nv2_map = {row["id"]: row["fields"] for row in nv2_all}
nv2_to_nl_map = {}
if has_nl:
nl_codes_map = {}
for row_obj in nl_all:
row = row_obj["fields"]
g = str(row[6]).strip() if len(row) > 6 else "-"
h = str(row[7]).strip() if len(row) > 7 else "-"
i = str(row[8]).strip() if len(row) > 8 else "-"
k = str(row[10]).strip() if len(row) > 10 else "-"
l = str(row[11]).strip() if len(row) > 11 else "-"
m = str(row[12]).strip() if len(row) > 12 else "-"
nl_codes_map[(g, h, i, k, l, m)] = row_obj["id"]
for i in range(len(nv2_all)):
row = nv2_all[i]["fields"]
if len(row) > 1 and str(row[1]).strip() == "name":
raw_i = str(row[8]).strip() if len(row) > 8 else "-"
raw_j = str(row[9]).strip() if len(row) > 9 else "-"
raw_k = str(row[10]).strip() if len(row) > 10 else "-"
parts_i = [p.strip() for p in raw_i.split(",") if p.strip() and p.strip() != "-"]
parts_j = [p.strip() for p in raw_j.split(",") if p.strip() and p.strip() != "-"]
parts_k = [p.strip() for p in raw_k.split(",") if p.strip() and p.strip() != "-"]
col_g = parts_i[0] if len(parts_i) > 0 else "-"
col_k = parts_i[1] if len(parts_i) > 1 else "-"
col_h = parts_j[0] if len(parts_j) > 0 else "-"
col_l = parts_j[1] if len(parts_j) > 1 else "-"
col_i = parts_k[0] if len(parts_k) > 0 else "-"
col_m = parts_k[1] if len(parts_k) > 1 else "-"
nl_id = nl_codes_map.get((col_g, col_h, col_i, col_k, col_l, col_m))
if nl_id:
for offset in range(4):
if (i + offset) < len(nv2_all):
nv2_to_nl_map[nv2_all[i + offset]["id"]] = nl_id
nv2_original = {} # {target_id: [block_fields]}
nl_original = {} # {nl_target_id: nl_fields}
for pair in content:
id_src = pair.get("source_id")
id_tgt = pair.get("target_id")
if not id_src or not id_tgt: continue
# NV2 block original source
block_src = find_nv2_block_by_id(nv2_all, id_src)
if block_src:
# original fields
nv2_original[id_tgt] = [list(nv2_map[sid]) for sid in block_src]
# NL original source (THA)
if has_nl:
nl_src_id = nv2_to_nl_map.get(id_src)
if nl_src_id:
nl_original[nv2_to_nl_map.get(id_tgt)] = list(nl_map[nl_src_id])
nv2_records_to_update = []
nl_records_to_update = []
for pair in content:
id_src = pair.get("source_id")
id_tgt = pair.get("target_id")
if not id_src or not id_tgt: continue
block_tgt = find_nv2_block_by_id(nv2_all, id_tgt)
# === Swap NV2 ===
original_block = nv2_original.get(id_tgt, [])
for i in range(min(len(block_tgt), len(original_block))):
id_v2_tgt = block_tgt[i]
fields_src = original_block[i]
nv2_records_to_update.append({
"id": id_v2_tgt,
"fields": format_for_grist(fields_src)
})
# === Swap NL (THA) ===
if has_nl:
nl_tgt_id = nv2_to_nl_map.get(id_tgt)
nl_src_original = nl_original.get(nl_tgt_id)
if nl_tgt_id and nl_src_original:
nl_records_to_update.append({
"id": nl_tgt_id,
"fields": format_for_grist(nl_src_original)
})
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
if has_nl and nl_records_to_update:
resp_v1 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nl}/tables/{nl_table}/records", headers=headers, json={"records": nl_records_to_update})
print(f"[{SERVICE_NAME}] Swap New-layout Result: {resp_v1.status_code}")
if nv2_records_to_update:
resp_v2 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nv2}/tables/{nv2_table}/records", headers=headers, json={"records": nv2_records_to_update})
print(f"[{SERVICE_NAME}] Swap New-layout-v2 Result: {resp_v2.status_code}")
def find_nv2_block_by_id(nv2_data, name_row_id):
"""Find block 4 rows by name row id """
for i, nv2_obj in enumerate(nv2_data):
if nv2_obj["id"] == name_row_id:
return [nv2_data[i+j]["id"] for j in range(4) if (i+j) < len(nv2_data)]
return []
def generate_search_targets(nl_row):
""" New-layout-v2 [format] """
pairs = [
(nl_row[6] if len(nl_row) > 6 else "-", nl_row[10] if len(nl_row) > 10 else "-"),
(nl_row[7] if len(nl_row) > 7 else "-", nl_row[11] if len(nl_row) > 11 else "-"),
(nl_row[8] if len(nl_row) > 8 else "-", nl_row[12] if len(nl_row) > 12 else "-")
]
targets = []
for p1, p2 in pairs:
c1 = str(p1).strip() if str(p1).strip() and str(p1).strip() != "-" else "-"
c2 = str(p2).strip() if str(p2).strip() and str(p2).strip() != "-" else "-"
if c1 != "-" or c2 != "-":
targets.append(f"{c1},{c2}")
return targets
def find_nv2_block_by_targets(nv2_data, search_targets):
""" find block target in new-layout-v2 """
search_set = set(search_targets)
for i, nv2_obj in enumerate(nv2_data):
nv2_row = nv2_obj["fields"]
if len(nv2_row) > 10:
# I, J, K (index 8, 9, 10)
v2_vals = [str(nv2_row[8]).strip(), str(nv2_row[9]).strip(), str(nv2_row[10]).strip()]
if any(v in search_set for v in v2_vals):
return [nv2_data[i+j]["id"] for j in range(4) if (i+j) < len(nv2_data)]
return []
app = FastAPI()
class CatalogPayloadValues(BaseModel):
country: str
class CatalogPayload(BaseModel):
user_info: dict = {}
srv_name: str
values: CatalogPayloadValues
class CatalogRequest(BaseModel):
type: str
payload: CatalogPayload
class RoomRequest(BaseModel):
country: str
catalog: str
user_id: str
@app.on_event("startup")
def startup_event():
thread = threading.Thread(target=redis_message_handler, daemon=True)
thread.start()
@app.post("/catalogs")
def get_api_catalogs(req: CatalogRequest):
payload = req.payload
if payload.srv_name != SERVICE_NAME:
raise HTTPException(status_code=400, detail="Invalid service name")
country = payload.values.country.strip().lower()
config = COUNTRY_MAPPING.get(country)
if not config or not config.get("grist_doc_id"):
raise HTTPException(status_code=404, detail="Country or Grist config not found")
try:
grist_docs = config["grist_doc_id"]
doc_id = None
if grist_docs.get("new-layout"):
doc_id = grist_docs["new-layout"]
elif grist_docs.get("new-layout-v2"):
doc_id = grist_docs["new-layout-v2"]
else:
print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}")
return None
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
catalogs = []
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for i, t in enumerate(tables):
t_id = t["id"]
if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2":
continue
reconstructed_name = reconstruct_table_name(t_id)
match = re.search(r'file=([^,]+)', reconstructed_name)
if match:
clean_catalog = match.group(1).strip()
else:
clean_catalog = t_id
lock_info = lock_manager.get_lock_info(country, clean_catalog)
catalogs.append({
"catalog": clean_catalog,
"row_index": i,
"status": "locked" if lock_info["is_locked"] else "free",
"locked_by": lock_info["locked_by"]
})
return {"status": "success", "country": country, "catalogs": catalogs}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/catalog/data/{country}/{catalog}/{user_id}")
def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks):
country = country.strip().lower()
if not lock_manager.is_authorized(country, catalog, user_id):
raise HTTPException(status_code=403, detail="Unauthorized access to this room")
background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id)
# return {"status": "success", "country": country, "catalog": catalog, "data": final_result}
return {
"status": "processing",
"country": country,
"catalog": catalog,
"message": "Data stream started. Please listen to frontend notifications."
}
@app.post("/api/room/enter")
def enter_room(req: RoomRequest):
country = req.country.strip().lower()
success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, req.catalog, req.user_id),
daemon=True
).start()
return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout}
else:
raise HTTPException(status_code=403, detail=message_or_user)
@app.post("/api/room/heartbeat")
def heartbeat(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.keep_alive(country, req.catalog, req.user_id):
return {"status": "alive"}
else:
raise HTTPException(status_code=401, detail="Timeout permission denied")
@app.post("/api/room/exit")
def exit_room(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.release(country, req.catalog, req.user_id):
return {"status": "success", "message": "Successfully exited"}
else:
return {"status": "success", "message": "You no longer have permission"}
@app.get("/grist/pull/sheet/{country}")
def grist_pull_sheet_api(country: str):
try:
sync_sheets_to_grist(country)
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/grist/push/data/sheet/{country}")
def grist_push_data_sheet_api(country: str):
try:
sync_grist_to_sheets(country)
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
"""
Grist function endpoint
[ Grist sync sheet to grist ]
"""
def get_column_letter(n):
"""0 -> A, 1 -> B, 25 -> Z, 26 -> AA"""
result = ""
n += 1
while n > 0:
n, remainder = divmod(n - 1, 26)
result = chr(65 + remainder) + result
return result
def sync_sheets_to_grist(country_key):
if country_key not in COUNTRY_MAPPING:
print(f"Error: Country '{country_key}' not found.")
return
config = COUNTRY_MAPPING[country_key]
spreadsheet_id = config["spreadsheet_id"]
sheets = config["sheets"] # dict แทน list
grist_doc_ids = config.get("grist_doc_id", {})
gc = get_gspread_client()
spreadsheet = gc.open_by_key(spreadsheet_id)
for sheet_key, sheet_name in sheets.items():
if not sheet_name:
continue
doc_id = grist_doc_ids.get(sheet_key)
if not doc_id:
continue
try:
worksheet = spreadsheet.worksheet(sheet_name)
all_values = worksheet.get_all_values()
if not all_values:
continue
header = all_values[0]
data_rows = all_values[1:]
if sheet_key in ["new-layout-v2", "new-layout"]:
process_new_layout_sheet(doc_id, header, data_rows)
elif sheet_key == "name-desc-v2":
upload_to_grist_self_hosted(doc_id, "name-desc-v2", header, data_rows)
except Exception as e:
print(f"[{SERVICE_NAME}] Error processing sheet {sheet_name}: {e}")
def process_new_layout_sheet(doc_id, header, data_rows):
table_groups = {}
current_table_name = None
for row in data_rows:
col_a = row[0] if len(row) > 0 else ""
if col_a.startswith("Name="):
current_table_name = col_a
table_groups[current_table_name] = []
elif current_table_name:
table_groups[current_table_name].append(row)
for table_name, rows in table_groups.items():
if rows:
upload_to_grist_self_hosted(doc_id, table_name, header, rows)
def upload_to_grist_self_hosted(doc_id, table_name, header, rows):
clean_table_id = re.sub(r'[^a-zA-Z0-9_]', '_', table_name)
clean_table_id = clean_table_id[0].upper() + clean_table_id[1:]
chunk_size = 500
if clean_table_id[0].isdigit():
clean_table_id = f"t_{clean_table_id}"
base_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {
"Authorization": f"Bearer {GRIST_API_KEY}",
"Content-Type": "application/json"
}
column_defs = []
for i, h_text in enumerate(header):
col_letter = get_column_letter(i)
column_defs.append({
"id": col_letter,
"fields": {"label": h_text.strip() or col_letter}
})
try:
check_resp = requests.get(f"{base_url}/{clean_table_id}/records", headers=headers)
if check_resp.status_code == 200:
existing = check_resp.json().get("records", [])
if existing:
row_ids = [r["id"] for r in existing]
for i in range(0, len(row_ids), chunk_size):
requests.post(f"{base_url}/{clean_table_id}/records/delete",
headers=headers, json=row_ids[i:i+chunk_size])
else:
print(f"[{SERVICE_NAME}] Creating table {clean_table_id} with indexed columns...")
create_resp = requests.post(base_url, headers=headers, json={
"tables": [{"id": clean_table_id, "columns": column_defs}]
})
if create_resp.status_code == 200:
time.sleep(1)
else:
print(f"[{SERVICE_NAME}] Create failed: {create_resp.text}")
return
records_to_add = []
for row in rows:
fields = {}
for i, val in enumerate(row):
if i < len(column_defs):
col_id = column_defs[i]["id"]
fields[col_id] = val
if fields:
records_to_add.append({"fields": fields})
if records_to_add:
for i in range(0, len(records_to_add), chunk_size):
chunk = records_to_add[i:i+chunk_size]
res = requests.post(f"{base_url}/{clean_table_id}/records",
headers=headers, json={"records": chunk})
if res.status_code != 200:
print(f"Error adding data to {clean_table_id}: {res.text}")
print(f"[{SERVICE_NAME}] Synced {len(records_to_add)} rows to {clean_table_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Error on {clean_table_id}: {e}")
"""
Grist [ Sync grist to google sheet ]
"""
def col_to_index(col_str):
"""Change 'A', 'B', 'AA' back to Index (0, 1, 26)"""
idx = 0
for char in col_str:
idx = idx * 26 + (ord(char) - ord('A') + 1)
return idx - 1
def reconstruct_table_name(t_id):
name = t_id
if name.startswith("Name_"):
name = name.replace("Name_", "Name=", 1)
if "_file_" in name:
name = name.replace("_file_", ",file=", 1)
if name.endswith("_skt"):
name = name[:-4] + ".skt"
return name
def get_all_grist_tables(doc_id):
""" Pull all table name in Document """
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
tables = resp.json().get("tables", [])
return [t["id"] for t in tables if not t["id"].startswith("Grist")]
return []
def fetch_grist_table_data(doc_id, table_id):
""" pull Record and change to {'A': 'val1', 'B': 'val2'} """
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
if resp.status_code != 200:
print(f"Failed to fetch table {table_id}: {resp.text}")
return []
records = resp.json().get("records", [])
if not records:
return []
parsed_rows = []
for r in records:
fields = r.get("fields", {})
record_id = r.get("id")
max_idx = -1
for k in fields.keys():
if re.match(r'^[A-Z]+$', k):
max_idx = max(max_idx, col_to_index(k))
row = [""] * (max_idx + 1)
for k, v in fields.items():
if re.match(r'^[A-Z]+$', k):
row[col_to_index(k)] = str(v) if v is not None else ""
parsed_rows.append({
"id": record_id,
"fields": row
})
return parsed_rows
def sync_grist_to_sheets(country_key):
if country_key not in COUNTRY_MAPPING:
print(f"Error: Country '{country_key}' not found.")
return
config = COUNTRY_MAPPING[country_key]
spreadsheet_id = config["spreadsheet_id"]
sheets = config["sheets"]
grist_doc_ids = config.get("grist_doc_id", {})
gc = get_gspread_client()
spreadsheet = gc.open_by_key(spreadsheet_id)
for sheet_key, sheet_name in sheets.items():
if not sheet_name:
continue
doc_id = grist_doc_ids.get(sheet_key)
if not doc_id:
continue
try:
worksheet = spreadsheet.worksheet(sheet_name)
if sheet_key == "name-desc-v2":
rows = fetch_grist_table_data(doc_id, "Name_desc_v2")
if rows:
sheet_rows = [r["fields"] for r in rows]
worksheet.batch_clear(["A2:ZZ"])
worksheet.update(values=sheet_rows, range_name="A2")
print(f"[{SERVICE_NAME}] Updated {len(rows)} rows to sheet '{sheet_name}'.")
elif sheet_key in ["new-layout", "new-layout-v2"]:
print(f"Fetching multiple tables from doc: {doc_id}...")
all_tables = get_all_grist_tables(doc_id)
all_new_rows = []
for t_id in all_tables:
if t_id.lower() == "name_desc_v2":
continue
rows = fetch_grist_table_data(doc_id, t_id)
if rows:
original_name = reconstruct_table_name(t_id)
all_new_rows.append([original_name])
sheet_rows = [r["fields"] for r in rows]
all_new_rows.extend(sheet_rows)
if all_new_rows:
worksheet.batch_clear(["A2:ZZ"])
worksheet.update(values=all_new_rows, range_name="A2")
print(f"[{SERVICE_NAME}] Updated {len(all_new_rows)} lines to sheet '{sheet_name}'.")
else:
print(f"[{SERVICE_NAME}] No data found for sheet '{sheet_name}'.")
except Exception as e:
print(f"[{SERVICE_NAME}] Error syncing to sheet {sheet_name}: {e}")