taobin_log/app/main.py
2026-04-27 09:55:10 +07:00

172 lines
No EOL
5.1 KiB
Python

from fastapi import FastAPI, Query
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
import sqlite3
import asyncio
app = FastAPI(title="Log-Server")
DB_PATH = "/app/data/logs.db"
SERVICE_NAME = "log-service"
def get_db():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_db() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS service_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_name TEXT NOT NULL,
log_level TEXT NOT NULL,
message TEXT NOT NULL,
timestamp DATETIME NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
requester_name TEXT NOT NULL,
target_service TEXT NOT NULL,
query_start_time TEXT,
query_end_time TEXT,
request_at DATETIME DEFAULT (datetime('now', '+7 hours'))
)
''')
conn.commit()
async def retention_policy_task():
while True:
try:
limit_date = (datetime.utcnow() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM service_logs WHERE timestamp < ?", (limit_date,))
conn.commit()
print(f"Cleanup: Deleted logs older than {limit_date}")
except Exception as e:
print(f"Cleanup Error: {e}")
await asyncio.sleep(86400)
@app.on_event("startup")
async def startup_event():
init_db()
asyncio.create_task(retention_policy_task())
# class LogEntry(BaseModel):
# service_name: str = Field(..., example="AUTH-SERVICE")
# log_level: str = Field(..., example="INFO")
# message: str = Field(..., example="User logged in successfully")
# timestamp: str = Field(..., example="2024-05-20 14:30:05")
class UserInfo(BaseModel):
displayName: str
email: str
uid: str
class LogValues(BaseModel):
log_level: str
message: str
timestamp: str
class LogPayloadPost(BaseModel):
user_info: UserInfo
srv_name: str
values: LogValues
class LogPayloadGet(BaseModel):
user_info: UserInfo
srv_name: str
start_time: Optional[str] = None
end_time: Optional[str] = None
class LogRequestPost(BaseModel):
type: str
payload: LogPayloadPost
class LogRequestGet(BaseModel):
type: str
payload: LogPayloadGet
@app.post("/add/logs", tags=["Logs Management"])
async def post_log(req: LogRequestPost):
payload = req.payload
values = payload.values
with get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO service_logs (service_name, log_level, message, timestamp) VALUES (?, ?, ?, ?)",
(payload.srv_name, values.log_level, values.message, values.timestamp)
)
conn.commit()
return {
"status": "success",
"message": "Log recorded successfully",
"entry_details": {
"service": payload.srv_name,
"level": values.log_level,
"logged_at": values.timestamp,
"user": payload.user_info.displayName
}
}
@app.post("/get/logs", tags=["Logs Management"])
async def get_logs(req: LogRequestGet):
payload = req.payload
with get_db() as conn:
cursor = conn.cursor()
# audit log
cursor.execute(
"INSERT INTO audit_logs (requester_name, target_service, query_start_time, query_end_time) VALUES (?, ?, ?, ?)",
(
payload.user_info.displayName,
payload.srv_name,
payload.start_time,
payload.end_time
)
)
query = "SELECT * FROM service_logs WHERE service_name = ?"
params = [payload.srv_name]
if payload.start_time:
query += " AND datetime(timestamp) >= datetime(?)"
params.append(payload.start_time)
if payload.end_time:
query += " AND datetime(timestamp) <= datetime(?)"
params.append(payload.end_time)
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
logs = [dict(row) for row in cursor.fetchall()]
return {
"metadata": {
"requester": payload.user_info.displayName,
"target_service": payload.srv_name,
"total_found": len(logs)
},
"results": logs
}
@app.get("/audit-trail", tags=["Requester log"])
async def view_audit():
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, requester_name, target_service, request_at FROM audit_logs ORDER BY request_at DESC")
audits = [dict(row) for row in cursor.fetchall()]
return {
"system_status": "operational",
"requester_count": len(audits),
"history": audits
}