Taobin-Recipe-Manager/updater/updater.py
pakintada@gmail.com ce3e1ad505 feat(upgrade): Add upgrader client
Run tasks to download and syncing version with repo releases
2024-09-10 09:05:11 +07:00

356 lines
No EOL
13 KiB
Python

# IGNORE
# RUN_keep
# BUILD_AND_RUN
# VERSION_1
# PORTS_36528
from contextlib import asynccontextmanager
import datetime
import sys
from fastapi import FastAPI
import schedule
import uvicorn.logging
import logging
import docker
import requests
import base64
import os
import time
import zipfile
import redis
from apscheduler.schedulers.background import BackgroundScheduler
import json
# url
source = "https://pakin-inspiron-15-3530.tail360bd.ts.net"
def get_headers() -> dict:
return {"accept":"application/json", "authorization": "Basic cGFraW46YWRtaW4xMjM="}
def get_headers_for_blob() -> dict:
return {"authorization": "Basic cGFraW46YWRtaW4xMjM=", "Content-Type": "application/octet-stream"}
def get_all_releases_url() -> str:
return f"{source}/api/v1/repos/pakin/taobin_recipe_manager/releases"
def get_download_url(tag_id: int, asset_id: int) -> str:
return f"{source}/api/v1/repos/pakin/taobin_recipe_manager/releases/{tag_id}/assets/{asset_id}"
# Flags
update_available_now = False
# Logs
def save_to_log_file(msg: str):
with open("./patches/.updater.log", "a") as f:
f.write(f"{datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=7)))} : {msg}\n")
# Redis
redis_client = redis.StrictRedis(host="localhost", port=6379, db=0)
MAIN_CHANNEL = "updater.noti"
pubsub = redis_client.pubsub()
pubsub.subscribe(MAIN_CHANNEL)
# Settings
settings = {}
# ------------------------------------------------------------------------------------------------
def test_path_from_settings():
# from settings
path_list = settings["path"].keys()
for path in path_list:
logger.debug(f"Checking {path}")
if not os.path.exists(settings["path"][path]["path"]):
logger.warning(f"Deps.fallback: {settings['path'][path]["path"]}")
# retry with fallback
if "fallback_paths" in settings['path'][path].keys():
for fallback_path in settings['path'][path]["fallback_paths"]:
# check kind of fallback
path_kind = fallback_path.split(":")[0]
path_real = fallback_path.split(":")[1]
if os.path.exists(path_real):
logger.info(f"Deps.fallback[{path_kind}]: {path_real} = ok")
break
else:
logger.warning(f"Deps.fallback: skipped {path}")
else:
# path ok(
logger.info(f"Deps: {path} = ok")
def find_latest_version(rels: list[dict]) -> str:
major = 0
minor = 0
patch = 0
for rel in rels:
if rel["tag_name"].startswith("v"):
version = rel["tag_name"][1:].split(".")
if len(version) == 3:
if int(version[0]) > major:
major = int(version[0])
minor = int(version[1])
patch = int(version[2])
elif int(version[0]) == major:
if int(version[1]) > minor:
minor = int(version[1])
patch = int(version[2])
elif int(version[1]) == minor:
if int(version[2]) > patch:
patch = int(version[2])
return f"{major}.{minor}.{patch}"
def get_version(rels: list[dict], tag: str) -> dict:
# assert that tag is valid
is_in_correct_format = tag.startswith("v") and len(tag[1:].split(".")) == 3
if is_in_correct_format:
for rel in rels:
if rel["tag_name"] == tag:
return rel
#
def check_version_job() -> dict:
logger.info("Checking version")
result = requests.get(get_all_releases_url(), timeout=10, headers=get_headers())
if result.status_code == 200:
# connect to server ok!
releases = result.json()
if len(releases) > 0:
# find the latest {major}.{minor}.{patch}
expected_version = find_latest_version(releases)
# get data from list
# version data
latest_version_data = get_version(releases, f"v{expected_version}")
latest_id = latest_version_data["id"]
latest_asset_id = latest_version_data["assets"][0]["id"]
# get download url
download_url = get_download_url(latest_id, latest_asset_id)
# download
logger.info(f"Found version {expected_version} from source")
dl_result = requests.get(download_url, timeout=10, headers=get_headers())
if dl_result.status_code == 200:
# download ok!
# logger.debug(f"data: {dl_result.json()}")
dl_content = dl_result.json()
dl_link = dl_content["browser_download_url"]
# download link is not usable without tailscale vpn
# cut uri, start at /attachments
dl_link = f"{source}{dl_link[dl_link.find("/attachments"):]}"
# logger.info(f"Download link: {dl_link}")
is_already_latest = False
old_version = ""
# check dl and latest
if os.path.exists("./patches/latest.version"):
with open("./patches/latest.version", "r") as f:
latest_version = f.read()
if latest_version == None or latest_version == "":
logger.warning("No version found on running machine. Starting overwrite ...")
save_to_log_file("Warning: version.overwrite caused by missing version on latest.version.")
old_version = "<overwrite>"
else:
old_version = latest_version
if latest_version == expected_version:
logger.info(f"Already at latest version {expected_version}");
is_already_latest = True
else:
logger.info(f"Updating to {expected_version}")
# write back to latest
with open("./patches/latest.version", "w") as f:
f.write(expected_version)
else:
logger.info(f"First time running {expected_version}")
with open("./patches/latest.version", "w") as f:
f.write(expected_version)
# extract
if not is_already_latest:
# download
downloaded = requests.get(dl_link, timeout=10, headers=get_headers_for_blob())
if downloaded.status_code == 200:
# download ok!
# create patch folder
if os.path.exists("./patches") == False:
os.mkdir("./patches")
with open(f"./patches/patch_cli_{expected_version}.zip", "wb") as f:
f.write(downloaded.content)
# write down version
with open("./patches/downloaded.version", "w") as f:
f.write(expected_version)
logger.debug("Downloaded")
else:
logger.debug(f"Failed to download {expected_version}")
logger.info(f"Extracting {expected_version}")
zip_file_path = f"./patches/patch_cli_{expected_version}.zip"
# extract zip
# mkdir client
if os.path.exists("./patches/client") == False:
os.mkdir("./patches/client")
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
zip_ref.extractall('./patches/client')
# ------------------------------------------------------------------------------------
# run
# logger.info(f"Running {expected_version}")
# this will check image tag and pull if diff
logger.info("Pulling from registry")
registry_source = f"{source[(source.find('//') + 2):]}"
try:
client = docker.from_env()
log_response = client.api.login(username=settings["secret"]["username"], password=settings["secret"]["password"], registry=source)
logger.debug(f"log_response: {log_response}")
# client.images.pull(f"{registry_source}/pakin/taobin_recipe_manager", tag=expected_version)
pull_prog = client.api.pull(f"{registry_source}/pakin/taobin_recipe_manager", tag=f"v{expected_version}")
logger.debug(f"Pulling: {pull_prog}")
except Exception as e:
logger.error(f"Failed to pull {expected_version}")
with open("./patches/error.txt", "w") as f:
f.write(f"{datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=7)))} : fail to update server\n")
# return {"status": "failed to pull"}
save_to_log_file("fail to update server")
global update_available_now
update_available_now = True
# clean up
logger.info(f"Cleaning up {expected_version}")
os.remove(f"./patches/patch_cli_{expected_version}.zip")
os.remove("./patches/downloaded.version")
save_to_log_file(f"update success from {old_version} -> {expected_version}")
else:
logger.error(f"Failed to download {expected_version}")
return {"status": "failed at download"}
if update_available_now:
# publish
redis_client.publish(MAIN_CHANNEL, "new_version")
return {"status": "ok"}
else:
logger.error("No release found")
return {"status": "no release found"}
# assert that tag is valid
else:
logger.error("Failed to connect to server")
return {"status": "failed to connect to server"}
def running_jobs():
global update_available_now
update_available_now = False
print("---------------------------------------------------------------------")
status = check_version_job()
logger.info(f"Status ({status['status']})")
print("---------------------------------------------------------------------")
if update_available_now:
redis_client.publish(MAIN_CHANNEL, "new_version")
def notification_job():
msgs = pubsub.get_message(timeout=5)
if msgs != None and msgs['type'] == 'message':
msg = msgs["data"].decode("utf-8")
logger.info(f"{MAIN_CHANNEL}> {msg}")
else:
logger.info(f"{MAIN_CHANNEL}> idle")
@asynccontextmanager
async def lifespan(app: FastAPI):
# load settings
config = open("./updater.settings.json", "r")
loaded_settings = json.load(config)
settings['path'] = loaded_settings['path']
try:
secret = open("./updater.secrets", "r")
secret_string = secret.read()
spl = secret_string.split(":")
settings['secret'] = {
"username": spl[0],
"password": spl[1]
}
except:
logger.warning("No secrets found")
exit(1)
test_path_from_settings()
update_available_now = False
logger.info("Starting up. Check update first ...")
redis_client.publish(MAIN_CHANNEL, "updater.first_start")
status = check_version_job()
logger.info(f"Status ({status['status']})")
#
scheduler.add_job(notification_job, 'interval', seconds=10, id="notification")
scheduler.add_job(running_jobs, 'interval', minutes=10, id="check_version")
scheduler.start()
yield
# schedule.every().hours.do(check_version_job)
print("Shutting down")
scheduler = BackgroundScheduler(jobs_default={'max_instances':2})
app = FastAPI(title="Updater",lifespan=lifespan)
logger= logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
log_formatter = logging.Formatter("%(asctime)s [%(levelname)s] : %(message)s")
stream_handler.setFormatter(log_formatter)
logger.addHandler(stream_handler)
@app.get("/")
async def main():
if update_available_now:
return "Update available"
return "Version is up to date."
@app.get("/job_status")
async def job_status():
return f"{scheduler.get_job("check_version")}"
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=36528)