from fastapi import FastAPI, UploadFile, File, HTTPException, Query, Request from fastapi.responses import FileResponse from pathlib import Path from typing import Optional import shutil import uuid import os import re import json import io import stat as stat_mod import time from datetime import datetime import httpx import paramiko from dotenv import load_dotenv load_dotenv() app = FastAPI() BASE_DIR = Path("/usr/src/app/taobin_project") # BASE_DIR = Path("/taobin_project") SERVICE_NAME = os.getenv("SERVICE_NAME") GIT_REPO_SERVER_URL = os.getenv("GIT_REPO_SERVER_URL") FRONTEND_NOTIFY_URL = os.getenv("FRONTEND_NOTIFY_URL") ALLOWED_FOLDERS = {"page_drink", "page_drink_disable", "page_drink_disable_n", "page_drink_disable_n2", "page_drink_n", "page_drink_picture2_n", "page_drink_press", "page_drink_press_n", "page_drink_select"} ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp"} @app.middleware("http") async def strip_prefix_middleware(request: Request, call_next): path = request.scope["path"] if path.startswith(PREFIX): new_path = path[len(PREFIX):] if not new_path: new_path = "/" request.scope["path"] = new_path print(f"[Middleware] {path} -> {new_path}") response = await call_next(request) return response def validate_folder(folder: str): if folder not in ALLOWED_FOLDERS: raise HTTPException(400, f"folder must be {ALLOWED_FOLDERS}") def validate_ext(filename: str): ext = Path(filename).suffix.lower() if ext not in ALLOWED_EXTENSIONS: raise HTTPException(400, f"file not allow {ext}") return ext def get_image_dir(folder: str, country: Optional[str] = None) -> Path: """ no country → /taobin_project/image/{folder}/ has country → /taobin_project/inter/{country}/image/{folder}/ """ if country: path = BASE_DIR / "inter" / country / "image" / folder else: path = BASE_DIR / "image" / folder path.mkdir(parents=True, exist_ok=True) return path # ───────────────────────────────────────── # ADV (advertisement videos) # ───────────────────────────────────────── # Advertisement videos are distributed to coffee machines via the FTP server # (matching the original cofffeemachineConfig `send_directory.sh adv` flow), # NOT through the git repo used for menu images. # # Naming convention enforces the expected video dimensions: # taobin_adv_menu_*.mp4 → 1080x380 (menu banner ad) # taobin_adv_*.mp4 → 1080x608 (fullscreen/idle ad) # Pixel dimensions are validated on the frontend (browser reads video metadata); # the backend enforces the .mp4 extension and the filename convention, then # uploads via SFTP to /var/ftp/pub/coffeemachine/taobin_project/adv/. ALLOWED_ADV_EXTENSIONS = {".mp4"} ADV_FILENAME_RE = re.compile(r"^taobin_adv_(menu_)?[A-Za-z0-9]+\.mp4$", re.IGNORECASE) # SFTP target for adv distribution (configure via env, never hard-code secrets). # These act as the global default / fallback for every country. # ADV_SFTP_HOST = os.getenv("ADV_SFTP_HOST", "192.168.202.224") ADV_SFTP_HOST = os.getenv("ADV_SFTP_HOST") ADV_SFTP_PORT = int(os.getenv("ADV_SFTP_PORT", "22")) ADV_SFTP_USER = os.getenv("ADV_SFTP_USER", "fssservice") ADV_SFTP_PASSWORD = os.getenv("ADV_SFTP_PASSWORD", "") ADV_SFTP_REMOTE_DIR = os.getenv("ADV_SFTP_REMOTE_DIR") # Per-country SFTP overrides. Each country can target a different host. # JSON map keyed by lowercase country code; any omitted field falls back to the # global ADV_SFTP_* default. Example: # ADV_SFTP_COUNTRY_CONFIG='{"ltu":{"host":"10.0.0.5","password":"..."}, # "mys":{"host":"10.0.0.6","user":"u","password":"..."}}' try: ADV_SFTP_COUNTRY_CONFIG = json.loads(os.getenv("ADV_SFTP_COUNTRY_CONFIG", "{}")) if not isinstance(ADV_SFTP_COUNTRY_CONFIG, dict): ADV_SFTP_COUNTRY_CONFIG = {} except Exception as e: print(f"[ADV CONFIG ERROR] Invalid ADV_SFTP_COUNTRY_CONFIG: {e}") ADV_SFTP_COUNTRY_CONFIG = {} def get_adv_sftp_config(country: str) -> dict: """Resolve the SFTP target for a country, falling back to global defaults.""" override = ADV_SFTP_COUNTRY_CONFIG.get((country or "").lower(), {}) or {} return { "host": override.get("host", ADV_SFTP_HOST), "port": int(override.get("port", ADV_SFTP_PORT)), "user": override.get("user", ADV_SFTP_USER), "password": override.get("password", ADV_SFTP_PASSWORD), "remote_dir": override.get("remote_dir", ADV_SFTP_REMOTE_DIR), } def validate_adv_ext(filename: str): ext = Path(filename).suffix.lower() if ext not in ALLOWED_ADV_EXTENSIONS: raise HTTPException(400, f"adv file must be .mp4, got {ext}") return ext def validate_adv_filename(filename: str): name = Path(filename).name if not ADV_FILENAME_RE.match(name): raise HTTPException( 400, f"adv filename must match taobin_adv_*.mp4 or taobin_adv_menu_*.mp4: {name}" ) def _enable_legacy_ssh(transport): """Old FTP/SSH boxes only offer SHA1 key exchange / host keys / CBC ciphers, which modern paramiko disables by default. Re-enable the ones this paramiko build actually implements (filtered, so we never offer an algo paramiko can't instantiate — that raised KeyError on paramiko 5.x).""" def prepend(current, extra, valid=None): cur = list(current) add = [a for a in extra if (valid is None or a in valid) and a not in cur] return tuple(add + cur) try: transport._preferred_kex = prepend(transport._preferred_kex, ( "diffie-hellman-group-exchange-sha1", "diffie-hellman-group14-sha1", "diffie-hellman-group1-sha1", ), transport._kex_info) transport._preferred_ciphers = prepend(transport._preferred_ciphers, ( "aes128-cbc", "aes256-cbc", "3des-cbc", ), transport._cipher_info) transport._preferred_keys = prepend(transport._preferred_keys, ( "ssh-rsa", "ssh-dss", )) except Exception as e: print(f"[ADV SFTP] legacy algo enable warning: {e}") def open_adv_sftp(cfg: dict): """Open an SFTP session to a country's adv FTP server. Caller must close both.""" if not cfg.get("password"): raise HTTPException(500, f"ADV SFTP password not configured for host {cfg.get('host')}") transport = paramiko.Transport((cfg["host"], cfg["port"])) _enable_legacy_ssh(transport) transport.connect(username=cfg["user"], password=cfg["password"]) sftp = paramiko.SFTPClient.from_transport(transport) return transport, sftp def sftp_ensure_dir(sftp, remote_dir: str): """Create remote directory tree if it does not already exist.""" path = "" for part in remote_dir.strip("/").split("/"): path += "/" + part try: sftp.stat(path) except IOError: sftp.mkdir(path) def rollback_adv_remote(sftp, uploaded: list[dict]): """Best-effort delete of files already put in this request.""" if not sftp: return for item in uploaded: try: sftp.remove(item["remote_path"]) print(f"[ADV ROLLBACK] Deleted remote: {item['remote_path']}") except Exception as e: print(f"[ADV ROLLBACK ERROR] {item['remote_path']}: {e}") # Manifest filename machines read to decide what to download (FileSyncServer). # The original flow hard-codes sync_1.file (`ls -l > sync_1.file`). ADV_SYNC_MANIFEST = os.getenv("ADV_SYNC_MANIFEST", "sync_1.file") # Non-video files that legitimately belong in the adv manifest (the original # `ls -l` lists these too). Comma-separated, configurable. ADV_MANIFEST_EXTRA_FILES = { name.strip() for name in os.getenv("ADV_MANIFEST_EXTRA_FILES", "advertise_add_on.ev").split(",") if name.strip() } def _allowed_in_manifest(name: str) -> bool: """Guard 2 (whitelist): only adv videos, the manifest itself, and known control files may enter the manifest. Stray/temp/partial/odd files on the FTP are excluded so a weird name/size can never be handed to the machines.""" if name == ADV_SYNC_MANIFEST: return True if ADV_FILENAME_RE.match(name): return True if name in ADV_MANIFEST_EXTRA_FILES: return True return False def regenerate_adv_manifest(sftp, remote_dir: str) -> str: """ Rebuild the `ls -l`-style manifest on the FTP server so machines actually pull the new files. The machine's FileSyncServer iterates over the manifest lines (not the live directory) and downloads any entry whose size differs from its local copy. This mirrors the original `ls -l > sync_1.file` that the CLI flow produced on a reference machine. Unlike a raw `ls -l`, this only lists whitelisted files (_allowed_in_manifest) so unexpected files on the FTP can't pollute the manifest. Line format (8 space-separated fields, what the parser expects): -rw-rw---- 1 root sdcard_rw """ entries = sftp.listdir_attr(remote_dir) rows = {} # name -> (size, mtime) skipped = [] for attr in entries: name = attr.filename # ls -l style: skip sub-directories (adv folder is flat). if attr.st_mode is not None and stat_mod.S_ISDIR(attr.st_mode): continue # Guard 2: drop anything that isn't an expected adv file. if not _allowed_in_manifest(name): skipped.append(name) continue # Mimic `ls -l > sync_1.file`: the shell truncates the manifest before # listing, so it appears with size 0 → machines skip it (the parser only # acts when size > 0), avoiding re-downloading the manifest itself. size = 0 if name == ADV_SYNC_MANIFEST else int(attr.st_size or 0) rows[name] = (size, attr.st_mtime or time.time()) # Always self-list the manifest as size 0, even on the first upload when it # isn't on the FTP yet (matches the original `ls -l` which always lists it). rows.setdefault(ADV_SYNC_MANIFEST, (0, time.time())) lines = ["total 0"] for name in sorted(rows): size, mtime = rows[name] dt = datetime.fromtimestamp(mtime) lines.append(f"-rw-rw---- 1 root sdcard_rw {size} {dt:%Y-%m-%d %H:%M} {name}") content = "\n".join(lines) + "\n" remote_path = f"{remote_dir}/{ADV_SYNC_MANIFEST}" sftp.putfo(io.BytesIO(content.encode("utf-8")), remote_path) print(f"[ADV MANIFEST] Rebuilt {remote_path} ({len(lines) - 1} entries)" + (f", skipped {skipped}" if skipped else "")) return remote_path def ensure_manifest_extras(sftp, remote_dir: str, content: str) -> str: """Make sure the configured control files (ADV_MANIFEST_EXTRA_FILES, e.g. advertise_add_on.ev) are listed in the manifest with their REAL FTP size — even if they weren't uploaded this round. Only a file that actually exists on the FTP is added (otherwise the machine would wget a 404). Re-sorts by name.""" total_line = "total 0" entries = {} # name -> full ls -l line for ln in content.splitlines(): parts = ln.split() if not parts: continue if parts[0] == "total": total_line = ln elif len(parts) == 8: entries[parts[7]] = ln for name in ADV_MANIFEST_EXTRA_FILES: if name in entries: continue try: st = sftp.stat(f"{remote_dir}/{name}") except IOError: continue # not on the FTP → don't list it dt = datetime.fromtimestamp(st.st_mtime or time.time()) entries[name] = ( f"-rw-rw---- 1 root sdcard_rw {int(st.st_size or 0)} " f"{dt:%Y-%m-%d %H:%M} {name}" ) return "\n".join([total_line] + [entries[n] for n in sorted(entries)]) + "\n" async def commit_files_to_git( files: list[UploadFile], folder: str, display_name: str, email: str, country: str, git_server_url: str = GIT_REPO_SERVER_URL ) -> dict: """ Commit file(s) to Git repository server via multipart form POST. Args: files: List of UploadFile objects (file pointers will be reset) folder: Folder name used in commit message display_name: Git signature username email: Git signature email git_server_url: Target Git repo server endpoint Returns: Response JSON from Git server """ commit_data = { "signature_username": display_name, "signature_email": email, "message": f"commit {folder} {country}", "ref": SERVICE_NAME } multipart_files = {} if len(files) == 1: file_obj = files[0] file_obj.file.seek(0) commit_data["path"] = Path(file_obj.filename).name multipart_files["file"] = ( Path(file_obj.filename).name, file_obj.file, "application/octet-stream" ) else: for idx, file_obj in enumerate(files, start=1): file_obj.file.seek(0) commit_data[f"path{idx}"] = Path(file_obj.filename).name multipart_files[f"file{idx}"] = ( Path(file_obj.filename).name, file_obj.file, "application/octet-stream" ) # return { # "git_url": git_server_url, # "data": commit_data, # "file": multipart_files # } async with httpx.AsyncClient() as client: response = await client.post( git_server_url + "/commit", data=commit_data, files=multipart_files, timeout=30.0 ) response.raise_for_status() return response.json() def backup_existing_file(dest: Path) -> Optional[Path]: if dest.exists(): backup_path = dest.with_suffix(dest.suffix + f".bak.{uuid.uuid4().hex[:8]}") shutil.copy2(dest, backup_path) print(f"[BACKUP] {dest.name} → {backup_path.name}") return backup_path return None def rollback_files(saved_files: list[dict], backups: dict[str, Optional[Path]], folder: str, country: Optional[str] = None): for item in saved_files: filename = item["filename"] dest = get_image_dir(folder, country) / filename backup = backups.get(filename) try: if backup and backup.exists(): if dest.exists(): dest.unlink() shutil.move(str(backup), str(dest)) print(f"[ROLLBACK] Restored: {filename}") else: if dest.exists(): dest.unlink() print(f"[ROLLBACK] Deleted new file: {filename}") except Exception as e: print(f"[ROLLBACK ERROR] {filename}: {e}") def cleanup_backups(backups: dict[str, Optional[Path]]): for filename, backup in backups.items(): if backup and backup.exists(): try: backup.unlink() except Exception as e: print(f"[CLEANUP ERROR] {backup}: {e}") async def notify_frontend(uid: str, msg: str): if not FRONTEND_NOTIFY_URL: print(f"[NOTIFY SKIP] FRONTEND_NOTIFY_URL not set") return payload = { "type": "notify", "payload": { "from": SERVICE_NAME, "level": "error", "to": uid, "msg": msg } } try: async with httpx.AsyncClient() as client: await client.post(FRONTEND_NOTIFY_URL, json=payload, timeout=10.0) print(f"[NOTIFY SEND SUCCESS] {payload}") except Exception as e: print(f"[NOTIFY ERROR] Failed to notify frontend: {e}") # ───────────────────────────────────────── # UPLOAD # ───────────────────────────────────────── @app.post("/image/{folder}/upload/{uid}/{displayname}/{email}") async def upload_images( folder: str, uid: str, displayname: str, email: str, files: list[UploadFile] = File(...) ): required_user_fields = { "uid": uid, "displayname": displayname, "email": email, "country": "tha" } if not (required_user_fields.get("country")): raise HTTPException(status_code=400, detail="Invalid country") for field, value in required_user_fields.items(): if not str(value).strip(): raise HTTPException( status_code=400, detail=f"Missing or empty user_info.{field}" ) validate_folder(folder) saved = [] backups: dict[str, Optional[Path]] = {} try: for file in files: ext = validate_ext(file.filename) filename = Path(file.filename).name dest = get_image_dir(folder) / filename backups[filename] = backup_existing_file(dest) with open(dest, "wb") as f: shutil.copyfileobj(file.file, f) saved.append({ "filename": filename, "url": f"/image/{folder}/{filename}" }) except Exception as e: rollback_files(saved, backups, folder) error_msg = f"Save image failed: {str(e)}" await notify_frontend(uid=uid, msg=error_msg) raise HTTPException(status_code=500, detail=error_msg) try: git_response = await commit_files_to_git( files=files, folder=folder, display_name=displayname, email=email, country=required_user_fields.get("country") ) except Exception as e: rollback_files(saved, backups, folder) error_msg = f"Git commit failed: {str(e)}" await notify_frontend(uid=uid, msg=error_msg) raise HTTPException(status_code=502, detail=f"{error_msg}, files rolled back") cleanup_backups(backups) return { "uploaded": saved, "git_commit": git_response } @app.post("/adv/upload/{country}/{uid}/{displayname}/{email}") async def upload_adv( country: str, uid: str, displayname: str, email: str, files: list[UploadFile] = File(...), regenerate: bool = Query( True, description="Rebuild sync_1.file from the FTP listing after upload " "(method 1). Set false when the manifest is generated on a " "machine and uploaded separately (method 2)." ) ): required_user_fields = { "country": country, "uid": uid, "displayname": displayname, "email": email } for field, value in required_user_fields.items(): if not str(value).strip(): raise HTTPException( status_code=400, detail=f"Missing or empty {field}" ) # Validate every file up front so a bad name never opens an SFTP session. for file in files: validate_adv_ext(file.filename) validate_adv_filename(file.filename) cfg = get_adv_sftp_config(country) remote_dir = cfg["remote_dir"] uploaded = [] manifest_path = None transport = None sftp = None try: transport, sftp = open_adv_sftp(cfg) sftp_ensure_dir(sftp, remote_dir) for file in files: filename = Path(file.filename).name remote_path = f"{remote_dir}/{filename}" # Local size for the post-upload integrity check (Guard 1). file.file.seek(0, os.SEEK_END) local_size = file.file.tell() file.file.seek(0) sftp.putfo(file.file, remote_path) # Guard 1: a half-written file must never reach the manifest. Verify # the remote size equals the source; mismatch → rollback + fail. remote_size = sftp.stat(remote_path).st_size if remote_size != local_size: raise HTTPException( status_code=502, detail=f"Upload size mismatch for {filename}: " f"local {local_size}, remote {remote_size}" ) uploaded.append({ "filename": filename, "remote_path": remote_path, "size": local_size }) # Method 1: rebuild the manifest from the (now verified) FTP contents. # Skipped when regenerate=false (method 2 uploads a machine-made manifest). if regenerate: manifest_path = regenerate_adv_manifest(sftp, remote_dir) except HTTPException: rollback_adv_remote(sftp, uploaded) raise except Exception as e: rollback_adv_remote(sftp, uploaded) error_msg = f"SFTP upload failed ({country} → {cfg['host']}): {str(e)}" await notify_frontend(uid=uid, msg=error_msg) raise HTTPException(status_code=502, detail=error_msg) finally: if sftp: sftp.close() if transport: transport.close() return { "uploaded": uploaded, "country": country, "sftp_host": cfg["host"], "remote_dir": remote_dir, "manifest": manifest_path } @app.post("/adv/manifest/{country}/{uid}/{displayname}/{email}") async def upload_adv_manifest( country: str, uid: str, displayname: str, email: str, file: UploadFile = File(...) ): """Upload a manifest (built in the browser for method 1, or on a machine via `ls -l` for method 2) to the FTP adv folder. Before writing, control files in ADV_MANIFEST_EXTRA_FILES (e.g. advertise_add_on.ev) are ensured present with their real FTP size, even if they weren't part of this upload.""" for field, value in {"country": country, "uid": uid, "displayname": displayname, "email": email}.items(): if not str(value).strip(): raise HTTPException(status_code=400, detail=f"Missing or empty {field}") cfg = get_adv_sftp_config(country) remote_dir = cfg["remote_dir"] remote_path = f"{remote_dir}/{ADV_SYNC_MANIFEST}" content = (await file.read()).decode("utf-8", errors="replace") transport = None sftp = None try: transport, sftp = open_adv_sftp(cfg) sftp_ensure_dir(sftp, remote_dir) final = ensure_manifest_extras(sftp, remote_dir, content) sftp.putfo(io.BytesIO(final.encode("utf-8")), remote_path) except Exception as e: error_msg = f"Manifest upload failed ({country} → {cfg['host']}): {str(e)}" await notify_frontend(uid=uid, msg=error_msg) raise HTTPException(status_code=502, detail=error_msg) finally: if sftp: sftp.close() if transport: transport.close() return {"country": country, "sftp_host": cfg["host"], "manifest": remote_path} @app.post("/inter/{country}/image/{folder}/upload/{uid}/{displayname}/{email}") async def upload_inter_images( country: str, folder: str, uid: str, displayname: str, email: str, files: list[UploadFile] = File(...) ): required_user_fields = { "uid": uid, "displayname": displayname, "email": email, "country": country } if not (country): raise HTTPException(status_code=400, detail="Invalid country") for field, value in required_user_fields.items(): if not str(value).strip(): raise HTTPException( status_code=400, detail=f"Missing or empty user_info.{field}" ) validate_folder(folder) saved = [] backups: dict[str, Optional[Path]] = {} try: for file in files: ext = validate_ext(file.filename) filename = Path(file.filename).name dest = get_image_dir(folder, country) / filename backups[filename] = backup_existing_file(dest) with open(dest, "wb") as f: shutil.copyfileobj(file.file, f) saved.append({ "filename": filename, "url": f"/inter/{country}/image/{folder}/{filename}" }) except Exception as e: rollback_files(saved, backups, folder, country) error_msg = f"Save image failed: {str(e)}" await notify_frontend(uid=uid, msg=error_msg) raise HTTPException(status_code=500, detail=error_msg) try: git_response = await commit_files_to_git( files=files, folder=folder, display_name=displayname, email=email, country=country ) except Exception as e: rollback_files(saved, backups, folder, country) error_msg = f"Git commit failed: {str(e)}" await notify_frontend(uid=uid, msg=error_msg) raise HTTPException(status_code=502, detail=f"{error_msg}, files rolled back") cleanup_backups(backups) return { "uploaded": saved, "git_commit": git_response }