initialize

This commit is contained in:
Pakin 2025-08-08 15:55:55 +07:00
commit f6e3fdd3c2
22 changed files with 7447 additions and 0 deletions

571
src/api.rs Normal file
View file

@ -0,0 +1,571 @@
use axum::{
Router,
extract::{Path, Query, State},
http::StatusCode,
response::{IntoResponse, Json, Response},
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use tower::ServiceBuilder;
use tower_http::{
cors::{Any, CorsLayer},
trace::{DefaultMakeSpan, TraceLayer},
};
use tracing::{debug, error, info};
use crate::{
discovery::ContainerDiscovery,
log_info,
logging::{LogDatesResponse, LogFilters, LogManager, LogResponse},
storage::Storage,
types::{ApiResponse, HealthResponse, ManagedContainer, StatsResponse},
updater::UpdateManager,
};
pub struct ApiServer {
port: u16,
update_manager: Arc<Mutex<UpdateManager>>,
discovery: Arc<ContainerDiscovery>,
storage: Arc<Storage>,
log_manager: Arc<LogManager>,
}
#[derive(Clone)]
pub struct AppState {
pub update_manager: Arc<Mutex<UpdateManager>>,
pub discovery: Arc<ContainerDiscovery>,
pub storage: Arc<Storage>,
pub log_manager: Arc<LogManager>,
}
impl ApiServer {
pub fn new(
port: u16,
update_manager: Arc<Mutex<UpdateManager>>,
discovery: Arc<ContainerDiscovery>,
storage: Arc<Storage>,
log_manager: Arc<LogManager>,
) -> Self {
Self {
port,
update_manager,
discovery,
storage,
log_manager,
}
}
pub async fn run(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let state = AppState {
update_manager: self.update_manager,
discovery: self.discovery,
storage: self.storage,
log_manager: self.log_manager.clone(),
};
let app = Router::new()
// Health and info endpoints
.route("/health", get(health_check))
.route("/", get(root))
// Container management endpoints
.route("/api/containers", get(list_containers))
.route("/api/containers/{id}", get(get_container))
.route("/api/containers/{id}/update", post(trigger_update))
.route("/api/containers/{id}/force-update", post(force_update))
// Discovery endpoints
.route("/api/discovery/scan", post(force_discovery))
.route("/api/discovery/containers", get(get_discovered_containers))
// System endpoints
.route("/api/status", get(system_status))
.route("/api/stats", get(get_stats))
.route("/api/logs", get(get_logs))
.route("/api/logs/dates", get(get_log_dates))
// Bulk operations
.route("/api/bulk/update-check", post(bulk_update_check))
.with_state(state)
.layer(
ServiceBuilder::new()
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().include_headers(true)),
)
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_headers(Any)
.allow_methods(Any),
),
);
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", self.port)).await?;
info!("🌐 API server listening on http://0.0.0.0:{}", self.port);
log_info!(
self.log_manager.clone(),
"init",
format!("🌐 API server listening on http://0.0.0.0:{}", self.port)
);
axum::serve(listener, app).await?;
Ok(())
}
}
// Handler functions
async fn root() -> Json<serde_json::Value> {
Json(serde_json::json!({
"service": "Docker Update Manager",
"version": "1.0.0",
"description": "Automatic Docker container update management with service discovery",
"endpoints": {
"health": "/health",
"status": "/api/status",
"containers": "/api/containers",
"discovery": "/api/discovery/scan",
"stats": "/api/stats",
"bulk_update": "/api/bulk/update-check"
}
}))
}
async fn health_check(State(state): State<AppState>) -> Json<ApiResponse<HealthResponse>> {
log_info!(state.log_manager, "/health", "get health check...");
// let containers = match state.update_manager.lock().await.get_all_containers().await {
// Ok(containers) => containers,
// Err(_) => vec![],
// };
let containers = {
let mgr = state.update_manager.lock().await;
let _containers = mgr.get_all_containers().await;
drop(mgr);
match _containers {
Ok(containers) => containers,
Err(_) => vec![],
}
};
debug!("containers: {:?}", containers);
let active_updates = {
let mgr = state.update_manager.lock().await;
let _active_updates = mgr.get_active_updates_count().await;
drop(mgr);
_active_updates
};
let update_queue_size = {
let mgr = state.update_manager.lock().await;
let _update_queue_size = mgr.get_update_queue_size().await;
drop(mgr);
_update_queue_size
};
let health = HealthResponse {
status: "healthy".to_string(),
version: "1.0.0".to_string(),
uptime_seconds: 0, // TODO: Track actual uptime
managed_containers: containers.len(),
active_updates,
update_queue_size,
};
Json(ApiResponse::success(health))
}
async fn system_status(State(state): State<AppState>) -> Json<ApiResponse<SystemStatusResponse>> {
// let containers = state
// .update_manager
// .lock()
// .await
// .get_all_containers()
// .await
// .unwrap_or_default();
let containers = {
let mgr = state.update_manager.lock().await;
let _containers = mgr.get_all_containers().await.unwrap_or_default();
drop(mgr);
_containers
};
let discovered = state
.discovery
.get_discovered_containers()
.await
.unwrap_or_default();
let active_updates = {
let mgr = state.update_manager.lock().await;
let _active_updates = mgr.get_active_updates_count().await;
drop(mgr);
_active_updates
};
let update_queue_size = {
let mgr = state.update_manager.lock().await;
let _update_queue_size = mgr.get_update_queue_size().await;
drop(mgr);
_update_queue_size
};
let status = SystemStatusResponse {
service: "Docker Update Manager".to_string(),
version: "1.0.0".to_string(),
managed_containers: containers.len(),
discovered_containers: discovered.len(),
active_updates,
update_queue_size,
containers: containers
.into_iter()
.map(|c| ContainerSummary {
id: c.id,
name: c.name,
image_name: c.image_name,
current_version: c.current_version,
status: c.status,
last_seen: c.last_seen,
update_available: false, // TODO: Check if update is available
})
.collect(),
};
Json(ApiResponse::success(status))
}
async fn list_containers(
State(state): State<AppState>,
Query(params): Query<ListContainersQuery>,
) -> Result<Json<ApiResponse<Vec<ManagedContainer>>>, ApiError> {
let containers = {
let mgr = state.update_manager.lock().await;
let _containers = mgr.get_all_containers().await.unwrap_or_default();
drop(mgr);
_containers
};
let filtered_containers = if let Some(status_filter) = params.status {
containers
.into_iter()
.filter(|c| format!("{:?}", c.status).to_lowercase() == status_filter.to_lowercase())
.collect()
} else {
containers
};
Ok(Json(ApiResponse::success(filtered_containers)))
}
async fn get_container(
Path(id): Path<String>,
State(state): State<AppState>,
) -> Result<Json<ApiResponse<ManagedContainer>>, ApiError> {
let container = state.update_manager.lock().await.get_container(&id).await?;
Ok(Json(ApiResponse::success(container)))
}
async fn trigger_update(
Path(id): Path<String>,
State(state): State<AppState>,
) -> Result<Json<ApiResponse<UpdateTriggeredResponse>>, ApiError> {
// state
// .update_manager
// .lock()
// .await
// .check_container_for_updates(id.clone())
// .await?;
info!("request update ...");
{
UpdateManager::check_container_for_updates(state.update_manager.clone(), id.clone())
.await?;
}
let response = UpdateTriggeredResponse {
container_id: id.clone(),
message: "Update check triggered".to_string(),
};
Ok(Json(ApiResponse::success(response)))
}
async fn force_update(
Path(id): Path<String>,
State(state): State<AppState>,
Json(payload): Json<ForceUpdateRequest>,
) -> Result<Json<ApiResponse<UpdateTriggeredResponse>>, ApiError> {
// state
// .update_manager
// .lock()
// .await
// .force_update(id.clone(), payload.target_version)
// .await?;
{
// let mgr = state.update_manager.lock().await;
UpdateManager::force_update(
state.update_manager.clone(),
id.clone(),
payload.target_version,
)
.await?;
}
let response = UpdateTriggeredResponse {
container_id: id.clone(),
message: "Forced update queued".to_string(),
};
Ok(Json(ApiResponse::success(response)))
}
async fn force_discovery(
State(state): State<AppState>,
) -> Result<Json<ApiResponse<DiscoveryResponse>>, ApiError> {
state.discovery.force_discovery().await?;
let response = DiscoveryResponse {
message: "Container discovery scan completed".to_string(),
};
Ok(Json(ApiResponse::success(response)))
}
async fn get_discovered_containers(
State(state): State<AppState>,
) -> Result<Json<ApiResponse<Vec<String>>>, ApiError> {
let containers = state.discovery.get_discovered_containers().await?;
Ok(Json(ApiResponse::success(containers)))
}
async fn get_stats(
State(state): State<AppState>,
) -> Result<Json<ApiResponse<crate::types::StatsResponse>>, ApiError> {
let mut error_flag = false;
let stats = {
let mgr = state.update_manager.lock().await;
let _stats = mgr.get_update_stats().await;
drop(mgr);
match _stats {
Ok(stats) => stats,
Err(err) => {
error!("Failed to get update stats: {}", err);
error_flag = true;
StatsResponse::default()
}
}
};
Ok(Json(ApiResponse::success(stats)))
}
// Logs endpoint handler
async fn get_logs(
State(state): State<AppState>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<Json<ApiResponse<LogResponse>>, ApiError> {
let filters = LogFilters {
date: params.get("date").cloned(),
level: params.get("level").cloned(),
component: params.get("component").cloned(),
container: params.get("container").cloned(),
search: params.get("search").cloned(),
page: params
.get("page")
.and_then(|p| p.parse().ok())
.unwrap_or(1)
.max(1),
limit: params
.get("limit")
.and_then(|l| l.parse().ok())
.unwrap_or(100)
.min(1000) // Max 1000 logs per request
.max(1),
};
let logs = state.log_manager.get_logs(filters).await?;
Ok(Json(ApiResponse::success(logs)))
}
async fn get_log_dates(
State(state): State<AppState>,
) -> Result<Json<ApiResponse<LogDatesResponse>>, ApiError> {
let available_dates = state.log_manager.get_available_dates().await?;
let current_date = chrono::Utc::now().format("%Y-%m-%d").to_string();
let response = LogDatesResponse {
available_dates,
current_date,
};
Ok(Json(ApiResponse::success(response)))
}
async fn bulk_update_check(
State(state): State<AppState>,
Json(payload): Json<BulkUpdateRequest>,
) -> Result<Json<ApiResponse<BulkUpdateResponse>>, ApiError> {
let containers = if payload.container_ids.is_empty() {
state
.update_manager
.lock()
.await
.get_all_containers()
.await?
} else {
let mut selected = Vec::new();
for id in &payload.container_ids {
if let Ok(container) = state.update_manager.lock().await.get_container(id).await {
selected.push(container);
}
}
selected
};
let mut results = Vec::new();
for container in containers {
let status = {
let update_manager = state.update_manager.clone();
let container_id = container.container_id.clone();
async move { UpdateManager::check_container_for_updates(update_manager, container_id).await }
};
match status.await {
Ok(_) => {
results.push(BulkUpdateResult {
container_id: container.container_id,
container_name: container.name,
status: "triggered".to_string(),
error: None,
});
}
Err(e) => {
results.push(BulkUpdateResult {
container_id: container.container_id,
container_name: container.name,
status: "failed".to_string(),
error: Some(e.to_string()),
});
}
}
}
let response = BulkUpdateResponse { results };
Ok(Json(ApiResponse::success(response)))
}
// Request/Response types
#[derive(Debug, Deserialize)]
struct ListContainersQuery {
status: Option<String>,
}
#[derive(Debug, Deserialize)]
struct LogsQuery {
page: Option<u32>,
limit: Option<u32>,
level: Option<String>,
container: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ForceUpdateRequest {
target_version: Option<String>,
}
#[derive(Debug, Deserialize)]
struct BulkUpdateRequest {
container_ids: Vec<String>,
}
#[derive(Debug, Serialize)]
struct SystemStatusResponse {
service: String,
version: String,
managed_containers: usize,
discovered_containers: usize,
active_updates: usize,
update_queue_size: usize,
containers: Vec<ContainerSummary>,
}
#[derive(Debug, Serialize)]
struct ContainerSummary {
id: String,
name: String,
image_name: String,
current_version: String,
status: crate::types::ContainerStatus,
last_seen: chrono::DateTime<chrono::Utc>,
update_available: bool,
}
#[derive(Debug, Serialize)]
struct UpdateTriggeredResponse {
container_id: String,
message: String,
}
#[derive(Debug, Serialize)]
struct DiscoveryResponse {
message: String,
}
#[derive(Debug, Serialize)]
struct LogsResponse {
logs: Vec<LogEntry>,
total: u32,
page: u32,
limit: u32,
}
#[derive(Debug, Serialize)]
struct LogEntry {
timestamp: String,
level: String,
message: String,
container: Option<String>,
}
#[derive(Debug, Serialize)]
struct BulkUpdateResponse {
results: Vec<BulkUpdateResult>,
}
#[derive(Debug, Serialize)]
struct BulkUpdateResult {
container_id: String,
container_name: String,
status: String,
error: Option<String>,
}
// Error handling
#[derive(Debug)]
struct ApiError(anyhow::Error);
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
error!("API Error: {}", self.0);
let error_response = ApiResponse::<()>::error(self.0.to_string());
(StatusCode::INTERNAL_SERVER_ERROR, Json(error_response)).into_response()
}
}
impl<E> From<E> for ApiError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self(err.into())
}
}

