Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 85 additions & 57 deletions src/lifx/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const util = require('util');
const dgram = require('dgram');
const EventEmitter = require('eventemitter3');
const {defaults, isArray, result, find, bind, forEach} = require('lodash');
const {defaults, isArray, result, find, bind, forEach, keys, isNil} = require('lodash');
const Packet = require('../lifx').packet;
const {Light, constants, utils} = require('../lifx');

Expand All @@ -19,8 +19,8 @@ function Client() {
this.isSocketBound = false;
this.devices = {};
this.port = null;
this.messagesQueue = [];
this.sendTimer = null;
this.messageQueues = {};
this.sendTimers = {};
this.discoveryTimer = null;
this.discoveryPacketSequence = 0;
this.messageHandlers = [{
Expand Down Expand Up @@ -209,81 +209,104 @@ Client.prototype.init = function(options, callback) {
*/
Client.prototype.destroy = function() {
this.stopDiscovery();
this.stopSendingProcess();
forEach(keys(this.messageQueues), (queueAddress) => {
this.stopSendingProcess(queueAddress);
});
if (this.isSocketBound) {
this.socket.close();
}
};

/**
* Gets the message queue for the given address. If no address is defined,
* defaults to broadcast address.
* @param {String} queueAddress Message queue address
* @return {Array} Message queue for the address
*/
Client.prototype.getMessageQueue = function(queueAddress = this.broadcastAddress) {
if (isNil(this.messageQueues[queueAddress])) {
this.messageQueues[queueAddress] = [];
}
return this.messageQueues[queueAddress];
};

/**
* Sends a packet from the messages queue or stops the sending process
* if queue is empty
* @param {String} queueAddress Message queue id
* @return {Function} Sending process for the message queue
**/
Client.prototype.sendingProcess = function() {
if (!this.isSocketBound) {
this.stopSendingProcess();
console.log('LIFX Client stopped sending due to unbound socket');
return;
}

if (this.messagesQueue.length > 0) {
const msg = this.messagesQueue.pop();
if (msg.address === undefined) {
msg.address = this.broadcastAddress;
Client.prototype.sendingProcess = function(queueAddress) {
const messageQueue = this.getMessageQueue(queueAddress);
return () => {
if (!this.isSocketBound) {
this.stopSendingProcess(queueAddress);
console.log('LIFX Client stopped sending due to unbound socket');
return;
}
if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.ONE_WAY) {
this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address);
/* istanbul ignore if */
if (this.debug) {
console.log('DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address);

if (messageQueue.length > 0) {
const msg = messageQueue.pop();
if (msg.address === undefined) {
msg.address = this.broadcastAddress;
}
} else if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE) {
if (msg.timesSent < this.resendMaxTimes) {
if (Date.now() > (msg.timeLastSent + this.resendPacketDelay)) {
this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address);
msg.timesSent += 1;
msg.timeLastSent = Date.now();
/* istanbul ignore if */
if (this.debug) {
console.log(
'DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address +
', send ' + msg.timesSent + ' time(s)'
);
}
if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.ONE_WAY) {
this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address);
/* istanbul ignore if */
if (this.debug) {
console.log('DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address);
}
// Add to the end of the queue again
this.messagesQueue.unshift(msg);
} else {
this.messageHandlers.forEach(function(handler, hdlrIndex) {
if (handler.type === 'acknowledgement' && handler.sequenceNumber === msg.sequence) {
this.messageHandlers.splice(hdlrIndex, 1);
const err = new Error('No LIFX response after max resend limit of ' + this.resendMaxTimes);
return handler.callback(err, null, null);
} else if (msg.transactionType === constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE) {
if (msg.timesSent < this.resendMaxTimes) {
if (Date.now() > (msg.timeLastSent + this.resendPacketDelay)) {
this.socket.send(msg.data, 0, msg.data.length, this.sendPort, msg.address);
msg.timesSent += 1;
msg.timeLastSent = Date.now();
/* istanbul ignore if */
if (this.debug) {
console.log(
'DEBUG - ' + msg.data.toString('hex') + ' to ' + msg.address +
', send ' + msg.timesSent + ' time(s)'
);
}
}
}.bind(this));
// Add to the end of the queue again
messageQueue.unshift(msg);
} else {
this.messageHandlers.forEach(function(handler, hdlrIndex) {
if (handler.type === 'acknowledgement' && handler.sequenceNumber === msg.sequence) {
this.messageHandlers.splice(hdlrIndex, 1);
const err = new Error('No LIFX response after max resend limit of ' + this.resendMaxTimes);
return handler.callback(err, null, null);
}
}.bind(this));
}
}
} else {
this.stopSendingProcess(queueAddress);
}
} else {
this.stopSendingProcess();
}
};
};

/**
* Starts the sending of all packages in the queue
* @param {String} queueAddress Message queue id
*/
Client.prototype.startSendingProcess = function() {
if (this.sendTimer === null) { // Already running?
this.sendTimer = setInterval(this.sendingProcess.bind(this), constants.MESSAGE_RATE_LIMIT);
Client.prototype.startSendingProcess = function(queueAddress = this.broadcastAddress) {
if (isNil(this.sendTimers[queueAddress])) { // Already running?
const sendingProcess = this.sendingProcess(queueAddress);
this.sendTimers[queueAddress] = setInterval(sendingProcess, constants.MESSAGE_RATE_LIMIT);
}
};

/**
* Stops sending of all packages in the queue
* @param {String} queueAddress Message queue id
*/
Client.prototype.stopSendingProcess = function() {
if (this.sendTimer !== null) {
clearInterval(this.sendTimer);
this.sendTimer = null;
Client.prototype.stopSendingProcess = function(queueAddress = this.broadcastAddress) {
if (!isNil(this.sendTimers[queueAddress])) {
clearInterval(this.sendTimers[queueAddress]);
delete this.sendTimers[queueAddress];
}
};

Expand Down Expand Up @@ -340,6 +363,7 @@ Client.prototype.startDiscovery = function(lights) {
* @param {Object} rinfo rinfo address info to check handler for
*/
Client.prototype.processMessageHandlers = function(msg, rinfo) {
const messageQueue = this.getMessageQueue(rinfo.address);
// Process only packages for us
if (msg.source.toLowerCase() !== this.source.toLowerCase()) {
return;
Expand All @@ -351,12 +375,12 @@ Client.prototype.processMessageHandlers = function(msg, rinfo) {
if (handler.sequenceNumber === msg.sequence) {
// Remove if specific packet was request, since it should only be called once
this.messageHandlers.splice(hdlrIndex, 1);
this.messagesQueue.forEach(function(packet, packetIndex) {
messageQueue.forEach(function(packet, packetIndex) {
if (packet.transactionType === constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE &&
packet.sequence === msg.sequence) {
this.messagesQueue.splice(packetIndex, 1);
messageQueue.splice(packetIndex, 1);
}
}.bind(this));
});

// Call the function requesting the packet
return handler.callback(null, msg, rinfo);
Expand Down Expand Up @@ -483,8 +507,12 @@ Client.prototype.send = function(msg, callback) {
packet.transactionType = constants.PACKET_TRANSACTION_TYPES.REQUEST_RESPONSE;
}
packet.data = Packet.toBuffer(msg);
this.messagesQueue.unshift(packet);
this.startSendingProcess();

const queueAddress = packet.address;
const messageQueue = this.getMessageQueue(packet.address);
messageQueue.unshift(packet);

this.startSendingProcess(queueAddress);

return this.sequenceNumber;
};
Expand Down
69 changes: 40 additions & 29 deletions test/unit/client-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const lolex = require('lolex');
suite('Client', () => {
let client;
let clock;
const getMsgQueueLength = () => {
return client.messagesQueue.length;
const getMsgQueueLength = (queueAddress) => {
return client.getMessageQueue(queueAddress).length;
};
const getDeviceCount = () => {
return Object.keys(client.devices).length;
Expand Down Expand Up @@ -205,28 +205,30 @@ suite('Client', () => {
port: 56700,
size: 41
};
const queueAddress = discoveryInfo.address;

let currDeviceCount = getDeviceCount();
let currMsgQueCnt = getMsgQueueLength();
let currMsgQueCnt = getMsgQueueLength(queueAddress);
client.processDiscoveryPacket(new Error(), null, null);
client.processDiscoveryPacket(null, {
service: 'udp',
port: 8080
}, null);
assert.equal(currDeviceCount, getDeviceCount(), 'malformed packages ignored');
assert.equal(currMsgQueCnt, getMsgQueueLength(), 'malformed packages ignored');
assert.equal(currMsgQueCnt, getMsgQueueLength(queueAddress), 'malformed packages ignored');

client.processDiscoveryPacket(null, discoveryMessage, discoveryInfo);
assert.equal(getDeviceCount(), currDeviceCount + 1, 'device added');
currDeviceCount += 1;
assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'label request done');
assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'label request done');
currMsgQueCnt += 1;

// Set to offline for recovery check
client.devices[discoveryMessage.target].status = 'off';
client.processDiscoveryPacket(null, discoveryMessage, discoveryInfo);
assert.equal(client.devices[discoveryMessage.target].status, 'on');
assert.equal(currDeviceCount, getDeviceCount(), 'no new devices but known updated');
assert.equal(currMsgQueCnt, getMsgQueueLength(), 'no new messages');
assert.equal(currMsgQueCnt, getMsgQueueLength(queueAddress), 'no new messages');
});

test('finding bulbs by different parameters', () => {
Expand Down Expand Up @@ -325,12 +327,12 @@ suite('Client', () => {
startDiscovery: false
}, () => {
assert.equal(client.sequenceNumber, 0, 'starts sequence with 0');
assert.lengthOf(client.messagesQueue, 0, 'is empty');
assert.lengthOf(client.getMessageQueue(), 0, 'is empty');
client.send(packet.create('getService', {}, '12345678'));
assert.equal(client.sequenceNumber, 0, 'sequence is the same after broadcast');
assert.lengthOf(client.messagesQueue, 1, 'added to message queue');
assert.property(client.messagesQueue[0], 'data', 'has data');
assert.notProperty(client.messagesQueue[0], 'address', 'broadcast has no target address');
assert.lengthOf(client.getMessageQueue(), 1, 'added to message queue');
assert.property(client.getMessageQueue()[0], 'data', 'has data');
assert.notProperty(client.getMessageQueue()[0], 'address', 'broadcast has no target address');

client.send(packet.create('setPower', {level: 65535, duration: 0, target: 'f37a4311b857'}, '12345678'));
assert.equal(client.sequenceNumber, 1, 'sequence increased after specific targeting');
Expand Down Expand Up @@ -531,13 +533,14 @@ suite('Client', () => {
const shouldNotBeCalled = () => {
throw new Error();
};
const queueAddress = client.broadcastAddress;

client.init({
startDiscovery: false
}, () => {
client.socket.on('message', shouldNotBeCalled);
client.sendingProcess();
assert.isNull(client.sendTimer);
client.sendingProcess(queueAddress);
assert.isUndefined(client.sendTimers[queueAddress]);
client.socket.removeListener('message', shouldNotBeCalled);
done();
});
Expand All @@ -551,22 +554,24 @@ suite('Client', () => {
done();
};
const packetObj = packet.create('setPower', {level: 65535}, client.source);
const queueAddress = client.broadcastAddress;

client.init({
port: constants.LIFX_DEFAULT_PORT,
startDiscovery: false
}, () => {
client.socket.on('message', packetSendCallback);

let currMsgQueCnt = getMsgQueueLength();
let currMsgQueCnt = getMsgQueueLength(queueAddress);
client.send(packetObj);
assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'sends a packet to the queue');
assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'sends a packet to the queue');
currMsgQueCnt += 1;
assert.isNotNull(client.sendTimer);
assert.isDefined(client.sendTimers[queueAddress]);
client.stopSendingProcess(); // We don't want automatic calling of sending

client.sendingProcess(); // Call sending it manualy
assert.equal(getMsgQueueLength(), currMsgQueCnt - 1, 'removes the packet when send');
const sendingProcess = client.sendingProcess(queueAddress);
sendingProcess(); // Call sending it manualy

assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt - 1, 'removes the packet when send');
currMsgQueCnt -= 1;
});
});
Expand All @@ -579,22 +584,25 @@ suite('Client', () => {
done();
};
const packetObj = packet.create('setPower', {level: 65535}, client.source);
const queueAddress = client.broadcastAddress;

client.init({
port: constants.LIFX_DEFAULT_PORT,
startDiscovery: false
}, () => {
client.socket.on('message', packetSendCallback);

let currMsgQueCnt = getMsgQueueLength();
let currMsgQueCnt = getMsgQueueLength(queueAddress);
client.send(packetObj, () => {});
assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'sends a packet to the queue');
assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'sends a packet to the queue');
currMsgQueCnt += 1;
assert.isNotNull(client.sendTimer);
assert.isDefined(client.sendTimers[queueAddress]);
client.stopSendingProcess(); // We don't want automatic calling of sending

client.sendingProcess(); // Call sending it manualy
assert.equal(getMsgQueueLength(), currMsgQueCnt, 'keeps packet when send');
const sendingProcess = client.sendingProcess(queueAddress);
sendingProcess(); // Call sending it manualy

assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt, 'keeps packet when send');
});
});

Expand All @@ -609,22 +617,25 @@ suite('Client', () => {
done();
};
const packetObj = packet.create('setPower', {level: 65535}, client.source);
const queueAddress = client.broadcastAddress;

client.init({
startDiscovery: false
}, () => {
client.socket.on('message', shouldNotBeSendCallback);

let currMsgQueCnt = getMsgQueueLength();
let currMsgQueCnt = getMsgQueueLength(queueAddress);
client.send(packetObj, handlerTimeoutCallback);
assert.equal(getMsgQueueLength(), currMsgQueCnt + 1, 'sends a packet to the queue');
assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt + 1, 'sends a packet to the queue');
currMsgQueCnt += 1;
assert.isNotNull(client.sendTimer);
assert.isDefined(client.sendTimers[queueAddress]);
client.stopSendingProcess(); // We don't want automatic calling of sending
client.messagesQueue[0].timesSent = client.resendMaxTimes; // This triggers error
client.getMessageQueue(queueAddress)[0].timesSent = client.resendMaxTimes; // This triggers error

const sendingProcess = client.sendingProcess(queueAddress);
sendingProcess(); // Call sending it manualy

client.sendingProcess(); // Call sending it manualy
assert.equal(getMsgQueueLength(), currMsgQueCnt - 1, 'removes packet after max retries and callback');
assert.equal(getMsgQueueLength(queueAddress), currMsgQueCnt - 1, 'removes packet after max retries and callback');
currMsgQueCnt -= 1;
});
});
Expand Down
2 changes: 1 addition & 1 deletion test/unit/light-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ suite('Light', () => {
let client;
let bulb;
const getMsgQueueLength = () => {
return client.messagesQueue.length;
return client.getMessageQueue().length;
};
const getMsgHandlerLength = () => {
return client.messageHandlers.length;
Expand Down