From a58b306042b95a60a4b5f43b67c14863b3d4fdfe Mon Sep 17 00:00:00 2001 From: Ittipat Lusuk Date: Wed, 20 May 2026 15:17:13 +0700 Subject: [PATCH] [sheet-service] update thread to check permission expire, redis sent public recovery --- main.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/main.py b/main.py index adbaac5..fdba83f 100644 --- a/main.py +++ b/main.py @@ -20,6 +20,7 @@ FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL") LOG_SERVER_URL = os.getenv("LOG_SERVER_URL") SERVICE_NAME = os.getenv("SERVICE_NAME") +GEN_SERVICE_NAME = os.getenv("GEN_SERVICE_NAME") ENTER_CHANNEL = f"{SERVICE_NAME}/enter" HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat" @@ -38,6 +39,9 @@ ADD_PRICE_CHANNEL = f"{SERVICE_NAME}/add/price" UPDATE_PRICE_CHANNEL = f"{SERVICE_NAME}/update/price" DELETE_PRICE_CHANNEL = f"{SERVICE_NAME}/delete/price" +# channel in gen-service to recovery git files +RECOVERY_CHANNEL = f"{GEN_SERVICE_NAME}/recovery" + # Grist set up ... GRIST_URL = os.getenv("GRIST_URL") GRIST_API_KEY = os.getenv("GRIST_API_KEY") @@ -190,6 +194,32 @@ class LockManager: lock_manager = LockManager() +def timeout_sweeper_task(): + print(f"[*] Timeout Sweeper started on service: {SERVICE_NAME}") + while True: + time.sleep(5) + now = time.time() + + for room_key, data in list(lock_manager.locks.items()): + if data is not None: + if now - data["timestamp"] >= lock_manager.timeout: + user_id = data["user_id"] + + try: + country, catalog = room_key.split(":", 1) + except ValueError: + continue + + lock_manager.locks[room_key] = None + + # recovery Channel + recovery_payload = { + "type": "recovery", + "payload": payload + } + r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload)) + print(f"[{SERVICE_NAME}] User {user_id} timed out from {catalog}. Sent to recovery.") + # -- Redis handler -- def redis_message_handler(): pubsub = r.pubsub() @@ -274,6 +304,15 @@ def redis_message_handler(): lock_manager.release(country, catalog, user_id) print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}") + # public recovery files + recovery_payload = { + "type": "recovery", + "payload": payload, + } + + r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload)) + print(f"[{SERVICE_NAME}] Sent user {user_id} to recovery channel (Exit).") + elif channel == GET_CATALOG_CHANNEL: try: if not FRONTEND_NOTIFY_URL: @@ -1690,6 +1729,9 @@ def startup_event(): thread = threading.Thread(target=redis_message_handler, daemon=True) thread.start() + sweeper_thread = threading.Thread(target=timeout_sweeper_task, daemon=True) + sweeper_thread.start() + @app.post("/catalogs") def get_api_catalogs(req: CatalogRequest): payload = req.payload