diff --git a/src/lifx/client.js b/src/lifx/client.js index 759446d..4b09291 100644 --- a/src/lifx/client.js +++ b/src/lifx/client.js @@ -3,7 +3,7 @@ const util = require('util'); const dgram = require('dgram'); const EventEmitter = require('eventemitter3'); -const {defaults, isArray, result, find, bind, forEach} = require('lodash'); +const {defaults, isArray, result, find, bind, forEach, keys, isNil} = require('lodash'); const Packet = require('../lifx').packet; const {Light, constants, utils} = require('../lifx'); @@ -19,8 +19,8 @@ function Client() { this.isSocketBound = false; this.devices = {}; this.port = null; - this.messagesQueue = []; - this.sendTimer = null; + this.messageQueues = {}; + this.sendTimers = {}; this.discoveryTimer = null; this.discoveryPacketSequence = 0; this.messageHandlers = [{ @@ -209,81 +209,104 @@ Client.prototype.init = function(options, callback) { */ Client.prototype.destroy = function() { this.stopDiscovery(); - this.stopSendingProcess(); + forEach(keys(this.messageQueues), (queueAddress) => { + this.stopSendingProcess(queueAddress); + }); if (this.isSocketBound) { this.socket.close(); } }; +/** + * Gets the message queue for the given address. If no address is defined, + * defaults to broadcast address. + * @param {String} queueAddress Message queue address + * @return {Array} Message queue for the address + */ +Client.prototype.getMessageQueue = function(queueAddress = this.broadcastAddress) { + if (isNil(this.messageQueues[queueAddress])) { + this.messageQueues[queueAddress] = []; + } + return this.messageQueues[queueAddress]; +}; + /** * Sends a packet from the messages queue or stops the sending process * if queue is empty + * @param {String} queueAddress Message queue id + * @return {Function} Sending process for the message queue **/ -Client.prototype.sendingProcess = function() { - if (!this.isSocketBound) { - this.stopSendingProcess(); - console.log('LIFX Client stopped sending due to unbound socket'); - return; - } - - if (this.messagesQueue.length > 0) { - const msg = this.messagesQueue.pop(); - if (msg.address === undefined) { - msg.address = this.broadcastAddress; +Client.prototype.sendingProcess = function(queueAddress) { + const messageQueue = this.getMessageQueue(queueAddress); + return () => { + if (!this.isSocketBound) { + this.stopSendingProcess(queueAddress); + console.log('LIFX Client stopped sending due to unbound socket'); + return; } - if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.ONE_WAY) { - this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address); - /* istanbul ignore if */ - if (this.debug) { - console.log('DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address); + + if (messageQueue.length > 0) { + const msg = messageQueue.pop(); + if (msg.address === undefined) { + msg.address = this.broadcastAddress; } - } else if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE) { - if (msg.timesSent < this.resendMaxTimes) { - if (Date.now() > (msg.timeLastSent + this.resendPacketDelay)) { - this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address); - msg.timesSent += 1; - msg.timeLastSent = Date.now(); - /* istanbul ignore if */ - if (this.debug) { - console.log( - 'DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address + - ', send ' + msg.timesSent + ' time(s)' - ); - } + if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.ONE_WAY) { + this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address); + /* istanbul ignore if */ + if (this.debug) { + console.log('DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address); } - // Add to the end of the queue again - this.messagesQueue.unshift(msg); - } else { - this.messageHandlers.forEach(function(handler, hdlrIndex) { - if (handler.type === 'acknowledgement' && handler.sequenceNumber === msg.sequence) { - this.messageHandlers.splice(hdlrIndex, 1); - const err = new Error('No LIFX response after max resend limit of ' + this.resendMaxTimes); - return handler.callback(err, null, null); + } else if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE) { + if (msg.timesSent < this.resendMaxTimes) { + if (Date.now() > (msg.timeLastSent + this.resendPacketDelay)) { + this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address); + msg.timesSent += 1; + msg.timeLastSent = Date.now(); + /* istanbul ignore if */ + if (this.debug) { + console.log( + 'DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address + + ', send ' + msg.timesSent + ' time(s)' + ); + } } - }.bind(this)); + // Add to the end of the queue again + messageQueue.unshift(msg); + } else { + this.messageHandlers.forEach(function(handler, hdlrIndex) { + if (handler.type === 'acknowledgement' && handler.sequenceNumber === msg.sequence) { + this.messageHandlers.splice(hdlrIndex, 1); + const err = new Error('No LIFX response after max resend limit of ' + this.resendMaxTimes); + return handler.callback(err, null, null); + } + }.bind(this)); + } } + } else { + this.stopSendingProcess(queueAddress); } - } else { - this.stopSendingProcess(); - } + }; }; /** * Starts the sending of all packages in the queue + * @param {String} queueAddress Message queue id */ -Client.prototype.startSendingProcess = function() { - if (this.sendTimer === null) { // Already running? - this.sendTimer = setInterval(this.sendingProcess.bind(this), constants.MESSAGE_RATE_LIMIT); +Client.prototype.startSendingProcess = function(queueAddress = this.broadcastAddress) { + if (isNil(this.sendTimers[queueAddress])) { // Already running? + const sendingProcess = this.sendingProcess(queueAddress); + this.sendTimers[queueAddress] = setInterval(sendingProcess, constants.MESSAGE_RATE_LIMIT); } }; /** * Stops sending of all packages in the queue + * @param {String} queueAddress Message queue id */ -Client.prototype.stopSendingProcess = function() { - if (this.sendTimer !== null) { - clearInterval(this.sendTimer); - this.sendTimer = null; +Client.prototype.stopSendingProcess = function(queueAddress = this.broadcastAddress) { + if (!isNil(this.sendTimers[queueAddress])) { + clearInterval(this.sendTimers[queueAddress]); + delete this.sendTimers[queueAddress]; } }; @@ -340,6 +363,7 @@ Client.prototype.startDiscovery = function(lights) { * @param {Object} rinfo rinfo address info to check handler for */ Client.prototype.processMessageHandlers = function(msg, rinfo) { + const messageQueue = this.getMessageQueue(rinfo.address); // Process only packages for us if (msg.source.toLowerCase() !== this.source.toLowerCase()) { return; @@ -351,12 +375,12 @@ Client.prototype.processMessageHandlers = function(msg, rinfo) { if (handler.sequenceNumber === msg.sequence) { // Remove if specific packet was request, since it should only be called once this.messageHandlers.splice(hdlrIndex, 1); - this.messagesQueue.forEach(function(packet, packetIndex) { + messageQueue.forEach(function(packet, packetIndex) { if (packet.transactionType === constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE && packet.sequence === msg.sequence) { - this.messagesQueue.splice(packetIndex, 1); + messageQueue.splice(packetIndex, 1); } - }.bind(this)); + }); // Call the function requesting the packet return handler.callback(null, msg, rinfo); @@ -483,8 +507,12 @@ Client.prototype.send = function(msg, callback) { packet.transactionType = constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE; } packet.data = Packet.toBuffer(msg); - this.messagesQueue.unshift(packet); - this.startSendingProcess(); + + const queueAddress = packet.address; + const messageQueue = this.getMessageQueue(packet.address); + messageQueue.unshift(packet); + + this.startSendingProcess(queueAddress); return this.sequenceNumber; }; diff --git a/test/unit/client-test.js b/test/unit/client-test.js index 33a352f..d1562de 100644 --- a/test/unit/client-test.js +++ b/test/unit/client-test.js @@ -10,8 +10,8 @@ const lolex = require('lolex'); suite('Client', () => { let client; let clock; - const getMsgQueueLength = () => { - return client.messagesQueue.length; + const getMsgQueueLength = (queueAddress) => { + return client.getMessageQueue(queueAddress).length; }; const getDeviceCount = () => { return Object.keys(client.devices).length; @@ -205,20 +205,22 @@ suite('Client', () => { port: 56700, size: 41 }; + const queueAddress = discoveryInfo.address; + let currDeviceCount = getDeviceCount(); - let currMsgQueCnt = getMsgQueueLength(); + let currMsgQueCnt = getMsgQueueLength(queueAddress); client.processDiscoveryPacket(new Error(), null, null); client.processDiscoveryPacket(null, { service: 'udp', port: 8080 }, null); assert.equal(currDeviceCount, getDeviceCount(), 'malformed packages ignored'); - assert.equal(currMsgQueCnt, getMsgQueueLength(), 'malformed packages ignored'); + assert.equal(currMsgQueCnt, getMsgQueueLength(queueAddress), 'malformed packages ignored'); client.processDiscoveryPacket(null, discoveryMessage, discoveryInfo); assert.equal(getDeviceCount(), currDeviceCount + 1, 'device added'); currDeviceCount += 1; - assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'label request done'); + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'label request done'); currMsgQueCnt += 1; // Set to offline for recovery check @@ -226,7 +228,7 @@ suite('Client', () => { client.processDiscoveryPacket(null, discoveryMessage, discoveryInfo); assert.equal(client.devices[discoveryMessage.target].status, 'on'); assert.equal(currDeviceCount, getDeviceCount(), 'no new devices but known updated'); - assert.equal(currMsgQueCnt, getMsgQueueLength(), 'no new messages'); + assert.equal(currMsgQueCnt, getMsgQueueLength(queueAddress), 'no new messages'); }); test('finding bulbs by different parameters', () => { @@ -325,12 +327,12 @@ suite('Client', () => { startDiscovery: false }, () => { assert.equal(client.sequenceNumber, 0, 'starts sequence with 0'); - assert.lengthOf(client.messagesQueue, 0, 'is empty'); + assert.lengthOf(client.getMessageQueue(), 0, 'is empty'); client.send(packet.create('getService', {}, '12345678')); assert.equal(client.sequenceNumber, 0, 'sequence is the same after broadcast'); - assert.lengthOf(client.messagesQueue, 1, 'added to message queue'); - assert.property(client.messagesQueue[0], 'data', 'has data'); - assert.notProperty(client.messagesQueue[0], 'address', 'broadcast has no target address'); + assert.lengthOf(client.getMessageQueue(), 1, 'added to message queue'); + assert.property(client.getMessageQueue()[0], 'data', 'has data'); + assert.notProperty(client.getMessageQueue()[0], 'address', 'broadcast has no target address'); client.send(packet.create('setPower', {level: 65535, duration: 0, target: 'f37a4311b857'}, '12345678')); assert.equal(client.sequenceNumber, 1, 'sequence increased after specific targeting'); @@ -531,13 +533,14 @@ suite('Client', () => { const shouldNotBeCalled = () => { throw new Error(); }; + const queueAddress = client.broadcastAddress; client.init({ startDiscovery: false }, () => { client.socket.on('message', shouldNotBeCalled); - client.sendingProcess(); - assert.isNull(client.sendTimer); + client.sendingProcess(queueAddress); + assert.isUndefined(client.sendTimers[queueAddress]); client.socket.removeListener('message', shouldNotBeCalled); done(); }); @@ -551,22 +554,24 @@ suite('Client', () => { done(); }; const packetObj = packet.create('setPower', {level: 65535}, client.source); + const queueAddress = client.broadcastAddress; client.init({ port: constants.LIFX_DEFAULT_PORT, startDiscovery: false }, () => { client.socket.on('message', packetSendCallback); - - let currMsgQueCnt = getMsgQueueLength(); + let currMsgQueCnt = getMsgQueueLength(queueAddress); client.send(packetObj); - assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'sends a packet to the queue'); + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'sends a packet to the queue'); currMsgQueCnt += 1; - assert.isNotNull(client.sendTimer); + assert.isDefined(client.sendTimers[queueAddress]); client.stopSendingProcess(); // We don't want automatic calling of sending - client.sendingProcess(); // Call sending it manualy - assert.equal(getMsgQueueLength(), currMsgQueCnt - 1, 'removes the packet when send'); + const sendingProcess = client.sendingProcess(queueAddress); + sendingProcess(); // Call sending it manualy + + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt - 1, 'removes the packet when send'); currMsgQueCnt -= 1; }); }); @@ -579,6 +584,7 @@ suite('Client', () => { done(); }; const packetObj = packet.create('setPower', {level: 65535}, client.source); + const queueAddress = client.broadcastAddress; client.init({ port: constants.LIFX_DEFAULT_PORT, @@ -586,15 +592,17 @@ suite('Client', () => { }, () => { client.socket.on('message', packetSendCallback); - let currMsgQueCnt = getMsgQueueLength(); + let currMsgQueCnt = getMsgQueueLength(queueAddress); client.send(packetObj, () => {}); - assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'sends a packet to the queue'); + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'sends a packet to the queue'); currMsgQueCnt += 1; - assert.isNotNull(client.sendTimer); + assert.isDefined(client.sendTimers[queueAddress]); client.stopSendingProcess(); // We don't want automatic calling of sending - client.sendingProcess(); // Call sending it manualy - assert.equal(getMsgQueueLength(), currMsgQueCnt, 'keeps packet when send'); + const sendingProcess = client.sendingProcess(queueAddress); + sendingProcess(); // Call sending it manualy + + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt, 'keeps packet when send'); }); }); @@ -609,22 +617,25 @@ suite('Client', () => { done(); }; const packetObj = packet.create('setPower', {level: 65535}, client.source); + const queueAddress = client.broadcastAddress; client.init({ startDiscovery: false }, () => { client.socket.on('message', shouldNotBeSendCallback); - let currMsgQueCnt = getMsgQueueLength(); + let currMsgQueCnt = getMsgQueueLength(queueAddress); client.send(packetObj, handlerTimeoutCallback); - assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'sends a packet to the queue'); + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'sends a packet to the queue'); currMsgQueCnt += 1; - assert.isNotNull(client.sendTimer); + assert.isDefined(client.sendTimers[queueAddress]); client.stopSendingProcess(); // We don't want automatic calling of sending - client.messagesQueue[0].timesSent = client.resendMaxTimes; // This triggers error + client.getMessageQueue(queueAddress)[0].timesSent = client.resendMaxTimes; // This triggers error + + const sendingProcess = client.sendingProcess(queueAddress); + sendingProcess(); // Call sending it manualy - client.sendingProcess(); // Call sending it manualy - assert.equal(getMsgQueueLength(), currMsgQueCnt - 1, 'removes packet after max retries and callback'); + assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt - 1, 'removes packet after max retries and callback'); currMsgQueCnt -= 1; }); }); diff --git a/test/unit/light-test.js b/test/unit/light-test.js index ac92a0b..67bf299 100644 --- a/test/unit/light-test.js +++ b/test/unit/light-test.js @@ -9,7 +9,7 @@ suite('Light', () => { let client; let bulb; const getMsgQueueLength = () => { - return client.messagesQueue.length; + return client.getMessageQueue().length; }; const getMsgHandlerLength = () => { return client.messageHandlers.length;