diff --git a/app.js b/app.js index dd3d0f4..091b40b 100644 --- a/app.js +++ b/app.js @@ -15,12 +15,17 @@ const { getTestSpreadSheet, GoogleFunctions, PluginsManager, + test_drive } = require("./lib/common"); const { SyncText } = require("./lib/sync_text"); +const { startup } = require("./lib/zmq"); + const { CronJobs } = require("./cron-jobs"); const { google } = require("googleapis"); const { EventEmitter } = require("stream"); +const nproc = require('./lib/nproc'); + require("dotenv").config(); Log.debug(process.env.TEST_SHEET_ID); @@ -53,6 +58,7 @@ var cronTasks = const auth = GoogleFunctions.auth(); const sheet = GoogleFunctions.SpreadSheets(auth); +const drive = GoogleFunctions.Drive(auth); // let heartbeatTask = CronJobs.doEveryMinute(() => { // Log.debug(`All running => ${JSON.stringify(cronTasks)}`); @@ -90,6 +96,27 @@ const sheet = GoogleFunctions.SpreadSheets(auth); // Test Taobin SyncText // SyncText.run(sheet, "uae", false); +// Test drive +// test_drive(drive); + +// startup(); +// +// + +let client = new nproc.NprocClient("127.0.0.1:36540", () => { + client.subscribe("self"); + client.publish("self", { + msg: "test" + }); + + if(client.history.length > 1){ + Log.info("connected!"); + } +}, () => { + Log.debug("closed connection"); +}); + + const pm = new PluginsManager(cronTasks, CronJobs); pm.load(); diff --git a/lib/common.js b/lib/common.js index 3ba6705..54d1eb1 100644 --- a/lib/common.js +++ b/lib/common.js @@ -293,7 +293,8 @@ function saveJsonToFile(filename, content) { // GOOGLE // ====================================================================== -const { google, sheets_v4 } = require("googleapis"); +const { google, sheets_v4, drive_v3 } = require("googleapis"); +const { GoogleAuth } = require("google-auth-library"); function authorize() { const oauthClient = new google.auth.GoogleAuth({ @@ -301,7 +302,7 @@ function authorize() { client_email: process.env.GOOGLE_SERVICE_ACCOUNT_EMAIL, private_key: process.env.GOOGLE_PRIVATE_KEY.replace(/\\n/gm, "\n"), }, - scopes: ["https://www.googleapis.com/auth/spreadsheets"], + scopes: ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"], }); return oauthClient; } @@ -325,6 +326,7 @@ function getCountrySpreadSheetId(cnt) { const GoogleFunctions = { auth: authorize, SpreadSheets: (auth) => google.sheets({ version: "v4", auth }), + Drive: (auth) => google.drive({ version: 'v3', auth }), GetCountrySpreadSheet: getCountrySpreadSheetById, getAllSheetNamesByCountry: getAllSheetNamesByCountry, getCountrySheetByName: getCountrySheetByName, @@ -1282,6 +1284,40 @@ async function _finalizeSyncProfilePrice( } } +// ====================================================================== +// DRIVE +// ====================================================================== + +/** + * @param {drive_v3.Drive} drive instance + */ +async function test_drive(drive){ + + try { + await drive.files.create({ + requestBody: { + name: "test", + mimeType: "text/plain", + parents: [process.env.TAOBIN_ADMIN_SERVER_DRIVE_FOLDER] + }, + media: { + mimeType: "text/plain", + body: "Hello from js" + } + }).then((x) => Log.info(`Created file response: ${JSON.parse(x)}`)); + } catch(error){ + Log.err(`Error test drive: ${error}`); + return JSON.stringify({ + error: error + }); + } + return JSON.stringify({ + status: "success" + }); +} + + + // special // ====================================================================== @@ -1406,4 +1442,5 @@ module.exports = { getCountrySheetByName, diff2DArraysCustom, saveJsonToFile, + test_drive }; diff --git a/lib/nproc.js b/lib/nproc.js new file mode 100644 index 0000000..d589de5 --- /dev/null +++ b/lib/nproc.js @@ -0,0 +1,85 @@ +const net = require('net'); +// TODO: must change to read by env +const API_KEY = Buffer.from('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'); + +function frame(msgType, topicStr, bodyBuf) { + const topic = Buffer.from(topicStr); + const payloadLen = 1 + 2 + 4 + topic.length + bodyBuf.length; + const totalBytes = payloadLen + 32; + + const buf = Buffer.alloc(4 + payloadLen + 32); + buf.writeUInt32BE(totalBytes, 0); + buf.writeUInt8(msgType, 4); + buf.writeUInt16BE(topic.length, 5); + buf.writeUInt32BE(bodyBuf.length, 7); + topic.copy(buf, 11); + bodyBuf.copy(buf, 11 + topic.length); + API_KEY.copy(buf, 4 + payloadLen); + return buf; +} + +class NprocClient { + + MESSAGE_TYPE = { + SUB: 1, + UNSUB: 2, + PUB: 3, + PING: 4 + }; + + constructor( + addr, + onConnect, + onClose + ){ + let _addr = addr.toString().split(":"); + + let host = _addr[0]; + let port = parseInt(_addr[1]); + + this.client = net.createConnection({ + host: host, + port: port + }, () => onConnect()); + this.history = []; + this.acc = Buffer.alloc(0); + this.client.on('data', chunk => { + this.acc = Buffer.concat([this.acc, chunk]); + while(this.acc.length >= 4){ + const total = this.acc.readUInt32BE(0); + if (this.acc.length < 4 + total) break; + const frm = this.acc.subarray(0, 4 + total); + this.acc = this.acc.subarray(4 + total); + + const msgType = frm.readUInt8(4); + const topicLen = frm.readUInt16BE(5); + const bodyLen = frm.readUInt32BE(7); + const topic = frm.subarray(11, 11 + topicLen).toString(); + const body = frm.subarray(11 + topicLen, 11 + topicLen + bodyLen); + + let res = { + msgType, topic, body: body.toString() + }; + console.log(`get msg! ${JSON.stringify(res)}`); + + this.history.push(res); + } + }); + this.client.on('close', () => onClose()); + } + + subscribe(topic){ + this.client.write(frame(this.MESSAGE_TYPE.SUB, topic, Buffer.alloc(0))); + } + + publish(topic, payload){ + this.client.write(frame(this.MESSAGE_TYPE.PUB, topic, Buffer.from(JSON.stringify(payload)))); + } + + // TODO: unsub + // TODO: ping +} + +module.exports = { + NprocClient +}; \ No newline at end of file diff --git a/lib/package_manager.js b/lib/package_manager.js new file mode 100644 index 0000000..e69de29 diff --git a/lib/zmq.js b/lib/zmq.js new file mode 100644 index 0000000..f7b1249 --- /dev/null +++ b/lib/zmq.js @@ -0,0 +1,28 @@ +const zmq = require('zeromq'); +const uuid = require('uuid'); + +/** + * Publish simple message to server to notify service is ready. + */ +async function startup(){ + const pub = new zmq.Publisher(); + await pub.connect("tcp://127.0.0.1:36541"); + + const msg = { + id: uuid.v4(), + topic: "news", + message_type: "Data", + data: { content: "GGS Ready!" }, + timestamp: Math.floor(Date.now() / 1000) + }; + + await pub.send(["news", JSON.stringify(msg)]); + + console.log("startup send!") + pub.close(); +} + + +module.exports = { + startup +}; \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index dc9af4f..b1b0d34 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,11 @@ "morgan": "~1.9.1", "node-cron": "^3.0.3", "winston": "^3.17.0", - "winston-daily-rotate-file": "^5.0.0" + "winston-daily-rotate-file": "^5.0.0", + "zeromq": "^6.5.0" + }, + "devDependencies": { + "esbuild": "^0.25.8" } }, "node_modules/@colors/colors": { @@ -720,6 +724,14 @@ "wordwrap": "0.0.2" } }, + "node_modules/cmake-ts": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/cmake-ts/-/cmake-ts-1.0.2.tgz", + "integrity": "sha512-5l++JHE7MxFuyV/OwJf3ek7ZZN1aGPFPM5oUz6AnK5inQAPe4TFXRMz5sA2qg2FRgByPWdqO+gSfIPo8GzoKNQ==", + "bin": { + "cmake-ts": "build/main.js" + } + }, "node_modules/color": { "version": "3.2.1", "resolved": "https://registry.npmjs.org/color/-/color-3.2.1.tgz", @@ -1631,6 +1643,14 @@ "node": ">= 0.6" } }, + "node_modules/node-addon-api": { + "version": "8.5.0", + "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-8.5.0.tgz", + "integrity": "sha512-/bRZty2mXUIFY/xU5HLvveNHlswNJej+RnxBjOMkidWfwZzgTbPG1E3K5TOxRLOR+5hX7bSofy8yf1hZevMS8A==", + "engines": { + "node": "^18 || ^20 || >= 21" + } + }, "node_modules/node-cron": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/node-cron/-/node-cron-3.0.3.tgz", @@ -2273,6 +2293,19 @@ "decamelize": "^1.0.0", "window-size": "0.1.0" } + }, + "node_modules/zeromq": { + "version": "6.5.0", + "resolved": "https://registry.npmjs.org/zeromq/-/zeromq-6.5.0.tgz", + "integrity": "sha512-vWOrt19lvcXTxu5tiHXfEGQuldSlU+qZn2TT+4EbRQzaciWGwNZ99QQTolQOmcwVgZLodv+1QfC6UZs2PX/6pQ==", + "hasInstallScript": true, + "dependencies": { + "cmake-ts": "1.0.2", + "node-addon-api": "^8.3.1" + }, + "engines": { + "node": ">= 12" + } } } } diff --git a/package.json b/package.json index 97cb220..359466d 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,8 @@ "morgan": "~1.9.1", "node-cron": "^3.0.3", "winston": "^3.17.0", - "winston-daily-rotate-file": "^5.0.0" + "winston-daily-rotate-file": "^5.0.0", + "zeromq": "^6.5.0" }, "devDependencies": { "esbuild": "^0.25.8" diff --git a/plugins/heartbeat.js b/plugins/heartbeat.js index f9e4c65..88dcfe9 100644 --- a/plugins/heartbeat.js +++ b/plugins/heartbeat.js @@ -12,7 +12,10 @@ const heartbeatApiInfo = { let heartbeatTask = CronJobs.doEveryMinute(() => { Log.debug("[hb] test heartbeat"); Log.debug(`[hb] current running tasks: ${JSON.stringify(CronJobs.getAllRunning.size)}`); - heartbeatTask.stop(); + client.publish("log", { + msg: "heartbeat", + status: 200 + }); }, 'heartbeat'); heartbeatTask.on('stop-heartbeat', () => heartbeatTask.stop()); diff --git a/run.test.sh b/run.test.sh index d24c3e8..58dbcf5 100755 --- a/run.test.sh +++ b/run.test.sh @@ -1,2 +1,2 @@ #!/bin/bash -DEBUG=server:* PORT=36530 npm start \ No newline at end of file +DEBUG=server:* PORT=36531 npm start \ No newline at end of file