From 4ea9aa77e59734811de4b2990bbb4d13f986f264 Mon Sep 17 00:00:00 2001 From: Ittipat Lusuk Date: Tue, 12 May 2026 14:29:38 +0700 Subject: [PATCH] [sheet-service] update redis subscribe channel /price, /add/price, /update/price, /delete/price --- README.md | 28 +++++ main.py | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 325 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 6043a31..a569f9d 100644 --- a/README.md +++ b/README.md @@ -70,4 +70,32 @@ PUBLISH sheet-service/delete/menu '{"type": "sheet","payload": {"user_info": {"u ``` PUBLISH sheet-service/swap/menu '{"type": "sheet","payload": {"user_info": {"uid": "xxxxxxxxxxxxxxxxxxxxxxxxxx"},"srv_name": "sheet-service","values": {"country": "tha","catalog":"page_catalog_group_coffee.skt","content": [{ "target_id": 1, "source_id": 9 },{ "target_id": 5, "source_id": 1 },{ "target_id": 9, "source_id": 5 }]}}}' +``` + +## PUBLISH sheet-service/price +### Get price payload by productCode + +``` +PUBLISH sheet-service/price '{"type": "sheet","payload": {"user_info": {"uid": "xxxxxxxxxxxxxxxxxxxxxxxxxx"},"srv_name": "sheet-service","values": {"country": "tha","content": [{"product_code": "12-99-02-99"}, {"product_code": "12-01-01-0009"}]}}}' +``` + +## PUBLISH sheet-service/add/price +### Add price row + +``` +PUBLISH sheet-service/add/price '{"type": "sheet","payload": {"user_info": {"uid": "xxxxxxxxxxxxxxxxxxxxxxxxxx"},"srv_name": "sheet-service","values": {"country": "tha","content": [{"cells":["12-99-02-99", "Pepsi Ume Cookie Smoothie", "เต่าทรงพลังสตรอเบอรีโซดา", "", "25", ""]},{"cells":["12-01-01-0009", "HOT TAIWANESE TEACAFÉ LATTE", "", "", "", "40"]}]}}}' +``` + +## PUBLISH sheet-service/update/price +### Update data in price rows + +``` +PUBLISH sheet-service/update/price '{"type": "sheet","payload": {"user_info": {"uid": "xxxxxxxxxxxxxxxxxxxxxxxxxx"},"srv_name": "sheet-service","values": {"country": "tha","content": [{"row_index": 1474,"cells": [{"value": "test update","coord": {"row": 1474,"col": 2}}, {"value": "zx","coord": {"row": 1474,"col": 3}}]},{"row_index": 1475,"cells": [{"value": "not taiwan","coord": {"row": 1475,"col": 2}}]}]}}}' +``` + +## PUBLISH sheet-service/delete/price +### Delete price row + +``` +PUBLISH sheet-service/delete/price '{"type": "sheet","payload": {"user_info": {"uid": "xxxxxxxxxxxxxxxxxxxxxxxxxx"},"srv_name": "sheet-service","values": {"country": "tha","content": [{"target_id":1474}, {"target_id":1475}]}}}' ``` \ No newline at end of file diff --git a/main.py b/main.py index 66b9716..adbaac5 100644 --- a/main.py +++ b/main.py @@ -33,6 +33,11 @@ ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog" ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu" SWAP_MENU_CHANNEL = f"{SERVICE_NAME}/swap/menu" +GET_PRICE_CHANNEL = f"{SERVICE_NAME}/price" +ADD_PRICE_CHANNEL = f"{SERVICE_NAME}/add/price" +UPDATE_PRICE_CHANNEL = f"{SERVICE_NAME}/update/price" +DELETE_PRICE_CHANNEL = f"{SERVICE_NAME}/delete/price" + # Grist set up ... GRIST_URL = os.getenv("GRIST_URL") GRIST_API_KEY = os.getenv("GRIST_API_KEY") @@ -47,23 +52,23 @@ COUNTRY_MAPPING = { "sheets": { "new-layout": os.getenv("SHEET_NEW_LAYOUT_THA"), "new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"), - "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA") + "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA"), + "price": os.getenv("SHEET_PRICE_THA") }, "grist_doc_id": { "new-layout": os.getenv("DOC_ID_NEW_LAYOUT_THA"), "new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"), - "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA") + "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA"), + "price": os.getenv("DOC_ID_PRICE_THA") } }, - "tha-no-layout": { - "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"), + "ltu": { + "spreadsheet_id": os.getenv("SPREAD_SHEET_ID_LTU"), "sheets": { - "new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"), - "name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA") + "price": os.getenv("SHEET_PRICE_LTU") }, "grist_doc_id": { - "new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"), - "name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA") + "price": os.getenv("DOC_ID_PRICE_LTU") } }, "hkg": { @@ -197,7 +202,11 @@ def redis_message_handler(): DELETE_MENU_CHANNEL, ADD_CATALOG_CHANNEL, ADD_MENU_CHANNEL, - SWAP_MENU_CHANNEL) + SWAP_MENU_CHANNEL, + GET_PRICE_CHANNEL, + ADD_PRICE_CHANNEL, + UPDATE_PRICE_CHANNEL, + DELETE_PRICE_CHANNEL) print(f"[*] Redis Listener started on service: {SERVICE_NAME}") @@ -376,6 +385,42 @@ def redis_message_handler(): print(f"[{SERVICE_NAME}] Swap data error: {e}") traceback.print_exc() + elif channel == GET_PRICE_CHANNEL: + if not (content): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}") + continue + + try: + handle_get_price(country, user_id, content) + print(f"[{SERVICE_NAME}] Get price menu success: {catalog} | User: {user_id}") + except Exception as e: + print(f"[{SERVICE_NAME}] Get price menu error: {e}") + + elif channel == ADD_PRICE_CHANNEL: + if not (content): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}") + continue + + try: + handle_add_price(country, content) + print(f"[{SERVICE_NAME}] Add price menu success: {catalog} | User: {user_id}") + except Exception as e: + print(f"[{SERVICE_NAME}] Add price menu error: {e}") + + elif channel == UPDATE_PRICE_CHANNEL: + try: + handle_update_price(country, content) + print(f"[{SERVICE_NAME}] Update price menu success: {catalog} | User: {user_id}") + except Exception as e: + print(f"[{SERVICE_NAME}] Update price menu error: {e}") + + elif channel == DELETE_PRICE_CHANNEL: + try: + handle_delete_price(country, content) + print(f"[{SERVICE_NAME}] Delete price menu success: {catalog} | User: {user_id}") + except Exception as e: + print(f"[{SERVICE_NAME}] Delete price menu error: {e}") + except Exception as e: print(f"[Redis Error] Error processing message: {e}") traceback.print_exc() @@ -429,7 +474,7 @@ def send_stream_notification(msg_type: str, content: any, batch_id: str, current } try: - # requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}") print(payload) except Exception as e: @@ -1389,6 +1434,238 @@ def find_nv2_block_by_targets(nv2_data, search_targets): return [nv2_data[i+j]["id"] for j in range(4) if (i+j) < len(nv2_data)] return [] +""" + +PRICE-SHEET-MANAGEMENT + +""" + +def handle_get_price(country: str, user_id: str, content: list): + + try: + config = COUNTRY_MAPPING.get(country) + if not config: + raise Exception(f"Country {country} not found") + + grist_docs = config.get("grist_doc_id", {}) + doc_price = grist_docs.get("price") + + if not doc_price: + raise Exception(f"Price doc not found for country {country}") + + price_data = fetch_grist_table_data(doc_price, "Price") + + headers_grist = {"Authorization": f"Bearer {GRIST_API_KEY}"} + meta_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/columns" + resp_meta = requests.get(meta_url, headers=headers_grist) + + header = [] + if resp_meta.status_code == 200: + columns = resp_meta.json().get("columns", []) + # id (A, B, C, ...) + sorted_cols = sorted(columns, key=lambda c: col_to_index(c.get("id", "A"))) + header = [c.get("fields", {}).get("label", "") for c in sorted_cols] + + result = [] + + for item in content: + product_code = item.get("product_code", "").strip() + if not product_code: + continue + + matched_rows = [] + for row_obj in price_data: + row_id = row_obj["id"] + row = row_obj["fields"] + + # column A (index 0) + if len(row) > 0 and str(row[0]).strip() == product_code: + cells = [] + for c_idx, val in enumerate(row): + cells.append({ + "value": val, + "coord": { + "row": row_id, + "col": c_idx + 1 + } + }) + + matched_rows.append({ + "row_index": row_id, + "cells": cells + }) + + result.append({ + "key": product_code, + "header": header, + "payload": matched_rows + }) + + if FRONTEND_NOTIFY_URL: + payload = { + "type": "notify", + "payload": { + "from": SERVICE_NAME, + "level": "content", + "to": user_id, + "content": result + } + } + requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + print(f"[{SERVICE_NAME}] Price data sent to user: {user_id} | Products: {len(result)}") + print(result) + + return result + + except Exception as e: + print(f"[{SERVICE_NAME}] Get price error: {e}") + traceback.print_exc() + + if FRONTEND_NOTIFY_URL: + payload = { + "type": "notify", + "payload": { + "from": SERVICE_NAME, + "level": "content", + "to": user_id, + "content": {"error": str(e)} + } + } + requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + + raise + + +def handle_add_price(country: str, content: list): + + config = COUNTRY_MAPPING.get(country) + if not config: + print(f"[{SERVICE_NAME}] Country {country} config not found") + return + + grist_docs = config.get("grist_doc_id", {}) + doc_price = grist_docs.get("price") + + if not doc_price: + print(f"[{SERVICE_NAME}] Price doc not found for country {country}") + return + + records = [] + for item in content: + cells = item.get("cells", []) + if cells: + records.append(to_grist_record(cells)) + + if not records: + print(f"[{SERVICE_NAME}] No price records to add") + return + + headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} + url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records" + + try: + resp = requests.post(url, headers=headers, json={"records": records}) + if resp.status_code == 200: + print(f"[{SERVICE_NAME}] Added {len(records)} price records successfully") + else: + print(f"[{SERVICE_NAME}] Error adding price records: {resp.text}") + except Exception as e: + print(f"[{SERVICE_NAME}] Request failed for add price: {e}") + + +def handle_update_price(country: str, content: list): + """ + Update price to Grist + content: [{"row_index": 361, "cells": [{"value": "x", "coord": {...}}, ...]}, ...] + """ + config = COUNTRY_MAPPING.get(country) + if not config: + print(f"[{SERVICE_NAME}] Country {country} config not found") + return + + grist_docs = config.get("grist_doc_id", {}) + doc_price = grist_docs.get("price") + + if not doc_price: + print(f"[{SERVICE_NAME}] Price doc not found for country {country}") + return + + records_to_update = [] + + for item in content: + row_i = item.get("row_index") + cells = item.get("cells", []) + + if not row_i or not cells: + continue + + fields = {} + for cell in cells: + col_letter = col_to_letter(cell["coord"]["col"]) + fields[col_letter] = str(cell["value"]) + + records_to_update.append({ + "id": row_i, + "fields": fields + }) + + if not records_to_update: + print(f"[{SERVICE_NAME}] No price records to update") + return + + headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} + update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records" + + # Group by field keys (Grist PATCH requirement) + grouped_updates = {} + for record in records_to_update: + field_keys = tuple(sorted(record["fields"].keys())) + if field_keys not in grouped_updates: + grouped_updates[field_keys] = [] + grouped_updates[field_keys].append(record) + + try: + for field_keys, batch in grouped_updates.items(): + resp = requests.patch(update_url, headers=headers, json={"records": batch}) + if resp.status_code == 200: + print(f"[{SERVICE_NAME}] Updated {len(batch)} price records successfully") + else: + print(f"[{SERVICE_NAME}] Price update failed: {resp.text}") + except Exception as e: + print(f"[{SERVICE_NAME}] Request failed for update price: {e}") + + +def handle_delete_price(country: str, content: list): + """ + content: [{"target_id": 389}, {"target_id": 393}] + """ + config = COUNTRY_MAPPING.get(country) + if not config: + print(f"[{SERVICE_NAME}] Country {country} config not found") + return + + grist_docs = config.get("grist_doc_id", {}) + doc_price = grist_docs.get("price") + + if not doc_price: + print(f"[{SERVICE_NAME}] Price doc not found for country {country}") + return + + target_ids = [int(item.get("target_id")) for item in content if item.get("target_id")] + + if not target_ids: + print(f"[{SERVICE_NAME}] Payload to delete price is empty") + return + + headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} + del_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records/delete" + + try: + resp = requests.post(del_url, headers=headers, json=target_ids) + print(f"[{SERVICE_NAME}] Deleted Price IDs: {target_ids} | Status: {resp.status_code}") + except Exception as e: + print(f"[{SERVICE_NAME}] Delete price error: {e}") + app = FastAPI() class CatalogPayloadValues(BaseModel): @@ -1659,6 +1936,8 @@ def sync_sheets_to_grist(country_key): process_new_layout_sheet(doc_id, header, data_rows) elif sheet_key == "name-desc-v2": upload_to_grist_self_hosted(doc_id, "name-desc-v2", header, data_rows) + elif sheet_key == "price": + upload_to_grist_self_hosted(doc_id, "price", header, data_rows) except Exception as e: print(f"[{SERVICE_NAME}] Error processing sheet {sheet_name}: {e}") @@ -1919,6 +2198,14 @@ def sync_grist_to_sheets(country_key): if sheet_key == "name-desc-v2": rows = fetch_grist_table_data(doc_id, "Name_desc_v2") + if rows: + sheet_rows = [r["fields"] for r in rows] + worksheet.batch_clear(["A2:ZZ"]) + worksheet.update(values=sheet_rows, range_name="A2") + print(f"[{SERVICE_NAME}] Updated {len(rows)} rows to sheet '{sheet_name}'.") + elif sheet_key == "price": + rows = fetch_grist_table_data(doc_id, "Price") + if rows: sheet_rows = [r["fields"] for r in rows] worksheet.batch_clear(["A2:ZZ"])