[sheet-service] update thread to check permission expire, redis sent public recovery
This commit is contained in:
parent
4ea9aa77e5
commit
a58b306042
1 changed files with 42 additions and 0 deletions
42
main.py
42
main.py
|
|
@ -20,6 +20,7 @@ FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL")
|
|||
LOG_SERVER_URL = os.getenv("LOG_SERVER_URL")
|
||||
|
||||
SERVICE_NAME = os.getenv("SERVICE_NAME")
|
||||
GEN_SERVICE_NAME = os.getenv("GEN_SERVICE_NAME")
|
||||
|
||||
ENTER_CHANNEL = f"{SERVICE_NAME}/enter"
|
||||
HEARTBEAT_CHANNEL = f"{SERVICE_NAME}/heartbeat"
|
||||
|
|
@ -38,6 +39,9 @@ ADD_PRICE_CHANNEL = f"{SERVICE_NAME}/add/price"
|
|||
UPDATE_PRICE_CHANNEL = f"{SERVICE_NAME}/update/price"
|
||||
DELETE_PRICE_CHANNEL = f"{SERVICE_NAME}/delete/price"
|
||||
|
||||
# channel in gen-service to recovery git files
|
||||
RECOVERY_CHANNEL = f"{GEN_SERVICE_NAME}/recovery"
|
||||
|
||||
# Grist set up ...
|
||||
GRIST_URL = os.getenv("GRIST_URL")
|
||||
GRIST_API_KEY = os.getenv("GRIST_API_KEY")
|
||||
|
|
@ -190,6 +194,32 @@ class LockManager:
|
|||
|
||||
lock_manager = LockManager()
|
||||
|
||||
def timeout_sweeper_task():
|
||||
print(f"[*] Timeout Sweeper started on service: {SERVICE_NAME}")
|
||||
while True:
|
||||
time.sleep(5)
|
||||
now = time.time()
|
||||
|
||||
for room_key, data in list(lock_manager.locks.items()):
|
||||
if data is not None:
|
||||
if now - data["timestamp"] >= lock_manager.timeout:
|
||||
user_id = data["user_id"]
|
||||
|
||||
try:
|
||||
country, catalog = room_key.split(":", 1)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
lock_manager.locks[room_key] = None
|
||||
|
||||
# recovery Channel
|
||||
recovery_payload = {
|
||||
"type": "recovery",
|
||||
"payload": payload
|
||||
}
|
||||
r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload))
|
||||
print(f"[{SERVICE_NAME}] User {user_id} timed out from {catalog}. Sent to recovery.")
|
||||
|
||||
# -- Redis handler --
|
||||
def redis_message_handler():
|
||||
pubsub = r.pubsub()
|
||||
|
|
@ -274,6 +304,15 @@ def redis_message_handler():
|
|||
lock_manager.release(country, catalog, user_id)
|
||||
print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}")
|
||||
|
||||
# public recovery files
|
||||
recovery_payload = {
|
||||
"type": "recovery",
|
||||
"payload": payload,
|
||||
}
|
||||
|
||||
r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload))
|
||||
print(f"[{SERVICE_NAME}] Sent user {user_id} to recovery channel (Exit).")
|
||||
|
||||
elif channel == GET_CATALOG_CHANNEL:
|
||||
try:
|
||||
if not FRONTEND_NOTIFY_URL:
|
||||
|
|
@ -1690,6 +1729,9 @@ def startup_event():
|
|||
thread = threading.Thread(target=redis_message_handler, daemon=True)
|
||||
thread.start()
|
||||
|
||||
sweeper_thread = threading.Thread(target=timeout_sweeper_task, daemon=True)
|
||||
sweeper_thread.start()
|
||||
|
||||
@app.post("/catalogs")
|
||||
def get_api_catalogs(req: CatalogRequest):
|
||||
payload = req.payload
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue