ggs-cron/lib/nproc.js

85 lines
2.1 KiB
JavaScript
Raw Normal View History

const net = require('net');
// TODO: must change to read by env
const API_KEY = Buffer.from('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA');
function frame(msgType, topicStr, bodyBuf) {
const topic = Buffer.from(topicStr);
const payloadLen = 1 + 2 + 4 + topic.length + bodyBuf.length;
const totalBytes = payloadLen + 32;
const buf = Buffer.alloc(4 + payloadLen + 32);
buf.writeUInt32BE(totalBytes, 0);
buf.writeUInt8(msgType, 4);
buf.writeUInt16BE(topic.length, 5);
buf.writeUInt32BE(bodyBuf.length, 7);
topic.copy(buf, 11);
bodyBuf.copy(buf, 11 + topic.length);
API_KEY.copy(buf, 4 + payloadLen);
return buf;
}
class NprocClient {
MESSAGE_TYPE = {
SUB: 1,
UNSUB: 2,
PUB: 3,
PING: 4
};
constructor(
addr,
onConnect,
onClose
){
let _addr = addr.toString().split(":");
let host = _addr[0];
let port = parseInt(_addr[1]);
this.client = net.createConnection({
host: host,
port: port
}, () => onConnect());
this.history = [];
this.acc = Buffer.alloc(0);
this.client.on('data', chunk => {
this.acc = Buffer.concat([this.acc, chunk]);
while(this.acc.length >= 4){
const total = this.acc.readUInt32BE(0);
if (this.acc.length < 4 + total) break;
const frm = this.acc.subarray(0, 4 + total);
this.acc = this.acc.subarray(4 + total);
const msgType = frm.readUInt8(4);
const topicLen = frm.readUInt16BE(5);
const bodyLen = frm.readUInt32BE(7);
const topic = frm.subarray(11, 11 + topicLen).toString();
const body = frm.subarray(11 + topicLen, 11 + topicLen + bodyLen);
let res = {
msgType, topic, body: body.toString()
};
console.log(`get msg! ${JSON.stringify(res)}`);
this.history.push(res);
}
});
this.client.on('close', () => onClose());
}
subscribe(topic){
this.client.write(frame(this.MESSAGE_TYPE.SUB, topic, Buffer.alloc(0)));
}
publish(topic, payload){
this.client.write(frame(this.MESSAGE_TYPE.PUB, topic, Buffer.from(JSON.stringify(payload))));
}
// TODO: unsub
// TODO: ping
}
module.exports = {
NprocClient
};