106
src/config.rs Normal file
View file

@ -0,0 +1,106 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub update_interval: u64, // seconds
pub max_concurrent_updates: usize,
pub health_check_timeout: u64, // seconds
pub max_retries: usize,
pub backup_retention_days: u32,
pub discovery: DiscoveryConfig,
pub registry: RegistryConfig,
pub storage: StorageConfig,
bearer: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveryConfig {
pub poll_interval: u64, // seconds
pub labels: DiscoveryLabels,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveryLabels {
pub enable_updates: String, // Label to enable updates
pub registry_url: String, // Registry URL label
pub image_name: String, // Image name label
pub health_check_path: String, // Health check path label
pub update_strategy: String, // Update strategy label
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryConfig {
pub default_url: String,
pub timeout: u64, // seconds
pub auth: Option<RegistryAuth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryAuth {
pub username: String,
pub password: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub path: String,
}
impl Default for Config {
fn default() -> Self {
Self {
update_interval: 300, // 5 minutes
max_concurrent_updates: 2,
health_check_timeout: 30,
max_retries: 3,
backup_retention_days: 7,
discovery: DiscoveryConfig {
poll_interval: 30, // 30 seconds
labels: DiscoveryLabels {
enable_updates: "silserv.enable".to_string(),
registry_url: "silserv.registry".to_string(),
image_name: "silserv.image".to_string(),
health_check_path: "silserv.health-path".to_string(),
update_strategy: "silserv.strategy".to_string(),
},
},
registry: RegistryConfig {
default_url: "localhost:32000".to_string(),
timeout: 30,
auth: None,
},
storage: StorageConfig {
path: "./data/silserv.db".to_string(),
},
bearer: "".to_string(),
}
}
}
impl Config {
pub fn load(path: &str) -> Result<Self, config::ConfigError> {
let settings = config::Config::builder()
.add_source(config::File::with_name(path).required(false))
.add_source(config::Environment::with_prefix("SILSERV"))
.build()?;
settings.try_deserialize()
}
pub fn update_interval_duration(&self) -> Duration {
Duration::from_secs(self.update_interval)
}
pub fn discovery_poll_duration(&self) -> Duration {
Duration::from_secs(self.discovery.poll_interval)
}
pub fn health_check_timeout_duration(&self) -> Duration {
Duration::from_secs(self.health_check_timeout)
}
pub fn bearer(&self) -> &str {
&self.bearer
}
}

545
src/discovery.rs Normal file
View file

@ -0,0 +1,545 @@
use anyhow::{Context, Result};
use bollard::{
Docker,
container::{InspectContainerOptions, ListContainersOptions},
network::ListNetworksOptions,
};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::{
sync::Mutex,
time::{Duration, interval},
};
use tracing::{debug, error, info, warn};
use crate::{
config::Config,
log_error, log_info,
logging::LogManager,
types::{
ContainerPort, ContainerStatus, ContainerVolume, ManagedContainer, NetworkSettings,
UpdateStrategy,
},
updater::UpdateManager,
};
pub struct ContainerDiscovery {
docker: Docker,
network_name: String,
config: Config,
update_manager: Arc<Mutex<UpdateManager>>,
known_containers: Arc<tokio::sync::RwLock<HashSet<String>>>,
log: Arc<LogManager>,
}
impl ContainerDiscovery {
pub async fn new(
network_name: String,
config: Config,
update_manager: Arc<Mutex<UpdateManager>>,
log: Arc<LogManager>,
) -> Result<Self> {
let docker =
Docker::connect_with_socket_defaults().context("Failed to connect to Docker daemon")?;
// Verify network exists
let networks = docker
.list_networks(Some(ListNetworksOptions::<String> {
filters: HashMap::from([("name".to_string(), vec![network_name.clone()])]),
}))
.await
.context("Failed to list Docker networks")?;
if networks.is_empty() {
warn!(
"Network '{}' not found, will create it if needed",
network_name
);
} else {
info!("Found network '{}' for container discovery", network_name);
}
Ok(Self {
docker,
network_name,
config,
update_manager,
known_containers: Arc::new(tokio::sync::RwLock::new(HashSet::new())),
log,
})
}
pub async fn start_monitoring(&self) -> Result<()> {
let mut discovery_interval = interval(self.config.discovery_poll_duration());
info!("🔍 Starting container discovery monitoring");
info!(" - Network: {}", self.network_name);
info!(
" - Poll interval: {}s",
self.config.discovery.poll_interval
);
log_info!(
self.log,
"discovery",
"Starting container discovery monitoring"
);
log_info!(
self.log,
"discovery",
format!(" - Network: {}", self.network_name)
);
log_info!(
self.log,
"discovery",
format!(
" - Poll interval: {}s",
self.config.discovery.poll_interval
)
);
loop {
discovery_interval.tick().await;
if let Err(e) = self.discover_containers().await {
error!("Container discovery failed: {}", e);
log_error!(
self.log,
"discover",
format!("Container discovery failed: {}", e)
);
}
}
}
async fn discover_containers(&self) -> Result<()> {
debug!(
"🔍 Discovering containers in network '{}'",
self.network_name
);
// Get all containers in the target network
let containers = self.get_network_containers().await?;
let mut current_container_ids = HashSet::new();
for container in containers {
current_container_ids.insert(container.id.clone().unwrap_or_default());
if let Err(e) = self.process_container(&container).await {
warn!(
"Failed to process container {}: {}",
container
.names
.as_ref()
.unwrap_or(&vec!["unknown".to_string()])[0],
e
);
}
}
// Check for containers that disappeared
self.handle_disappeared_containers(&current_container_ids)
.await?;
// Update known containers
{
let mut known = self.known_containers.write().await;
*known = current_container_ids;
}
Ok(())
}
async fn get_network_containers(&self) -> Result<Vec<bollard::models::ContainerSummary>> {
let containers = self
.docker
.list_containers(Some(ListContainersOptions::<String> {
all: false,
limit: None,
size: false,
filters: HashMap::from([("status".to_string(), vec!["running".to_string()])]),
}))
.await
.context("Failed to list containers")?;
// Filter containers that are in our target network
let mut network_containers = Vec::new();
for container in containers {
if let Some(network_settings) = &container.network_settings {
if let Some(networks) = &network_settings.networks {
if networks.contains_key(&self.network_name) {
network_containers.push(container);
}
}
}
}
debug!(
"Found {} containers in network '{}'",
network_containers.len(),
self.network_name
);
Ok(network_containers)
}
async fn process_container(&self, container: &bollard::models::ContainerSummary) -> Result<()> {
let container_id = container.id.clone().unwrap_or_default();
let default_unknown = vec!["unknown".to_string()];
let container_names = container.names.as_ref().unwrap_or(&default_unknown);
let container_name = container_names[0].trim_start_matches('/').to_string();
// Get detailed container information
let inspect_result = self
.docker
.inspect_container(&container_id, None::<InspectContainerOptions>)
.await
.context("Failed to inspect container")?;
// Check if container has update management labels
let mut labels = inspect_result
.config
.as_ref()
.and_then(|c| c.labels.as_ref())
.cloned()
.unwrap_or_default();
// Check if updates are enabled for this container
let updates_enabled = labels
.get(&self.config.discovery.labels.enable_updates)
.map(|v| v == "true")
.unwrap_or(false);
if !updates_enabled {
debug!(
"Container '{}' does not have updates enabled",
container_name
);
return Ok(());
}
info!("📦 Discovered manageable container: {}", container_name);
log_info!(
self.log,
"discovery",
format!("Discovered Container: {}", container_name)
);
debug!("Labels: {:?}", labels);
self.override_label(&mut labels);
// Extract container information
let managed_container = self
.create_managed_container(
container_id.clone(),
container_name,
&inspect_result,
labels,
)
.await?;
// Register or update the container with the update manager
self.update_manager
.lock()
.await
.register_container(managed_container)
.await?;
// Save final container state
Ok(())
}
fn override_label(&self, need_override_label: &mut HashMap<String, String>) {
// do check registry url
let registry_url = need_override_label.get("silserv.registry");
let strategy = need_override_label.get("silserv.strategy");
if let Some(url) = registry_url {
if url.starts_with("pakin-inspiron-15-3530.tail110d9.ts.net") {
// https://pakin-inspiron-15-3530.tail110d9.ts.net/api/v1/packages/pakin?type=container
if let Some(strategy) = strategy {
if strategy == "automatic" {
// Auto-update logic
// need_override_label
// .insert("silserv.registry".to_string(), "default".to_string());
let parts = url.split("/").collect::<Vec<&str>>();
match parts.len() {
2 => {
// expect domain/user
let new_format = format!("default.{}", parts[1]);
need_override_label
.insert("silserv.registry".to_string(), new_format);
}
_ => {
// do nothing
}
}
} else if strategy == "manual" {
// Manual-update logic
}
}
}
}
}
async fn create_managed_container(
&self,
container_id: String,
name: String,
inspect: &bollard::models::ContainerInspectResponse,
labels: HashMap<String, String>,
) -> Result<ManagedContainer> {
// Extract image information
let image = inspect
.config
.as_ref()
.unwrap()
.image
.as_ref()
.unwrap_or(&String::new())
.clone();
let image_parts: Vec<&str> = image.split(':').collect();
debug!("split image parts: {:?}", image_parts);
let (image_name, current_version) = if image_parts.len() >= 2 {
(image_parts[0].to_string(), image_parts[1].to_string())
} else {
(image.clone(), "latest".to_string())
};
// Get registry URL from labels or use default
let registry_url = labels
.get(&self.config.discovery.labels.registry_url)
.cloned()
.unwrap_or_else(|| self.config.registry.default_url.clone());
// Create base managed container
let mut container =
ManagedContainer::new(container_id.clone(), name, image_name, registry_url);
container.current_version = current_version;
container.labels = labels.clone();
// Extract network settings
if let Some(network_settings) = &inspect.network_settings {
if let Some(networks) = &network_settings.networks {
if let Some(network) = networks.get(&self.network_name) {
container.network_settings = NetworkSettings {
ip_address: network.ip_address.clone().unwrap_or_default(),
network_name: self.network_name.clone(),
aliases: network.aliases.clone().unwrap_or_default(),
};
}
}
}
// Extract ports
if let Some(host_config) = &inspect.host_config {
if let Some(port_bindings) = &host_config.port_bindings {
for (container_port, host_ports) in port_bindings {
let port_parts: Vec<&str> = container_port.split('/').collect();
let port_num = port_parts[0].parse::<u16>().unwrap_or(0);
let protocol = port_parts.get(1).unwrap_or(&"tcp").to_string();
let host_port = host_ports
.as_ref()
.and_then(|ports| ports.first())
.and_then(|port| port.host_port.as_ref())
.and_then(|port| port.parse::<u16>().ok());
container.ports.push(ContainerPort {
container_port: port_num,
host_port,
protocol,
});
}
}
}
// Extract volumes/mounts
if let Some(mounts) = &inspect.mounts {
for mount in mounts {
if let (Some(source), Some(destination)) = (&mount.source, &mount.destination) {
container.volumes.push(ContainerVolume {
source: source.clone(),
target: destination.clone(),
read_only: mount.rw.unwrap_or(true) == false,
});
}
}
}
// Extract environment variables
if let Some(config) = &inspect.config {
if let Some(env) = &config.env {
for env_var in env {
if let Some(eq_pos) = env_var.find('=') {
let key = env_var[..eq_pos].to_string();
let value = env_var[eq_pos + 1..].to_string();
// check key
if key.contains("registry") {
debug!("Registry: {}", value);
}
// Skip sensitive environment variables
if !key.to_lowercase().contains("password")
&& !key.to_lowercase().contains("secret")
&& !key.to_lowercase().contains("token")
{
container.environment.insert(key, value);
}
}
}
}
}
// Set health check URL
if let Some(health_path) = labels.get(&self.config.discovery.labels.health_check_path) {
// Try to determine the health check URL from container ports and network
if let Some(port) = container.ports.first() {
let base_port = port.host_port.unwrap_or(port.container_port);
container.health_check_url = Some(format!(
"http://{}:{}{}",
container.network_settings.ip_address, base_port, health_path
));
}
}
// Set update strategy
container.update_strategy = match labels
.get(&self.config.discovery.labels.update_strategy)
.map(|s| s.as_str())
{
Some("automatic") => UpdateStrategy::Automatic,
Some("manual") => UpdateStrategy::Manual,
Some(cron) if cron.contains(' ') => UpdateStrategy::Scheduled(cron.to_string()),
_ => UpdateStrategy::Manual,
};
container.status = ContainerStatus::Healthy;
Ok(container)
}
async fn handle_disappeared_containers(&self, current_ids: &HashSet<String>) -> Result<()> {
let known_ids = self.known_containers.read().await.clone();
for disappeared_id in known_ids.difference(current_ids) {
info!("📤 Container {} left the network", disappeared_id);
log_info!(
self.log,
"discovery",
format!("Container {} left the network", disappeared_id)
);
if let Err(e) = self
.update_manager
.lock()
.await
.mark_container_lost(disappeared_id)
.await
{
warn!("Failed to mark container {} as lost: {}", disappeared_id, e);
log_error!(
self.log,
"discovery",
format!("Failed to mark container {} as lost: {}", disappeared_id, e)
);
}
}
Ok(())
}
pub async fn get_discovered_containers(&self) -> Result<Vec<String>> {
Ok(self.known_containers.read().await.iter().cloned().collect())
}
pub async fn force_discovery(&self) -> Result<()> {
info!("🔍 Forcing container discovery");
log_info!(self.log, "discovery", "Forcing container discovery");
self.discover_containers().await
}
}
// Additional utility functions for container discovery
pub fn extract_image_info(full_image: &str) -> (String, String, String) {
// Parse full image string like "localhost:32000/myapp:v1.0.0"
// Returns (registry, image_name, version)
let parts: Vec<&str> = full_image.split('/').collect();
if parts.len() >= 2 {
let registry = parts[0];
let image_with_tag = parts.last().unwrap();
let tag_parts: Vec<&str> = image_with_tag.split(':').collect();
if tag_parts.len() >= 2 {
let image_name = parts[1..]
.join("/")
.replace(&format!(":{}", tag_parts[1]), "");
let version = tag_parts[1];
(registry.to_string(), image_name, version.to_string())
} else {
let image_name = parts[1..].join("/");
(registry.to_string(), image_name, "latest".to_string())
}
} else {
// Handle simple image names like "nginx:alpine"
let tag_parts: Vec<&str> = full_image.split(':').collect();
if tag_parts.len() >= 2 {
(
"docker.io".to_string(),
tag_parts[0].to_string(),
tag_parts[1].to_string(),
)
} else {
(
"docker.io".to_string(),
full_image.to_string(),
"latest".to_string(),
)
}
}
}
pub fn is_semantic_version(version: &str) -> bool {
// Check if version follows semantic versioning pattern
regex::Regex::new(r"^v?\d+\.\d+\.\d+")
.unwrap()
.is_match(version)
}
pub fn compare_versions(v1: &str, v2: &str) -> std::cmp::Ordering {
use std::cmp::Ordering;
// Simple semantic version comparison
let normalize = |v: &str| -> Vec<u32> {
v.trim_start_matches('v')
.split('.')
.take(3)
.map(|s| s.parse::<u32>().unwrap_or(0))
.collect()
};
let version1 = normalize(v1);
let version2 = normalize(v2);
for i in 0..3 {
let num1 = version1.get(i).unwrap_or(&0);
let num2 = version2.get(i).unwrap_or(&0);
match num1.cmp(num2) {
Ordering::Equal => continue,
other => return other,
}
}
Ordering::Equal
}

43
src/health.rs Normal file
View file

@ -0,0 +1,43 @@
use anyhow::{Context, Result};
use reqwest::Client;
use std::time::Duration;
use tracing::debug;
use crate::config::Config;
pub struct HealthChecker {
client: Client,
config: Config,
}
impl HealthChecker {
pub fn new(config: Config) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(config.health_check_timeout))
.build()
.expect("Failed to create health check HTTP client");
Self { client, config }
}
pub async fn check_health(&self, health_url: &str) -> Result<()> {
debug!("Performing health check: {}", health_url);
let response = self
.client
.get(health_url)
.send()
.await
.context("Health check request failed")?;
if response.status().is_success() {
debug!("Health check passed: HTTP {}", response.status());
Ok(())
} else {
Err(anyhow::anyhow!(
"Health check failed: HTTP {}",
response.status()
))
}
}
}

480
src/logging.rs Normal file
View file

@ -0,0 +1,480 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub message: String,
pub component: String,
pub container_id: Option<String>,
pub container_name: Option<String>,
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Error,
Warn,
Info,
Debug,
Trace,
}
impl std::fmt::Display for LogLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogLevel::Error => write!(f, "error"),
LogLevel::Warn => write!(f, "warn"),
LogLevel::Info => write!(f, "info"),
LogLevel::Debug => write!(f, "debug"),
LogLevel::Trace => write!(f, "trace"),
}
}
}
impl From<String> for LogLevel {
fn from(s: String) -> Self {
match s.to_lowercase().as_str() {
"error" => LogLevel::Error,
"warn" | "warning" => LogLevel::Warn,
"info" => LogLevel::Info,
"debug" => LogLevel::Debug,
"trace" => LogLevel::Trace,
_ => LogLevel::Info,
}
}
}
pub struct LogManager {
in_memory_logs: Arc<RwLock<VecDeque<LogEntry>>>,
max_memory_logs: usize,
log_directory: String,
}
impl LogManager {
pub fn new(log_directory: String, max_memory_logs: usize) -> Self {
Self {
in_memory_logs: Arc::new(RwLock::new(VecDeque::with_capacity(max_memory_logs))),
max_memory_logs,
log_directory,
}
}
pub async fn init(&self) -> Result<()> {
// Create log directory if it doesn't exist
fs::create_dir_all(&self.log_directory).await?;
info!("📝 Log manager initialized at: {}", self.log_directory);
Ok(())
}
pub async fn log(
&self,
level: LogLevel,
message: String,
component: String,
container_id: Option<String>,
container_name: Option<String>,
metadata: Option<serde_json::Value>,
) -> Result<()> {
let entry = LogEntry {
timestamp: Utc::now(),
level,
message,
component,
container_id,
container_name,
metadata,
};
// Add to in-memory cache
self.add_to_memory_cache(entry.clone()).await;
// Write to file asynchronously
self.write_to_file(&entry).await?;
Ok(())
}
async fn add_to_memory_cache(&self, entry: LogEntry) {
let mut logs = self.in_memory_logs.write().await;
// Remove oldest if at capacity
if logs.len() >= self.max_memory_logs {
logs.pop_front();
}
logs.push_back(entry);
}
async fn write_to_file(&self, entry: &LogEntry) -> Result<()> {
let date_str = entry.timestamp.format("%Y-%m-%d").to_string();
let log_file_path = format!("{}/silserv-{}.log", self.log_directory, date_str);
let log_line = serde_json::to_string(entry)? + "\n";
// Append to file
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_file_path)
.await?;
file.write_all(log_line.as_bytes()).await?;
file.flush().await?;
Ok(())
}
pub async fn get_logs(&self, filters: LogFilters) -> Result<LogResponse> {
// First try memory cache for recent logs
if filters.date.is_none()
|| filters.date.as_ref().unwrap() == &Utc::now().format("%Y-%m-%d").to_string()
{
return self.get_logs_from_memory(filters).await;
}
// For historical logs, read from file
self.get_logs_from_file(filters).await
}
async fn get_logs_from_memory(&self, filters: LogFilters) -> Result<LogResponse> {
let logs = self.in_memory_logs.read().await;
let mut filtered_logs: Vec<LogEntry> = logs.iter().cloned().collect();
// Apply filters
if let Some(level) = &filters.level {
let filter_level = LogLevel::from(level.clone());
filtered_logs.retain(|log| {
matches!(
(log.level.clone(), filter_level.clone()),
(LogLevel::Error, LogLevel::Error)
| (LogLevel::Warn, LogLevel::Warn | LogLevel::Error)
| (
LogLevel::Info,
LogLevel::Info | LogLevel::Warn | LogLevel::Error
)
| (LogLevel::Debug, _)
| (LogLevel::Trace, _)
)
});
}
if let Some(component) = &filters.component {
filtered_logs.retain(|log| log.component.contains(component));
}
if let Some(container) = &filters.container {
filtered_logs.retain(|log| {
log.container_name
.as_ref()
.map(|name| name.contains(container))
.unwrap_or(false)
|| log
.container_id
.as_ref()
.map(|id| id.contains(container))
.unwrap_or(false)
});
}
if let Some(search) = &filters.search {
let search_lower = search.to_lowercase();
filtered_logs.retain(|log| log.message.to_lowercase().contains(&search_lower));
}
// Sort by timestamp (newest first)
filtered_logs.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
// Apply pagination
let total = filtered_logs.len();
let start = (filters.page.saturating_sub(1)) * filters.limit;
let end = std::cmp::min(start + filters.limit, total);
let logs = if start < total {
filtered_logs[start..end].to_vec()
} else {
Vec::new()
};
Ok(LogResponse {
logs,
total,
page: filters.page,
limit: filters.limit,
has_more: end < total,
date: filters
.date
.unwrap_or_else(|| Utc::now().format("%Y-%m-%d").to_string()),
})
}
async fn get_logs_from_file(&self, filters: LogFilters) -> Result<LogResponse> {
let date = filters.date.as_ref().unwrap();
let log_file_path = format!("{}/silserv-{}.log", self.log_directory, date);
// Check if file exists
if !fs::metadata(&log_file_path).await.is_ok() {
return Ok(LogResponse {
logs: Vec::new(),
total: 0,
page: filters.page,
limit: filters.limit,
has_more: false,
date: date.clone(),
});
}
// Read file content
let content = fs::read_to_string(&log_file_path).await?;
let mut logs = Vec::new();
// Parse each line as JSON
for line in content.lines() {
if let Ok(entry) = serde_json::from_str::<LogEntry>(line) {
logs.push(entry);
}
}
// Apply filters (same logic as memory)
if let Some(level) = &filters.level {
let filter_level = LogLevel::from(level.clone());
logs.retain(|log| self.level_matches(&log.level, &filter_level));
}
if let Some(component) = &filters.component {
logs.retain(|log| log.component.contains(component));
}
if let Some(container) = &filters.container {
logs.retain(|log| {
log.container_name
.as_ref()
.map(|name| name.contains(container))
.unwrap_or(false)
|| log
.container_id
.as_ref()
.map(|id| id.contains(container))
.unwrap_or(false)
});
}
if let Some(search) = &filters.search {
let search_lower = search.to_lowercase();
logs.retain(|log| log.message.to_lowercase().contains(&search_lower));
}
// Sort by timestamp (newest first)
logs.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
// Apply pagination
let total = logs.len();
let start = (filters.page.saturating_sub(1)) * filters.limit;
let end = std::cmp::min(start + filters.limit, total);
let paginated_logs = if start < total {
logs[start..end].to_vec()
} else {
Vec::new()
};
Ok(LogResponse {
logs: paginated_logs,
total,
page: filters.page,
limit: filters.limit,
has_more: end < total,
date: date.clone(),
})
}
fn level_matches(&self, log_level: &LogLevel, filter_level: &LogLevel) -> bool {
use LogLevel::*;
match filter_level {
Error => matches!(log_level, Error),
Warn => matches!(log_level, Error | Warn),
Info => matches!(log_level, Error | Warn | Info),
Debug => matches!(log_level, Error | Warn | Info | Debug),
Trace => true, // Trace shows everything
}
}
pub async fn get_available_dates(&self) -> Result<Vec<String>> {
let mut dates = Vec::new();
let mut dir = fs::read_dir(&self.log_directory).await?;
while let Some(entry) = dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
if name.starts_with("silserv-") && name.ends_with(".log") {
// Extract date from filename: silserv-2024-01-15.log
if let Some(date) = name
.strip_prefix("silserv-")
.and_then(|s| s.strip_suffix(".log"))
{
dates.push(date.to_string());
}
}
}
}
dates.sort();
dates.reverse(); // Most recent first
Ok(dates)
}
pub async fn cleanup_old_logs(&self, retention_days: u32) -> Result<()> {
let cutoff_date = Utc::now() - chrono::Duration::days(retention_days as i64);
let cutoff_str = cutoff_date.format("%Y-%m-%d").to_string();
let mut dir = fs::read_dir(&self.log_directory).await?;
while let Some(entry) = dir.next_entry().await? {
if let Some(name) = entry.file_name().to_str() {
if name.starts_with("silserv-") && name.ends_with(".log") {
if let Some(date) = name
.strip_prefix("silserv-")
.and_then(|s| s.strip_suffix(".log"))
{
if cutoff_date < Utc::now() {
if let Err(e) = fs::remove_file(entry.path()).await {
warn!("Failed to remove old log file {}: {}", name, e);
} else {
info!("Removed old log file: {}", name);
}
}
}
}
}
}
Ok(())
}
}
#[derive(Debug, Deserialize)]
pub struct LogFilters {
pub date: Option<String>, // YYYY-MM-DD format
pub level: Option<String>, // error, warn, info, debug, trace
pub component: Option<String>, // updater, discovery, api, etc.
pub container: Option<String>, // container name or ID
pub search: Option<String>, // search in message content
pub page: usize, // page number (1-based)
pub limit: usize, // logs per page
}
impl Default for LogFilters {
fn default() -> Self {
Self {
date: None,
level: None,
component: None,
container: None,
search: None,
page: 1,
limit: 100,
}
}
}
#[derive(Debug, Serialize)]
pub struct LogResponse {
pub logs: Vec<LogEntry>,
pub total: usize,
pub page: usize,
pub limit: usize,
pub has_more: bool,
pub date: String,
}
#[derive(Debug, Serialize)]
pub struct LogDatesResponse {
pub available_dates: Vec<String>,
pub current_date: String,
}
// Convenience macros for logging
#[macro_export]
macro_rules! log_info {
($log_manager:expr, $component:expr, $message:expr) => {
$log_manager
.log(
$crate::logging::LogLevel::Info,
$message.to_string(),
$component.to_string(),
None,
None,
None,
)
.await
.unwrap_or_else(|e| tracing::warn!("Failed to log: {}", e));
};
($log_manager:expr, $component:expr, $message:expr, $container_id:expr, $container_name:expr) => {
$log_manager
.log(
$crate::logging::LogLevel::Info,
$message.to_string(),
$component.to_string(),
Some($container_id.to_string()),
Some($container_name.to_string()),
None,
)
.await
.unwrap_or_else(|e| tracing::warn!("Failed to log: {}", e));
};
}
#[macro_export]
macro_rules! log_error {
($log_manager:expr, $component:expr, $message:expr) => {
$log_manager
.log(
$crate::logging::LogLevel::Error,
$message.to_string(),
$component.to_string(),
None,
None,
None,
)
.await
.unwrap_or_else(|e| tracing::warn!("Failed to log: {}", e));
};
($log_manager:expr, $component:expr, $message:expr, $metadata:expr) => {
$log_manager
.log(
$crate::logging::LogLevel::Error,
$message.to_string(),
$component.to_string(),
None,
None,
Some($metadata),
)
.await
.unwrap_or_else(|e| tracing::warn!("Failed to log: {}", e));
};
}
#[macro_export]
macro_rules! log_warn {
($log_manager:expr, $component:expr, $message:expr) => {
$log_manager
.log(
$crate::logging::LogLevel::Warn,
$message.to_string(),
$component.to_string(),
None,
None,
None,
)
.await
.unwrap_or_else(|e| tracing::warn!("Failed to log: {}", e));
};
}

147
src/main.rs Normal file
View file

@ -0,0 +1,147 @@
use anyhow::Result;
use clap::Parser;
use std::sync::Arc;
use tracing::{info, warn};
mod api;
mod config;
mod discovery;
mod health;
mod logging;
mod registry;
mod storage;
mod types;
mod updater;
use crate::{
api::ApiServer, config::Config, discovery::ContainerDiscovery, logging::LogManager,
storage::Storage, updater::UpdateManager,
};
#[derive(Parser, Debug)]
#[command(name = "silserv")]
#[command(about = "Automatic container update manager with service discovery")]
struct Args {
#[arg(short, long, default_value = "config.toml")]
config: String,
#[arg(short, long, default_value = "36530")]
port: u16,
#[arg(long, default_value = "info")]
log_level: String,
#[arg(long, default_value = "silserv-network")]
network_name: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// Initialize logging
let subscriber = tracing_subscriber::fmt()
.with_env_filter(&args.log_level)
.with_target(false)
.with_thread_ids(true)
.with_file(true)
.with_line_number(true)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Failed to set up logging");
info!("🚀 Starting Docker Update Manager");
// Load configuration
let config = Config::load(&args.config).unwrap_or_else(|e| {
warn!("Failed to load config file: {}, using defaults", e);
Config::default()
});
info!("📋 Configuration loaded");
info!(" - Port: {}", args.port);
info!(" - Network: {}", args.network_name);
info!(" - Update interval: {}s", config.update_interval);
info!(
" - Max concurrent updates: {}",
config.max_concurrent_updates
);
info!("Current config: {:?}", config);
// log manager
let log_manager = Arc::new(LogManager::new("./logs/".to_string(), 1000));
log_manager.init().await?;
// Initialize storage
let storage = Arc::new(Storage::new(&config.storage.path).await?);
info!("💾 Storage initialized at: {}", config.storage.path);
// Initialize update manager
let update_manager =
UpdateManager::new(config.clone(), storage.clone(), log_manager.clone()).await?;
info!("🔄 Update manager initialized");
// Initialize container discovery
let discovery = Arc::new(
ContainerDiscovery::new(
args.network_name.clone(),
config.clone(),
update_manager.clone(),
log_manager.clone(),
)
.await?,
);
info!("🔍 Container discovery initialized");
// Start container discovery
let discovery_handle = {
let discovery = discovery.clone();
tokio::spawn(async move {
if let Err(e) = discovery.start_monitoring().await {
tracing::error!("Container discovery failed: {}", e);
}
})
};
// Start update scheduler
let update_handle = {
let update_manager_mutex = update_manager.clone().lock().await.create_as_mutex();
tokio::spawn(async move {
if let Err(e) = UpdateManager::start_scheduler(update_manager_mutex).await {
tracing::error!("Update scheduler failed: {}", e);
}
})
};
// Start API server
let api_server = ApiServer::new(
args.port,
update_manager.clone(),
discovery.clone(),
storage.clone(),
log_manager.clone(),
);
info!("🌐 Starting API server on port {}", args.port);
info!("📡 Ready to discover and manage containers!");
// Run everything concurrently
tokio::select! {
result = api_server.run() => {
tracing::error!("API server stopped: {:?}", result);
}
result = discovery_handle => {
tracing::error!("Discovery stopped: {:?}", result);
}
result = update_handle => {
tracing::error!("Update scheduler stopped: {:?}", result);
}
_ = tokio::signal::ctrl_c() => {
info!("🛑 Received shutdown signal");
}
}
info!("👋 Silserv shutting down");
Ok(())
}

