[sheet-service] update redis subscribe channel /price, /add/price, /update/price, /delete/price

This commit is contained in:
Ittipat Lusuk 2026-05-12 14:29:38 +07:00
parent 2612d33eb2
commit 4ea9aa77e5
2 changed files with 325 additions and 10 deletions

307
main.py
View file

@ -33,6 +33,11 @@ ADD_CATALOG_CHANNEL = f"{SERVICE_NAME}/add/catalog"
ADD_MENU_CHANNEL = f"{SERVICE_NAME}/add/menu"
SWAP_MENU_CHANNEL = f"{SERVICE_NAME}/swap/menu"
GET_PRICE_CHANNEL = f"{SERVICE_NAME}/price"
ADD_PRICE_CHANNEL = f"{SERVICE_NAME}/add/price"
UPDATE_PRICE_CHANNEL = f"{SERVICE_NAME}/update/price"
DELETE_PRICE_CHANNEL = f"{SERVICE_NAME}/delete/price"
# Grist set up ...
GRIST_URL = os.getenv("GRIST_URL")
GRIST_API_KEY = os.getenv("GRIST_API_KEY")
@ -47,23 +52,23 @@ COUNTRY_MAPPING = {
"sheets": {
"new-layout": os.getenv("SHEET_NEW_LAYOUT_THA"),
"new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA")
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA"),
"price": os.getenv("SHEET_PRICE_THA")
},
"grist_doc_id": {
"new-layout": os.getenv("DOC_ID_NEW_LAYOUT_THA"),
"new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA")
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA"),
"price": os.getenv("DOC_ID_PRICE_THA")
}
},
"tha-no-layout": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_THA"),
"ltu": {
"spreadsheet_id": os.getenv("SPREAD_SHEET_ID_LTU"),
"sheets": {
"new-layout-v2": os.getenv("SHEET_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("SHEET_NAME_DESC_V2_THA")
"price": os.getenv("SHEET_PRICE_LTU")
},
"grist_doc_id": {
"new-layout-v2": os.getenv("DOC_ID_NEW_LAYOUT_V2_THA"),
"name-desc-v2": os.getenv("DOC_ID_NAME_DESC_V2_THA")
"price": os.getenv("DOC_ID_PRICE_LTU")
}
},
"hkg": {
@ -197,7 +202,11 @@ def redis_message_handler():
DELETE_MENU_CHANNEL,
ADD_CATALOG_CHANNEL,
ADD_MENU_CHANNEL,
SWAP_MENU_CHANNEL)
SWAP_MENU_CHANNEL,
GET_PRICE_CHANNEL,
ADD_PRICE_CHANNEL,
UPDATE_PRICE_CHANNEL,
DELETE_PRICE_CHANNEL)
print(f"[*] Redis Listener started on service: {SERVICE_NAME}")
@ -376,6 +385,42 @@ def redis_message_handler():
print(f"[{SERVICE_NAME}] Swap data error: {e}")
traceback.print_exc()
elif channel == GET_PRICE_CHANNEL:
if not (content):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}")
continue
try:
handle_get_price(country, user_id, content)
print(f"[{SERVICE_NAME}] Get price menu success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Get price menu error: {e}")
elif channel == ADD_PRICE_CHANNEL:
if not (content):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}")
continue
try:
handle_add_price(country, content)
print(f"[{SERVICE_NAME}] Add price menu success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Add price menu error: {e}")
elif channel == UPDATE_PRICE_CHANNEL:
try:
handle_update_price(country, content)
print(f"[{SERVICE_NAME}] Update price menu success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Update price menu error: {e}")
elif channel == DELETE_PRICE_CHANNEL:
try:
handle_delete_price(country, content)
print(f"[{SERVICE_NAME}] Delete price menu success: {catalog} | User: {user_id}")
except Exception as e:
print(f"[{SERVICE_NAME}] Delete price menu error: {e}")
except Exception as e:
print(f"[Redis Error] Error processing message: {e}")
traceback.print_exc()
@ -429,7 +474,7 @@ def send_stream_notification(msg_type: str, content: any, batch_id: str, current
}
try:
# requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}")
print(payload)
except Exception as e:
@ -1389,6 +1434,238 @@ def find_nv2_block_by_targets(nv2_data, search_targets):
return [nv2_data[i+j]["id"] for j in range(4) if (i+j) < len(nv2_data)]
return []
"""
PRICE-SHEET-MANAGEMENT
"""
def handle_get_price(country: str, user_id: str, content: list):
try:
config = COUNTRY_MAPPING.get(country)
if not config:
raise Exception(f"Country {country} not found")
grist_docs = config.get("grist_doc_id", {})
doc_price = grist_docs.get("price")
if not doc_price:
raise Exception(f"Price doc not found for country {country}")
price_data = fetch_grist_table_data(doc_price, "Price")
headers_grist = {"Authorization": f"Bearer {GRIST_API_KEY}"}
meta_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/columns"
resp_meta = requests.get(meta_url, headers=headers_grist)
header = []
if resp_meta.status_code == 200:
columns = resp_meta.json().get("columns", [])
# id (A, B, C, ...)
sorted_cols = sorted(columns, key=lambda c: col_to_index(c.get("id", "A")))
header = [c.get("fields", {}).get("label", "") for c in sorted_cols]
result = []
for item in content:
product_code = item.get("product_code", "").strip()
if not product_code:
continue
matched_rows = []
for row_obj in price_data:
row_id = row_obj["id"]
row = row_obj["fields"]
# column A (index 0)
if len(row) > 0 and str(row[0]).strip() == product_code:
cells = []
for c_idx, val in enumerate(row):
cells.append({
"value": val,
"coord": {
"row": row_id,
"col": c_idx + 1
}
})
matched_rows.append({
"row_index": row_id,
"cells": cells
})
result.append({
"key": product_code,
"header": header,
"payload": matched_rows
})
if FRONTEND_NOTIFY_URL:
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"to": user_id,
"content": result
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[{SERVICE_NAME}] Price data sent to user: {user_id} | Products: {len(result)}")
print(result)
return result
except Exception as e:
print(f"[{SERVICE_NAME}] Get price error: {e}")
traceback.print_exc()
if FRONTEND_NOTIFY_URL:
payload = {
"type": "notify",
"payload": {
"from": SERVICE_NAME,
"level": "content",
"to": user_id,
"content": {"error": str(e)}
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
raise
def handle_add_price(country: str, content: list):
config = COUNTRY_MAPPING.get(country)
if not config:
print(f"[{SERVICE_NAME}] Country {country} config not found")
return
grist_docs = config.get("grist_doc_id", {})
doc_price = grist_docs.get("price")
if not doc_price:
print(f"[{SERVICE_NAME}] Price doc not found for country {country}")
return
records = []
for item in content:
cells = item.get("cells", [])
if cells:
records.append(to_grist_record(cells))
if not records:
print(f"[{SERVICE_NAME}] No price records to add")
return
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records"
try:
resp = requests.post(url, headers=headers, json={"records": records})
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Added {len(records)} price records successfully")
else:
print(f"[{SERVICE_NAME}] Error adding price records: {resp.text}")
except Exception as e:
print(f"[{SERVICE_NAME}] Request failed for add price: {e}")
def handle_update_price(country: str, content: list):
"""
Update price to Grist
content: [{"row_index": 361, "cells": [{"value": "x", "coord": {...}}, ...]}, ...]
"""
config = COUNTRY_MAPPING.get(country)
if not config:
print(f"[{SERVICE_NAME}] Country {country} config not found")
return
grist_docs = config.get("grist_doc_id", {})
doc_price = grist_docs.get("price")
if not doc_price:
print(f"[{SERVICE_NAME}] Price doc not found for country {country}")
return
records_to_update = []
for item in content:
row_i = item.get("row_index")
cells = item.get("cells", [])
if not row_i or not cells:
continue
fields = {}
for cell in cells:
col_letter = col_to_letter(cell["coord"]["col"])
fields[col_letter] = str(cell["value"])
records_to_update.append({
"id": row_i,
"fields": fields
})
if not records_to_update:
print(f"[{SERVICE_NAME}] No price records to update")
return
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records"
# Group by field keys (Grist PATCH requirement)
grouped_updates = {}
for record in records_to_update:
field_keys = tuple(sorted(record["fields"].keys()))
if field_keys not in grouped_updates:
grouped_updates[field_keys] = []
grouped_updates[field_keys].append(record)
try:
for field_keys, batch in grouped_updates.items():
resp = requests.patch(update_url, headers=headers, json={"records": batch})
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Updated {len(batch)} price records successfully")
else:
print(f"[{SERVICE_NAME}] Price update failed: {resp.text}")
except Exception as e:
print(f"[{SERVICE_NAME}] Request failed for update price: {e}")
def handle_delete_price(country: str, content: list):
"""
content: [{"target_id": 389}, {"target_id": 393}]
"""
config = COUNTRY_MAPPING.get(country)
if not config:
print(f"[{SERVICE_NAME}] Country {country} config not found")
return
grist_docs = config.get("grist_doc_id", {})
doc_price = grist_docs.get("price")
if not doc_price:
print(f"[{SERVICE_NAME}] Price doc not found for country {country}")
return
target_ids = [int(item.get("target_id")) for item in content if item.get("target_id")]
if not target_ids:
print(f"[{SERVICE_NAME}] Payload to delete price is empty")
return
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
del_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records/delete"
try:
resp = requests.post(del_url, headers=headers, json=target_ids)
print(f"[{SERVICE_NAME}] Deleted Price IDs: {target_ids} | Status: {resp.status_code}")
except Exception as e:
print(f"[{SERVICE_NAME}] Delete price error: {e}")
app = FastAPI()
class CatalogPayloadValues(BaseModel):
@ -1659,6 +1936,8 @@ def sync_sheets_to_grist(country_key):
process_new_layout_sheet(doc_id, header, data_rows)
elif sheet_key == "name-desc-v2":
upload_to_grist_self_hosted(doc_id, "name-desc-v2", header, data_rows)
elif sheet_key == "price":
upload_to_grist_self_hosted(doc_id, "price", header, data_rows)
except Exception as e:
print(f"[{SERVICE_NAME}] Error processing sheet {sheet_name}: {e}")
@ -1919,6 +2198,14 @@ def sync_grist_to_sheets(country_key):
if sheet_key == "name-desc-v2":
rows = fetch_grist_table_data(doc_id, "Name_desc_v2")
if rows:
sheet_rows = [r["fields"] for r in rows]
worksheet.batch_clear(["A2:ZZ"])
worksheet.update(values=sheet_rows, range_name="A2")
print(f"[{SERVICE_NAME}] Updated {len(rows)} rows to sheet '{sheet_name}'.")
elif sheet_key == "price":
rows = fetch_grist_table_data(doc_id, "Price")
if rows:
sheet_rows = [r["fields"] for r in rows]
worksheet.batch_clear(["A2:ZZ"])