add NPROC for communication with updater

This commit is contained in:
Pakin 2025-09-02 17:47:30 +07:00
parent 6ccc186e97
commit 9df9d1b75f
9 changed files with 220 additions and 6 deletions

27
app.js
View file

@ -15,12 +15,17 @@ const {
getTestSpreadSheet,
GoogleFunctions,
PluginsManager,
test_drive
} = require("./lib/common");
const { SyncText } = require("./lib/sync_text");
const { startup } = require("./lib/zmq");
const { CronJobs } = require("./cron-jobs");
const { google } = require("googleapis");
const { EventEmitter } = require("stream");
const nproc = require('./lib/nproc');
require("dotenv").config();
Log.debug(process.env.TEST_SHEET_ID);
@ -53,6 +58,7 @@ var cronTasks =
const auth = GoogleFunctions.auth();
const sheet = GoogleFunctions.SpreadSheets(auth);
const drive = GoogleFunctions.Drive(auth);
// let heartbeatTask = CronJobs.doEveryMinute(() => {
// Log.debug(`All running => ${JSON.stringify(cronTasks)}`);
@ -90,6 +96,27 @@ const sheet = GoogleFunctions.SpreadSheets(auth);
// Test Taobin SyncText
// SyncText.run(sheet, "uae", false);
// Test drive
// test_drive(drive);
// startup();
//
//
let client = new nproc.NprocClient("127.0.0.1:36540", () => {
client.subscribe("self");
client.publish("self", {
msg: "test"
});
if(client.history.length > 1){
Log.info("connected!");
}
}, () => {
Log.debug("closed connection");
});
const pm = new PluginsManager(cronTasks, CronJobs);
pm.load();

View file