236
src/registry.rs Normal file
View file

@ -0,0 +1,236 @@
use anyhow::{Context, Ok, Result};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, error, info};
use crate::{config::Config, discovery::is_semantic_version};
pub struct RegistryClient {
client: Client,
config: Config,
}
#[derive(Debug, Deserialize)]
struct RegistryTagsResponse {
name: String,
tags: Vec<String>,
}
#[derive(Debug, Deserialize, Clone)]
struct RegistriedContainerFromApi {
id: i64,
#[serde(rename = "type")]
_type: String,
name: String,
version: String,
html_url: String,
created_at: String,
#[serde(flatten)]
extra_fields: HashMap<String, serde_json::Value>,
}
#[derive(Debug)]
struct Semver {
major: u32,
minor: u32,
patch: u32,
}
impl RegistriedContainerFromApi {
fn get_semver(&self) -> Result<Semver> {
if !is_semantic_version(&self.version) {
// return Err(format!("Invalid semantic version: {}", self.version));
error!("Invalid semantic version: {}", self.version);
return Err(anyhow::anyhow!(
"Invalid semantic version: {}",
self.version
));
}
let version_without_prefix = self.version.replace("v", "");
let parts: Vec<&str> = version_without_prefix.split('.').collect();
if parts.len() != 3 {
return Err(anyhow::anyhow!(
"Invalid semantic version: {}",
self.version
));
}
let major = parts[0]
.parse()
.map_err(|_| format!("Invalid major version: {}", parts[0]))
.expect("");
let minor = parts[1]
.parse()
.map_err(|_| format!("Invalid minor version: {}", parts[1]))
.expect("");
let patch = parts[2]
.parse()
.map_err(|_| format!("Invalid patch version: {}", parts[2]))
.expect("");
Ok(Semver {
major,
minor,
patch,
})
}
}
impl RegistryClient {
pub fn new(config: Config) -> Self {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(config.registry.timeout))
.build()
.expect("Failed to create HTTP client");
Self { client, config }
}
/// Fetches all container images from user then filters by image_name
async fn fetch_by_user(&self, user: String, image_name: String) -> Result<String> {
// https://pakin-inspiron-15-3530.tail110d9.ts.net/api/v1/packages/pakin?type=container
let api_url = format!(
"https://pakin-inspiron-15-3530.tail110d9.ts.net/api/v1/packages/{}/?type=container",
user
);
let response = self
.client
.get(&api_url)
.header("accept", "application/json")
.header("authorization", format!("Basic {}", self.config.bearer()))
.send()
.await
.context("Failed to connect to registry")?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Registry responded with status: {}",
response.status()
));
}
let registry_responses: serde_json::Value = response
.json()
.await
.context("Failed to parse registry response")?;
let containers = registry_responses.as_array().unwrap();
let mut image_versions = Vec::new();
for registried_container in containers {
let container = RegistriedContainerFromApi::deserialize(registried_container).unwrap();
let pure_name = image_name.split('/').last().unwrap().to_string();
if is_semantic_version(&container.version) && container.name.eq(&pure_name) {
image_versions.push(container);
}
}
if image_versions.is_empty() {
return Err(anyhow::anyhow!("No image versions found"));
}
let mut latest_version = image_versions.get(0).unwrap().clone();
for image in image_versions {
if is_semantic_version(&latest_version.version) && is_semantic_version(&image.version) {
let latest_version_parts = latest_version.get_semver().unwrap();
let image_version_parts = image.get_semver().unwrap();
if latest_version_parts.major < image_version_parts.major {
latest_version = image;
} else if latest_version_parts.major.eq(&image_version_parts.major) {
if latest_version_parts.minor < image_version_parts.minor {
latest_version = image;
} else if latest_version_parts.minor.eq(&image_version_parts.minor) {
if latest_version_parts.patch < image_version_parts.patch {
latest_version = image;
}
}
}
}
}
Ok(latest_version.version)
}
pub async fn get_latest_version(&self, registry_url: &str, image_name: &str) -> Result<String> {
let mut url = format!(
"http://{}/v2/{}/tags/list",
registry_url.clone(),
image_name
);
info!("Fetching tags from registry: {}", url);
if registry_url.starts_with("default") {
// get user
let reg_clone = registry_url.clone();
let _pre = reg_clone.split(".").collect::<Vec<&str>>();
let user = _pre.get(1).expect("invalid format of default");
self.fetch_by_user(user.to_string(), image_name.to_string())
.await
} else {
let mut request = self.client.get(&url);
// Add authentication if configured
if let Some(auth) = &self.config.registry.auth {
request = request.basic_auth(&auth.username, Some(&auth.password));
}
let response = request
.send()
.await
.context("Failed to connect to registry")?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Registry responded with status: {}",
response.status()
));
}
let registry_response: RegistryTagsResponse = response
.json()
.await
.context("Failed to parse registry response")?;
// Filter and sort version tags
let mut version_tags: Vec<&String> = registry_response
.tags
.iter()
.filter(|tag| self.is_version_tag(tag))
.collect();
version_tags.sort_by(|a, b| self.compare_versions(a, b));
let latest = version_tags
.last()
.ok_or_else(|| anyhow::anyhow!("No version tags found in registry"))?;
info!(
"Latest version for {}/{}: {}",
registry_url, image_name, latest
);
Ok((*latest).clone())
}
}
fn is_version_tag(&self, tag: &str) -> bool {
// Check if tag looks like a semantic version
regex::Regex::new(r"^v?\d+\.\d+\.\d+")
.unwrap()
.is_match(tag)
}
fn compare_versions(&self, v1: &str, v2: &str) -> std::cmp::Ordering {
crate::discovery::compare_versions(v1, v2)
}
}

