diff --git a/main.py b/main.py index f2818db..2e7f0b7 100644 --- a/main.py +++ b/main.py @@ -25,6 +25,12 @@ ENTER_CHANNEL = f"{SERVICE_NAME}/enter" HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat" EXIT_CHANNEL = f"{SERVICE_NAME}/exit" +GET_CATALOG_CHANNEL = f"{SERVICE_NAME}/catalogs" +UPDATE_MENU_CHANNEL = f"{SERVICE_NAME}/update/menu" +DELETE_MENU_CHANNEL = f"{SERVICE_NAME}/delete/menu" +ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog" +ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu" + r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) ID_PATTERN = r'[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{2}-[a-zA-Z0-9]{4}' @@ -145,9 +151,16 @@ lock_manager = LockManager() # -- Redis handler -- def redis_message_handler(): pubsub = r.pubsub() - pubsub.subscribe(ENTER_CHANNEL, HEARTBEAT_CHANNEL, EXIT_CHANNEL) + pubsub.subscribe(ENTER_CHANNEL, + HEARTBEAT_CHANNEL, + EXIT_CHANNEL, + GET_CATALOG_CHANNEL, + UPDATE_MENU_CHANNEL, + DELETE_MENU_CHANNEL, + ADD_CATALOG_CHANNEL, + ADD_MENU_CHANNEL) - print(f"[*] Redis Listener started on channels: {ENTER_CHANNEL}, {HEARTBEAT_CHANNEL}, {EXIT_CHANNEL}") + print(f"[*] Redis Listener started on service: {SERVICE_NAME}") for message in pubsub.listen(): if message['type'] != 'message': @@ -162,20 +175,26 @@ def redis_message_handler(): country = values.get("country", "").strip().lower() catalog = values.get("catalog", "") + catalog_name = values.get("catalog_name", "") + content = values.get("content", []) srv_name = payload.get("srv_name", "") - user_id = user_info.get("user_id") or user_info.get("id") + user_id = user_info.get("uid") if srv_name != SERVICE_NAME: continue channel = message['channel'] - if not (user_id and country and catalog): - print(f"[Redis] Missing required parameters | Channel: {channel} | User: {user_id}") + if not (user_id and country): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue if channel == ENTER_CHANNEL: + if not catalog: + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + success, msg = lock_manager.acquire(country, catalog, user_id) if success: @@ -185,16 +204,104 @@ def redis_message_handler(): daemon=True ).start() - print(f"[Redis] Enter Room: {catalog} | User: {user_id} | Success: {success}") + print(f"[{SERVICE_NAME}] Enter Room: {catalog} | User: {user_id} | Success: {success}") elif channel == HEARTBEAT_CHANNEL: + if not catalog: + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + alive = lock_manager.keep_alive(country, catalog, user_id) if not alive: - print(f"[Redis] Heartbeat Failed: {catalog} | User: {user_id}") + print(f"[{SERVICE_NAME}] Heartbeat Failed: {catalog} | User: {user_id}") elif channel == EXIT_CHANNEL: + if not catalog: + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + lock_manager.release(country, catalog, user_id) - print(f"[Redis] Exit Room (Force/Command): {catalog} | User: {user_id}") + print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}") + + elif channel == GET_CATALOG_CHANNEL: + try: + if not FRONTEND_NOTIFY_URL: + print("Warning: FRONTEND_NOTIFY_URL is not set.") + continue + + result = get_catalogs(country) + payload = { + "type": "notify", + "payload": { + "from": SERVICE_NAME, + "level": "content", + "to": user_id, + "content": result + } + } + + requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + print(payload) + print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}") + + + except Exception as e: + print(f"[{SERVICE_NAME}] Get catalog error: {e}") + + elif channel == ADD_MENU_CHANNEL: + if not (content and catalog): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + + if not lock_manager.is_authorized(country, catalog, user_id): + print(f"[{SERVICE_NAME}] Add menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") + continue + + try: + handle_add_menu(country, catalog, content) + print(f"[{SERVICE_NAME}] Add menu success: {catalog} | User: {user_id}") + except Exception as e: + print(f"[{SERVICE_NAME}] Add menu error: {e}") + + elif channel == ADD_CATALOG_CHANNEL: + if not (catalog_name, catalog): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + + try: + handle_add_catalog(country, catalog_name, catalog) + except Exception as e: + print(f"[{SERVICE_NAME}] Add catalog error: {e}") + + elif channel == UPDATE_MENU_CHANNEL: + if not (content and catalog): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + + if not lock_manager.is_authorized(country, catalog, user_id): + # raise HTTPException(status_code=403, detail="Unauthorized access to this room") + print(f"[{SERVICE_NAME}] Updated catalog failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") + continue + + try: + update_sheets(country, content) + print(f"[{SERVICE_NAME}] Update success: {catalog} | User: {user_id}") + except Exception as e: + print(f"[{SERVICE_NAME}] Update error: {e}") + + elif channel == DELETE_MENU_CHANNEL: + if not (content and catalog): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") + continue + + if not lock_manager.is_authorized(country, catalog, user_id): + print(f"[{SERVICE_NAME}] Delete menu failed: {catalog} | User: {user_id} | Message: Unauthorized access to this room") + continue + + try: + handle_delete_menu(country, catalog, content) + except Exception as e: + print(f"[{SERVICE_NAME}] Delete menu error: {e}") except Exception as e: print(f"[Redis Error] Error processing message: {e}") @@ -250,6 +357,11 @@ def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): nv2_all = spreadsheet.worksheet(nv2_name).get_all_values() nd_all = spreadsheet.worksheet(nd_name).get_all_values() + # range for new-layout-v2 + nv2_start, nv2_end = find_catalog_range(nv2_all, catalog) + if nv2_start is None: + nv2_start, nv2_end = 0, len(nv2_all) - 1 + final_result = [] is_in_block = False @@ -298,7 +410,8 @@ def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): if c2 != "-": individual_codes.append(c2) # find in new-layout-v2 (check I, J, K | Index 8, 9, 10) - for j, nv2_row in enumerate(nv2_all): + for j in range(nv2_start, nv2_end + 1): + nv2_row = nv2_all[j] if len(nv2_row) > 10: # range max is column K # I(8), J(9), K(10) v2_vals = [nv2_row[8].strip(), nv2_row[9].strip(), nv2_row[10].strip()] @@ -313,7 +426,6 @@ def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): row_data = nv2_all[curr_j] row_info = { "row_index": curr_j + 1, - "type": row_data[0] if len(row_data) > 0 else "", "cells": [] } # E-H (index 4-7) with only 2 row @@ -362,41 +474,33 @@ def process_and_stream_sheet_data(country: str, catalog: str, user_id: str): traceback.print_exc() send_stream_notification("error", {"error_detail": str(e)}, batch_id, 0, 0, user_id) -app = FastAPI() +def get_coord(r, c): + return {"row": r + 1, "col": c + 1} -class CatalogPayloadValues(BaseModel): - country: str +def find_catalog_range(all_rows, catalog: str): + start = None + end = None -class CatalogPayload(BaseModel): - user_info: dict = {} - srv_name: str - values: CatalogPayloadValues + for i, row in enumerate(all_rows): + col_a = row[0].strip() if len(row) > 0 else "" -class CatalogRequest(BaseModel): - type: str - payload: CatalogPayload + if start is None: + if catalog in col_a: + start = i + end = i + else: + if col_a and catalog not in col_a: + break + end = i -class RoomRequest(BaseModel): - country: str - catalog: str - user_id: str + return start, end -@app.on_event("startup") -def startup_event(): - thread = threading.Thread(target=redis_message_handler, daemon=True) - thread.start() - -@app.post("/catalogs") -def get_catalogs(req: CatalogRequest): - payload = req.payload - if payload.srv_name != SERVICE_NAME: - raise HTTPException(status_code=400, detail="Invalid service name") - - country = payload.values.country.strip().lower() +# get catalog +def get_catalogs(country: str): config = COUNTRY_MAPPING.get(country) if not config or not config["sheets"]: - raise HTTPException(status_code=404, detail="Country or sheets not found") + print(f"[{SERVICE_NAME}] Country or sheets not found") try: # First sheet in Map @@ -434,8 +538,366 @@ def get_catalogs(req: CatalogRequest): traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) -def get_coord(r, c): - return {"row": r + 1, "col": c + 1} +""" + +ADD_MENU_CHANNEL + +""" +def find_catalog_block(col_a, catalog: str): + start = None + end = None + + pattern = re.compile(rf"file={re.escape(catalog)}") + + for i, val in enumerate(col_a): + val = val.strip() + + if pattern.search(val): + if start is None: + start = i + end = i + else: + if start is not None: + break + + return start, end + +def handle_add_menu(country: str, catalog: str, content: list): + config = COUNTRY_MAPPING.get(country) + client = get_gspread_client() + sheet = client.open_by_key(config["spreadsheet_id"]) + + nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) + worksheet = sheet.worksheet(nl_name) + + col_a = worksheet.col_values(1) + start, end = find_catalog_block(col_a, catalog) + + if start is None: + return + + insert_at = end + 2 + + worksheet.insert_rows([[]] * len(content), row=insert_at) + + batch_requests = [] + + for i, row in enumerate(content): + row_index = insert_at + i + cells = row.get("cells", []) + + if not cells: + continue + + end_col = col_to_letter(len(cells)) + + batch_requests.append({ + "range": f"A{row_index}:{end_col}{row_index}", + "values": [cells] + }) + + if batch_requests: + worksheet.batch_update(batch_requests, value_input_option="USER_ENTERED") + +""" + +ADD_CATALOG_CHANNEL + +""" + +def handle_add_catalog(country: str, catalog_name: str, catalog: str): + config = COUNTRY_MAPPING.get(country) + client = get_gspread_client() + sheet = client.open_by_key(config["spreadsheet_id"]) + + nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) + ws = sheet.worksheet(nl_name) + + all_values = ws.get_all_values() + + if any(catalog in cell for row in all_values for cell in row): + print(f"[{SERVICE_NAME}] catalog is already exists") + return + + # find final row + last_row = 0 + for i, row in enumerate(all_values, start=1): + if any(cell.strip() for cell in row): + last_row = i + + insert_row = last_row + 2 + + value = f"Name={catalog_name},file={catalog}" + + ws.update(f"A{insert_row}", [[value]]) + + print(f"[{SERVICE_NAME}] added at row {insert_row}") + +""" + +UPDATE_MENU_CHANNEL + +""" +# update sheet +def update_sheets(country: str, content: list): + config = COUNTRY_MAPPING.get(country) + client = get_gspread_client() + spreadsheet = client.open_by_key(config["spreadsheet_id"]) + + requests_map = build_batch_requests(content) + + for sheet_name in config["sheets"]: + sheet_key = None + + if "new-layout-v2" in sheet_name.lower(): + sheet_key = "new_layout_v2" + elif "name-desc-v2" in sheet_name.lower(): + sheet_key = "name_desc_v2" + else: + sheet_key = "new_layout" + + batch_data = requests_map.get(sheet_key, []) + if not batch_data: + continue + + worksheet = spreadsheet.worksheet(sheet_name) + + worksheet.batch_update(batch_data, value_input_option="USER_ENTERED") + +# payload -> batch requests +def build_batch_requests(content): + requests_map = { + "new_layout": {}, + "new_layout_v2": {}, + "name_desc_v2": {} + } + + for item in content: + for sheet_key in requests_map.keys(): + rows = item.get(sheet_key, []) + + if sheet_key == "new_layout": + rows = [rows] if rows else [] + + for row in rows: + cells = row.get("cells", []) + if not cells: + continue + + row_i = cells[0]["coord"]["row"] + + if row_i not in requests_map[sheet_key]: + requests_map[sheet_key][row_i] = {} + + for cell in cells: + col_i = cell["coord"]["col"] + val = cell["value"] + + requests_map[sheet_key][row_i][col_i] = val + + # convert → batch format + final_map = {k: [] for k in requests_map} + + for sheet_key, rows in requests_map.items(): + for row_i, cols in rows.items(): + sorted_cols = sorted(cols.items()) + + chunks = split_into_chunks(sorted_cols) + + for chunk in chunks: + start_col = chunk[0][0] + end_col = chunk[-1][0] + + values = [v for _, v in chunk] + + final_map[sheet_key].append({ + "range": f"{col_to_letter(start_col)}{row_i}:{col_to_letter(end_col)}{row_i}", + "values": [values] + }) + + return final_map + +# column to A1 spreadsheet format +def col_to_letter(col: int) -> str: + result = "" + while col > 0: + col, rem = divmod(col - 1, 26) + result = chr(65 + rem) + result + return result + +def split_into_chunks(sorted_cols): + chunks = [] + current = [sorted_cols[0]] + + for prev, curr in zip(sorted_cols, sorted_cols[1:]): + if curr[0] == prev[0] + 1: + current.append(curr) + else: + chunks.append(current) + current = [curr] + + chunks.append(current) + return chunks + +""" + +DELETE_MENU_CHANNEL + +""" +def handle_delete_menu(country: str, catalog: str, content: list): + config = COUNTRY_MAPPING.get(country) + client = get_gspread_client() + sheet = client.open_by_key(config["spreadsheet_id"]) + + nl_name = next(s for s in config["sheets"] if "new-layout" in s.lower() and "v2" not in s.lower()) + worksheet = sheet.worksheet(nl_name) + + col_a = worksheet.col_values(1) + + # find catalog start range + pattern = re.compile(rf"file={re.escape(catalog)}") + + start = None + end = None + + for i, val in enumerate(col_a): + val = val.strip() + + if start is None: + if pattern.search(val): + start = i + end = i + else: + if "file=" in val: + break + + end = i + + if start is None: + return + + rows = [item["row_index"] for item in content if "row_index" in item] + + # specifically in block + print("start, end:", start, end) + print("rows before filter:", rows) + rows = [ + r for r in rows + if (start + 1) <= r <= (end + 1) + ] + + if not rows: + return + + ranges = group_consecutive_rows(rows) + + requests = [] + + for start_r, end_r in reversed(ranges): + requests.append({ + "deleteDimension": { + "range": { + "sheetId": worksheet.id, + "dimension": "ROWS", + "startIndex": start_r - 1, + "endIndex": end_r + } + } + }) + + sheet.batch_update({ + "requests": requests + }) + +def group_consecutive_rows(rows): + ranges = [] + rows = sorted(rows) + + start = rows[0] + prev = rows[0] + + for r in rows[1:]: + if r == prev + 1: + prev = r + else: + ranges.append((start, prev)) + start = r + prev = r + + ranges.append((start, prev)) + return ranges + +app = FastAPI() + +class CatalogPayloadValues(BaseModel): + country: str + +class CatalogPayload(BaseModel): + user_info: dict = {} + srv_name: str + values: CatalogPayloadValues + +class CatalogRequest(BaseModel): + type: str + payload: CatalogPayload + +class RoomRequest(BaseModel): + country: str + catalog: str + user_id: str + +@app.on_event("startup") +def startup_event(): + thread = threading.Thread(target=redis_message_handler, daemon=True) + thread.start() + +# @app.post("/catalogs") +# def get_catalogs(req: CatalogRequest): +# payload = req.payload +# if payload.srv_name != SERVICE_NAME: +# raise HTTPException(status_code=400, detail="Invalid service name") + +# country = payload.values.country.strip().lower() +# config = COUNTRY_MAPPING.get(country) + +# if not config or not config["sheets"]: +# raise HTTPException(status_code=404, detail="Country or sheets not found") + +# try: +# # First sheet in Map +# target_sheet_name = config["sheets"][0] +# client = get_gspread_client() +# spreadsheet = client.open_by_key(config["spreadsheet_id"]) +# worksheet = spreadsheet.worksheet(target_sheet_name) + +# # Get A column +# col_a = worksheet.col_values(1) + +# catalogs = [] +# # Skip Row 1 (Index 0 in List) +# for row_idx in range(1, len(col_a)): +# val = col_a[row_idx].strip() +# if not val or val in ["-", "IGNORE"]: +# continue + +# # Specify file=... +# # format: Name=Test,file=page_catalog_group_recommend.skt +# match = re.search(r'file=([^,]+)', val) +# if match: +# catalog_name = match.group(1).strip() +# lock_info = lock_manager.get_lock_info(country, catalog_name) +# catalogs.append({ +# "catalog": catalog_name, +# "row_index": row_idx + 1, # Index in Google Sheet +# "status": "locked" if lock_info["is_locked"] else "free", +# "locked_by": lock_info["locked_by"] +# }) + +# return {"status": "success", "country": country, "catalogs": catalogs} + +# except Exception as e: +# traceback.print_exc() +# raise HTTPException(status_code=500, detail=str(e)) @app.get("/catalog/data/{country}/{catalog}/{user_id}") def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks): @@ -470,13 +932,13 @@ def enter_room(req: RoomRequest): else: raise HTTPException(status_code=403, detail=message_or_user) -# @app.post("/api/room/heartbeat") -# def heartbeat(req: RoomRequest): -# country = req.country.strip().lower() -# if lock_manager.keep_alive(country, req.catalog, req.user_id): -# return {"status": "alive"} -# else: -# raise HTTPException(status_code=401, detail="Timeout permission denied") +@app.post("/api/room/heartbeat") +def heartbeat(req: RoomRequest): + country = req.country.strip().lower() + if lock_manager.keep_alive(country, req.catalog, req.user_id): + return {"status": "alive"} + else: + raise HTTPException(status_code=401, detail="Timeout permission denied") @app.post("/api/room/exit") def exit_room(req: RoomRequest):