taobin_sheet/main.py

1196 lines
40 KiB
Python

import time
import re
import os
import json
import redis
import gspread
import requests
import traceback
import threading
import uuid
import math
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from dotenv import load_dotenv
from google.oauth2 import service_account
REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT")
FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL")
LOG_SERVER_URL = os.getenv("LOG_SERVER_URL")
SERVICE_NAME = os.getenv("SERVICE_NAME")
ENTER_CHANNEL = f"{SERVICE_NAME}/enter"
HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat"
EXIT_CHANNEL = f"{SERVICE_NAME}/exit"
GET_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalogs"
UPDATE_MENU_CHANNEL = f"{SERVICE_NAME}/update/menu"
DELETE_MENU_CHANNEL = f"{SERVICE_NAME}/delete/menu"
ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog"
ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu"
# Grist set up ...
GRIST_URL = os.getenv("GRIST_URL")
GRIST_API_KEY = os.getenv("GRIST_API_KEY")
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}'
COUNTRY_MAPPING = {
"tha": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"),
"sheets": [
os.getenv("SHEET_NEW_LAYOUT_THA"),
os.getenv("SHEET_NEW_LAYOUT_V2_THA"),
os.getenv("SHEET_NAME_DESC_V2_THA")
],
"grist_doc_id": [
os.getenv("DOC_ID_NEW_LAYOUT_THA"),
os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"),
os.getenv("DOC_ID_NAME_DESC_V2_THA")
]
},
"tha_premium": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA_PREMIUM"),
"sheets": [
os.getenv("SHEET_NEW_LAYOUT_THA_PREMIUM")
]
}
}
def get_gspread_client():
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive'
]
creds = None
service_account_email = os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL")
private_key = os.getenv("GOOGLE_PRIVATE_KEY").replace("\\n", "\n")
credentials_info = {
"type": "service_account",
"client_email": service_account_email,
"private_key": private_key,
"token_uri": "https://oauth2.googleapis.com/token",
}
creds = service_account.Credentials.from_service_account_info(
credentials_info,
scopes=SCOPES
)
return gspread.authorize(creds)
class LockManager:
def __init__(self):
# { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} }
self.locks: Dict[str, dict] = {}
self.timeout = 30
def get_room_key(self, country: str, catalog: str) -> str:
return f"{country}:{catalog}"
def acquire(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
# (1 person, 1 room)
for key, data in list(self.locks.items()):
if data and data["user_id"] == user_id and key != room_key:
if now - data["timestamp"] < self.timeout:
return False, "You are already in another room."
else:
self.locks[key] = None
# Check if the room you're requesting is already occupied by someone else.
current_lock = self.locks.get(room_key)
if current_lock and current_lock["user_id"] != user_id:
if now - current_lock["timestamp"] < self.timeout:
return False, f"Room locked by {current_lock['user_id']}"
# queue
self.locks[room_key] = {"user_id": user_id, "timestamp": now}
return True, user_id
def keep_alive(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Renew
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
self.locks[room_key]["timestamp"] = now
return True
return False
def release(self, country: str, catalog: str, user_id: str):
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Clear
if lock and lock["user_id"] == user_id:
self.locks[room_key] = None
return True
return False
def is_authorized(self, country: str, catalog: str, user_id: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
return True
return False
def get_lock_info(self, country: str, catalog: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and (now - lock["timestamp"] < self.timeout):
return {"is_locked": True, "locked_by": lock["user_id"]}
return {"is_locked": False, "locked_by": None}
lock_manager = LockManager()
# -- Redis handler --
def redis_message_handler():
pubsub = r.pubsub()
pubsub.subscribe(ENTER_CHANNEL,
HEARTBEAT_CHANNEL,
EXIT_CHANNEL,
GET_CATALOG_CHANNEL,
UPDATE_MENU_CHANNEL,
DELETE_MENU_CHANNEL,
ADD_CATALOG_CHANNEL,
ADD_MENU_CHANNEL)
print(f"[*] Redis Listener started on service: {SERVICE_NAME}")
for message in pubsub.listen():
if message['type'] != 'message':
continue
try:
data = json.loads(message['data'])
payload = data.get("payload", {})
values = payload.get("values", {})
user_info = payload.get("user_info", {})
country = values.get("country", "").strip().lower()
catalog = values.get("catalog", "")
catalog_name = values.get("catalog_name", "")
content = values.get("content", [])
srv_name = payload.get("srv_name", "")
user_id = user_info.get("uid")
if srv_name != SERVICE_NAME:
continue
channel = message['channel']
if not (user_id and country):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if channel == ENTER_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
success, msg = lock_manager.acquire(country, catalog, user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, catalog, user_id),
daemon=True
).start()
print(f"[{SERVICE_NAME}] Enter Room: {catalog} | User: {user_id} | Success: {success}")
elif channel == HEARTBEAT_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
alive = lock_manager.keep_alive(country, catalog, user_id)
if not alive:
print(f"[{SERVICE_NAME}] Heartbeat Failed: {catalog} | User: {user_id}")
elif channel == EXIT_CHANNEL:
if not catalog:
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
lock_manager.release(country, catalog, user_id)
print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}")
elif channel == GET_CATALOG_CHANNEL:
try:
if not FRONTEND_NOTIFY_URL:
print(f"{SERVICE_NAME} Warning: FRONTEND_NOTIFY_URL is not set.")
continue
result = get_catalogs(country)
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"to": user_id,
"content": result
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(payload)
print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Get catalog error: {e}")
elif channel == ADD_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
print(f"[{SERVICE_NAME}] Add menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
handle_add_menu(country, catalog, content)
print(f"[{SERVICE_NAME}] Add menu success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Add menu error: {e}")
elif channel == ADD_CATALOG_CHANNEL:
if not (catalog_name, catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
try:
handle_add_catalog(country, catalog_name, catalog)
except Exception as e:
print(f"[{SERVICE_NAME}] Add catalog error: {e}")
elif channel == UPDATE_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
# raise HTTPException(status_code=403, detail="Unauthorized access to this room")
print(f"[{SERVICE_NAME}] Updated catalog failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
update_sheets(country, catalog, content)
print(f"[{SERVICE_NAME}] Update success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Update error: {e}")
elif channel == DELETE_MENU_CHANNEL:
if not (content and catalog):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if not lock_manager.is_authorized(country, catalog, user_id):
print(f"[{SERVICE_NAME}] Delete menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room")
continue
try:
handle_delete_menu(country, catalog, content)
except Exception as e:
print(f"[{SERVICE_NAME}] Delete menu error: {e}")
except Exception as e:
print(f"[Redis Error] Error processing message: {e}")
traceback.print_exc()
def find_grist_table_id(doc_id, catalog_suffix):
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
try:
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for t in tables:
t_id = t.get("id", "")
reconstructed = reconstruct_table_name(t_id)
if f"file={catalog_suffix}" in reconstructed:
return t_id
label = t.get("fields", {}).get("label", "")
if f"file={catalog_suffix}" in label:
return t_id
return None
except Exception as e:
print(f"Error: {e}")
return None
def send_stream_notification(msg_type: str, content: any, batch_id: str, current_chunk: int, total_chunks: int, user_id: str):
"""
msg_type: "start", "chunk", "end", "error"
"""
if not FRONTEND_NOTIFY_URL:
print("Warning: FRONTEND_NOTIFY_URL is not set.")
return
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"msg": msg_type,
"batch_id": batch_id,
"current_chunk": current_chunk,
"total_chunks": total_chunks,
"to": user_id,
"content": content
}
}
try:
# requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}")
print(payload)
except Exception as e:
print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}")
def process_and_stream_sheet_data(country: str, catalog: str, user_id: str):
batch_id = str(uuid.uuid4())
send_stream_notification("start", {"message": f"Start fetching catalog: {catalog}"}, batch_id, 0, 0, user_id)
try:
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", [])
doc_nl = grist_docs[0]
doc_nv2 = grist_docs[1]
doc_nd = grist_docs[2]
full_table_nl = find_grist_table_id(doc_nl, catalog)
full_table_nv2 = find_grist_table_id(doc_nv2, catalog)
if not full_table_nl:
raise Exception(f"Table for catalog {catalog} not found in New-Layout")
nl_data = fetch_grist_table_data(doc_nl, full_table_nl)
nv2_data = fetch_grist_table_data(doc_nv2, full_table_nv2)
nd_all = fetch_grist_table_data(doc_nd, "Name_desc_v2")
final_result = []
for i, nl_row in enumerate(nl_data):
name_th = nl_row[2].strip() if len(nl_row) > 2 else ""
name_en = nl_row[3].strip() if len(nl_row) > 3 else ""
if not name_th and not name_en:
continue
menu_item = {
"new_layout": {"row_index": i + 1, "values": nl_row},
"new_layout_v2": [],
"name_desc_v2": []
}
# Search Targets
pairs = [
(nl_row[6] if len(nl_row) > 6 else "-", nl_row[10] if len(nl_row) > 10 else "-"),
(nl_row[7] if len(nl_row) > 7 else "-", nl_row[11] if len(nl_row) > 11 else "-"),
(nl_row[8] if len(nl_row) > 8 else "-", nl_row[12] if len(nl_row) > 12 else "-")
]
search_targets = []
individual_codes = []
for p1, p2 in pairs:
c1 = p1.strip() if p1.strip() and p1.strip() != "-" else "-"
c2 = p2.strip() if p2.strip() and p2.strip() != "-" else "-"
if c1 != "-" or c2 != "-": search_targets.append(f"{c1},{c2}")
if c1 != "-": individual_codes.append(c1)
if c2 != "-": individual_codes.append(c2)
# find in nv2_data
for j, nv2_row in enumerate(nv2_data):
if len(nv2_row) > 10:
v2_vals = [nv2_row[8].strip(), nv2_row[9].strip(), nv2_row[10].strip()]
# Check target have or not in I, J, K
if any(t in v2_vals for t in search_targets):
for sub_idx in range(3): # 3 row (Name, Desc, Img)
curr_j = j + sub_idx
if curr_j < len(nv2_data):
row_data = nv2_data[curr_j]
row_info = {"row_index": curr_j + 1, "cells": []}
if sub_idx < 2:
for c_idx in range(4, 8):
if c_idx < len(row_data):
row_info["cells"].append({
"value": row_data[c_idx],
"coord": get_coord(curr_j, c_idx)
})
menu_item["new_layout_v2"].append(row_info)
break
# find in name-desc-v2
for code in set(individual_codes):
targets = [f"MENU.{code}.name", f"MENU.{code}.desc"]
for nd_idx, nd_row in enumerate(nd_all):
if len(nd_row) > 0 and nd_row[0].strip() in targets:
nd_info = {"key": nd_row[0], "row_index": nd_idx+1, "cells": []}
for c_idx in range(4, 8):
if c_idx < len(nd_row):
nd_info["cells"].append({
"value": nd_row[c_idx],
"coord": get_coord(nd_idx, c_idx)
})
menu_item["name_desc_v2"].append(nd_info)
final_result.append(menu_item)
# --- Stream Chunk ---
CHUNK_SIZE = 10
total_items = len(final_result)
total_chunks = math.ceil(total_items / CHUNK_SIZE) if total_items > 0 else 1
for i in range(total_chunks):
start_idx = i * CHUNK_SIZE
end_idx = start_idx + CHUNK_SIZE
chunk_data = final_result[start_idx:end_idx]
send_stream_notification("chunk", chunk_data, batch_id, i + 1, total_chunks, user_id)
send_stream_notification("end", {"message": "All data sent successfully"}, batch_id, total_chunks, total_chunks, user_id)
except Exception as e:
traceback.print_exc()
send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, user_id)
def get_coord(r, c):
return {"row": r + 1, "col": c + 1}
# get catalog
def get_catalogs(country: str):
config = COUNTRY_MAPPING.get(country)
if not config or not config.get("grist_doc_id"):
print(f"[{SERVICE_NAME}] Country or Grist doc not found")
raise HTTPException(status_code=404, detail="Grist config not found")
try:
# First sheet in Map
doc_nl_id = config["grist_doc_id"][0]
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
catalogs = []
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for i, t in enumerate(tables):
t_id = t["id"]
if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2":
continue
reconstructed_name = reconstruct_table_name(t_id)
match = re.search(r'file=([^,]+)', reconstructed_name)
if match:
clean_catalog = match.group(1).strip()
else:
clean_catalog = t_id
lock_info = lock_manager.get_lock_info(country, clean_catalog)
catalogs.append({
"catalog": clean_catalog,
"row_index": i,
"status": "locked" if lock_info["is_locked"] else "free",
"locked_by": lock_info["locked_by"]
})
return {"status": "success", "country": country, "catalogs": catalogs}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
"""
ADD_MENU_CHANNEL
"""
def find_catalog_block(col_a, catalog: str):
start = None
end = None
pattern = re.compile(rf"file={re.escape(catalog)}")
for i, val in enumerate(col_a):
val = val.strip()
if pattern.search(val):
if start is None:
start = i
end = i
else:
if start is not None:
break
return start, end
def handle_add_menu(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
client = get_gspread_client()
sheet = client.open_by_key(config["spreadsheet_id"])
nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower())
worksheet = sheet.worksheet(nl_name)
col_a = worksheet.col_values(1)
start, end = find_catalog_block(col_a, catalog)
if start is None:
return
insert_at = end + 2
worksheet.insert_rows([[]] * len(content), row=insert_at)
batch_requests = []
for i, row in enumerate(content):
row_index = insert_at + i
cells = row.get("cells", [])
if not cells:
continue
end_col = col_to_letter(len(cells))
batch_requests.append({
"range": f"A{row_index}:{end_col}{row_index}",
"values": [cells]
})
if batch_requests:
worksheet.batch_update(batch_requests, value_input_option="USER_ENTERED")
"""
ADD_CATALOG_CHANNEL
"""
def handle_add_catalog(country: str, catalog_name: str, catalog: str):
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", [])
doc_nl = grist_docs[0]
doc_nv2 = grist_docs[1]
table_label = f"Name={catalog_name},file={catalog}"
table_id = re.sub(r'[^a-zA-Z0-9_]', '_', table_label)
table_id = table_id[0].upper() + table_id[1:]
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
def create_table_in_doc(doc_id):
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
columns = [{"id": chr(65+i), "fields": {"label": chr(65+i)}} for i in range(15)]
payload = {
"tables": [{
"id": table_id,
"columns": columns
}]
}
resp = requests.post(url, headers=headers, json=payload)
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Created table {table_id} in doc {doc_id}")
else:
print(f"[{SERVICE_NAME}] Failed to create table: {resp.text}")
create_table_in_doc(doc_nl)
create_table_in_doc(doc_nv2)
"""
UPDATE_MENU_CHANNEL
"""
# update sheet
def update_sheets(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
grist_docs = config.get("grist_doc_id", [])
doc_map = {
"new_layout": grist_docs[0],
"new_layout_v2": grist_docs[1],
"name_desc_v2": grist_docs[2]
}
full_table_name = find_grist_table_id(grist_docs[0], catalog)
if not full_table_name:
raise Exception(f"Table for catalog {catalog} not found in New-Layout")
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
# {"id": row_index, "fields": {"A": val, "B": val}}
for item in content:
for sheet_key in ["new_layout", "new_layout_v2", "name_desc_v2"]:
rows = item.get(sheet_key, [])
if sheet_key == "new_layout":
rows = [rows] if rows else []
if not rows:
continue
doc_id = doc_map[sheet_key]
target_table = "Name_desc_v2" if sheet_key == "name_desc_v2" else full_table_name
records_to_update = []
for row in rows:
cells = row.get("cells", [])
if not cells: continue
row_i = cells[0]["coord"]["row"]
fields = {}
for cell in cells:
col_letter = col_to_letter(cell["coord"]["col"]) # column A, B, C...
fields[col_letter] = str(cell["value"])
records_to_update.append({
"id": row_i,
"fields": fields
})
if records_to_update:
update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{target_table}/records"
resp = requests.patch(update_url, headers=headers, json={"records": records_to_update})
if resp.status_code != 200:
print(f"[{SERVICE_NAME}] Grist update failed for {target_table}: {resp.text}")
# column to A1 spreadsheet format
def col_to_letter(col: int) -> str:
result = ""
while col > 0:
col, rem = divmod(col - 1, 26)
result = chr(65 + rem) + result
return result
"""
DELETE_MENU_CHANNEL
"""
def handle_delete_menu(country: str, catalog: str, content: list):
config = COUNTRY_MAPPING.get(country)
client = get_gspread_client()
sheet = client.open_by_key(config["spreadsheet_id"])
nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower())
worksheet = sheet.worksheet(nl_name)
col_a = worksheet.col_values(1)
# find catalog start range
pattern = re.compile(rf"file={re.escape(catalog)}")
start = None
end = None
for i, val in enumerate(col_a):
val = val.strip()
if start is None:
if pattern.search(val):
start = i
end = i
else:
if "file=" in val:
break
end = i
if start is None:
return
rows = [item["row_index"] for item in content if "row_index" in item]
# specifically in block
print("start, end:", start, end)
print("rows before filter:", rows)
rows = [
r for r in rows
if (start + 1) <= r <= (end + 1)
]
if not rows:
return
ranges = group_consecutive_rows(rows)
requests = []
for start_r, end_r in reversed(ranges):
requests.append({
"deleteDimension": {
"range": {
"sheetId": worksheet.id,
"dimension": "ROWS",
"startIndex": start_r - 1,
"endIndex": end_r
}
}
})
sheet.batch_update({
"requests": requests
})
def group_consecutive_rows(rows):
ranges = []
rows = sorted(rows)
start = rows[0]
prev = rows[0]
for r in rows[1:]:
if r == prev + 1:
prev = r
else:
ranges.append((start, prev))
start = r
prev = r
ranges.append((start, prev))
return ranges
app = FastAPI()
class CatalogPayloadValues(BaseModel):
country: str
class CatalogPayload(BaseModel):
user_info: dict = {}
srv_name: str
values: CatalogPayloadValues
class CatalogRequest(BaseModel):
type: str
payload: CatalogPayload
class RoomRequest(BaseModel):
country: str
catalog: str
user_id: str
@app.on_event("startup")
def startup_event():
thread = threading.Thread(target=redis_message_handler, daemon=True)
thread.start()
@app.post("/catalogs")
def get_api_catalogs(req: CatalogRequest):
payload = req.payload
if payload.srv_name != SERVICE_NAME:
raise HTTPException(status_code=400, detail="Invalid service name")
country = payload.values.country.strip().lower()
config = COUNTRY_MAPPING.get(country)
if not config or not config["sheets"]:
raise HTTPException(status_code=404, detail="Country or sheets not found")
try:
# First sheet in Map
target_sheet_name = config["sheets"][0]
client = get_gspread_client()
spreadsheet = client.open_by_key(config["spreadsheet_id"])
worksheet = spreadsheet.worksheet(target_sheet_name)
# Get A column
col_a = worksheet.col_values(1)
catalogs = []
# Skip Row 1 (Index 0 in List)
for row_idx in range(1, len(col_a)):
val = col_a[row_idx].strip()
if not val or val in ["-", "IGNORE"]:
continue
# Specify file=...
# format: Name=Test,file=page_catalog_group_recommend.skt
match = re.search(r'file=([^,]+)', val)
if match:
catalog_name = match.group(1).strip()
lock_info = lock_manager.get_lock_info(country, catalog_name)
catalogs.append({
"catalog": catalog_name,
"row_index": row_idx + 1, # Index in Google Sheet
"status": "locked" if lock_info["is_locked"] else "free",
"locked_by": lock_info["locked_by"]
})
return {"status": "success", "country": country, "catalogs": catalogs}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/catalog/data/{country}/{catalog}/{user_id}")
def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks):
country = country.strip().lower()
if not lock_manager.is_authorized(country, catalog, user_id):
raise HTTPException(status_code=403, detail="Unauthorized access to this room")
background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id)
# return {"status": "success", "country": country, "catalog": catalog, "data": final_result}
return {
"status": "processing",
"country": country,
"catalog": catalog,
"message": "Data stream started. Please listen to frontend notifications."
}
@app.post("/api/room/enter")
def enter_room(req: RoomRequest):
country = req.country.strip().lower()
success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, req.catalog, req.user_id),
daemon=True
).start()
return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout}
else:
raise HTTPException(status_code=403, detail=message_or_user)
@app.post("/api/room/heartbeat")
def heartbeat(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.keep_alive(country, req.catalog, req.user_id):
return {"status": "alive"}
else:
raise HTTPException(status_code=401, detail="Timeout permission denied")
@app.post("/api/room/exit")
def exit_room(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.release(country, req.catalog, req.user_id):
return {"status": "success", "message": "Successfully exited"}
else:
return {"status": "success", "message": "You no longer have permission"}
@app.get("/grist/pull/sheet/{country}")
def grist_pull_sheet_api(country: str):
try:
sync_sheets_to_grist(country)
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/grist/push/data/sheet/{country}")
def grist_push_data_sheet_api(country: str):
try:
sync_grist_to_sheets(country)
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
"""
Grist function endpoint
[ Grist sync sheet to grist ]
"""
def get_column_letter(n):
"""0 -> A, 1 -> B, 25 -> Z, 26 -> AA"""
result = ""
n += 1
while n > 0:
n, remainder = divmod(n - 1, 26)
result = chr(65 + remainder) + result
return result
def sync_sheets_to_grist(country_key):
if country_key not in COUNTRY_MAPPING:
print(f"Error: Country '{country_key}' not found.")
return
config = COUNTRY_MAPPING[country_key]
spreadsheet_id = config["spreadsheet_id"]
sheet_names = config["sheets"]
grist_doc_ids = config.get("grist_doc_id", [])
gc = get_gspread_client()
spreadsheet = gc.open_by_key(spreadsheet_id)
for index, sheet_name in enumerate(sheet_names):
if not sheet_name or index >= len(grist_doc_ids):
continue
doc_id = grist_doc_ids[index]
try:
worksheet = spreadsheet.worksheet(sheet_name)
all_values = worksheet.get_all_values()
if not all_values:
continue
header = all_values[0]
data_rows = all_values[1:]
if "new-layout" in sheet_name.lower():
process_new_layout_sheet(doc_id, header, data_rows)
elif "name-desc-v2" in sheet_name.lower():
upload_to_grist_self_hosted(doc_id, "name-desc-v2", header, data_rows)
except Exception as e:
print(f"[{SERVICE_NAME}] Error processing sheet {sheet_name}: {e}")
def process_new_layout_sheet(doc_id, header, data_rows):
table_groups = {}
current_table_name = None
for row in data_rows:
col_a = row[0] if len(row) > 0 else ""
if col_a.startswith("Name="):
current_table_name = col_a
table_groups[current_table_name] = []
elif current_table_name:
table_groups[current_table_name].append(row)
for table_name, rows in table_groups.items():
if rows:
upload_to_grist_self_hosted(doc_id, table_name, header, rows)
def upload_to_grist_self_hosted(doc_id, table_name, header, rows):
clean_table_id = re.sub(r'[^a-zA-Z0-9_]', '_', table_name)
clean_table_id = clean_table_id[0].upper() + clean_table_id[1:]
chunk_size = 500
if clean_table_id[0].isdigit():
clean_table_id = f"t_{clean_table_id}"
base_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {
"Authorization": f"Bearer {GRIST_API_KEY}",
"Content-Type": "application/json"
}
column_defs = []
for i, h_text in enumerate(header):
col_letter = get_column_letter(i)
column_defs.append({
"id": col_letter,
"fields": {"label": h_text.strip() or col_letter}
})
try:
check_resp = requests.get(f"{base_url}/{clean_table_id}/records", headers=headers)
if check_resp.status_code == 200:
existing = check_resp.json().get("records", [])
if existing:
row_ids = [r["id"] for r in existing]
for i in range(0, len(row_ids), chunk_size):
requests.post(f"{base_url}/{clean_table_id}/records/delete",
headers=headers, json=row_ids[i:i+chunk_size])
else:
print(f"[{SERVICE_NAME}] Creating table {clean_table_id} with indexed columns...")
create_resp = requests.post(base_url, headers=headers, json={
"tables": [{"id": clean_table_id, "columns": column_defs}]
})
if create_resp.status_code == 200:
time.sleep(1)
else:
print(f"[{SERVICE_NAME}] Create failed: {create_resp.text}")
return
records_to_add = []
for row in rows:
fields = {}
for i, val in enumerate(row):
if i < len(column_defs):
col_id = column_defs[i]["id"]
fields[col_id] = val
if fields:
records_to_add.append({"fields": fields})
if records_to_add:
for i in range(0, len(records_to_add), chunk_size):
chunk = records_to_add[i:i+chunk_size]
res = requests.post(f"{base_url}/{clean_table_id}/records",
headers=headers, json={"records": chunk})
if res.status_code != 200:
print(f"Error adding data to {clean_table_id}: {res.text}")
print(f"[{SERVICE_NAME}] Synced {len(records_to_add)} rows to {clean_table_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Error on {clean_table_id}: {e}")
"""
Grist [ Sync grist to google sheet ]
"""
def col_to_index(col_str):
"""Change 'A', 'B', 'AA' back to Index (0, 1, 26)"""
idx = 0
for char in col_str:
idx = idx * 26 + (ord(char) - ord('A') + 1)
return idx - 1
def reconstruct_table_name(t_id):
name = t_id
if name.startswith("Name_"):
name = name.replace("Name_", "Name=", 1)
if "_file_" in name:
name = name.replace("_file_", ",file=", 1)
if name.endswith("_skt"):
name = name[:-4] + ".skt"
return name
def get_all_grist_tables(doc_id):
""" Pull all table name in Document """
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
if resp.status_code == 200:
tables = resp.json().get("tables", [])
return [t["id"] for t in tables if not t["id"].startswith("Grist")]
return []
def fetch_grist_table_data(doc_id, table_id):
""" pull Record and change to {'A': 'val1', 'B': 'val2'} """
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
if resp.status_code != 200:
print(f"Failed to fetch table {table_id}: {resp.text}")
return []
records = resp.json().get("records", [])
if not records:
return []
parsed_rows = []
for r in records:
fields = r.get("fields", {})
max_idx = -1
for k in fields.keys():
if re.match(r'^[A-Z]+$', k):
max_idx = max(max_idx, col_to_index(k))
row = [""] * (max_idx + 1)
for k, v in fields.items():
if re.match(r'^[A-Z]+$', k):
row[col_to_index(k)] = str(v) if v is not None else ""
parsed_rows.append(row)
return parsed_rows
def sync_grist_to_sheets(country_key):
if country_key not in COUNTRY_MAPPING:
print(f"Error: Country '{country_key}' not found.")
return
config = COUNTRY_MAPPING[country_key]
spreadsheet_id = config["spreadsheet_id"]
sheet_names = config["sheets"]
grist_doc_ids = config.get("grist_doc_id", [])
gc = get_gspread_client()
spreadsheet = gc.open_by_key(spreadsheet_id)
for index, sheet_name in enumerate(sheet_names):
if not sheet_name or index >= len(grist_doc_ids):
continue
doc_id = grist_doc_ids[index]
if not doc_id:
continue
try:
worksheet = spreadsheet.worksheet(sheet_name)
# Case Name-desc-v2
if "name-desc-v2" in sheet_name.lower():
# print(f"Fetching Name_desc_v2 from doc: {doc_id}...")
rows = fetch_grist_table_data(doc_id, "Name_desc_v2")
if rows:
worksheet.batch_clear(["A2:ZZ"])
worksheet.update(values=rows, range_name="A2")
print(f"[{SERVICE_NAME}] Updated {len(rows)} rows to sheet '{sheet_name}'.")
# Case New-layout / New-layout-v2
elif "new-layout" in sheet_name.lower():
print(f"Fetching multiple tables from doc: {doc_id}...")
all_tables = get_all_grist_tables(doc_id)
all_new_rows = []
for t_id in all_tables:
if t_id.lower() == "name_desc_v2":
continue
rows = fetch_grist_table_data(doc_id, t_id)
if rows:
original_name = reconstruct_table_name(t_id)
all_new_rows.append([original_name])
all_new_rows.extend(rows)
if all_new_rows:
worksheet.batch_clear(["A2:ZZ"])
worksheet.update(values=all_new_rows, range_name="A2")
print(f"[{SERVICE_NAME}] Updated {len(all_new_rows)} lines (incl. Table Names) to sheet '{sheet_name}'.")
else:
print(f"[{SERVICE_NAME}] No data found in any tables for sheet '{sheet_name}'.")
except Exception as e:
print(f"[{SERVICE_NAME}] Error syncing to sheet {sheet_name}: {e}")