79
src/storage.rs Normal file
View file

@ -0,0 +1,79 @@
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
use tracing::{debug, info};
use crate::types::ManagedContainer;
pub struct Storage {
db: sled::Db,
}
impl Storage {
pub async fn new(db_path: &str) -> Result<Self> {
// Ensure parent directory exists
if let Some(parent) = Path::new(db_path).parent() {
tokio::fs::create_dir_all(parent)
.await
.context("Failed to create storage directory")?;
}
let db = sled::open(db_path).context("Failed to open storage database")?;
info!("📁 Storage initialized at: {}", db_path);
Ok(Self { db })
}
pub async fn save_container(&self, container: &ManagedContainer) -> Result<()> {
let key = format!("container:{}", container.container_id);
let value = serde_json::to_vec(container).context("Failed to serialize container")?;
self.db
.insert(key.as_bytes(), value)
.context("Failed to save container to storage")?;
debug!("💾 Saved container: {}", container.name);
Ok(())
}
pub async fn get_container(&self, container_id: &str) -> Result<ManagedContainer> {
let key = format!("container:{}", container_id);
let value = self
.db
.get(key.as_bytes())
.context("Failed to read from storage")?
.ok_or_else(|| anyhow::anyhow!("Container not found: {}", container_id))?;
let container =
serde_json::from_slice(&value).context("Failed to deserialize container")?;
Ok(container)
}
pub async fn get_all_containers(&self) -> Result<Vec<ManagedContainer>> {
let mut containers = Vec::new();
for result in self.db.scan_prefix(b"container:") {
let (_key, value) = result.context("Failed to scan storage")?;
let container: ManagedContainer =
serde_json::from_slice(&value).context("Failed to deserialize container")?;
containers.push(container);
}
debug!("📋 Retrieved {} containers from storage", containers.len());
Ok(containers)
}
pub async fn delete_container(&self, container_id: &str) -> Result<()> {
let key = format!("container:{}", container_id);
self.db
.remove(key.as_bytes())
.context("Failed to delete container from storage")?;
debug!("🗑️ Deleted container: {}", container_id);
Ok(())
}
}

