add nproc, update rollback, s3 (WIP)
This commit is contained in:
parent
6d215292bd
commit
73d5cdd171
8 changed files with 1962 additions and 59 deletions
1280
Cargo.lock
generated
1280
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "silserv"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
edition = "2024"
|
||||
authors = ["pakin"]
|
||||
description = "Container manager service for testing"
|
||||
|
|
@ -8,7 +8,10 @@ description = "Container manager service for testing"
|
|||
[dependencies]
|
||||
anyhow = "1.0.98"
|
||||
async-trait = "0.1.88"
|
||||
aws-config = { version = "1.8.5", features = ["behavior-version-latest"] }
|
||||
aws-sdk-s3 = "1.103.0"
|
||||
axum = "0.8.4"
|
||||
base64 = "0.22.1"
|
||||
bollard = "0.19.2"
|
||||
chrono = { version = "0.4.41", features = ["serde"] }
|
||||
clap = { version = "4.5.43", features = ["derive"] }
|
||||
|
|
@ -20,6 +23,7 @@ regex = "1.11.1"
|
|||
reqwest = { version = "0.12.22", features = ["json"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.142"
|
||||
sha2 = "0.10.9"
|
||||
sled = "0.34.7"
|
||||
thiserror = "2.0.12"
|
||||
tokio = { version = "1.47.1", features = ["full"] }
|
||||
|
|
|
|||
112
src/api.rs
112
src/api.rs
|
|
@ -1,12 +1,14 @@
|
|||
use aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output;
|
||||
use axum::{
|
||||
Router,
|
||||
body::Body,
|
||||
extract::{Path, Query, State},
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Json, Response},
|
||||
routing::{get, post},
|
||||
routing::{delete, get, post, put},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::Mutex;
|
||||
use tower::ServiceBuilder;
|
||||
use tower_http::{
|
||||
|
|
@ -30,6 +32,8 @@ pub struct ApiServer {
|
|||
discovery: Arc<ContainerDiscovery>,
|
||||
storage: Arc<Storage>,
|
||||
log_manager: Arc<LogManager>,
|
||||
s3_client: aws_sdk_s3::Client,
|
||||
garage_endpoint: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
@ -38,6 +42,10 @@ pub struct AppState {
|
|||
pub discovery: Arc<ContainerDiscovery>,
|
||||
pub storage: Arc<Storage>,
|
||||
pub log_manager: Arc<LogManager>,
|
||||
|
||||
// garage
|
||||
pub s3_client: aws_sdk_s3::Client,
|
||||
pub garage_endpoint: String,
|
||||
}
|
||||
|
||||
impl ApiServer {
|
||||
|
|
@ -47,6 +55,8 @@ impl ApiServer {
|
|||
discovery: Arc<ContainerDiscovery>,
|
||||
storage: Arc<Storage>,
|
||||
log_manager: Arc<LogManager>,
|
||||
s3_client: aws_sdk_s3::Client,
|
||||
garage_endpoint: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
port,
|
||||
|
|
@ -54,6 +64,8 @@ impl ApiServer {
|
|||
discovery,
|
||||
storage,
|
||||
log_manager,
|
||||
s3_client,
|
||||
garage_endpoint,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -63,6 +75,8 @@ impl ApiServer {
|
|||
discovery: self.discovery,
|
||||
storage: self.storage,
|
||||
log_manager: self.log_manager.clone(),
|
||||
s3_client: self.s3_client,
|
||||
garage_endpoint: self.garage_endpoint,
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
|
|
@ -74,6 +88,7 @@ impl ApiServer {
|
|||
.route("/api/containers/{id}", get(get_container))
|
||||
.route("/api/containers/{id}/update", post(trigger_update))
|
||||
.route("/api/containers/{id}/force-update", post(force_update))
|
||||
.route("/api/containers/{id}/rollback", post(force_rollback))
|
||||
// Discovery endpoints
|
||||
.route("/api/discovery/scan", post(force_discovery))
|
||||
.route("/api/discovery/containers", get(get_discovered_containers))
|
||||
|
|
@ -84,6 +99,14 @@ impl ApiServer {
|
|||
.route("/api/logs/dates", get(get_log_dates))
|
||||
// Bulk operations
|
||||
.route("/api/bulk/update-check", post(bulk_update_check))
|
||||
// TODO: communicate with garage s3
|
||||
// .route("/api/s3/{bucket}", get(handler))
|
||||
// .route("/api/s3/{bucket}", put(handler))
|
||||
// .route("/api/s3/{bucket}", delete(handler))
|
||||
// .route("/api/s3/{bucket}/{key}", get(handler))
|
||||
// .route("/api/s3/{bucket}/{key}", put(handler))
|
||||
// .route("/api/s3/{bucket}/{key}", delete(handler))
|
||||
// TODO: Online installations
|
||||
.with_state(state)
|
||||
.layer(
|
||||
ServiceBuilder::new()
|
||||
|
|
@ -214,8 +237,8 @@ async fn system_status(State(state): State<AppState>) -> Json<ApiResponse<System
|
|||
};
|
||||
|
||||
let status = SystemStatusResponse {
|
||||
service: "Docker Update Manager".to_string(),
|
||||
version: "1.0.0".to_string(),
|
||||
service: "Silserv".to_string(),
|
||||
version: "0.1.1".to_string(),
|
||||
managed_containers: containers.len(),
|
||||
discovered_containers: discovered.len(),
|
||||
active_updates,
|
||||
|
|
@ -335,6 +358,33 @@ async fn force_discovery(
|
|||
Ok(Json(ApiResponse::success(response)))
|
||||
}
|
||||
|
||||
async fn force_rollback(
|
||||
Path(id): Path<String>,
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<ApiResponse<RollbackResponse>>, ApiError> {
|
||||
// find images with format of <name>-backup-<timestamp>
|
||||
{
|
||||
let mgr = state.update_manager.lock().await;
|
||||
let container = mgr.get_container(&id).await;
|
||||
|
||||
match container {
|
||||
Ok(mut c) => {
|
||||
mgr.attempt_rollback(&mut c).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to get container: {}", err);
|
||||
return Ok(Json(ApiResponse::error(err.to_string())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let success = RollbackResponse {
|
||||
message: "Rollback completed successfully".to_string(),
|
||||
};
|
||||
|
||||
Ok(Json(ApiResponse::success(success)))
|
||||
}
|
||||
|
||||
async fn get_discovered_containers(
|
||||
State(state): State<AppState>,
|
||||
) -> Result<Json<ApiResponse<Vec<String>>>, ApiError> {
|
||||
|
|
@ -459,6 +509,55 @@ async fn bulk_update_check(
|
|||
Ok(Json(ApiResponse::success(response)))
|
||||
}
|
||||
|
||||
// get object list
|
||||
async fn list_objects(
|
||||
State(state): State<AppState>,
|
||||
Path(bucket): Path<String>,
|
||||
Query(params): Query<std::collections::HashMap<String, String>>,
|
||||
) -> Result<Json<ApiResponse<serde_json::Value>>, ApiError> {
|
||||
let mut list_request = state.s3_client.list_objects_v2().bucket(&bucket);
|
||||
|
||||
if let Some(prefix) = params.get("prefix") {
|
||||
list_request = list_request.prefix(prefix);
|
||||
}
|
||||
|
||||
let response: ListObjectsV2Output = list_request.send().await?;
|
||||
|
||||
if response.contents.is_some() {
|
||||
let mapped = response
|
||||
.contents()
|
||||
.iter()
|
||||
.map(|obj| {
|
||||
let mut hashmap = HashMap::new();
|
||||
hashmap.insert("Key".to_string(), obj.key().unwrap().to_string());
|
||||
hashmap.insert("Size".to_string(), obj.size().unwrap().to_string());
|
||||
hashmap.insert(
|
||||
"LastModified".to_string(),
|
||||
obj.last_modified().unwrap().to_string(),
|
||||
);
|
||||
hashmap.insert("ETag".to_string(), obj.e_tag().unwrap().to_string());
|
||||
hashmap.insert(
|
||||
"StorageClass".to_string(),
|
||||
obj.storage_class().unwrap().to_string(),
|
||||
);
|
||||
hashmap.insert(
|
||||
"Owner".to_string(),
|
||||
obj.owner().unwrap().id().unwrap().to_string(),
|
||||
);
|
||||
hashmap
|
||||
})
|
||||
.collect::<Vec<HashMap<String, String>>>();
|
||||
|
||||
// let mut response = Response::builder()
|
||||
// .status(StatusCode::OK)
|
||||
// .body(Body::new(mapped))
|
||||
// .unwrap();
|
||||
Ok(Json(ApiResponse::success(serde_json::json!(mapped))))
|
||||
} else {
|
||||
Ok(Json(ApiResponse::success(serde_json::json!({}))))
|
||||
}
|
||||
}
|
||||
|
||||
// Request/Response types
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
|
@ -546,6 +645,11 @@ struct BulkUpdateResult {
|
|||
error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RollbackResponse {
|
||||
message: String,
|
||||
}
|
||||
|
||||
// Error handling
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ pub struct Config {
|
|||
pub registry: RegistryConfig,
|
||||
pub storage: StorageConfig,
|
||||
bearer: String,
|
||||
s3: S3Config,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
|
@ -47,6 +48,14 @@ pub struct StorageConfig {
|
|||
pub path: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct S3Config {
|
||||
pub access_key_id: String,
|
||||
pub secret_access_key: String,
|
||||
pub region: String,
|
||||
pub endpoint: String,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
|
|
@ -74,6 +83,12 @@ impl Default for Config {
|
|||
path: "./data/silserv.db".to_string(),
|
||||
},
|
||||
bearer: "".to_string(),
|
||||
s3: S3Config {
|
||||
access_key_id: "".to_string(),
|
||||
secret_access_key: "".to_string(),
|
||||
region: "".to_string(),
|
||||
endpoint: "".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -103,4 +118,12 @@ impl Config {
|
|||
pub fn bearer(&self) -> &str {
|
||||
&self.bearer
|
||||
}
|
||||
|
||||
pub fn get_s3_access_key_id(&self) -> &str {
|
||||
&self.s3.access_key_id
|
||||
}
|
||||
|
||||
pub fn get_s3_secret_access_key(&self) -> &str {
|
||||
&self.s3.secret_access_key
|
||||
}
|
||||
}
|
||||
|
|
|
|||
89
src/main.rs
89
src/main.rs
|
|
@ -1,7 +1,7 @@
|
|||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use std::sync::Arc;
|
||||
use tracing::{info, warn};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
mod api;
|
||||
mod config;
|
||||
|
|
@ -12,10 +12,17 @@ mod registry;
|
|||
mod storage;
|
||||
mod types;
|
||||
mod updater;
|
||||
// mod zmq;
|
||||
mod nproc;
|
||||
|
||||
use crate::{
|
||||
api::ApiServer, config::Config, discovery::ContainerDiscovery, logging::LogManager,
|
||||
storage::Storage, updater::UpdateManager,
|
||||
api::ApiServer,
|
||||
config::Config,
|
||||
discovery::ContainerDiscovery,
|
||||
logging::LogManager,
|
||||
storage::Storage,
|
||||
updater::UpdateManager,
|
||||
// zmq::{SocketConfig, ZmqBackend},
|
||||
};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
|
|
@ -114,6 +121,75 @@ async fn main() -> Result<()> {
|
|||
})
|
||||
};
|
||||
|
||||
// s3 garage
|
||||
let config = aws_sdk_s3::Config::builder()
|
||||
.endpoint_url("http://localhost:3900")
|
||||
.region(aws_sdk_s3::config::Region::new("garage"))
|
||||
.credentials_provider(
|
||||
aws_sdk_s3::config::Credentials::builder()
|
||||
.provider_name("garage")
|
||||
.access_key_id(config.clone().get_s3_access_key_id())
|
||||
.secret_access_key(config.clone().get_s3_secret_access_key())
|
||||
.build(),
|
||||
)
|
||||
.behavior_version_latest()
|
||||
.build();
|
||||
let s3client = aws_sdk_s3::Client::from_conf(config);
|
||||
|
||||
// ZMQ
|
||||
// let download_dir = PathBuf::from("./downloads");
|
||||
// let mut zmq_manager = Arc::new(ZmqBackend::new(
|
||||
// 36540,
|
||||
// Some("127.0.0.1".to_string()),
|
||||
// Some(download_dir),
|
||||
// ));
|
||||
|
||||
// // publisher
|
||||
// let publisher_endpoint = zmq_manager
|
||||
// .add_socket(
|
||||
// "publisher".to_string(),
|
||||
// SocketConfig::publish(None, vec![]).with_bind(true),
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// info!("zmq.publisher: {}", publisher_endpoint);
|
||||
|
||||
// let subscriber_endpoint = zmq_manager
|
||||
// .add_socket(
|
||||
// "subscriber".to_string(),
|
||||
// SocketConfig::subscribe(None, vec!["files".to_string(), "news".to_string()]),
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// info!("zmq.subscriber: {}", subscriber_endpoint);
|
||||
|
||||
// zmq_manager
|
||||
// .add_socket(
|
||||
// "file_server".to_string(),
|
||||
// SocketConfig::pull(None).with_bind(true),
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// zmq_manager
|
||||
// .add_socket("file_client".to_string(), SocketConfig::push(None))
|
||||
// .await?;
|
||||
|
||||
// let _ = zmq_manager.start_receivers().await;
|
||||
|
||||
// nproc
|
||||
// let nproc_listener = tokio::net::TcpListener::bind(&"0.0.0.0:36540").await?;
|
||||
// let nproc_state = nproc::ServerState::new();
|
||||
let reply_key = b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
|
||||
|
||||
let pubsub_nproc = {
|
||||
let addr = "0.0.0.0:36540".to_string();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = nproc::run_server(&addr, *reply_key).await {
|
||||
error!("pubsub server exited: {e}");
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// Start API server
|
||||
let api_server = ApiServer::new(
|
||||
args.port,
|
||||
|
|
@ -121,6 +197,8 @@ async fn main() -> Result<()> {
|
|||
discovery.clone(),
|
||||
storage.clone(),
|
||||
log_manager.clone(),
|
||||
s3client.clone(),
|
||||
"http://localhost:3900".to_string(),
|
||||
);
|
||||
|
||||
info!("🌐 Starting API server on port {}", args.port);
|
||||
|
|
@ -137,6 +215,9 @@ async fn main() -> Result<()> {
|
|||
result = update_handle => {
|
||||
tracing::error!("Update scheduler stopped: {:?}", result);
|
||||
}
|
||||
result = pubsub_nproc => {
|
||||
tracing::error!("Nproc stopped: {:?}", result);
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("🛑 Received shutdown signal");
|
||||
}
|
||||
|
|
|
|||
290
src/nproc.rs
Normal file
290
src/nproc.rs
Normal file
|
|
@ -0,0 +1,290 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
net::SocketAddr,
|
||||
sync::{Arc, atomic::AtomicUsize},
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpStream,
|
||||
sync::{Mutex, RwLock, mpsc},
|
||||
};
|
||||
|
||||
pub const API_KEY_LEN: usize = 32;
|
||||
const MAX_FRAME: usize = 1 << 22;
|
||||
const MAX_TOPIC: usize = 256;
|
||||
const N_SHARDS: usize = 32;
|
||||
const WRITER_QUEUE_CAP: usize = 256;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectionHandle {
|
||||
tx: tokio::sync::mpsc::Sender<Arc<Vec<u8>>>,
|
||||
}
|
||||
|
||||
type TopicSubs = HashMap<String, HashSet<usize>>;
|
||||
|
||||
pub struct ServerState {
|
||||
shards: Vec<Mutex<TopicSubs>>,
|
||||
conns: RwLock<HashMap<usize, ConnectionHandle>>,
|
||||
next_id: AtomicUsize,
|
||||
}
|
||||
|
||||
impl ServerState {
|
||||
pub fn new() -> Self {
|
||||
let mut shards = Vec::with_capacity(N_SHARDS);
|
||||
for _ in 0..N_SHARDS {
|
||||
shards.push(Mutex::new(HashMap::new()));
|
||||
}
|
||||
Self {
|
||||
shards,
|
||||
conns: RwLock::new(HashMap::new()),
|
||||
next_id: AtomicUsize::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shard_idx(topic: &str) -> usize {
|
||||
let mut h: u64 = 1469598103934665603;
|
||||
for b in topic.as_bytes() {
|
||||
h ^= *b as u64;
|
||||
h = h.wrapping_mul(1099511628211);
|
||||
}
|
||||
(h as usize) % N_SHARDS
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn be_u32(b: &[u8]) -> u32 {
|
||||
u32::from_be_bytes([b[0], b[1], b[2], b[3]])
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn put_be_u32(buf: &mut Vec<u8>, v: u32) {
|
||||
buf.extend_from_slice(&v.to_be_bytes());
|
||||
}
|
||||
|
||||
fn build_frame_arc(t: u8, topic: &[u8], body: &[u8], key: &[u8; API_KEY_LEN]) -> Arc<Vec<u8>> {
|
||||
let payload = 1 + 2 + 4 + topic.len() + body.len();
|
||||
let total = payload + API_KEY_LEN;
|
||||
let mut buf = Vec::with_capacity(4 + total);
|
||||
put_be_u32(&mut buf, total as u32);
|
||||
buf.push(t);
|
||||
buf.extend_from_slice(&(topic.len() as u16).to_be_bytes());
|
||||
buf.extend_from_slice(&(body.len() as u32).to_be_bytes());
|
||||
buf.extend_from_slice(topic);
|
||||
buf.extend_from_slice(body);
|
||||
buf.extend_from_slice(key);
|
||||
Arc::new(buf)
|
||||
}
|
||||
|
||||
fn parse_one(buf: &[u8]) -> Option<(u8, Vec<u8>, Vec<u8>, [u8; API_KEY_LEN], usize)> {
|
||||
if buf.len() < 4 {
|
||||
return None;
|
||||
}
|
||||
let total = be_u32(&buf[0..4]) as usize;
|
||||
if buf.len() < 4 + total {
|
||||
return None;
|
||||
}
|
||||
let err_value = Some((0, vec![], vec![], [0; API_KEY_LEN], 4 + total));
|
||||
let pl = &buf[4..4 + total];
|
||||
if total < API_KEY_LEN + 1 + 2 + 4 {
|
||||
return err_value;
|
||||
}
|
||||
let key_off = pl.len() - API_KEY_LEN;
|
||||
let (hdr, keyb) = pl.split_at(key_off);
|
||||
let mut key = [0u8; API_KEY_LEN];
|
||||
key.copy_from_slice(keyb);
|
||||
let t = hdr[0];
|
||||
let tl = usize::from(u16::from_be_bytes([hdr[1], hdr[2]]));
|
||||
let bl = usize::from(u32::from_be_bytes([hdr[3], hdr[4], hdr[5], hdr[6]]) as usize);
|
||||
if tl > MAX_TOPIC {
|
||||
return err_value;
|
||||
}
|
||||
let need = 1 + 2 + 4 + tl + bl;
|
||||
if hdr.len() != need {
|
||||
return err_value;
|
||||
}
|
||||
let mut off = 7;
|
||||
let topic = hdr[off..off + tl].to_vec();
|
||||
off += tl;
|
||||
let body = hdr[off..off + bl].to_vec();
|
||||
Some((t, topic, body, key, 4 + total))
|
||||
}
|
||||
|
||||
pub async fn handle_connection(
|
||||
st: Arc<ServerState>,
|
||||
sock: TcpStream,
|
||||
_peer: SocketAddr,
|
||||
reply_key: [u8; API_KEY_LEN],
|
||||
) -> std::io::Result<()> {
|
||||
let (mut rd, mut wr) = sock.into_split();
|
||||
let (tx, mut rx) = mpsc::channel::<Arc<Vec<u8>>>(WRITER_QUEUE_CAP);
|
||||
let writer = tokio::spawn(async move {
|
||||
while let Some(b) = rx.recv().await {
|
||||
if wr.write_all(&b).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
let id = st
|
||||
.next_id
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
{
|
||||
st.conns
|
||||
.write()
|
||||
.await
|
||||
.insert(id, ConnectionHandle { tx: tx.clone() });
|
||||
}
|
||||
|
||||
let my = Arc::new(Mutex::new(HashSet::<String>::new()));
|
||||
let mut acc = Vec::<u8>::with_capacity(8192);
|
||||
let mut tmp = vec![0u8; 8192];
|
||||
|
||||
'read: loop {
|
||||
while let Some((t, topic, body, _key, end)) = parse_one(&acc) {
|
||||
acc.drain(..end);
|
||||
match t {
|
||||
1 => {
|
||||
let ts = String::from_utf8_lossy(&topic).to_string();
|
||||
let idx = ServerState::shard_idx(&ts);
|
||||
{
|
||||
st.shards[idx]
|
||||
.lock()
|
||||
.await
|
||||
.entry(ts.clone())
|
||||
.or_default()
|
||||
.insert(id);
|
||||
}
|
||||
{
|
||||
my.lock().await.insert(ts);
|
||||
}
|
||||
let _ = tx.send(build_frame_arc(
|
||||
1,
|
||||
b"",
|
||||
br#"{"subscribed": true}"#,
|
||||
&reply_key,
|
||||
));
|
||||
}
|
||||
2 => {
|
||||
let ts = String::from_utf8_lossy(&topic).to_string();
|
||||
let idx = ServerState::shard_idx(&ts);
|
||||
{
|
||||
if let Some(s) = st.shards[idx].lock().await.get_mut(&ts) {
|
||||
s.remove(&id);
|
||||
}
|
||||
}
|
||||
{
|
||||
my.lock().await.remove(&ts);
|
||||
}
|
||||
let _ = tx.try_send(build_frame_arc(
|
||||
2,
|
||||
b"",
|
||||
br#"{"unsubscribed":true}"#,
|
||||
&reply_key,
|
||||
));
|
||||
}
|
||||
3 => {
|
||||
// PUBLISH
|
||||
let topic_str = String::from_utf8_lossy(&topic).to_string();
|
||||
let idx = ServerState::shard_idx(&topic_str);
|
||||
|
||||
// Build the fanout frame **once** (zero/low-copy)
|
||||
let fanout = build_frame_arc(3, &topic, &body, &reply_key);
|
||||
|
||||
// Snapshot targets under shard lock
|
||||
let targets: Vec<mpsc::Sender<Arc<Vec<u8>>>> = {
|
||||
let shard = st.shards[idx].lock().await;
|
||||
let conns = st.conns.read().await;
|
||||
shard
|
||||
.get(&topic_str)
|
||||
.map(|ids| {
|
||||
ids.iter()
|
||||
.filter_map(|id| conns.get(id).map(|h| h.tx.clone()))
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
for t in targets {
|
||||
// If a subscriber is slow and its queue is full, drop it by closing channel
|
||||
if t.try_send(fanout.clone()).is_err() {
|
||||
// let the writer task break naturally; optional: proactively close
|
||||
// (we skip here for simplicity)
|
||||
}
|
||||
}
|
||||
|
||||
// ACK publisher
|
||||
let ack = build_frame_arc(
|
||||
3,
|
||||
topic_str.as_bytes(),
|
||||
br#"{"published":true}"#,
|
||||
&reply_key,
|
||||
);
|
||||
|
||||
// handle internal topic; log, ...
|
||||
|
||||
let _ = tx.try_send(ack);
|
||||
}
|
||||
4 => {
|
||||
// PING -> PONG
|
||||
let _ = tx.send(build_frame_arc(4, b"", b"", &reply_key));
|
||||
}
|
||||
_ => {
|
||||
let _ = tx.send(build_frame_arc(
|
||||
0,
|
||||
b"",
|
||||
br#"{"error": "bad type"}"#,
|
||||
&reply_key,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
let n = rd.read(&mut tmp).await?;
|
||||
if n == 0 {
|
||||
break 'read;
|
||||
}
|
||||
acc.extend_from_slice(&tmp[..n]);
|
||||
if acc.len() > MAX_FRAME * 2 {
|
||||
break 'read;
|
||||
}
|
||||
}
|
||||
|
||||
// clean
|
||||
{
|
||||
let mut conns = st.conns.write().await;
|
||||
conns.remove(&id);
|
||||
}
|
||||
|
||||
let topics: Vec<String> = {
|
||||
let guard = my.lock().await;
|
||||
guard.iter().cloned().collect()
|
||||
};
|
||||
|
||||
for t in topics {
|
||||
let idx = ServerState::shard_idx(&t);
|
||||
let mut shard = st.shards[idx].lock().await;
|
||||
if let Some(set) = shard.get_mut(&t) {
|
||||
set.remove(&id);
|
||||
}
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
let _ = writer.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run_server(addr: &str, reply_key: [u8; API_KEY_LEN]) -> std::io::Result<()> {
|
||||
let nproc_listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
let nproc_state = Arc::new(ServerState::new());
|
||||
loop {
|
||||
let (socket, peer) = nproc_listener.accept().await?;
|
||||
|
||||
let state_cloned = nproc_state.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_connection(state_cloned, socket, peer, reply_key).await {
|
||||
eprintln!("conn error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -232,4 +232,12 @@ impl ManagedContainer {
|
|||
self.update_history.remove(0);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_semver(&self) -> Vec<i64> {
|
||||
self.current_version
|
||||
.replace("v", "")
|
||||
.split(".")
|
||||
.map(|v| v.parse::<i64>().unwrap())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
213
src/updater.rs
213
src/updater.rs
|
|
@ -5,13 +5,15 @@ use bollard::{
|
|||
CreateContainerOptions, RemoveContainerOptions, StartContainerOptions, StopContainerOptions,
|
||||
},
|
||||
image::{CommitContainerOptions, CreateImageOptions},
|
||||
query_parameters::ListImagesOptionsBuilder,
|
||||
secret::ImageSummary,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use std::ops::Deref;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
marker::PhantomData,
|
||||
};
|
||||
use std::{ops::Deref, string::ParseError};
|
||||
use std::{ops::DerefMut, sync::Arc};
|
||||
use tokio::{
|
||||
sync::Mutex,
|
||||
|
|
@ -30,6 +32,84 @@ use crate::{
|
|||
types::{ContainerStatus, ManagedContainer, UpdateJob, UpdateRecord, UpdateStatus},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct Semver {
|
||||
major: i64,
|
||||
minor: i64,
|
||||
patch: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum SemverError {
|
||||
InvalidFormat,
|
||||
InvalidNumber,
|
||||
}
|
||||
|
||||
impl Semver {
|
||||
pub fn parse(version: &str) -> Result<Self, SemverError> {
|
||||
let parts: Vec<&str> = version.split('.').collect();
|
||||
if parts.len() != 3 {
|
||||
return Err(SemverError::InvalidFormat);
|
||||
}
|
||||
let major = parts[0].parse().map_err(|_| SemverError::InvalidNumber)?;
|
||||
let minor = parts[1].parse().map_err(|_| SemverError::InvalidNumber)?;
|
||||
let patch = parts[2].parse().map_err(|_| SemverError::InvalidNumber)?;
|
||||
Ok(Semver {
|
||||
major,
|
||||
minor,
|
||||
patch,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_string(&self) -> String {
|
||||
format!("{}.{}.{}", self.major, self.minor, self.patch)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for Semver {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(&other))
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Semver {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
use std::cmp::Ordering;
|
||||
|
||||
match self.major.cmp(&other.major) {
|
||||
Ordering::Equal => {}
|
||||
other => return other,
|
||||
}
|
||||
|
||||
match self.minor.cmp(&other.minor) {
|
||||
Ordering::Equal => {}
|
||||
other => return other,
|
||||
}
|
||||
|
||||
self.patch.cmp(&other.patch)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Semver {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}.{}.{}", self.major, self.minor, self.patch)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn find_previous_latest_version(versions: &[Semver], target: &Semver) -> Option<Semver> {
|
||||
versions
|
||||
.iter()
|
||||
.filter(|&v| v < target)
|
||||
.map(|s| s.clone())
|
||||
.max()
|
||||
}
|
||||
|
||||
fn parse_versions(versions: &[String]) -> Result<Vec<Semver>, SemverError> {
|
||||
versions.iter().map(|s| Semver::parse(s.as_str())).collect()
|
||||
}
|
||||
|
||||
pub struct UpdateManager {
|
||||
docker: Docker,
|
||||
config: Config,
|
||||
|
|
@ -407,7 +487,7 @@ impl UpdateManager {
|
|||
}
|
||||
|
||||
async fn perform_update(mself: Arc<Mutex<UpdateManager>>, mut job: UpdateJob) -> Result<()> {
|
||||
let container = {
|
||||
let mut container = {
|
||||
let mut mgr = mself.lock().await;
|
||||
mgr.storage.get_container(&job.container_id).await?
|
||||
};
|
||||
|
|
@ -452,7 +532,7 @@ impl UpdateManager {
|
|||
// Perform the update steps
|
||||
let update_result = Self::execute_update_steps(
|
||||
mself.clone(),
|
||||
&container,
|
||||
&mut container,
|
||||
&job.target_version,
|
||||
&mut update_record,
|
||||
)
|
||||
|
|
@ -520,7 +600,7 @@ impl UpdateManager {
|
|||
|
||||
{
|
||||
let mgr = mself.lock().await;
|
||||
if let Err(rollback_error) = mgr.attempt_rollback(&container).await {
|
||||
if let Err(rollback_error) = mgr.attempt_rollback(&mut container).await {
|
||||
error!("Rollback also failed: {}", rollback_error);
|
||||
{
|
||||
let mgr = mself.lock().await;
|
||||
|
|
@ -553,7 +633,7 @@ impl UpdateManager {
|
|||
|
||||
async fn execute_update_steps(
|
||||
mself: Arc<Mutex<UpdateManager>>,
|
||||
container: &ManagedContainer,
|
||||
container: &mut ManagedContainer,
|
||||
target_version: &str,
|
||||
update_record: &mut UpdateRecord,
|
||||
) -> Result<()> {
|
||||
|
|
@ -902,7 +982,7 @@ impl UpdateManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn verify_update_health(&self, container: &ManagedContainer) -> Result<()> {
|
||||
async fn verify_update_health(&self, container: &mut ManagedContainer) -> Result<()> {
|
||||
if let Some(health_url) = &container.health_check_url {
|
||||
info!("🏥 Performing health check: {}", health_url);
|
||||
|
||||
|
|
@ -946,6 +1026,22 @@ impl UpdateManager {
|
|||
}
|
||||
}
|
||||
|
||||
// do rollback
|
||||
let rollback_result = self.attempt_rollback(container).await;
|
||||
|
||||
match rollback_result {
|
||||
Ok(_) => {
|
||||
info!("Rollback successful");
|
||||
}
|
||||
Err(e) => {
|
||||
log_warn!(
|
||||
self.log_manager,
|
||||
"updater",
|
||||
format!("⚠️ Rollback failed: {}", e)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Err(anyhow::anyhow!(
|
||||
"Health check failed after {} attempts",
|
||||
self.config.max_retries
|
||||
|
|
@ -956,11 +1052,112 @@ impl UpdateManager {
|
|||
}
|
||||
}
|
||||
|
||||
async fn attempt_rollback(&self, container: &ManagedContainer) -> Result<()> {
|
||||
warn!("🔄 Attempting rollback for container: {}", container.name);
|
||||
/// List all images on the Docker host.
|
||||
async fn list_images(&self) -> Result<Vec<ImageSummary>> {
|
||||
let list_image_options = ListImagesOptionsBuilder::new().all(true).build();
|
||||
|
||||
let result = self.docker.list_images(Some(list_image_options)).await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn attempt_rollback(&self, container: &mut ManagedContainer) -> Result<()> {
|
||||
warn!("🔄 Attempting rollback for container: {}", container.name);
|
||||
warn!("Cleaning up ... ");
|
||||
|
||||
let images = self.list_images().await?;
|
||||
let expected_prefix = container.image_name.clone();
|
||||
|
||||
let mut filtered_expected_backup: Vec<ImageSummary> = images
|
||||
.iter()
|
||||
.filter(|img| {
|
||||
!img.repo_tags.is_empty()
|
||||
&& img
|
||||
.repo_tags
|
||||
.iter()
|
||||
.filter(|tag| tag.contains(&expected_prefix.clone()))
|
||||
.count()
|
||||
> 0
|
||||
})
|
||||
.map(|img| img.clone())
|
||||
.collect();
|
||||
|
||||
filtered_expected_backup.sort_by_key(|k| k.created);
|
||||
|
||||
let version_lists = filtered_expected_backup
|
||||
.iter()
|
||||
.map(|x| {
|
||||
x.repo_tags
|
||||
.first()
|
||||
.unwrap()
|
||||
.split(":")
|
||||
.last()
|
||||
.unwrap()
|
||||
.replace("v", "")
|
||||
})
|
||||
.collect::<Vec<String>>();
|
||||
|
||||
let versions = parse_versions(&version_lists);
|
||||
let current_version = Semver::parse(&container.container_id).unwrap();
|
||||
|
||||
let previous_latest_version =
|
||||
find_previous_latest_version(&versions.unwrap(), ¤t_version);
|
||||
|
||||
if let Some(prev) = previous_latest_version {
|
||||
// find image name
|
||||
let prev_image = filtered_expected_backup
|
||||
.iter()
|
||||
.filter(|fe| fe.repo_tags.first().unwrap().ends_with(&prev.to_string()))
|
||||
.map(|fe| fe.clone())
|
||||
.collect::<Vec<ImageSummary>>();
|
||||
|
||||
if prev_image.len() == 1 {
|
||||
let img_name = prev_image
|
||||
.first()
|
||||
.unwrap()
|
||||
.repo_tags
|
||||
.first()
|
||||
.unwrap()
|
||||
.clone();
|
||||
|
||||
let _ = self.storage.delete_container(&container.container_id);
|
||||
|
||||
container.image_name = img_name;
|
||||
// container.current_version = prev.to_string();
|
||||
|
||||
self.start_updated_container(container, &prev.to_string())
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
let _ = self.storage.delete_container(&container.container_id);
|
||||
}
|
||||
|
||||
// find latest previous version available
|
||||
|
||||
// // get latetst backup by timestamp
|
||||
// // let current_timestamp = Utc::now().timestamp();
|
||||
// let mut diff_timestamp = filtered_expected_backup.get(0).unwrap().created;
|
||||
// let mut latest_backup = None;
|
||||
// for backup_image in filtered_expected_backup.iter() {
|
||||
// let backup_timestamp = backup_image.created;
|
||||
// if backup_timestamp > diff_timestamp {
|
||||
// diff_timestamp = backup_timestamp;
|
||||
// latest_backup = Some(backup_image.clone());
|
||||
// }
|
||||
// }
|
||||
|
||||
// if let Some(latest_backup) = latest_backup {
|
||||
// //
|
||||
// let _ = self.storage.delete_container(&container.container_id);
|
||||
|
||||
// // Perform actions with the latest backup
|
||||
// container.image_name = latest_backup.repo_digests.get(0).unwrap().clone();
|
||||
// container.registry_url = "default.backup".to_string();
|
||||
|
||||
// info!("Starting backup container ...");
|
||||
|
||||
// self.start_updated_container(container, "latest").await?;
|
||||
// }
|
||||
|
||||
// let _ = self.storage.delete_container(&container.container_id);
|
||||
|
||||
// Implementation for rollback would go here
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue