v1: initialize
- git repo fetcher service, compatible with registry tbm Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
commit
7f8b19353a
9 changed files with 3823 additions and 0 deletions
7
.gitignore
vendored
Normal file
7
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
/target
|
||||
.tbcfg
|
||||
*.txt
|
||||
*.log
|
||||
|
||||
#
|
||||
tbm-proto
|
||||
3402
Cargo.lock
generated
Normal file
3402
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
22
Cargo.toml
Normal file
22
Cargo.toml
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
[package]
|
||||
name = "tbm-git-repo-service"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
axum = "0.8.7"
|
||||
env_logger = "0.11.8"
|
||||
git2 = { version = "0.20.3", features = ["https", "ssh"] }
|
||||
libgit2-sys = { version = "0.18.3", features = ["ssh"] }
|
||||
libtbr = { git = "https://gitlab.forthrd.io/Pakin/libtbr.git", version = "0.1.1" }
|
||||
log = "0.4.29"
|
||||
prost = "0.14.1"
|
||||
reqwest = { version = "0.12.25", features = ["json"] }
|
||||
serde = { version = "1.0.228", features = ["derive"] }
|
||||
serde_json = { version = "1.0.145", features = ["preserve_order"] }
|
||||
tokio = { version = "1.48.0", features = ["full"] }
|
||||
tonic = { version = "0.14.2", features = ["transport"] }
|
||||
tonic-prost = "0.14.2"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-prost-build = "0.14.2"
|
||||
5
build.rs
Normal file
5
build.rs
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tonic_prost_build::configure().compile_protos(&["tbm-proto/registry.proto"], &["tbm-proto"])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
189
src/app.rs
Normal file
189
src/app.rs
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use axum::{extract::{Query, State}, response::IntoResponse, routing::get, Json, Router};
|
||||
use git2::{Repository, RemoteCallbacks, FetchOptions, Cred};
|
||||
use log::{error, warn};
|
||||
use libtbr::recipe_functions::common;
|
||||
use serde::Deserialize;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use crate::{gcm, reg};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
cached_country_names: Vec<&'static str>,
|
||||
configures: gcm::Configure
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn check_country_existed(&self, req: &str) -> bool {
|
||||
self.cached_country_names.contains(&req)
|
||||
}
|
||||
|
||||
pub fn get_config(&self, config_name: &str) -> Option<&String> {
|
||||
self.configures.get(config_name)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CheckoutParams {
|
||||
path: String
|
||||
}
|
||||
|
||||
async fn checkout_handler(State(state): State<AppState>, Query(param): Query<CheckoutParams>) -> impl IntoResponse {
|
||||
let mut response: HashMap<String, Value> = HashMap::new();
|
||||
|
||||
let repo_path = state.get_config("GIT_REPO_LOCAL_DEST");
|
||||
if repo_path.is_none() {
|
||||
response.insert("error".to_string(), json!("config repo dest not found".to_string()));
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!(response))
|
||||
);
|
||||
}
|
||||
match param.path.as_str() {
|
||||
legit_path if param.path.contains("/") || state.check_country_existed(param.path.as_str()) => {
|
||||
let rpath = repo_path.unwrap().clone();
|
||||
let repo = match Repository::open_bare(rpath) {
|
||||
Ok(repo) => repo,
|
||||
Err(_) => {
|
||||
let error_log = "unable to open repo as bare";
|
||||
error!("{error_log}");
|
||||
response.insert("error".to_string(), json!(error_log));
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!(response))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let fpath = format!("master:{}", legit_path);
|
||||
let obj = match repo.revparse_single(&fpath){
|
||||
Ok(obj) => obj,
|
||||
Err(e) => {
|
||||
let error_log = format!("unexpected revparse single: {err}", err = e.message());
|
||||
error!("{error_log}");
|
||||
response.insert("error".to_string(), json!(error_log.clone()));
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!(response))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(blob) = obj.as_blob() {
|
||||
let content = unsafe {
|
||||
str::from_utf8_unchecked(blob.content())
|
||||
};
|
||||
response.insert("result".to_string(), json!(content.to_string()));
|
||||
} else if let Some(tree) = obj.as_tree() {
|
||||
let dir_list = tree.iter().map(|x| x.name().unwrap_or("").to_string()).collect::<Vec<String>>();
|
||||
response.insert("result".to_string(), json!(dir_list));
|
||||
} else {
|
||||
let error_log = "not obj nor tree";
|
||||
error!("{error_log}");
|
||||
response.insert("error".to_string(), json!(error_log));
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let error_log = "requested path is unexpected";
|
||||
error!("{error_log}");
|
||||
response.insert("error".to_string(), json!(error_log));
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
axum::http::StatusCode::BAD_REQUEST,
|
||||
Json(json!(response))
|
||||
)
|
||||
}
|
||||
|
||||
async fn fetch_handler(State(state): State<AppState>) -> impl IntoResponse {
|
||||
let mut response: HashMap<String, Value> = HashMap::new();
|
||||
if let Some(repo_path) = state.get_config("GIT_REPO_LOCAL_DEST") {
|
||||
let rpath = repo_path.clone();
|
||||
|
||||
let repo = match Repository::open_bare(rpath) {
|
||||
Ok(repo) => repo,
|
||||
Err(_) => {
|
||||
let error_log = "unable to open bare repo";
|
||||
error!("{error_log}");
|
||||
response.insert("error".to_string(), json!(error_log));
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!(response))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let mut remote = match repo.find_remote("origin") {
|
||||
Ok(remote) => remote,
|
||||
Err(e) => {
|
||||
let error_log = format!("unable to find remote, {}", e.message());
|
||||
error!("{error_log}");
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!(response))
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
if state.get_config("GIT_REPO_USERNAME").is_none() || state.get_config("GIT_REPO_PASSWORD").is_none() {
|
||||
warn!("username or password not provided may cause fetching fail");
|
||||
}
|
||||
|
||||
let mut callbacks = RemoteCallbacks::new();
|
||||
callbacks.credentials(|_, _, _| {
|
||||
Cred::userpass_plaintext(
|
||||
state.get_config("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
|
||||
state.get_config("GIT_REPO_PASSWORD").unwrap_or(&"".to_string())
|
||||
)
|
||||
});
|
||||
|
||||
let mut fetch_options = FetchOptions::new();
|
||||
fetch_options.remote_callbacks(callbacks);
|
||||
|
||||
match remote.fetch(&["origin"], Some(&mut fetch_options), None) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("error while fetching {}", e.message());
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!({"error": format!("error while fetching {}", e.message())}))
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let error_log = "cannot find local repo";
|
||||
error!("{error_log}");
|
||||
response.insert("error".to_string(), json!(error_log));
|
||||
return (
|
||||
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!(response))
|
||||
);
|
||||
}
|
||||
|
||||
(
|
||||
axum::http::StatusCode::OK,
|
||||
Json(json!({"result": "fetch success"}))
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn run(config: gcm::Configure) -> gcm::StandardResult {
|
||||
let state = AppState {
|
||||
cached_country_names: common::valid_country_name(),
|
||||
configures: config.clone()
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
.route("/checkout", get(checkout_handler))
|
||||
.route("/fetch", get(fetch_handler))
|
||||
.route("/healthz", get(reg::health))
|
||||
.with_state(state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.clone().get("PUBLIC_PORT").unwrap_or(&"36583".to_string()))).await?;
|
||||
|
||||
axum::serve(listener, app).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
5
src/gcm.rs
Normal file
5
src/gcm.rs
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
// Setup readable types
|
||||
pub type Configure = HashMap<String, String>;
|
||||
pub type StandardResult = Result<(), Box<dyn std::error::Error>>;
|
||||
97
src/git.rs
Normal file
97
src/git.rs
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
use std::{cell::RefCell, collections::HashMap, io::{self, Write}, path::{Path, PathBuf}};
|
||||
|
||||
use git2::{build::RepoBuilder, Cred, FetchOptions, Progress, RemoteCallbacks};
|
||||
use log::{info};
|
||||
|
||||
use crate::gcm;
|
||||
|
||||
struct GitState {
|
||||
progress: Option<Progress<'static>>,
|
||||
total: usize,
|
||||
current: usize,
|
||||
path: Option<PathBuf>,
|
||||
newline: bool
|
||||
}
|
||||
|
||||
fn print(state: &mut GitState) {
|
||||
let stats = state.progress.as_ref().unwrap();
|
||||
let network_pct = (100 * stats.received_objects()) / stats.total_objects();
|
||||
let index_pct = (100 * stats.indexed_objects()) / stats.total_objects();
|
||||
let co_pct = if state.total > 0 {
|
||||
(100 * state.current) / state.total
|
||||
} else {
|
||||
0
|
||||
};
|
||||
let kbytes = stats.received_bytes() / 1024;
|
||||
if stats.received_objects() == stats.total_objects() {
|
||||
if !state.newline {
|
||||
println!();
|
||||
state.newline = true;
|
||||
}
|
||||
|
||||
info!("Resolving deltas {}/{}",
|
||||
stats.indexed_deltas(),
|
||||
stats.total_deltas());
|
||||
|
||||
} else {
|
||||
info!(
|
||||
"net {:3}% ({:4} kb, {:5}/{:5}) / idx {:3}% ({:5}/{:5}) \
|
||||
/ chk {:3}% ({:4}/{:4}) {}\r",
|
||||
network_pct,
|
||||
kbytes,
|
||||
stats.received_objects(),
|
||||
stats.total_objects(),
|
||||
index_pct,
|
||||
stats.indexed_objects(),
|
||||
stats.total_objects(),
|
||||
co_pct,
|
||||
state.current,
|
||||
state.total,
|
||||
state
|
||||
.path
|
||||
.as_ref()
|
||||
.map(|s| s.to_string_lossy().into_owned())
|
||||
.unwrap_or_default()
|
||||
)
|
||||
}
|
||||
io::stdout().flush().unwrap();
|
||||
}
|
||||
|
||||
pub fn setup_git_repo(config: HashMap<String, String>) -> gcm::StandardResult {
|
||||
let state = RefCell::new(GitState {
|
||||
progress: None,
|
||||
total: 0,
|
||||
current: 0,
|
||||
path: None,
|
||||
newline: true
|
||||
});
|
||||
|
||||
let mut cb = RemoteCallbacks::new();
|
||||
cb.transfer_progress(|stats| {
|
||||
let mut state = state.borrow_mut();
|
||||
state.progress = Some(stats.to_owned());
|
||||
print(&mut state);
|
||||
true
|
||||
});
|
||||
|
||||
cb.credentials(|_,_,_| {
|
||||
Cred::userpass_plaintext(
|
||||
config.get("GIT_REPO_USERNAME").unwrap_or(&"".to_string()),
|
||||
config.get("GIT_REPO_PASSWORD").unwrap_or(&"".to_string())
|
||||
)
|
||||
});
|
||||
|
||||
let mut fo = FetchOptions::new();
|
||||
fo.remote_callbacks(cb);
|
||||
fo.depth(1);
|
||||
|
||||
RepoBuilder::new()
|
||||
.bare(true)
|
||||
.fetch_options(fo)
|
||||
.clone(
|
||||
config.get("GIT_REPO_REMOTE").unwrap_or(&"".to_string()),
|
||||
Path::new(config.get("GIT_REPO_LOCAL_DEST").unwrap()).into()
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
57
src/main.rs
Normal file
57
src/main.rs
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
use std::{fs::File};
|
||||
|
||||
use env_logger::Builder;
|
||||
use libtbr::recipe_functions::common;
|
||||
use log::info;
|
||||
|
||||
use crate::{git::setup_git_repo, reg::{heartbeat_loop, registry::registry_client::RegistryClient}};
|
||||
|
||||
mod app;
|
||||
mod gcm;
|
||||
mod git;
|
||||
mod reg;
|
||||
|
||||
fn setup_log(config: gcm::Configure) -> gcm::StandardResult {
|
||||
let logfile = File::create(config.get("LOG_NAME").unwrap_or(&"run.log".to_string()))?;
|
||||
|
||||
Builder::from_env(env_logger::Env::default().default_filter_or("debug"))
|
||||
.target(env_logger::Target::Pipe(Box::new(logfile))).init();
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn setup_registry(config: gcm::Configure) -> gcm::StandardResult {
|
||||
let cfg_clone = config.clone();
|
||||
let register_server_address = cfg_clone.get("REGISTRY_SERVER").expect("not found registry server");
|
||||
let name = cfg_clone.get("SERVICE_NAME").expect("missing service name");
|
||||
let url = cfg_clone.get("SERVICE_URL").expect("missing service url");
|
||||
|
||||
let mut client = RegistryClient::connect(register_server_address.clone()).await?;
|
||||
reg::register_service(&mut client, name, url).await;
|
||||
|
||||
tokio::spawn(heartbeat_loop(name.to_string(), url.to_string()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> gcm::StandardResult {
|
||||
let config: gcm::Configure = common::get_config();
|
||||
setup_log(config.clone())?;
|
||||
|
||||
match std::fs::read_dir(config.get("GIT_REPO_LOCAL_DEST").expect("not exist")) {
|
||||
Ok(_) => {
|
||||
info!("GIT_REPO `{dest}` already setup", dest = config.get("GIT_REPO_LOCAL_DEST").unwrap())
|
||||
},
|
||||
Err(_) => {
|
||||
setup_git_repo(config.clone())?;
|
||||
}
|
||||
}
|
||||
setup_registry(config.clone()).await?;
|
||||
app::run(config.clone()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
39
src/reg.rs
Normal file
39
src/reg.rs
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use tokio::time::sleep;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::reg::registry::{ServiceHeartbeat, ServiceInfo, registry_client::RegistryClient};
|
||||
|
||||
pub mod registry {
|
||||
tonic::include_proto!("registry");
|
||||
}
|
||||
|
||||
pub async fn register_service(client: &mut RegistryClient<Channel>, name: &str, url: &str) {
|
||||
let req = tonic::Request::new(ServiceInfo {
|
||||
name: name.to_string(),
|
||||
url: url.to_string(),
|
||||
healthz: "/healthz".to_string(),
|
||||
token: std::env::var("REGISTRY_TOKEN").unwrap_or_default(),
|
||||
});
|
||||
|
||||
let _ = client.register(req).await;
|
||||
}
|
||||
|
||||
pub async fn heartbeat_loop(name: String, url: String) {
|
||||
loop {
|
||||
// TODO: adjust connection path
|
||||
if let Ok(mut client) = RegistryClient::connect("http://localhost:50051").await {
|
||||
let req = tonic::Request::new(ServiceHeartbeat {
|
||||
name: name.clone(),
|
||||
url: url.clone(),
|
||||
});
|
||||
let _ = client.heartbeat(req).await;
|
||||
}
|
||||
sleep(Duration::from_secs(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn health() -> &'static str {
|
||||
"git-fetch-service.rs ok"
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue