-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclientServerBase.js
More file actions
105 lines (92 loc) · 2.88 KB
/
clientServerBase.js
File metadata and controls
105 lines (92 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
const dgram = require("dgram");
const { waitingAck } = require("./data");
const Events = require("events").EventEmitter;
const messageFragmentation = require("./messageFragmentation");
// ===========================
// Client server base class
/**
* dgram udp server used for server to server communication
*/
class ClientServerBase extends Events {
/**
* @param {Object} options - port(default: 41234), encryptionKeys)
*/
constructor({ connectionTimeout, retryTimeout }) {
super();
this.server = dgram.createSocket("udp4");
this.connectionTimeout = connectionTimeout || 1000; // 1 seconds
this.retryTimeout = retryTimeout || 500; // 500 ms
this.frag = new messageFragmentation(
this.server,
this.messageHandler.bind(this),
connectionTimeout
);
this.server.on("error", (err) => {
console.error(`Server error: ${err.stack}`);
this.server.close();
});
}
// =========
// Handlers
/**
* Handle incoming messages format: Handle incoming messages format: (see: messageStructure)
* message should be in JSON format
* @param {string} msg - message from socket
* @param {*} rinfo
*/
async messageHandler(msg, rinfo) {
let m = msg.toString();
try {
m = JSON.parse(msg);
} catch (e) {
console.error("Error parsing message: ", e);
return;
}
let { data } = m;
const { type, clientID, iv } = m;
if (clientID && iv) {
data = await this.decrypt(data, iv, clientID);
if (!data) return;
try {
data = JSON.parse(data);
} catch {}
}
// return ackID to client to guarantee delivery
const socket = this.getSocket(data.socketID);
data &&
type !== "ack" &&
data.ackID &&
socket &&
socket.emit("ack", null, {
type: "ack",
ackID: data.ackID,
});
this[type] && this[type]({ ...m, ...{ data }, ...{ rinfo } });
}
keepAlive({ data }) {
// update keepAliveTime
const socket = this.getSocket(data.socketID);
data &&
socket &&
socket.socketID == data.socketID &&
(socket.keepAliveTime = new Date());
}
data({ data }) {
// emit data event
const socket = this.getSocket(data.socketID);
data && socket && socket.emitLocal(data.topic, data.message);
}
ack({ data }) {
// remove message from waitingAck
data && delete waitingAck[data.ackID];
}
/**
* Emit an event with eventemitter
* @param {string} eventName
* @param {Any} data
*/
emitLocal(eventName, data) {
super.emit(eventName, data);
}
}
module.exports = ClientServerBase;