235
src/types.rs Normal file
View file

@ -0,0 +1,235 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManagedContainer {
pub id: String,
pub container_id: String,
pub name: String,
pub image_name: String,
pub current_version: String,
pub registry_url: String,
pub health_check_url: Option<String>,
pub labels: HashMap<String, String>,
pub ports: Vec<ContainerPort>,
pub volumes: Vec<ContainerVolume>,
pub environment: HashMap<String, String>,
pub network_settings: NetworkSettings,
pub status: ContainerStatus,
pub update_strategy: UpdateStrategy,
pub discovered_at: DateTime<Utc>,
pub last_seen: DateTime<Utc>,
pub last_update_check: Option<DateTime<Utc>>,
pub last_update: Option<DateTime<Utc>>,
pub update_history: Vec<UpdateRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerPort {
pub container_port: u16,
pub host_port: Option<u16>,
pub protocol: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerVolume {
pub source: String,
pub target: String,
pub read_only: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkSettings {
pub ip_address: String,
pub network_name: String,
pub aliases: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ContainerStatus {
Discovered,
Healthy,
Unhealthy,
Updating,
UpdateFailed,
Lost, // Container disappeared from network
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateStrategy {
Automatic, // Update as soon as new version is available
Manual, // Only update when manually triggered
Scheduled(String), // Cron expression for scheduled updates
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateRecord {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub from_version: String,
pub to_version: String,
pub status: UpdateStatus,
pub duration_ms: Option<u64>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Started,
PullingImage,
CreatingBackup,
StoppingContainer,
StartingContainer,
HealthChecking,
Success,
Failed,
RolledBack,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryImage {
pub registry_url: String,
pub image_name: String,
pub tags: Vec<String>,
pub latest_version: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateJob {
pub id: Uuid,
pub container_id: String,
pub target_version: String,
pub created_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub status: UpdateStatus,
pub retry_count: usize,
}
// API Types
#[derive(Debug, Serialize, Deserialize)]
pub struct ApiResponse<T> {
pub success: bool,
pub data: Option<T>,
pub error: Option<String>,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HealthResponse {
pub status: String,
pub version: String,
pub uptime_seconds: u64,
pub managed_containers: usize,
pub active_updates: usize,
pub update_queue_size: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct StatsResponse {
pub total_containers: usize,
pub healthy_containers: usize,
pub updating_containers: usize,
pub failed_containers: usize,
pub total_updates_performed: usize,
pub successful_updates: usize,
pub failed_updates: usize,
pub average_update_duration_ms: Option<f64>,
}
impl Default for StatsResponse {
fn default() -> Self {
Self {
total_containers: 0,
healthy_containers: 0,
updating_containers: 0,
failed_containers: 0,
total_updates_performed: 0,
successful_updates: 0,
failed_updates: 0,
average_update_duration_ms: None,
}
}
}
impl<T> ApiResponse<T> {
pub fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
timestamp: Utc::now(),
}
}
pub fn error(message: String) -> Self {
Self {
success: false,
data: None,
error: Some(message),
timestamp: Utc::now(),
}
}
}
impl Default for UpdateStrategy {
fn default() -> Self {
UpdateStrategy::Manual
}
}
impl ManagedContainer {
pub fn new(
container_id: String,
name: String,
image_name: String,
registry_url: String,
) -> Self {
Self {
id: Uuid::new_v4().to_string(),
container_id,
name,
image_name,
current_version: "unknown".to_string(),
registry_url,
health_check_url: None,
labels: HashMap::new(),
ports: Vec::new(),
volumes: Vec::new(),
environment: HashMap::new(),
network_settings: NetworkSettings {
ip_address: String::new(),
network_name: String::new(),
aliases: Vec::new(),
},
status: ContainerStatus::Discovered,
update_strategy: UpdateStrategy::default(),
discovered_at: Utc::now(),
last_seen: Utc::now(),
last_update_check: None,
last_update: None,
update_history: Vec::new(),
}
}
pub fn is_update_enabled(&self) -> bool {
self.labels
.get("update-manager.enable")
.map(|v| v == "true")
.unwrap_or(false)
}
pub fn should_auto_update(&self) -> bool {
matches!(self.update_strategy, UpdateStrategy::Automatic)
}
pub fn add_update_record(&mut self, record: UpdateRecord) {
self.update_history.push(record);
// Keep only last 10 records
if self.update_history.len() > 10 {
self.update_history.remove(0);
}
}
}

1067
src/updater.rs Normal file

File diff suppressed because it is too large Load diff