taobin_log/app/main.py

172 lines
5.1 KiB
Python
Raw Normal View History

2026-04-27 09:55:10 +07:00
from fastapi import FastAPI, Query
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
import sqlite3
import asyncio
app = FastAPI(title="Log-Server")
DB_PATH = "/app/data/logs.db"
SERVICE_NAME = "log-service"
def get_db():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS service_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_name TEXT NOT NULL,
log_level TEXT NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
requester_name TEXT NOT NULL,
target_service TEXT NOT NULL,
query_start_time TEXT,
query_end_time TEXT,
request_at DATETIME DEFAULT (datetime('now', '+7 hours'))
)
''')
conn.commit()
async def retention_policy_task():
while True:
try:
limit_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM service_logs WHERE timestamp < ?", (limit_date,))
conn.commit()
print(f"Cleanup: Deleted logs older than {limit_date}")
except Exception as e:
print(f"Cleanup Error: {e}")
await asyncio.sleep(86400)
@app.on_event("startup")
async def startup_event():
init_db()
asyncio.create_task(retention_policy_task())
# class LogEntry(BaseModel):
# service_name: str = Field(..., example="AUTH-SERVICE")
# log_level: str = Field(..., example="INFO")
# message: str = Field(..., example="User logged in successfully")
# timestamp: str = Field(..., example="2024-05-20 14:30:05")
class UserInfo(BaseModel):
displayName: str
email: str
uid: str
class LogValues(BaseModel):
log_level: str
message: str
timestamp: str
class LogPayloadPost(BaseModel):
user_info: UserInfo
srv_name: str
values: LogValues
class LogPayloadGet(BaseModel):
user_info: UserInfo
srv_name: str
start_time: Optional[str] = None
end_time: Optional[str] = None
class LogRequestPost(BaseModel):
type: str
payload: LogPayloadPost
class LogRequestGet(BaseModel):
type: str
payload: LogPayloadGet
@app.post("/add/logs", tags=["Logs Management"])
async def post_log(req: LogRequestPost):
payload = req.payload
values = payload.values
with get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO service_logs (service_name, log_level, message, timestamp) VALUES (?, ?, ?, ?)",
(payload.srv_name, values.log_level, values.message, values.timestamp)
)
conn.commit()
return {
"status": "success",
"message": "Log recorded successfully",
"entry_details": {
"service": payload.srv_name,
"level": values.log_level,
"logged_at": values.timestamp,
"user": payload.user_info.displayName
}
}
@app.post("/get/logs", tags=["Logs Management"])
async def get_logs(req: LogRequestGet):
payload = req.payload
with get_db() as conn:
cursor = conn.cursor()
# audit log
cursor.execute(
"INSERT INTO audit_logs (requester_name, target_service, query_start_time, query_end_time) VALUES (?, ?, ?, ?)",
(
payload.user_info.displayName,
payload.srv_name,
payload.start_time,
payload.end_time
)
)
query = "SELECT * FROM service_logs WHERE service_name = ?"
params = [payload.srv_name]
if payload.start_time:
query += " AND datetime(timestamp) >= datetime(?)"
params.append(payload.start_time)
if payload.end_time:
query += " AND datetime(timestamp) <= datetime(?)"
params.append(payload.end_time)
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
logs = [dict(row) for row in cursor.fetchall()]
return {
"metadata": {
"requester": payload.user_info.displayName,
"target_service": payload.srv_name,
"total_found": len(logs)
},
"results": logs
}
@app.get("/audit-trail", tags=["Requester log"])
async def view_audit():
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, requester_name, target_service, request_at FROM audit_logs ORDER BY request_at DESC")
audits = [dict(row) for row in cursor.fetchall()]
return {
"system_status": "operational",
"requester_count": len(audits),
"history": audits
}