const net = require('net'); // TODO: must change to read by env const API_KEY = Buffer.from('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'); function frame(msgType, topicStr, bodyBuf) { const topic = Buffer.from(topicStr); const payloadLen = 1 + 2 + 4 + topic.length + bodyBuf.length; const totalBytes = payloadLen + 32; const buf = Buffer.alloc(4 + payloadLen + 32); buf.writeUInt32BE(totalBytes, 0); buf.writeUInt8(msgType, 4); buf.writeUInt16BE(topic.length, 5); buf.writeUInt32BE(bodyBuf.length, 7); topic.copy(buf, 11); bodyBuf.copy(buf, 11 + topic.length); API_KEY.copy(buf, 4 + payloadLen); return buf; } class NprocClient { MESSAGE_TYPE = { SUB: 1, UNSUB: 2, PUB: 3, PING: 4 }; constructor( addr, onConnect, onClose ){ let _addr = addr.toString().split(":"); let host = _addr[0]; let port = parseInt(_addr[1]); this.client = net.createConnection({ host: host, port: port }, () => onConnect()); this.history = []; this.acc = Buffer.alloc(0); this.client.on('data', chunk => { this.acc = Buffer.concat([this.acc, chunk]); while(this.acc.length >= 4){ const total = this.acc.readUInt32BE(0); if (this.acc.length < 4 + total) break; const frm = this.acc.subarray(0, 4 + total); this.acc = this.acc.subarray(4 + total); const msgType = frm.readUInt8(4); const topicLen = frm.readUInt16BE(5); const bodyLen = frm.readUInt32BE(7); const topic = frm.subarray(11, 11 + topicLen).toString(); const body = frm.subarray(11 + topicLen, 11 + topicLen + bodyLen); let res = { msgType, topic, body: body.toString() }; console.log(`get msg! ${JSON.stringify(res)}`); this.history.push(res); } }); this.client.on('close', () => onClose()); } subscribe(topic){ this.client.write(frame(this.MESSAGE_TYPE.SUB, topic, Buffer.alloc(0))); } publish(topic, payload){ this.client.write(frame(this.MESSAGE_TYPE.PUB, topic, Buffer.from(JSON.stringify(payload)))); } // TODO: unsub // TODO: ping } module.exports = { NprocClient };