[sheet-service] New thread timeout, Retry api requests, Redis public commit channel

This commit is contained in:
Ittipat Lusuk 2026-05-25 13:54:53 +07:00
parent a58b306042
commit 0296cb57f0

602
main.py
View file

@ -129,96 +129,112 @@ class LockManager:
# { "tha:page_catalog_group_coffee.skt": {"user_id": "A", "timestamp": 12345} }
self.locks: Dict[str, dict] = {}
self.timeout = 30
self._mutex = threading.Lock()
def get_room_key(self, country: str, catalog: str) -> str:
return f"{country}:{catalog}"
def acquire(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
# (1 person, 1 room)
for key, data in list(self.locks.items()):
if data and data["user_id"] == user_id and key != room_key:
if now - data["timestamp"] < self.timeout:
return False, "You are already in another room."
else:
self.locks[key] = None
def acquire(self, country: str, catalog: str, user_id: str, payload: dict = None):
with self._mutex:
now = time.time()
room_key = self.get_room_key(country, catalog)
# (1 person, 1 room)
for key, data in list(self.locks.items()):
if data and data["user_id"] == user_id and key != room_key:
if now - data["timestamp"] < self.timeout:
return False, "You are already in another room."
else:
self.locks.pop(key, None)
# Check if the room you're requesting is already occupied by someone else.
current_lock = self.locks.get(room_key)
if current_lock and current_lock["user_id"] != user_id:
if now - current_lock["timestamp"] < self.timeout:
return False, f"Room locked by {current_lock['user_id']}"
# queue
self.locks[room_key] = {"user_id": user_id, "timestamp": now}
return True, user_id
# Check if the room you're requesting is already occupied by someone else.
current_lock = self.locks.get(room_key)
if current_lock and current_lock["user_id"] != user_id:
if now - current_lock["timestamp"] < self.timeout:
return False, f"Room locked by {current_lock['user_id']}"
# queue
self.locks[room_key] = {"user_id": user_id, "timestamp": now, "payload": payload or {}}
return True, user_id
def keep_alive(self, country: str, catalog: str, user_id: str):
now = time.time()
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Renew
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
self.locks[room_key]["timestamp"] = now
return True
return False
with self._mutex:
now = time.time()
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Renew
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
self.locks[room_key]["timestamp"] = now
return True
return False
def release(self, country: str, catalog: str, user_id: str):
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Clear
if lock and lock["user_id"] == user_id:
self.locks[room_key] = None
return True
return False
with self._mutex:
room_key = self.get_room_key(country, catalog)
lock = self.locks.get(room_key)
# Clear
if lock and lock["user_id"] == user_id:
del self.locks[room_key]
return True
return False
def is_authorized(self, country: str, catalog: str, user_id: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
return True
return False
with self._mutex:
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and lock["user_id"] == user_id:
if now - lock["timestamp"] < self.timeout:
return True
return False
def get_lock_info(self, country: str, catalog: str):
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and (now - lock["timestamp"] < self.timeout):
return {"is_locked": True, "locked_by": lock["user_id"]}
return {"is_locked": False, "locked_by": None}
with self._mutex:
now = time.time()
lock = self.locks.get(self.get_room_key(country, catalog))
if lock and (now - lock["timestamp"] < self.timeout):
return {"is_locked": True, "locked_by": lock["user_id"]}
return {"is_locked": False, "locked_by": None}
lock_manager = LockManager()
def timeout_sweeper_task():
print(f"[*] Timeout Sweeper started on service: {SERVICE_NAME}")
while True:
time.sleep(5)
now = time.time()
for room_key, data in list(lock_manager.locks.items()):
if data is not None:
if now - data["timestamp"] >= lock_manager.timeout:
user_id = data["user_id"]
try:
country, catalog = room_key.split(":", 1)
except ValueError:
continue
lock_manager.locks[room_key] = None
# recovery Channel
recovery_payload = {
"type": "recovery",
"payload": payload
}
r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload))
print(f"[{SERVICE_NAME}] User {user_id} timed out from {catalog}. Sent to recovery.")
try:
time.sleep(5)
now = time.time()
for room_key, data in list(lock_manager.locks.items()):
if data is not None:
if now - data["timestamp"] >= lock_manager.timeout:
user_id = data["user_id"]
try:
country, catalog = room_key.split(":", 1)
except ValueError:
continue
lock = lock_manager.locks.pop(room_key, None)
if lock:
try:
# recovery Channel
recovery_payload = {
"type": "recovery",
"payload": lock.get("payload", {})
}
r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload))
print(f"[{SERVICE_NAME}] User {user_id} timed out from {catalog}. Sent to recovery.")
except Exception as e:
print(f"[{SERVICE_NAME}] Recovery publish failed: {e}")
traceback.print_exc()
except Exception as e:
print(f"[Sweeper Error] {e}")
traceback.print_exc()
# -- Redis handler --
def redis_message_handler():
@ -273,7 +289,7 @@ def redis_message_handler():
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel} | User: {user_id}")
continue
success, msg = lock_manager.acquire(country, catalog, user_id)
success, msg = lock_manager.acquire(country, catalog, user_id, payload)
if success:
threading.Thread(
@ -304,14 +320,17 @@ def redis_message_handler():
lock_manager.release(country, catalog, user_id)
print(f"[{SERVICE_NAME}] Exit Room (Force/Command): {catalog} | User: {user_id}")
# public recovery files
recovery_payload = {
"type": "recovery",
"payload": payload,
}
try:
# public recovery files
recovery_payload = {
"type": "recovery",
"payload": payload,
}
r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload))
print(f"[{SERVICE_NAME}] Sent user {user_id} to recovery channel (Exit).")
r.publish(RECOVERY_CHANNEL, json.dumps(recovery_payload))
print(f"[{SERVICE_NAME}] Sent user {user_id} to recovery channel (Exit).")
except Exception as e:
print(f"[{SERVICE_NAME}] Failed to sent user {user_id} to recovery channel (Exit).")
elif channel == GET_CATALOG_CHANNEL:
try:
@ -330,9 +349,17 @@ def redis_message_handler():
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
res = request_with_retry(
"POST",
FRONTEND_NOTIFY_URL,
headers=None,
json=payload,
)
if res.status_code == 200:
print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}")
print(payload)
print(f"[{SERVICE_NAME}] Get catalog success | User: {user_id}")
except Exception as e:
@ -447,6 +474,10 @@ def redis_message_handler():
print(f"[{SERVICE_NAME}] Add price menu error: {e}")
elif channel == UPDATE_PRICE_CHANNEL:
if not (content):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}")
continue
try:
handle_update_price(country, content)
print(f"[{SERVICE_NAME}] Update price menu success: {catalog} | User: {user_id}")
@ -454,6 +485,10 @@ def redis_message_handler():
print(f"[{SERVICE_NAME}] Update price menu error: {e}")
elif channel == DELETE_PRICE_CHANNEL:
if not (content):
print(f"[{SERVICE_NAME}] Missing required parameters | Channel: {channel}")
continue
try:
handle_delete_price(country, content)
print(f"[{SERVICE_NAME}] Delete price menu success: {catalog} | User: {user_id}")
@ -470,7 +505,11 @@ def find_grist_table_id(doc_id, catalog_suffix):
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
try:
resp = requests.get(url, headers=headers)
resp = request_with_retry(
"GET",
url,
headers=headers
)
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for t in tables:
@ -513,8 +552,16 @@ def send_stream_notification(msg_type: str, content: any, batch_id: str, current
}
try:
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}")
res = request_with_retry(
"POST",
FRONTEND_NOTIFY_URL,
headers=None,
json=payload,
)
if res.status_code == 200:
print(f"[Stream] Sent {msg_type} chunk {current_chunk}/{total_chunks} for batch {batch_id}")
print(payload)
except Exception as e:
print(f"[Stream Error] Failed to send {msg_type} for batch {batch_id}: {e}")
@ -651,7 +698,11 @@ def get_catalogs(country: str):
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
resp = request_with_retry(
"GET",
url,
headers=headers
)
catalogs = []
if resp.status_code == 200:
@ -860,7 +911,12 @@ def handle_add_menu(country: str, catalog: str, content: list):
return
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records"
try:
resp = requests.post(url, headers=headers, json={"records": records})
resp = request_with_retry(
"POST",
url,
headers=headers,
json={"records": records}
)
if resp.status_code != 200:
print(f"Error adding to {table_id}: {resp.text}")
except Exception as e:
@ -910,7 +966,12 @@ def handle_add_catalog(country: str, catalog_name: str, catalog: str):
"columns": columns
}]
}
resp = requests.post(url, headers=headers, json=payload)
resp = request_with_retry(
"POST",
url,
headers=headers,
json=payload
)
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Created table {table_id} in doc {doc_id}")
else:
@ -1003,7 +1064,12 @@ def update_sheets(country: str, catalog: str, content: list):
update_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{target_table}/records"
for field_keys, batch in grouped_updates.items():
resp = requests.patch(update_url, headers=headers, json={"records": batch})
resp = request_with_retry(
"PATCH",
update_url,
headers=headers,
json={"records": batch}
)
if resp.status_code != 200:
print(f"[{SERVICE_NAME}] Grist update failed for {target_table} | Doc_id : {doc_id} | Status : {resp.text}")
@ -1177,7 +1243,12 @@ def sync_nv2_to_nl(country: str, catalog: str, nv2_updates: list, headers: dict)
grouped_updates[field_keys].append(record)
for field_keys, batch in grouped_updates.items():
resp = requests.patch(update_url, headers=headers, json={"records": batch})
resp = request_with_retry(
"PATCH",
update_url,
headers=headers,
json={"records": batch}
)
if resp.status_code != 200:
print(f"[{SERVICE_NAME}] NL sync update failed: {resp.text}")
else:
@ -1292,13 +1363,23 @@ def handle_delete_menu(country: str, catalog: str, content: list):
if nl_ids_to_delete:
del_nl_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nl}/tables/{nl_table_id}/records/delete"
resp_nl = requests.post(del_nl_url, headers=headers, json=nl_ids_to_delete)
resp_nl = request_with_retry(
"POST",
del_nl_url,
headers=headers,
json=nl_ids_to_delete
)
print(f"[{SERVICE_NAME}] Deleted NL IDs: {nl_ids_to_delete} | Status: {resp_nl.status_code}")
# === Delete in new-layout-v2 (All country) ===
if nv2_ids_to_delete:
del_nv2_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_nv2}/tables/{nv2_table_id}/records/delete"
resp_v2 = requests.post(del_nv2_url, headers=headers, json=nv2_ids_to_delete)
resp_v2 = request_with_retry(
"POST",
del_nv2_url,
headers=headers,
json=nv2_ids_to_delete
)
print(f"[{SERVICE_NAME}] Deleted NV2 IDs: {nv2_ids_to_delete} | Status: {resp_v2.status_code}")
except Exception as e:
@ -1432,11 +1513,21 @@ def handle_swap_menu(country: str, catalog: str, content: list):
headers = {"Authorization": f"Bearer {GRIST_API_KEY}", "Content-Type": "application/json"}
if has_nl and nl_records_to_update:
resp_v1 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nl}/tables/{nl_table}/records", headers=headers, json={"records": nl_records_to_update})
resp_v1 = request_with_retry(
"PATCH",
f"{GRIST_URL}/api/docs/{doc_nl}/tables/{nl_table}/records",
headers=headers,
json={"records": nl_records_to_update}
)
print(f"[{SERVICE_NAME}] Swap New-layout Result: {resp_v1.status_code}")
if nv2_records_to_update:
resp_v2 = requests.patch(f"{GRIST_URL}/api/docs/{doc_nv2}/tables/{nv2_table}/records", headers=headers, json={"records": nv2_records_to_update})
resp_v2 = request_with_retry(
"PATCH",
f"{GRIST_URL}/api/docs/{doc_nv2}/tables/{nv2_table}/records",
headers=headers,
json={"records": nv2_records_to_update}
)
print(f"[{SERVICE_NAME}] Swap New-layout-v2 Result: {resp_v2.status_code}")
def find_nv2_block_by_id(nv2_data, name_row_id):
@ -1496,7 +1587,12 @@ def handle_get_price(country: str, user_id: str, content: list):
headers_grist = {"Authorization": f"Bearer {GRIST_API_KEY}"}
meta_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/columns"
resp_meta = requests.get(meta_url, headers=headers_grist)
resp_meta = request_with_retry(
"GET",
meta_url,
headers=headers_grist
)
header = []
if resp_meta.status_code == 200:
@ -1550,8 +1646,17 @@ def handle_get_price(country: str, user_id: str, content: list):
"content": result
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
print(f"[{SERVICE_NAME}] Price data sent to user: {user_id} | Products: {len(result)}")
res = request_with_retry(
"POST",
FRONTEND_NOTIFY_URL,
headers=None,
json=payload,
)
if res.status_code == 200:
print(f"[{SERVICE_NAME}] Price data sent to user: {user_id} | Products: {len(result)}")
print(result)
return result
@ -1570,7 +1675,13 @@ def handle_get_price(country: str, user_id: str, content: list):
"content": {"error": str(e)}
}
}
requests.post(FRONTEND_NOTIFY_URL, json=payload, timeout=5)
res = request_with_retry(
"POST",
FRONTEND_NOTIFY_URL,
headers=None,
json=payload,
)
raise
@ -1603,7 +1714,12 @@ def handle_add_price(country: str, content: list):
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records"
try:
resp = requests.post(url, headers=headers, json={"records": records})
resp = request_with_retry(
"POST",
url,
headers=headers,
json={"records": records}
)
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Added {len(records)} price records successfully")
else:
@ -1665,7 +1781,12 @@ def handle_update_price(country: str, content: list):
try:
for field_keys, batch in grouped_updates.items():
resp = requests.patch(update_url, headers=headers, json={"records": batch})
resp = request_with_retry(
"PATCH",
update_url,
headers=headers,
json={"records": batch}
)
if resp.status_code == 200:
print(f"[{SERVICE_NAME}] Updated {len(batch)} price records successfully")
else:
@ -1700,7 +1821,12 @@ def handle_delete_price(country: str, content: list):
del_url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_price}/tables/Price/records/delete"
try:
resp = requests.post(del_url, headers=headers, json=target_ids)
resp = request_with_retry(
"POST",
del_url,
headers=headers,
json=target_ids
)
print(f"[{SERVICE_NAME}] Deleted Price IDs: {target_ids} | Status: {resp.status_code}")
except Exception as e:
print(f"[{SERVICE_NAME}] Delete price error: {e}")
@ -1732,113 +1858,113 @@ def startup_event():
sweeper_thread = threading.Thread(target=timeout_sweeper_task, daemon=True)
sweeper_thread.start()
@app.post("/catalogs")
def get_api_catalogs(req: CatalogRequest):
payload = req.payload
if payload.srv_name != SERVICE_NAME:
raise HTTPException(status_code=400, detail="Invalid service name")
# @app.post("/catalogs")
# def get_api_catalogs(req: CatalogRequest):
# payload = req.payload
# if payload.srv_name != SERVICE_NAME:
# raise HTTPException(status_code=400, detail="Invalid service name")
country = payload.values.country.strip().lower()
config = COUNTRY_MAPPING.get(country)
# country = payload.values.country.strip().lower()
# config = COUNTRY_MAPPING.get(country)
if not config or not config.get("grist_doc_id"):
raise HTTPException(status_code=404, detail="Country or Grist config not found")
# if not config or not config.get("grist_doc_id"):
# raise HTTPException(status_code=404, detail="Country or Grist config not found")
try:
grist_docs = config["grist_doc_id"]
doc_id = None
# try:
# grist_docs = config["grist_doc_id"]
# doc_id = None
if grist_docs.get("new-layout"):
doc_id = grist_docs["new-layout"]
elif grist_docs.get("new-layout-v2"):
doc_id = grist_docs["new-layout-v2"]
else:
print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}")
return None
# if grist_docs.get("new-layout"):
# doc_id = grist_docs["new-layout"]
# elif grist_docs.get("new-layout-v2"):
# doc_id = grist_docs["new-layout-v2"]
# else:
# print(f"[{SERVICE_NAME}] grist_doc_id [layout] is empty in this country | {country}")
# return None
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
# url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
# headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
# resp = requests.get(url, headers=headers, timeout=10)
catalogs = []
if resp.status_code == 200:
tables = resp.json().get("tables", [])
for i, t in enumerate(tables):
t_id = t["id"]
# catalogs = []
# if resp.status_code == 200:
# tables = resp.json().get("tables", [])
# for i, t in enumerate(tables):
# t_id = t["id"]
if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2":
continue
# if t_id.startswith("Grist") or t_id.lower() == "name_desc_v2":
# continue
reconstructed_name = reconstruct_table_name(t_id)
# reconstructed_name = reconstruct_table_name(t_id)
match = re.search(r'file=([^,]+)', reconstructed_name)
if match:
clean_catalog = match.group(1).strip()
else:
clean_catalog = t_id
# match = re.search(r'file=([^,]+)', reconstructed_name)
# if match:
# clean_catalog = match.group(1).strip()
# else:
# clean_catalog = t_id
lock_info = lock_manager.get_lock_info(country, clean_catalog)
catalogs.append({
"catalog": clean_catalog,
"row_index": i,
"status": "locked" if lock_info["is_locked"] else "free",
"locked_by": lock_info["locked_by"]
})
# lock_info = lock_manager.get_lock_info(country, clean_catalog)
# catalogs.append({
# "catalog": clean_catalog,
# "row_index": i,
# "status": "locked" if lock_info["is_locked"] else "free",
# "locked_by": lock_info["locked_by"]
# })
return {"status": "success", "country": country, "catalogs": catalogs}
# return {"status": "success", "country": country, "catalogs": catalogs}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
# except Exception as e:
# traceback.print_exc()
# raise HTTPException(status_code=500, detail=str(e))
@app.get("/catalog/data/{country}/{catalog}/{user_id}")
def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks):
country = country.strip().lower()
if not lock_manager.is_authorized(country, catalog, user_id):
raise HTTPException(status_code=403, detail="Unauthorized access to this room")
# @app.get("/catalog/data/{country}/{catalog}/{user_id}")
# def get_catalog_data(country: str, catalog: str, user_id: str, background_tasks: BackgroundTasks):
# country = country.strip().lower()
# if not lock_manager.is_authorized(country, catalog, user_id):
# raise HTTPException(status_code=403, detail="Unauthorized access to this room")
background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id)
# background_tasks.add_task(process_and_stream_sheet_data, country, catalog, user_id)
# return {"status": "success", "country": country, "catalog": catalog, "data": final_result}
return {
"status": "processing",
"country": country,
"catalog": catalog,
"message": "Data stream started. Please listen to frontend notifications."
}
# # return {"status": "success", "country": country, "catalog": catalog, "data": final_result}
# return {
# "status": "processing",
# "country": country,
# "catalog": catalog,
# "message": "Data stream started. Please listen to frontend notifications."
# }
@app.post("/api/room/enter")
def enter_room(req: RoomRequest):
country = req.country.strip().lower()
success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id)
# @app.post("/api/room/enter")
# def enter_room(req: RoomRequest):
# country = req.country.strip().lower()
# success, message_or_user = lock_manager.acquire(country, req.catalog, req.user_id)
if success:
threading.Thread(
target=process_and_stream_sheet_data,
args=(country, req.catalog, req.user_id),
daemon=True
).start()
# if success:
# threading.Thread(
# target=process_and_stream_sheet_data,
# args=(country, req.catalog, req.user_id),
# daemon=True
# ).start()
return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout}
else:
raise HTTPException(status_code=403, detail=message_or_user)
# return {"status": "success", "message": f"Successfully entry catalog: {req.catalog}", "timeout": lock_manager.timeout}
# else:
# raise HTTPException(status_code=403, detail=message_or_user)
@app.post("/api/room/heartbeat")
def heartbeat(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.keep_alive(country, req.catalog, req.user_id):
return {"status": "alive"}
else:
raise HTTPException(status_code=401, detail="Timeout permission denied")
# @app.post("/api/room/heartbeat")
# def heartbeat(req: RoomRequest):
# country = req.country.strip().lower()
# if lock_manager.keep_alive(country, req.catalog, req.user_id):
# return {"status": "alive"}
# else:
# raise HTTPException(status_code=401, detail="Timeout permission denied")
@app.post("/api/room/exit")
def exit_room(req: RoomRequest):
country = req.country.strip().lower()
if lock_manager.release(country, req.catalog, req.user_id):
return {"status": "success", "message": "Successfully exited"}
else:
return {"status": "success", "message": "You no longer have permission"}
# @app.post("/api/room/exit")
# def exit_room(req: RoomRequest):
# country = req.country.strip().lower()
# if lock_manager.release(country, req.catalog, req.user_id):
# return {"status": "success", "message": "Successfully exited"}
# else:
# return {"status": "success", "message": "You no longer have permission"}
@app.get("/grist/pull/sheet/{country}")
def grist_pull_sheet_api(country: str):
@ -1873,6 +1999,66 @@ def export_country(country: str):
"""
Helper
"""
# retry request api
def request_with_retry(
method,
url,
*,
headers=None,
json=None,
data=None,
params=None,
timeout=(5, 30),
max_retries=3,
):
delay = 1
for attempt in range(1, max_retries + 1):
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=json,
data=data,
params=params,
timeout=timeout
)
# retry only server error case
if response.status_code >= 500:
raise requests.HTTPError(
f"HTTP {response.status_code}",
response=response
)
return response
except (
requests.Timeout,
requests.ConnectionError,
requests.HTTPError,
) as e:
print(
f"[Retry {attempt}/{max_retries}] "
f"{method} {url} -> {e}"
)
if attempt >= max_retries:
raise
sleep_time = delay + random.uniform(0, 1)
time.sleep(sleep_time)
delay *= 2
"""
Get data sheet from grist
"""
@ -2026,7 +2212,11 @@ def upload_to_grist_self_hosted(doc_id, table_name, header, rows):
})
try:
check_resp = requests.get(f"{base_url}/{clean_table_id}/records", headers=headers)
check_resp = request_with_retry(
"GET",
f"{base_url}/{clean_table_id}/records",
headers=headers
)
if check_resp.status_code == 200:
existing = check_resp.json().get("records", [])
@ -2034,13 +2224,23 @@ def upload_to_grist_self_hosted(doc_id, table_name, header, rows):
row_ids = [r["id"] for r in existing]
for i in range(0, len(row_ids), chunk_size):
requests.post(f"{base_url}/{clean_table_id}/records/delete",
headers=headers, json=row_ids[i:i+chunk_size])
request_with_retry(
"POST",
f"{base_url}/{clean_table_id}/records/delete",
headers=headers,
json=row_ids[i:i+chunk_size]
)
else:
print(f"[{SERVICE_NAME}] Creating table {clean_table_id} with indexed columns...")
create_resp = requests.post(base_url, headers=headers, json={
create_resp = request_with_retry(
"POST",
base_url,
headers=headers,
json={
"tables": [{"id": clean_table_id, "columns": column_defs}]
})
}
)
if create_resp.status_code == 200:
time.sleep(1)
else:
@ -2061,8 +2261,12 @@ def upload_to_grist_self_hosted(doc_id, table_name, header, rows):
for i in range(0, len(records_to_add), chunk_size):
chunk = records_to_add[i:i+chunk_size]
res = requests.post(f"{base_url}/{clean_table_id}/records",
headers=headers, json={"records": chunk})
res = request_with_retry(
"POST",
f"{base_url}/{clean_table_id}/records",
headers=headers,
json={"records": chunk}
)
if res.status_code != 200:
print(f"Error adding data to {clean_table_id}: {res.text}")
print(f"[{SERVICE_NAME}] Synced {len(records_to_add)} rows to {clean_table_id}")
@ -2169,7 +2373,11 @@ def get_all_grist_tables(doc_id):
""" Pull all table name in Document """
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
resp = request_with_retry(
"GET",
url,
headers=headers
)
if resp.status_code == 200:
tables = resp.json().get("tables", [])
@ -2180,7 +2388,11 @@ def fetch_grist_table_data(doc_id, table_id):
""" pull Record and change to {'A': 'val1', 'B': 'val2'} """
url = f"{GRIST_URL.rstrip('/')}/api/docs/{doc_id}/tables/{table_id}/records"
headers = {"Authorization": f"Bearer {GRIST_API_KEY}"}
resp = requests.get(url, headers=headers)
resp = request_with_retry(
"GET",
url,
headers=headers
)
if resp.status_code != 200:
print(f"Failed to fetch table {table_id}: {resp.text}")