From 0296cb57f07f83998b7084a0235322966c9e80fa Mon Sep 17 00:00:00 2001 From: Ittipat Lusuk Date: Mon, 25 May 2026 13:54:53 +0700 Subject: [PATCH] [sheet-service] New thread timeout, Retry api requests, Redis public commit channel --- main.py | 602 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 407 insertions(+), 195 deletions(-) diff --git a/main.py b/main.py index fdba83f..5e56a7e 100644 --- a/main.py +++ b/main.py @@ -129,96 +129,112 @@ class LockManager: # { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} } self.locks: Dict[str, dict] = {} self.timeout = 30 + self._mutex = threading.Lock() def get_room_key(self, country: str, catalog: str) -> str: return f"{country}:{catalog}" - def acquire(self, country: str, catalog: str, user_id: str): - now = time.time() - room_key = self.get_room_key(country, catalog) - - # (1 person, 1 room) - for key, data in list(self.locks.items()): - if data and data["user_id"] == user_id and key != room_key: - if now - data["timestamp"] < self.timeout: - return False, "You are already in another room." - else: - self.locks[key] = None + def acquire(self, country: str, catalog: str, user_id: str, payload: dict = None): + with self._mutex: + now = time.time() + room_key = self.get_room_key(country, catalog) + + # (1 person, 1 room) + for key, data in list(self.locks.items()): + if data and data["user_id"] == user_id and key != room_key: + if now - data["timestamp"] < self.timeout: + return False, "You are already in another room." + else: + self.locks.pop(key, None) - # Check if the room you're requesting is already occupied by someone else. - current_lock = self.locks.get(room_key) - if current_lock and current_lock["user_id"] != user_id: - if now - current_lock["timestamp"] < self.timeout: - return False, f"Room locked by {current_lock['user_id']}" - - # queue - self.locks[room_key] = {"user_id": user_id, "timestamp": now} - return True, user_id + # Check if the room you're requesting is already occupied by someone else. + current_lock = self.locks.get(room_key) + if current_lock and current_lock["user_id"] != user_id: + if now - current_lock["timestamp"] < self.timeout: + return False, f"Room locked by {current_lock['user_id']}" + + # queue + self.locks[room_key] = {"user_id": user_id, "timestamp": now, "payload": payload or {}} + return True, user_id def keep_alive(self, country: str, catalog: str, user_id: str): - now = time.time() - room_key = self.get_room_key(country, catalog) - lock = self.locks.get(room_key) - - # Renew - if lock and lock["user_id"] == user_id: - if now - lock["timestamp"] < self.timeout: - self.locks[room_key]["timestamp"] = now - return True - return False + with self._mutex: + now = time.time() + room_key = self.get_room_key(country, catalog) + lock = self.locks.get(room_key) + + # Renew + if lock and lock["user_id"] == user_id: + if now - lock["timestamp"] < self.timeout: + self.locks[room_key]["timestamp"] = now + return True + return False def release(self, country: str, catalog: str, user_id: str): - room_key = self.get_room_key(country, catalog) - lock = self.locks.get(room_key) - - # Clear - if lock and lock["user_id"] == user_id: - self.locks[room_key] = None - return True - return False + with self._mutex: + room_key = self.get_room_key(country, catalog) + lock = self.locks.get(room_key) + + # Clear + if lock and lock["user_id"] == user_id: + del self.locks[room_key] + return True + return False def is_authorized(self, country: str, catalog: str, user_id: str): - now = time.time() - lock = self.locks.get(self.get_room_key(country, catalog)) - if lock and lock["user_id"] == user_id: - if now - lock["timestamp"] < self.timeout: - return True - return False + with self._mutex: + now = time.time() + lock = self.locks.get(self.get_room_key(country, catalog)) + if lock and lock["user_id"] == user_id: + if now - lock["timestamp"] < self.timeout: + return True + return False def get_lock_info(self, country: str, catalog: str): - now = time.time() - lock = self.locks.get(self.get_room_key(country, catalog)) - if lock and (now - lock["timestamp"] < self.timeout): - return {"is_locked": True, "locked_by": lock["user_id"]} - return {"is_locked": False, "locked_by": None} + with self._mutex: + now = time.time() + lock = self.locks.get(self.get_room_key(country, catalog)) + if lock and (now - lock["timestamp"] < self.timeout): + return {"is_locked": True, "locked_by": lock["user_id"]} + return {"is_locked": False, "locked_by": None} lock_manager = LockManager() def timeout_sweeper_task(): print(f"[*] Timeout Sweeper started on service: {SERVICE_NAME}") while True: - time.sleep(5) - now = time.time() - - for room_key, data in list(lock_manager.locks.items()): - if data is not None: - if now - data["timestamp"] >= lock_manager.timeout: - user_id = data["user_id"] - - try: - country, catalog = room_key.split(":", 1) - except ValueError: - continue - - lock_manager.locks[room_key] = None - - # recovery Channel - recovery_payload = { - "type": "recovery", - "payload": payload - } - r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload)) - print(f"[{SERVICE_NAME}] User {user_id} timed out from {catalog}. Sent to recovery.") + try: + time.sleep(5) + now = time.time() + + for room_key, data in list(lock_manager.locks.items()): + if data is not None: + if now - data["timestamp"] >= lock_manager.timeout: + user_id = data["user_id"] + + try: + country, catalog = room_key.split(":", 1) + except ValueError: + continue + + lock = lock_manager.locks.pop(room_key, None) + + if lock: + try: + # recovery Channel + recovery_payload = { + "type": "recovery", + "payload": lock.get("payload", {}) + } + r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload)) + print(f"[{SERVICE_NAME}] User {user_id} timed out from {catalog}. Sent to recovery.") + except Exception as e: + print(f"[{SERVICE_NAME}] Recovery publish failed: {e}") + traceback.print_exc() + + except Exception as e: + print(f"[Sweeper Error] {e}") + traceback.print_exc() # -- Redis handler -- def redis_message_handler(): @@ -273,7 +289,7 @@ def redis_message_handler(): print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}") continue - success, msg = lock_manager.acquire(country, catalog, user_id) + success, msg = lock_manager.acquire(country, catalog, user_id, payload) if success: threading.Thread( @@ -304,14 +320,17 @@ def redis_message_handler(): lock_manager.release(country, catalog, user_id) print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}") - # public recovery files - recovery_payload = { - "type": "recovery", - "payload": payload, - } + try: + # public recovery files + recovery_payload = { + "type": "recovery", + "payload": payload, + } - r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload)) - print(f"[{SERVICE_NAME}] Sent user {user_id} to recovery channel (Exit).") + r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload)) + print(f"[{SERVICE_NAME}] Sent user {user_id} to recovery channel (Exit).") + except Exception as e: + print(f"[{SERVICE_NAME}] Failed to sent user {user_id} to recovery channel (Exit).") elif channel == GET_CATALOG_CHANNEL: try: @@ -330,9 +349,17 @@ def redis_message_handler(): } } - requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + res = request_with_retry( + "POST", + FRONTEND_NOTIFY_URL, + headers=None, + json=payload, + ) + + if res.status_code == 200: + print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}") + print(payload) - print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}") except Exception as e: @@ -447,6 +474,10 @@ def redis_message_handler(): print(f"[{SERVICE_NAME}] Add price menu error: {e}") elif channel == UPDATE_PRICE_CHANNEL: + if not (content): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}") + continue + try: handle_update_price(country, content) print(f"[{SERVICE_NAME}] Update price menu success: {catalog} | User: {user_id}") @@ -454,6 +485,10 @@ def redis_message_handler(): print(f"[{SERVICE_NAME}] Update price menu error: {e}") elif channel == DELETE_PRICE_CHANNEL: + if not (content): + print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}") + continue + try: handle_delete_price(country, content) print(f"[{SERVICE_NAME}] Delete price menu success: {catalog} | User: {user_id}") @@ -470,7 +505,11 @@ def find_grist_table_id(doc_id, catalog_suffix): headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} try: - resp = requests.get(url, headers=headers) + resp = request_with_retry( + "GET", + url, + headers=headers + ) if resp.status_code == 200: tables = resp.json().get("tables", []) for t in tables: @@ -513,8 +552,16 @@ def send_stream_notification(msg_type: str, content: any, batch_id: str, current } try: - requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) - print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}") + res = request_with_retry( + "POST", + FRONTEND_NOTIFY_URL, + headers=None, + json=payload, + ) + + if res.status_code == 200: + print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}") + print(payload) except Exception as e: print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}") @@ -651,7 +698,11 @@ def get_catalogs(country: str): url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} - resp = requests.get(url, headers=headers) + resp = request_with_retry( + "GET", + url, + headers=headers + ) catalogs = [] if resp.status_code == 200: @@ -860,7 +911,12 @@ def handle_add_menu(country: str, catalog: str, content: list): return url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records" try: - resp = requests.post(url, headers=headers, json={"records": records}) + resp = request_with_retry( + "POST", + url, + headers=headers, + json={"records": records} + ) if resp.status_code != 200: print(f"Error adding to {table_id}: {resp.text}") except Exception as e: @@ -910,7 +966,12 @@ def handle_add_catalog(country: str, catalog_name: str, catalog: str): "columns": columns }] } - resp = requests.post(url, headers=headers, json=payload) + resp = request_with_retry( + "POST", + url, + headers=headers, + json=payload + ) if resp.status_code == 200: print(f"[{SERVICE_NAME}] Created table {table_id} in doc {doc_id}") else: @@ -1003,7 +1064,12 @@ def update_sheets(country: str, catalog: str, content: list): update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{target_table}/records" for field_keys, batch in grouped_updates.items(): - resp = requests.patch(update_url, headers=headers, json={"records": batch}) + resp = request_with_retry( + "PATCH", + update_url, + headers=headers, + json={"records": batch} + ) if resp.status_code != 200: print(f"[{SERVICE_NAME}] Grist update failed for {target_table} | Doc_id : {doc_id} | Status : {resp.text}") @@ -1177,7 +1243,12 @@ def sync_nv2_to_nl(country: str, catalog: str, nv2_updates: list, headers: dict) grouped_updates[field_keys].append(record) for field_keys, batch in grouped_updates.items(): - resp = requests.patch(update_url, headers=headers, json={"records": batch}) + resp = request_with_retry( + "PATCH", + update_url, + headers=headers, + json={"records": batch} + ) if resp.status_code != 200: print(f"[{SERVICE_NAME}] NL sync update failed: {resp.text}") else: @@ -1292,13 +1363,23 @@ def handle_delete_menu(country: str, catalog: str, content: list): if nl_ids_to_delete: del_nl_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl}/tables/{nl_table_id}/records/delete" - resp_nl = requests.post(del_nl_url, headers=headers, json=nl_ids_to_delete) + resp_nl = request_with_retry( + "POST", + del_nl_url, + headers=headers, + json=nl_ids_to_delete + ) print(f"[{SERVICE_NAME}] Deleted NL IDs: {nl_ids_to_delete} | Status: {resp_nl.status_code}") # === Delete in new-layout-v2 (All country) === if nv2_ids_to_delete: del_nv2_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nv2}/tables/{nv2_table_id}/records/delete" - resp_v2 = requests.post(del_nv2_url, headers=headers, json=nv2_ids_to_delete) + resp_v2 = request_with_retry( + "POST", + del_nv2_url, + headers=headers, + json=nv2_ids_to_delete + ) print(f"[{SERVICE_NAME}] Deleted NV2 IDs: {nv2_ids_to_delete} | Status: {resp_v2.status_code}") except Exception as e: @@ -1432,11 +1513,21 @@ def handle_swap_menu(country: str, catalog: str, content: list): headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"} if has_nl and nl_records_to_update: - resp_v1 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nl}/tables/{nl_table}/records", headers=headers, json={"records": nl_records_to_update}) + resp_v1 = request_with_retry( + "PATCH", + f"{GRIST_URL}/api/docs/{doc_nl}/tables/{nl_table}/records", + headers=headers, + json={"records": nl_records_to_update} + ) print(f"[{SERVICE_NAME}] Swap New-layout Result: {resp_v1.status_code}") if nv2_records_to_update: - resp_v2 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nv2}/tables/{nv2_table}/records", headers=headers, json={"records": nv2_records_to_update}) + resp_v2 = request_with_retry( + "PATCH", + f"{GRIST_URL}/api/docs/{doc_nv2}/tables/{nv2_table}/records", + headers=headers, + json={"records": nv2_records_to_update} + ) print(f"[{SERVICE_NAME}] Swap New-layout-v2 Result: {resp_v2.status_code}") def find_nv2_block_by_id(nv2_data, name_row_id): @@ -1496,7 +1587,12 @@ def handle_get_price(country: str, user_id: str, content: list): headers_grist = {"Authorization": f"Bearer {GRIST_API_KEY}"} meta_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/columns" - resp_meta = requests.get(meta_url, headers=headers_grist) + + resp_meta = request_with_retry( + "GET", + meta_url, + headers=headers_grist + ) header = [] if resp_meta.status_code == 200: @@ -1550,8 +1646,17 @@ def handle_get_price(country: str, user_id: str, content: list): "content": result } } - requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) - print(f"[{SERVICE_NAME}] Price data sent to user: {user_id} | Products: {len(result)}") + + res = request_with_retry( + "POST", + FRONTEND_NOTIFY_URL, + headers=None, + json=payload, + ) + + if res.status_code == 200: + print(f"[{SERVICE_NAME}] Price data sent to user: {user_id} | Products: {len(result)}") + print(result) return result @@ -1570,7 +1675,13 @@ def handle_get_price(country: str, user_id: str, content: list): "content": {"error": str(e)} } } - requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5) + + res = request_with_retry( + "POST", + FRONTEND_NOTIFY_URL, + headers=None, + json=payload, + ) raise @@ -1603,7 +1714,12 @@ def handle_add_price(country: str, content: list): url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records" try: - resp = requests.post(url, headers=headers, json={"records": records}) + resp = request_with_retry( + "POST", + url, + headers=headers, + json={"records": records} + ) if resp.status_code == 200: print(f"[{SERVICE_NAME}] Added {len(records)} price records successfully") else: @@ -1665,7 +1781,12 @@ def handle_update_price(country: str, content: list): try: for field_keys, batch in grouped_updates.items(): - resp = requests.patch(update_url, headers=headers, json={"records": batch}) + resp = request_with_retry( + "PATCH", + update_url, + headers=headers, + json={"records": batch} + ) if resp.status_code == 200: print(f"[{SERVICE_NAME}] Updated {len(batch)} price records successfully") else: @@ -1700,7 +1821,12 @@ def handle_delete_price(country: str, content: list): del_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records/delete" try: - resp = requests.post(del_url, headers=headers, json=target_ids) + resp = request_with_retry( + "POST", + del_url, + headers=headers, + json=target_ids + ) print(f"[{SERVICE_NAME}] Deleted Price IDs: {target_ids} | Status: {resp.status_code}") except Exception as e: print(f"[{SERVICE_NAME}] Delete price error: {e}") @@ -1732,113 +1858,113 @@ def startup_event(): sweeper_thread = threading.Thread(target=timeout_sweeper_task, daemon=True) sweeper_thread.start() -@app.post("/catalogs") -def get_api_catalogs(req: CatalogRequest): - payload = req.payload - if payload.srv_name != SERVICE_NAME: - raise HTTPException(status_code=400, detail="Invalid service name") +# @app.post("/catalogs") +# def get_api_catalogs(req: CatalogRequest): +# payload = req.payload +# if payload.srv_name != SERVICE_NAME: +# raise HTTPException(status_code=400, detail="Invalid service name") - country = payload.values.country.strip().lower() - config = COUNTRY_MAPPING.get(country) +# country = payload.values.country.strip().lower() +# config = COUNTRY_MAPPING.get(country) - if not config or not config.get("grist_doc_id"): - raise HTTPException(status_code=404, detail="Country or Grist config not found") +# if not config or not config.get("grist_doc_id"): +# raise HTTPException(status_code=404, detail="Country or Grist config not found") - try: - grist_docs = config["grist_doc_id"] - doc_id = None +# try: +# grist_docs = config["grist_doc_id"] +# doc_id = None - if grist_docs.get("new-layout"): - doc_id = grist_docs["new-layout"] - elif grist_docs.get("new-layout-v2"): - doc_id = grist_docs["new-layout-v2"] - else: - print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}") - return None +# if grist_docs.get("new-layout"): +# doc_id = grist_docs["new-layout"] +# elif grist_docs.get("new-layout-v2"): +# doc_id = grist_docs["new-layout-v2"] +# else: +# print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}") +# return None - url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" - headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} - resp = requests.get(url, headers=headers) +# url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" +# headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} +# resp = requests.get(url, headers=headers, timeout=10) - catalogs = [] - if resp.status_code == 200: - tables = resp.json().get("tables", []) - for i, t in enumerate(tables): - t_id = t["id"] +# catalogs = [] +# if resp.status_code == 200: +# tables = resp.json().get("tables", []) +# for i, t in enumerate(tables): +# t_id = t["id"] - if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2": - continue +# if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2": +# continue - reconstructed_name = reconstruct_table_name(t_id) +# reconstructed_name = reconstruct_table_name(t_id) - match = re.search(r'file=([^,]+)', reconstructed_name) - if match: - clean_catalog = match.group(1).strip() - else: - clean_catalog = t_id +# match = re.search(r'file=([^,]+)', reconstructed_name) +# if match: +# clean_catalog = match.group(1).strip() +# else: +# clean_catalog = t_id - lock_info = lock_manager.get_lock_info(country, clean_catalog) - catalogs.append({ - "catalog": clean_catalog, - "row_index": i, - "status": "locked" if lock_info["is_locked"] else "free", - "locked_by": lock_info["locked_by"] - }) +# lock_info = lock_manager.get_lock_info(country, clean_catalog) +# catalogs.append({ +# "catalog": clean_catalog, +# "row_index": i, +# "status": "locked" if lock_info["is_locked"] else "free", +# "locked_by": lock_info["locked_by"] +# }) - return {"status": "success", "country": country, "catalogs": catalogs} +# return {"status": "success", "country": country, "catalogs": catalogs} - except Exception as e: - traceback.print_exc() - raise HTTPException(status_code=500, detail=str(e)) +# except Exception as e: +# traceback.print_exc() +# raise HTTPException(status_code=500, detail=str(e)) -@app.get("/catalog/data/{country}/{catalog}/{user_id}") -def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks): - country = country.strip().lower() - if not lock_manager.is_authorized(country, catalog, user_id): - raise HTTPException(status_code=403, detail="Unauthorized access to this room") +# @app.get("/catalog/data/{country}/{catalog}/{user_id}") +# def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks): +# country = country.strip().lower() +# if not lock_manager.is_authorized(country, catalog, user_id): +# raise HTTPException(status_code=403, detail="Unauthorized access to this room") - background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id) +# background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id) - # return {"status": "success", "country": country, "catalog": catalog, "data": final_result} - return { - "status": "processing", - "country": country, - "catalog": catalog, - "message": "Data stream started. Please listen to frontend notifications." - } +# # return {"status": "success", "country": country, "catalog": catalog, "data": final_result} +# return { +# "status": "processing", +# "country": country, +# "catalog": catalog, +# "message": "Data stream started. Please listen to frontend notifications." +# } -@app.post("/api/room/enter") -def enter_room(req: RoomRequest): - country = req.country.strip().lower() - success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id) +# @app.post("/api/room/enter") +# def enter_room(req: RoomRequest): +# country = req.country.strip().lower() +# success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id) - if success: - threading.Thread( - target=process_and_stream_sheet_data, - args=(country, req.catalog, req.user_id), - daemon=True - ).start() +# if success: +# threading.Thread( +# target=process_and_stream_sheet_data, +# args=(country, req.catalog, req.user_id), +# daemon=True +# ).start() - return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout} - else: - raise HTTPException(status_code=403, detail=message_or_user) +# return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout} +# else: +# raise HTTPException(status_code=403, detail=message_or_user) -@app.post("/api/room/heartbeat") -def heartbeat(req: RoomRequest): - country = req.country.strip().lower() - if lock_manager.keep_alive(country, req.catalog, req.user_id): - return {"status": "alive"} - else: - raise HTTPException(status_code=401, detail="Timeout permission denied") +# @app.post("/api/room/heartbeat") +# def heartbeat(req: RoomRequest): +# country = req.country.strip().lower() +# if lock_manager.keep_alive(country, req.catalog, req.user_id): +# return {"status": "alive"} +# else: +# raise HTTPException(status_code=401, detail="Timeout permission denied") -@app.post("/api/room/exit") -def exit_room(req: RoomRequest): - country = req.country.strip().lower() - if lock_manager.release(country, req.catalog, req.user_id): - return {"status": "success", "message": "Successfully exited"} - else: - return {"status": "success", "message": "You no longer have permission"} +# @app.post("/api/room/exit") +# def exit_room(req: RoomRequest): +# country = req.country.strip().lower() +# if lock_manager.release(country, req.catalog, req.user_id): +# return {"status": "success", "message": "Successfully exited"} +# else: +# return {"status": "success", "message": "You no longer have permission"} @app.get("/grist/pull/sheet/{country}") def grist_pull_sheet_api(country: str): @@ -1873,6 +1999,66 @@ def export_country(country: str): """ +Helper + +""" +# retry request api +def request_with_retry( + method, + url, + *, + headers=None, + json=None, + data=None, + params=None, + timeout=(5, 30), + max_retries=3, +): + delay = 1 + + for attempt in range(1, max_retries + 1): + try: + response = requests.request( + method=method, + url=url, + headers=headers, + json=json, + data=data, + params=params, + timeout=timeout + ) + + # retry only server error case + if response.status_code >= 500: + raise requests.HTTPError( + f"HTTP {response.status_code}", + response=response + ) + + return response + + except ( + requests.Timeout, + requests.ConnectionError, + requests.HTTPError, + ) as e: + + print( + f"[Retry {attempt}/{max_retries}] " + f"{method} {url} -> {e}" + ) + + if attempt >= max_retries: + raise + + sleep_time = delay + random.uniform(0, 1) + + time.sleep(sleep_time) + + delay *= 2 + +""" + Get data sheet from grist """ @@ -2026,7 +2212,11 @@ def upload_to_grist_self_hosted(doc_id, table_name, header, rows): }) try: - check_resp = requests.get(f"{base_url}/{clean_table_id}/records", headers=headers) + check_resp = request_with_retry( + "GET", + f"{base_url}/{clean_table_id}/records", + headers=headers + ) if check_resp.status_code == 200: existing = check_resp.json().get("records", []) @@ -2034,13 +2224,23 @@ def upload_to_grist_self_hosted(doc_id, table_name, header, rows): row_ids = [r["id"] for r in existing] for i in range(0, len(row_ids), chunk_size): - requests.post(f"{base_url}/{clean_table_id}/records/delete", - headers=headers, json=row_ids[i:i+chunk_size]) + request_with_retry( + "POST", + f"{base_url}/{clean_table_id}/records/delete", + headers=headers, + json=row_ids[i:i+chunk_size] + ) else: print(f"[{SERVICE_NAME}] Creating table {clean_table_id} with indexed columns...") - create_resp = requests.post(base_url, headers=headers, json={ + + create_resp = request_with_retry( + "POST", + base_url, + headers=headers, + json={ "tables": [{"id": clean_table_id, "columns": column_defs}] - }) + } + ) if create_resp.status_code == 200: time.sleep(1) else: @@ -2061,8 +2261,12 @@ def upload_to_grist_self_hosted(doc_id, table_name, header, rows): for i in range(0, len(records_to_add), chunk_size): chunk = records_to_add[i:i+chunk_size] - res = requests.post(f"{base_url}/{clean_table_id}/records", - headers=headers, json={"records": chunk}) + res = request_with_retry( + "POST", + f"{base_url}/{clean_table_id}/records", + headers=headers, + json={"records": chunk} + ) if res.status_code != 200: print(f"Error adding data to {clean_table_id}: {res.text}") print(f"[{SERVICE_NAME}] Synced {len(records_to_add)} rows to {clean_table_id}") @@ -2169,7 +2373,11 @@ def get_all_grist_tables(doc_id): """ Pull all table name in Document """ url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} - resp = requests.get(url, headers=headers) + resp = request_with_retry( + "GET", + url, + headers=headers + ) if resp.status_code == 200: tables = resp.json().get("tables", []) @@ -2180,7 +2388,11 @@ def fetch_grist_table_data(doc_id, table_id): """ pull Record and change to {'A': 'val1', 'B': 'val2'} """ url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records" headers = {"Authorization": f"Bearer {GRIST_API_KEY}"} - resp = requests.get(url, headers=headers) + resp = request_with_retry( + "GET", + url, + headers=headers + ) if resp.status_code != 200: print(f"Failed to fetch table {table_id}: {resp.text}")