feat: add plugin

- add plugin system that request may included to run before do actual request by type.

Signed-off-by: Pakin <pakin.t@forth.co.th>
This commit is contained in:
Pakin 2026-05-12 12:40:32 +07:00
parent d048dc2437
commit d7f5e12d51
9 changed files with 1492 additions and 15 deletions

4
.gitignore vendored
View file

@ -1,4 +1,6 @@
*.json *.json
.env .env
target target
*.txt *.txt
node_modules
*wasm

1263
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -25,3 +25,6 @@ tokio = { version = "1.49.0", features = ["full"] }
tokio-cron-scheduler = "0.15.1" tokio-cron-scheduler = "0.15.1"
tokio-stream = "0.1.18" tokio-stream = "0.1.18"
uuid = { version = "1.20.0", features = ["v4"] } uuid = { version = "1.20.0", features = ["v4"] }
wasmtime = { version = "44.0.1", features = ["async"] }
wasmtime-wasi = "44.0.1"
wasmtime-wasi-http = "44.0.1"

View file

@ -0,0 +1,5 @@
Generate WASM file
```
jco componentize -w ../plugin.wit -n plugin-world index.js -o ../example-js.wasm
```

View file

@ -0,0 +1,18 @@
export const handler = {
processMessage(input) {
// input should be { type_w: "...", "payload": { plugin: "", ... } }
let n_input = JSON.parse(input);
console.log(`processing: ${input}`);
try {
// delete n_input["payload"]["plugin"];
return JSON.stringify({
type_w: n_input.type,
payload: n_input.payload
});
} catch (e) {
return `JSERROR: ${e.message}\nStack: ${e.stack}`;
}
}
};

10
plugins/plugin.wit Normal file
View file

@ -0,0 +1,10 @@
package server-m2-dev:plugins;
interface handler {
// The plugin takes a message and returns a processed version
process-message: func(input: string) -> string;
}
world plugin-world {
export handler;
}

View file

@ -2,5 +2,6 @@ pub mod core;
pub mod handler; pub mod handler;
pub mod helper; pub mod helper;
pub mod model; pub mod model;
pub mod plugins;
mod rw; mod rw;
mod tasks; mod tasks;

185
src/websocket/plugins.rs Normal file
View file

@ -0,0 +1,185 @@
use std::collections::HashMap;
use log::{error, info};
use serde::{Deserialize, Serialize};
use tokio::{fs::read_dir, time::error};
use wasmtime::{
Engine, Store,
component::{Component, Linker, ResourceTable},
};
use wasmtime_wasi::{WasiCtx, WasiCtxBuilder, WasiCtxView, WasiView, p2::add_to_linker_async};
use wasmtime_wasi_http::{
WasiHttpCtx,
p2::{WasiHttpCtxView, WasiHttpView},
};
use crate::websocket::model::WebsocketMessageRequest;
wasmtime::component::bindgen!({
path: "plugins/plugin.wit",
world: "plugin-world",
require_store_data_send: true,
imports: { default: async | trappable },
exports: { default: async },
});
#[derive(Debug, Serialize, Deserialize, Clone)]
struct PluginMessageRequest {
#[serde(rename = "type")]
pub type_w: String,
pub payload: Option<PluginPayload>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct PluginPayload {
pub plugin: String,
#[serde(flatten)]
pub original_payload: HashMap<String, serde_json::Value>,
}
struct WState {
ctx: WasiCtx,
table: ResourceTable,
http_ctx: WasiHttpCtx,
}
impl WasiView for WState {
fn ctx(&mut self) -> WasiCtxView<'_> {
WasiCtxView {
ctx: &mut self.ctx,
table: &mut self.table,
}
}
}
impl WasiHttpView for WState {
fn http(&mut self) -> wasmtime_wasi_http::p2::WasiHttpCtxView<'_> {
WasiHttpCtxView {
ctx: &mut self.http_ctx,
table: &mut self.table,
hooks: Default::default(),
}
}
}
pub async fn read_plugins() -> HashMap<String, String> {
let mut result = HashMap::new();
// expect path
match read_dir("./plugins").await {
Ok(mut d) => {
while let Ok(Some(entry)) = d.next_entry().await {
let ent_path = entry.path();
if let Some(filename) = ent_path.file_name()
&& let Some(filename_str) = filename.to_str()
&& filename_str.ends_with(".wasm")
{
result.insert(
filename_str.replace(".wasm", ""),
ent_path.clone().to_str().unwrap().to_string(),
);
}
}
}
Err(_) => {}
}
result
}
async fn call_plugin_logic(engine: &Engine, component: &Component, input: String) -> String {
let table = ResourceTable::new();
let ctx = WasiCtxBuilder::new()
.inherit_stdout()
.inherit_stderr()
.build();
let http_ctx = WasiHttpCtx::new();
let mut store = Store::new(
engine,
WState {
ctx,
table,
http_ctx,
},
);
let mut linker = Linker::new(engine);
if let Err(e) = add_to_linker_async(&mut linker) {
error!("add linker fail: {e}");
return String::new();
}
if let Err(e) = wasmtime_wasi_http::p2::add_only_http_to_linker_async(&mut linker) {
error!("add http linker fail: {e}");
return String::new();
}
let instance_result = PluginWorld::instantiate_async(&mut store, component, &linker)
.await
.expect("Failed to instantiate plugin");
// 3. Call the exported function from the WIT 'handler' interface
match instance_result
.server_m2_dev_plugins_handler()
.call_process_message(&mut store, &input)
.await
{
Ok(s) => {
info!("plugin response: {s}");
s
}
Err(e) => {
error!("Error plugin: {e}");
String::new()
}
}
}
pub async fn call_plugin_if_existed(
req: WebsocketMessageRequest,
engine: Engine,
) -> WebsocketMessageRequest {
let mut plugin_request: PluginMessageRequest = PluginMessageRequest {
type_w: req.clone().type_w,
payload: None,
};
if req.payload.is_none() {
return req.clone();
}
let plugin_payload: PluginPayload = match serde_json::from_value(req.clone().payload.unwrap()) {
Ok(p) => p,
Err(_) => return req,
};
plugin_request.payload = Some(plugin_payload);
// do modify data from plugin
let all_plugins = read_plugins().await;
if let Some(pl) = plugin_request.clone().payload {
// seems valid
// && all_plugins.contains_key(&pl.plugin)
let apply_plugins: Vec<String> = pl.plugin.split(",").map(|x| x.to_string()).collect();
let mut res_str = serde_json::to_string(&plugin_request.clone()).unwrap_or("".to_string());
for ap in apply_plugins {
if all_plugins.contains_key(&ap) {
let component =
Component::from_file(&engine, all_plugins.get(&ap).unwrap()).unwrap();
res_str = call_plugin_logic(&engine, &component, res_str).await;
}
}
// reject by fail response
return match serde_json::from_str(&res_str) {
Ok(s) => s,
Err(_) => return req,
};
} else {
// immediately reject
return req;
}
}

View file

@ -1,5 +1,8 @@
use super::{core::*, helper::*, model::*}; use super::{core::*, helper::*, model::*};
use crate::{app::*, websocket::tasks}; use crate::{
app::*,
websocket::{plugins::call_plugin_if_existed, tasks},
};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use axum::extract::ws::{Message, WebSocket}; use axum::extract::ws::{Message, WebSocket};
@ -16,6 +19,7 @@ use tokio::{
}, },
time::Instant, time::Instant,
}; };
use wasmtime::{Config, Engine};
pub async fn read( pub async fn read(
// redis: redis::Client, // redis: redis::Client,
@ -40,16 +44,24 @@ pub async fn read(
.await .await
.err() .err()
{ {
println!("[SYS] failed to send back to client: {err}"); error!("[SYS] failed to send back to client: {err}");
} }
} }
}); });
let uid_clone = uid.clone(); let uid_clone = uid.clone();
// Plugins
//
let engine = Engine::new(Config::new().wasm_component_model(true)).unwrap();
while let Some(Ok(msg)) = receiver.next().await { while let Some(Ok(msg)) = receiver.next().await {
match msg { match msg {
Message::Text(t) => { Message::Text(t) => {
let req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?; let mut req: WebsocketMessageRequest = serde_json::from_str(t.as_str())?;
req = call_plugin_if_existed(req, engine.clone()).await;
// info!("get msg: {}", req.type_w); // info!("get msg: {}", req.type_w);
match req.type_w.as_str() { match req.type_w.as_str() {