@ -293,7 +293,8 @@ function saveJsonToFile(filename, content) {
// GOOGLE
// ======================================================================
const { google, sheets_v4 } = require("googleapis");
const { google, sheets_v4, drive_v3 } = require("googleapis");
const { GoogleAuth } = require("google-auth-library");
function authorize() {
const oauthClient = new google.auth.GoogleAuth({
@ -301,7 +302,7 @@ function authorize() {
client_email: process.env.GOOGLE_SERVICE_ACCOUNT_EMAIL,
private_key: process.env.GOOGLE_PRIVATE_KEY.replace(/\\n/gm, "\n"),
},
scopes: ["https://www.googleapis.com/auth/spreadsheets"],
scopes: ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"],
});
return oauthClient;
}
@ -325,6 +326,7 @@ function getCountrySpreadSheetId(cnt) {
const GoogleFunctions = {
auth: authorize,
SpreadSheets: (auth) => google.sheets({ version: "v4", auth }),
Drive: (auth) => google.drive({ version: 'v3', auth }),
GetCountrySpreadSheet: getCountrySpreadSheetById,
getAllSheetNamesByCountry: getAllSheetNamesByCountry,
getCountrySheetByName: getCountrySheetByName,
@ -1282,6 +1284,40 @@ async function _finalizeSyncProfilePrice(
}
}
// ======================================================================
// DRIVE
// ======================================================================
/**
* @param {drive_v3.Drive} drive instance
*/
async function test_drive(drive){
try {
await drive.files.create({
requestBody: {
name: "test",
mimeType: "text/plain",
parents: [process.env.TAOBIN_ADMIN_SERVER_DRIVE_FOLDER]
},
media: {
mimeType: "text/plain",
body: "Hello from js"
}
}).then((x) => Log.info(`Created file response: ${JSON.parse(x)}`));
} catch(error){
Log.err(`Error test drive: ${error}`);
return JSON.stringify({
error: error
});
}
return JSON.stringify({
status: "success"
});
}
// special
// ======================================================================
@ -1406,4 +1442,5 @@ module.exports = {
getCountrySheetByName,
diff2DArraysCustom,
saveJsonToFile,
test_drive
};

85
lib/nproc.js Normal file
View file

@ -0,0 +1,85 @@
const net = require('net');
// TODO: must change to read by env
const API_KEY = Buffer.from('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA');
function frame(msgType, topicStr, bodyBuf) {
const topic = Buffer.from(topicStr);
const payloadLen = 1 + 2 + 4 + topic.length + bodyBuf.length;
const totalBytes = payloadLen + 32;
const buf = Buffer.alloc(4 + payloadLen + 32);
buf.writeUInt32BE(totalBytes, 0);
buf.writeUInt8(msgType, 4);
buf.writeUInt16BE(topic.length, 5);
buf.writeUInt32BE(bodyBuf.length, 7);
topic.copy(buf, 11);
bodyBuf.copy(buf, 11 + topic.length);
API_KEY.copy(buf, 4 + payloadLen);
return buf;
}
class NprocClient {
MESSAGE_TYPE = {
SUB: 1,
UNSUB: 2,
PUB: 3,
PING: 4
};
constructor(
addr,
onConnect,
onClose
){
let _addr = addr.toString().split(":");
let host = _addr[0];
let port = parseInt(_addr[1]);
this.client = net.createConnection({
host: host,
port: port
}, () => onConnect());
this.history = [];
this.acc = Buffer.alloc(0);
this.client.on('data', chunk => {
this.acc = Buffer.concat([this.acc, chunk]);
while(this.acc.length >= 4){
const total = this.acc.readUInt32BE(0);
if (this.acc.length < 4 + total) break;
const frm = this.acc.subarray(0, 4 + total);
this.acc = this.acc.subarray(4 + total);
const msgType = frm.readUInt8(4);
const topicLen = frm.readUInt16BE(5);
const bodyLen = frm.readUInt32BE(7);
const topic = frm.subarray(11, 11 + topicLen).toString();
const body = frm.subarray(11 + topicLen, 11 + topicLen + bodyLen);
let res = {
msgType, topic, body: body.toString()
};
console.log(`get msg! ${JSON.stringify(res)}`);
this.history.push(res);
}
});
this.client.on('close', () => onClose());
}
subscribe(topic){
this.client.write(frame(this.MESSAGE_TYPE.SUB, topic, Buffer.alloc(0)));
}
publish(topic, payload){
this.client.write(frame(this.MESSAGE_TYPE.PUB, topic, Buffer.from(JSON.stringify(payload))));
}
// TODO: unsub
// TODO: ping
}
module.exports = {
NprocClient
};

0
lib/package_manager.js Normal file
View file

28
lib/zmq.js Normal file
View file

@ -0,0 +1,28 @@
const zmq = require('zeromq');
const uuid = require('uuid');
/**
* Publish simple message to server to notify service is ready.
*/
async function startup(){
const pub = new zmq.Publisher();
await pub.connect("tcp://127.0.0.1:36541");
const msg = {
id: uuid.v4(),
topic: "news",
message_type: "Data",
data: { content: "GGS Ready!" },
timestamp: Math.floor(Date.now() / 1000)
};
await pub.send(["news", JSON.stringify(msg)]);
console.log("startup send!")
pub.close();
}
module.exports = {
startup
};

35
package-lock.json generated
View file

@ -18,7 +18,11 @@
"morgan": "~1.9.1",
"node-cron": "^3.0.3",
"winston": "^3.17.0",
"winston-daily-rotate-file": "^5.0.0"
"winston-daily-rotate-file": "^5.0.0",
"zeromq": "^6.5.0"
},
"devDependencies": {
"esbuild": "^0.25.8"
}
},
"node_modules/@colors/colors": {
@ -720,6 +724,14 @@
"wordwrap": "0.0.2"
}
},
"node_modules/cmake-ts": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/cmake-ts/-/cmake-ts-1.0.2.tgz",
"integrity": "sha512-5l++JHE7MxFuyV/OwJf3ek7ZZN1aGPFPM5oUz6AnK5inQAPe4TFXRMz5sA2qg2FRgByPWdqO+gSfIPo8GzoKNQ==",
"bin": {
"cmake-ts": "build/main.js"
}
},
"node_modules/color": {
"version": "3.2.1",
"resolved": "https://registry.npmjs.org/color/-/color-3.2.1.tgz",
@ -1631,6 +1643,14 @@
"node": ">= 0.6"
}
},
"node_modules/node-addon-api": {
"version": "8.5.0",
"resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-8.5.0.tgz",
"integrity": "sha512-/bRZty2mXUIFY/xU5HLvveNHlswNJej+RnxBjOMkidWfwZzgTbPG1E3K5TOxRLOR+5hX7bSofy8yf1hZevMS8A==",
"engines": {
"node": "^18 || ^20 || >= 21"
}
},
"node_modules/node-cron": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz",
@ -2273,6 +2293,19 @@
"decamelize": "^1.0.0",
"window-size": "0.1.0"
}
},
"node_modules/zeromq": {
"version": "6.5.0",
"resolved": "https://registry.npmjs.org/zeromq/-/zeromq-6.5.0.tgz",
"integrity": "sha512-vWOrt19lvcXTxu5tiHXfEGQuldSlU+qZn2TT+4EbRQzaciWGwNZ99QQTolQOmcwVgZLodv+1QfC6UZs2PX/6pQ==",
"hasInstallScript": true,
"dependencies": {
"cmake-ts": "1.0.2",
"node-addon-api": "^8.3.1"
},
"engines": {
"node": ">= 12"
}
}
}
}

View file

@ -16,7 +16,8 @@
"morgan": "~1.9.1",
"node-cron": "^3.0.3",
"winston": "^3.17.0",
"winston-daily-rotate-file": "^5.0.0"
"winston-daily-rotate-file": "^5.0.0",
"zeromq": "^6.5.0"
},
"devDependencies": {
"esbuild": "^0.25.8"

View file

@ -12,7 +12,10 @@ const heartbeatApiInfo = {
let heartbeatTask = CronJobs.doEveryMinute(() => {
Log.debug("[hb] test heartbeat");
Log.debug(`[hb] current running tasks: ${JSON.stringify(CronJobs.getAllRunning.size)}`);
heartbeatTask.stop();
client.publish("log", {
msg: "heartbeat",
status: 200
});
}, 'heartbeat');
heartbeatTask.on('stop-heartbeat', () => heartbeatTask.stop());

View file

@ -1,2 +1,2 @@
#!/bin/bash
DEBUG=server:* PORT=36530 npm start
DEBUG=server:* PORT=36531 npm start