taobin_sheet/main.py
2026-04-02 12:53:23 +07:00

487 lines
No EOL
18 KiB
Python

import time
import re
import os
import json
import redis
import gspread
import requests
import traceback
import threading
import uuid
import math
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from dotenv import load_dotenv
from google.oauth2 import service_account
REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT")
FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL")
LOG_SERVER_URL = os.getenv("LOG_SERVER_URL")
SERVICE_NAME = os.getenv("SERVICE_NAME")
ENTER_CHANNEL = f"{SERVICE_NAME}/enter"
HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat"
EXIT_CHANNEL = f"{SERVICE_NAME}/exit"
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}'
COUNTRY_MAPPING = {
"tha": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"),
"sheets": [
os.getenv("SHEET_NEW_LAYOUT_THA"),
os.getenv("SHEET_NEW_LAYOUT_V2_THA"),
os.getenv("SHEET_NAME_DESC_V2_THA")
]
},
"tha_premium": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA_PREMIUM"),
"sheets": [
os.getenv("SHEET_NEW_LAYOUT_THA_PREMIUM")
]
}
}
def get_gspread_client():
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive'
]
creds = None
service_account_email = os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL")
private_key = os.getenv("GOOGLE_PRIVATE_KEY").replace("\\n", "\n")
credentials_info = {
"type": "service_account",
"client_email": service_account_email,
"private_key": private_key,
"token_uri": "https://oauth2.googleapis.com/token",
}
creds = service_account.Credentials.from_service_account_info(
credentials_info,
scopes=SCOPES
)
return gspread.authorize(creds)
class LockManager:
def __init__(self):
# { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} }
self.locks: Dict[str, dict] = {}
self.timeout = 30
def get_room_key(self, country: str, catalog: str) -> str:
return f"{country}:{catalog}"
def acquire(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
# (1 person, 1 room)
for key, data in list(self.locks.items()):
if data and data["user_id"] == user_id and key != room_key:
if now - data["timestamp"] < self.timeout:
return False, "You are already in another room."
else:
self.locks[key] = None
# Check if the room you're requesting is already occupied by someone else.
current_lock = self.locks.get(room_key)
if current_lock and current_lock["user_id"] != user_id:
if now - current_lock["timestamp"] < self.timeout:
return False, f"Room locked by {current_lock['user_id']}"
# queue
self.locks[room_key] = {"user_id": user_id, "timestamp": now}
return True, user_id
def keep_alive(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Renew
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
self.locks[room_key]["timestamp"] = now
return True
return False
def release(self, country: str, catalog: str, user_id: str):
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Clear
if lock and lock["user_id"] == user_id:
self.locks[room_key] = None
return True
return False
def is_authorized(self, country: str, catalog: str, user_id: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
return True
return False
def get_lock_info(self, country: str, catalog: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and (now - lock["timestamp"] < self.timeout):
return {"is_locked": True, "locked_by": lock["user_id"]}
return {"is_locked": False, "locked_by": None}
lock_manager = LockManager()
# -- Redis handler --
def redis_message_handler():
pubsub = r.pubsub()
pubsub.subscribe(ENTER_CHANNEL, HEARTBEAT_CHANNEL, EXIT_CHANNEL)
print(f"[*] Redis Listener started on channels: {ENTER_CHANNEL}, {HEARTBEAT_CHANNEL}, {EXIT_CHANNEL}")
for message in pubsub.listen():
if message['type'] != 'message':
continue
try:
data = json.loads(message['data'])
payload = data.get("payload", {})
values = payload.get("values", {})
user_info = payload.get("user_info", {})
country = values.get("country", "").strip().lower()
catalog = values.get("catalog", "")
srv_name = payload.get("srv_name", "")
user_id = user_info.get("user_id") or user_info.get("id")
if srv_name != SERVICE_NAME:
continue
channel = message['channel']
if not (user_id and country and catalog):
print(f"[Redis] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
if channel == ENTER_CHANNEL:
success, msg = lock_manager.acquire(country, catalog, user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, catalog, user_id),
daemon=True
).start()
print(f"[Redis] Enter Room: {catalog} | User: {user_id} | Success: {success}")
elif channel == HEARTBEAT_CHANNEL:
alive = lock_manager.keep_alive(country, catalog, user_id)
if not alive:
print(f"[Redis] Heartbeat Failed: {catalog} | User: {user_id}")
elif channel == EXIT_CHANNEL:
lock_manager.release(country, catalog, user_id)
print(f"[Redis] Exit Room (Force/Command): {catalog} | User: {user_id}")
except Exception as e:
print(f"[Redis Error] Error processing message: {e}")
traceback.print_exc()
def send_stream_notification(msg_type: str, content: any, batch_id: str, current_chunk: int, total_chunks: int, user_id: str):
"""
msg_type: "start", "chunk", "end", "error"
"""
if not FRONTEND_NOTIFY_URL:
print("Warning: FRONTEND_NOTIFY_URL is not set.")
return
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"msg": msg_type,
"batch_id": batch_id,
"current_chunk": current_chunk,
"total_chunks": total_chunks,
"to": user_id,
"content": content
}
}
try:
# requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}")
print(payload)
except Exception as e:
print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}")
def process_and_stream_sheet_data(country: str, catalog: str, user_id: str):
batch_id = str(uuid.uuid4())
send_stream_notification("start", {"message": f"Start fetching catalog: {catalog}"}, batch_id, 0, 0, user_id)
try:
config = COUNTRY_MAPPING.get(country)
client = get_gspread_client()
spreadsheet = client.open_by_key(config["spreadsheet_id"])
try:
nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower())
nv2_name = next(s for s in config["sheets"] if "new-layout-v2" in s.lower())
nd_name = next(s for s in config["sheets"] if "name-desc-v2" in s.lower())
except StopIteration:
raise HTTPException(status_code=404, detail="Sheet mapping configuration error")
nl_all = spreadsheet.worksheet(nl_name).get_all_values()
nv2_all = spreadsheet.worksheet(nv2_name).get_all_values()
nd_all = spreadsheet.worksheet(nd_name).get_all_values()
final_result = []
is_in_block = False
for i, nl_row in enumerate(nl_all):
col_a = nl_row[0].strip() if len(nl_row) > 0 else ""
if col_a in ["IGNORE", "-"]: col_a = ""
if not is_in_block:
if catalog in col_a: is_in_block = True
continue
else:
if col_a and catalog not in col_a: break
name_th = nl_row[2].strip() if len(nl_row) > 2 else ""
name_en = nl_row[3].strip() if len(nl_row) > 3 else ""
if not name_th and not name_en:
continue
menu_item = {
"new_layout": {"row_index": i + 1, "values": nl_row},
"new_layout_v2": [],
"name_desc_v2": []
}
# Search Targets (G-K, H-L, I-M)
# G(6)-K(10), H(7)-L(11), I(8)-M(12)
pairs = [
(nl_row[6] if len(nl_row) > 6 else "-", nl_row[10] if len(nl_row) > 10 else "-"),
(nl_row[7] if len(nl_row) > 7 else "-", nl_row[11] if len(nl_row) > 11 else "-"),
(nl_row[8] if len(nl_row) > 8 else "-", nl_row[12] if len(nl_row) > 12 else "-")
]
search_targets = []
individual_codes = []
for p1, p2 in pairs:
c1 = p1.strip() if p1.strip() and p1.strip() != "-" else "-"
c2 = p2.strip() if p2.strip() and p2.strip() != "-" else "-"
# Format: {code1},{code2}
if c1 != "-" or c2 != "-":
search_targets.append(f"{c1},{c2}")
# keep to find in name-desc-v2
if c1 != "-": individual_codes.append(c1)
if c2 != "-": individual_codes.append(c2)
# find in new-layout-v2 (check I, J, K | Index 8, 9, 10)
for j, nv2_row in enumerate(nv2_all):
if len(nv2_row) > 10: # range max is column K
# I(8), J(9), K(10)
v2_vals = [nv2_row[8].strip(), nv2_row[9].strip(), nv2_row[10].strip()]
# Check target have or not in I, J, K
match_v2 = any(t in v2_vals for t in search_targets)
if match_v2:
for sub_idx in range(3): # 3 row (Name, Desc, Img)
curr_j = j + sub_idx
if curr_j < len(nv2_all):
row_data = nv2_all[curr_j]
row_info = {
"row_index": curr_j + 1,
"type": row_data[0] if len(row_data) > 0 else "",
"cells": []
}
# E-H (index 4-7) with only 2 row
if sub_idx < 2:
for c_idx in range(4, 8):
if c_idx < len(row_data):
row_info["cells"].append({
"value": row_data[c_idx],
"coord": get_coord(curr_j, c_idx)
})
menu_item["new_layout_v2"].append(row_info)
break
# find in name-desc-v2
for code in set(individual_codes):
targets = [f"MENU.{code}.name", f"MENU.{code}.desc"]
for nd_idx, nd_row in enumerate(nd_all):
if len(nd_row) > 0 and nd_row[0].strip() in targets:
nd_info = {"key": nd_row[0], "row_index": nd_idx+1, "cells": []}
for c_idx in range(4, 8):
if c_idx < len(nd_row):
nd_info["cells"].append({
"value": nd_row[c_idx],
"coord": get_coord(nd_idx, c_idx)
})
menu_item["name_desc_v2"].append(nd_info)
final_result.append(menu_item)
# -----------------------------------------------------------------
# init chunk
CHUNK_SIZE = 10
total_items = len(final_result)
total_chunks = math.ceil(total_items / CHUNK_SIZE)
for i in range(total_chunks):
start_idx = i * CHUNK_SIZE
end_idx = start_idx + CHUNK_SIZE
chunk_data = final_result[start_idx:end_idx]
send_stream_notification("chunk", chunk_data, batch_id, i + 1, total_chunks, user_id)
send_stream_notification("end", {"message": "All data sent successfully"}, batch_id, total_chunks, total_chunks, user_id)
except Exception as e:
traceback.print_exc()
send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, user_id)
app = FastAPI()
class CatalogPayloadValues(BaseModel):
country: str
class CatalogPayload(BaseModel):
user_info: dict = {}
srv_name: str
values: CatalogPayloadValues
class CatalogRequest(BaseModel):
type: str
payload: CatalogPayload
class RoomRequest(BaseModel):
country: str
catalog: str
user_id: str
@app.on_event("startup")
def startup_event():
thread = threading.Thread(target=redis_message_handler, daemon=True)
thread.start()
@app.post("/catalogs")
def get_catalogs(req: CatalogRequest):
payload = req.payload
if payload.srv_name != SERVICE_NAME:
raise HTTPException(status_code=400, detail="Invalid service name")
country = payload.values.country.strip().lower()
config = COUNTRY_MAPPING.get(country)
if not config or not config["sheets"]:
raise HTTPException(status_code=404, detail="Country or sheets not found")
try:
# First sheet in Map
target_sheet_name = config["sheets"][0]
client = get_gspread_client()
spreadsheet = client.open_by_key(config["spreadsheet_id"])
worksheet = spreadsheet.worksheet(target_sheet_name)
# Get A column
col_a = worksheet.col_values(1)
catalogs = []
# Skip Row 1 (Index 0 in List)
for row_idx in range(1, len(col_a)):
val = col_a[row_idx].strip()
if not val or val in ["-", "IGNORE"]:
continue
# Specify file=...
# format: Name=Test,file=page_catalog_group_recommend.skt
match = re.search(r'file=([^,]+)', val)
if match:
catalog_name = match.group(1).strip()
lock_info = lock_manager.get_lock_info(country, catalog_name)
catalogs.append({
"catalog": catalog_name,
"row_index": row_idx + 1, # Index in Google Sheet
"status": "locked" if lock_info["is_locked"] else "free",
"locked_by": lock_info["locked_by"]
})
return {"status": "success", "country": country, "catalogs": catalogs}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
def get_coord(r, c):
return {"row": r + 1, "col": c + 1}
@app.get("/catalog/data/{country}/{catalog}/{user_id}")
def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks):
country = country.strip().lower()
if not lock_manager.is_authorized(country, catalog, user_id):
raise HTTPException(status_code=403, detail="Unauthorized access to this room")
background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id)
# return {"status": "success", "country": country, "catalog": catalog, "data": final_result}
return {
"status": "processing",
"country": country,
"catalog": catalog,
"message": "Data stream started. Please listen to frontend notifications."
}
@app.post("/api/room/enter")
def enter_room(req: RoomRequest):
country = req.country.strip().lower()
success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, req.catalog, req.user_id),
daemon=True
).start()
return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout}
else:
raise HTTPException(status_code=403, detail=message_or_user)
# @app.post("/api/room/heartbeat")
# def heartbeat(req: RoomRequest):
# country = req.country.strip().lower()
# if lock_manager.keep_alive(country, req.catalog, req.user_id):
# return {"status": "alive"}
# else:
# raise HTTPException(status_code=401, detail="Timeout permission denied")
@app.post("/api/room/exit")
def exit_room(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.release(country, req.catalog, req.user_id):
return {"status": "success", "message": "Successfully exited"}
else:
return {"status": "success", "message": "You no longer have permission"}