From be1cd2876a9420d56ca57f6ff398731de056b9eb Mon Sep 17 00:00:00 2001 From: Ittipat Lusuk Date: Thu, 2 Apr 2026 12:53:23 +0700 Subject: [PATCH] taobin-sheet --- Dockerfile | 14 ++ docker-compose.yml | 25 +++ main.py | 487 +++++++++++++++++++++++++++++++++++++++++++++ nginx.conf | 9 + requirements.txt | 10 + 5 files changed, 545 insertions(+) create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 main.py create mode 100644 nginx.conf create mode 100644 requirements.txt diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1dd336a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +ENV TZ=Asia/Bangkok + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/* + +CMD ["python", "main.py"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..4344540 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,25 @@ +version: "3.9" + +services: + app: + build: . + container_name: taobin-sheet-container + ports: + - "8124:8124" + environment: + - PYTHONUNBUFFERED=1 + env_file: + - .env + # volumes: + # - ./tsv_data:/app/tsv_data + restart: always + command: uvicorn main:app --host 0.0.0.0 --port 8124 + + nginx: + image: nginx:latest + container_name: nginx-image-container + volumes: + - ~/repo/taobin_project:/taobin_project + - ./nginx.conf:/etc/nginx/conf.d/default.conf + ports: + - "8080:80" \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..f2818db --- /dev/null +++ b/main.py @@ -0,0 +1,487 @@ +import time +import re +import os +import json +import redis +import gspread +import requests +import traceback +import threading +import uuid +import math +from fastapi import FastAPI, HTTPException, BackgroundTasks +from pydantic import BaseModel +from dotenv import load_dotenv +from google.oauth2 import service_account + +REDIS_HOST = os.getenv("REDIS_HOST") +REDIS_PORT = os.getenv("REDIS_PORT") +FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL") +LOG_SERVER_URL = os.getenv("LOG_SERVER_URL") + +SERVICE_NAME = os.getenv("SERVICE_NAME") + +ENTER_CHANNEL = f"{SERVICE_NAME}/enter" +HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat" +EXIT_CHANNEL = f"{SERVICE_NAME}/exit" + +r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + +ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}' + +COUNTRY_MAPPING = { + "tha": { + "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"), + "sheets": [ + os.getenv("SHEET_NEW_LAYOUT_THA"), + os.getenv("SHEET_NEW_LAYOUT_V2_THA"), + os.getenv("SHEET_NAME_DESC_V2_THA") + ] + }, + "tha_premium": { + "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA_PREMIUM"), + "sheets": [ + os.getenv("SHEET_NEW_LAYOUT_THA_PREMIUM") + ] + } +} + +def get_gspread_client(): + SCOPES = [ + 'https://www.googleapis.com/auth/spreadsheets', + 'https://www.googleapis.com/auth/drive' + ] + + creds = None + + service_account_email = os.getenv("GOOGLE_SERVICE_ACCOUNT_EMAIL") + private_key = os.getenv("GOOGLE_PRIVATE_KEY").replace("\\n", "\n") + + credentials_info = { + "type": "service_account", + "client_email": service_account_email, + "private_key": private_key, + "token_uri": "https://oauth2.googleapis.com/token", + } + + creds = service_account.Credentials.from_service_account_info( + credentials_info, + scopes=SCOPES + ) + + return gspread.authorize(creds) + + +class LockManager: + def __init__(self): + # { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} } + self.locks: Dict[str, dict] = {} + self.timeout = 30 + + def get_room_key(self, country: str, catalog: str) -> str: + return f"{country}:{catalog}" + + def acquire(self, country: str, catalog: str, user_id: str): + now = time.time() + room_key = self.get_room_key(country, catalog) + + # (1 person, 1 room) + for key, data in list(self.locks.items()): + if data and data["user_id"] == user_id and key != room_key: + if now - data["timestamp"] < self.timeout: + return False, "You are already in another room." + else: + self.locks[key] = None + + # Check if the room you're requesting is already occupied by someone else. + current_lock = self.locks.get(room_key) + if current_lock and current_lock["user_id"] != user_id: + if now - current_lock["timestamp"] < self.timeout: + return False, f"Room locked by {current_lock['user_id']}" + + # queue + self.locks[room_key] = {"user_id": user_id, "timestamp": now} + return True, user_id + + def keep_alive(self, country: str, catalog: str, user_id: str): + now = time.time() + room_key = self.get_room_key(country, catalog) + lock = self.locks.get(room_key) + + # Renew + if lock and lock["user_id"] == user_id: + if now - lock["timestamp"] < self.timeout: + self.locks[room_key]["timestamp"] = now + return True + return False + + def release(self, country: str, catalog: str, user_id: str): + room_key = self.get_room_key(country, catalog) + lock = self.locks.get(room_key) + + # Clear + if lock and lock["user_id"] == user_id: + self.locks[room_key] = None + return True + return False + + def is_authorized(self, country: str, catalog: str, user_id: str): + now = time.time() + lock = self.locks.get(self.get_room_key(country, catalog)) + if lock and lock["user_id"] == user_id: + if now - lock["timestamp"] < self.timeout: + return True + return False + + def get_lock_info(self, country: str, catalog: str): + now = time.time() + lock = self.locks.get(self.get_room_key(country, catalog)) + if lock and (now - lock["timestamp"] < self.timeout): + return {"is_locked": True, "locked_by": lock["user_id"]} + return {"is_locked": False, "locked_by": None} + +lock_manager = LockManager() + +# -- Redis handler -- +def redis_message_handler(): + pubsub = r.pubsub() + pubsub.subscribe(ENTER_CHANNEL, HEARTBEAT_CHANNEL, EXIT_CHANNEL) + + print(f"[*] Redis Listener started on channels: {ENTER_CHANNEL}, {HEARTBEAT_CHANNEL}, {EXIT_CHANNEL}") + + for message in pubsub.listen(): + if message['type'] != 'message': + continue + + try: + data = json.loads(message['data']) + + payload = data.get("payload", {}) + values = payload.get("values", {}) + user_info = payload.get("user_info", {}) + + country = values.get("country", "").strip().lower() + catalog = values.get("catalog", "") + srv_name = payload.get("srv_name", "") + + user_id = user_info.get("user_id") or user_info.get("id") + + if srv_name != SERVICE_NAME: + continue + + channel = message['channel'] + + if not (user_id and country and catalog): + print(f"[Redis] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + + if channel == ENTER_CHANNEL: + success, msg = lock_manager.acquire(country, catalog, user_id) + + if success: + threading.Thread( + target=process_and_stream_sheet_data, + args=(country, catalog, user_id), + daemon=True + ).start() + + print(f"[Redis] Enter Room: {catalog} | User: {user_id} | Success: {success}") + + elif channel == HEARTBEAT_CHANNEL: + alive = lock_manager.keep_alive(country, catalog, user_id) + if not alive: + print(f"[Redis] Heartbeat Failed: {catalog} | User: {user_id}") + + elif channel == EXIT_CHANNEL: + lock_manager.release(country, catalog, user_id) + print(f"[Redis] Exit Room (Force/Command): {catalog} | User: {user_id}") + + except Exception as e: + print(f"[Redis Error] Error processing message: {e}") + traceback.print_exc() + +def send_stream_notification(msg_type: str, content: any, batch_id: str, current_chunk: int, total_chunks: int, user_id: str): + """ + msg_type: "start", "chunk", "end", "error" + """ + if not FRONTEND_NOTIFY_URL: + print("Warning: FRONTEND_NOTIFY_URL is not set.") + return + + payload = { + "type": "notify", + "payload": { + "from": SERVICE_NAME, + "level": "content", + "msg": msg_type, + "batch_id": batch_id, + "current_chunk": current_chunk, + "total_chunks": total_chunks, + "to": user_id, + "content": content + } + } + + try: + # requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}") + print(payload) + except Exception as e: + print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}") + +def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): + batch_id = str(uuid.uuid4()) + + send_stream_notification("start", {"message": f"Start fetching catalog: {catalog}"}, batch_id, 0, 0, user_id) + + try: + config = COUNTRY_MAPPING.get(country) + client = get_gspread_client() + spreadsheet = client.open_by_key(config["spreadsheet_id"]) + + try: + nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) + nv2_name = next(s for s in config["sheets"] if "new-layout-v2" in s.lower()) + nd_name = next(s for s in config["sheets"] if "name-desc-v2" in s.lower()) + except StopIteration: + raise HTTPException(status_code=404, detail="Sheet mapping configuration error") + + nl_all = spreadsheet.worksheet(nl_name).get_all_values() + nv2_all = spreadsheet.worksheet(nv2_name).get_all_values() + nd_all = spreadsheet.worksheet(nd_name).get_all_values() + + final_result = [] + is_in_block = False + + for i, nl_row in enumerate(nl_all): + col_a = nl_row[0].strip() if len(nl_row) > 0 else "" + if col_a in ["IGNORE", "-"]: col_a = "" + + if not is_in_block: + if catalog in col_a: is_in_block = True + continue + else: + if col_a and catalog not in col_a: break + + name_th = nl_row[2].strip() if len(nl_row) > 2 else "" + name_en = nl_row[3].strip() if len(nl_row) > 3 else "" + + if not name_th and not name_en: + continue + + menu_item = { + "new_layout": {"row_index": i + 1, "values": nl_row}, + "new_layout_v2": [], + "name_desc_v2": [] + } + + # Search Targets (G-K, H-L, I-M) + # G(6)-K(10), H(7)-L(11), I(8)-M(12) + pairs = [ + (nl_row[6] if len(nl_row) > 6 else "-", nl_row[10] if len(nl_row) > 10 else "-"), + (nl_row[7] if len(nl_row) > 7 else "-", nl_row[11] if len(nl_row) > 11 else "-"), + (nl_row[8] if len(nl_row) > 8 else "-", nl_row[12] if len(nl_row) > 12 else "-") + ] + + search_targets = [] + individual_codes = [] + for p1, p2 in pairs: + c1 = p1.strip() if p1.strip() and p1.strip() != "-" else "-" + c2 = p2.strip() if p2.strip() and p2.strip() != "-" else "-" + + # Format: {code1},{code2} + if c1 != "-" or c2 != "-": + search_targets.append(f"{c1},{c2}") + + # keep to find in name-desc-v2 + if c1 != "-": individual_codes.append(c1) + if c2 != "-": individual_codes.append(c2) + + # find in new-layout-v2 (check I, J, K | Index 8, 9, 10) + for j, nv2_row in enumerate(nv2_all): + if len(nv2_row) > 10: # range max is column K + # I(8), J(9), K(10) + v2_vals = [nv2_row[8].strip(), nv2_row[9].strip(), nv2_row[10].strip()] + + # Check target have or not in I, J, K + match_v2 = any(t in v2_vals for t in search_targets) + + if match_v2: + for sub_idx in range(3): # 3 row (Name, Desc, Img) + curr_j = j + sub_idx + if curr_j < len(nv2_all): + row_data = nv2_all[curr_j] + row_info = { + "row_index": curr_j + 1, + "type": row_data[0] if len(row_data) > 0 else "", + "cells": [] + } + # E-H (index 4-7) with only 2 row + if sub_idx < 2: + for c_idx in range(4, 8): + if c_idx < len(row_data): + row_info["cells"].append({ + "value": row_data[c_idx], + "coord": get_coord(curr_j, c_idx) + }) + menu_item["new_layout_v2"].append(row_info) + break + + # find in name-desc-v2 + for code in set(individual_codes): + targets = [f"MENU.{code}.name", f"MENU.{code}.desc"] + for nd_idx, nd_row in enumerate(nd_all): + if len(nd_row) > 0 and nd_row[0].strip() in targets: + nd_info = {"key": nd_row[0], "row_index": nd_idx+1, "cells": []} + for c_idx in range(4, 8): + if c_idx < len(nd_row): + nd_info["cells"].append({ + "value": nd_row[c_idx], + "coord": get_coord(nd_idx, c_idx) + }) + menu_item["name_desc_v2"].append(nd_info) + + final_result.append(menu_item) + # ----------------------------------------------------------------- + + # init chunk + CHUNK_SIZE = 10 + total_items = len(final_result) + total_chunks = math.ceil(total_items / CHUNK_SIZE) + + for i in range(total_chunks): + start_idx = i * CHUNK_SIZE + end_idx = start_idx + CHUNK_SIZE + chunk_data = final_result[start_idx:end_idx] + + send_stream_notification("chunk", chunk_data, batch_id, i + 1, total_chunks, user_id) + + send_stream_notification("end", {"message": "All data sent successfully"}, batch_id, total_chunks, total_chunks, user_id) + + except Exception as e: + traceback.print_exc() + send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, user_id) + +app = FastAPI() + +class CatalogPayloadValues(BaseModel): + country: str + +class CatalogPayload(BaseModel): + user_info: dict = {} + srv_name: str + values: CatalogPayloadValues + +class CatalogRequest(BaseModel): + type: str + payload: CatalogPayload + +class RoomRequest(BaseModel): + country: str + catalog: str + user_id: str + +@app.on_event("startup") +def startup_event(): + thread = threading.Thread(target=redis_message_handler, daemon=True) + thread.start() + +@app.post("/catalogs") +def get_catalogs(req: CatalogRequest): + payload = req.payload + if payload.srv_name != SERVICE_NAME: + raise HTTPException(status_code=400, detail="Invalid service name") + + country = payload.values.country.strip().lower() + config = COUNTRY_MAPPING.get(country) + + if not config or not config["sheets"]: + raise HTTPException(status_code=404, detail="Country or sheets not found") + + try: + # First sheet in Map + target_sheet_name = config["sheets"][0] + client = get_gspread_client() + spreadsheet = client.open_by_key(config["spreadsheet_id"]) + worksheet = spreadsheet.worksheet(target_sheet_name) + + # Get A column + col_a = worksheet.col_values(1) + + catalogs = [] + # Skip Row 1 (Index 0 in List) + for row_idx in range(1, len(col_a)): + val = col_a[row_idx].strip() + if not val or val in ["-", "IGNORE"]: + continue + + # Specify file=... + # format: Name=Test,file=page_catalog_group_recommend.skt + match = re.search(r'file=([^,]+)', val) + if match: + catalog_name = match.group(1).strip() + lock_info = lock_manager.get_lock_info(country, catalog_name) + catalogs.append({ + "catalog": catalog_name, + "row_index": row_idx + 1, # Index in Google Sheet + "status": "locked" if lock_info["is_locked"] else "free", + "locked_by": lock_info["locked_by"] + }) + + return {"status": "success", "country": country, "catalogs": catalogs} + + except Exception as e: + traceback.print_exc() + raise HTTPException(status_code=500, detail=str(e)) + +def get_coord(r, c): + return {"row": r + 1, "col": c + 1} + +@app.get("/catalog/data/{country}/{catalog}/{user_id}") +def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks): + country = country.strip().lower() + if not lock_manager.is_authorized(country, catalog, user_id): + raise HTTPException(status_code=403, detail="Unauthorized access to this room") + + background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id) + + + # return {"status": "success", "country": country, "catalog": catalog, "data": final_result} + return { + "status": "processing", + "country": country, + "catalog": catalog, + "message": "Data stream started. Please listen to frontend notifications." + } + +@app.post("/api/room/enter") +def enter_room(req: RoomRequest): + country = req.country.strip().lower() + success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id) + + if success: + threading.Thread( + target=process_and_stream_sheet_data, + args=(country, req.catalog, req.user_id), + daemon=True + ).start() + + return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout} + else: + raise HTTPException(status_code=403, detail=message_or_user) + +# @app.post("/api/room/heartbeat") +# def heartbeat(req: RoomRequest): +# country = req.country.strip().lower() +# if lock_manager.keep_alive(country, req.catalog, req.user_id): +# return {"status": "alive"} +# else: +# raise HTTPException(status_code=401, detail="Timeout permission denied") + +@app.post("/api/room/exit") +def exit_room(req: RoomRequest): + country = req.country.strip().lower() + if lock_manager.release(country, req.catalog, req.user_id): + return {"status": "success", "message": "Successfully exited"} + else: + return {"status": "success", "message": "You no longer have permission"} \ No newline at end of file diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..d0db620 --- /dev/null +++ b/nginx.conf @@ -0,0 +1,9 @@ +server { + + listen 80; + + location /static/ { + alias /taobin_project/; + } + +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d231bfe --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +google-auth +google-auth-oauthlib +google-api-python-client +dotenv +redis +requests +gspread +fastapi +uvicorn +pydantic \ No newline at end of file