From c1e7899683476c4182e75bf19b559a322ce1f619 Mon Sep 17 00:00:00 2001 From: Dauden1184 <> Date: Fri, 27 Sep 2019 01:29:19 +0200 Subject: [PATCH] Ugly hack to make it works with Nuki v2 --- gatttool.py | 2 +- nuki.py | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/gatttool.py b/gatttool.py index 9f98d8c..b4d8958 100644 --- a/gatttool.py +++ b/gatttool.py @@ -389,7 +389,7 @@ def discover_characteristics(self): # Sleep one extra second in case we caught characteristic # in the middle - time.sleep(1) + time.sleep(5) if not self._characteristics: raise NotConnectedError("Characteristic discovery failed") diff --git a/nuki.py b/nuki.py index 042dc84..b8b2d1f 100644 --- a/nuki.py +++ b/nuki.py @@ -1,3 +1,4 @@ +import time import nacl.utils import pygatt.backends import array @@ -84,12 +85,13 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name): pairingHandle = self.device.get_handle('a92ee101-5501-11e4-916c-0800200c9a66') print "Nuki Pairing UUID handle created: %04x" % pairingHandle publicKeyReq = nuki_messages.Nuki_REQ('0003') - self.device.subscribe('a92ee101-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse) + self.device.subscribe('a92ee101-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)) publicKeyReqCommand = publicKeyReq.generate() self._charWriteResponse = "" print "Requesting Nuki Public Key using command: %s" % publicKeyReq.show() self.device.char_write_handle(pairingHandle,publicKeyReqCommand,True,2) print "Nuki Public key requested" + time.sleep(2) commandParsed = self.parser.parse(self._charWriteResponse) if self.parser.isNukiCommand(self._charWriteResponse) == False: sys.exit("Error while requesting public key: %s" % commandParsed) @@ -109,6 +111,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name): self._charWriteResponse = "" self.device.char_write_handle(pairingHandle,publicKeyPushCommand,True,5) print "Public key pushed" + time.sleep(2) commandParsed = self.parser.parse(self._charWriteResponse) if self.parser.isNukiCommand(self._charWriteResponse) == False: sys.exit("Error while pushing public key: %s" % commandParsed) @@ -122,6 +125,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name): self._charWriteResponse = "" self.device.char_write_handle(pairingHandle,authAuthenticatorCommand,True,5) print "Authorization Authenticator sent: %s" % authAuthenticator.show() + time.sleep(2) commandParsed = self.parser.parse(self._charWriteResponse) if self.parser.isNukiCommand(self._charWriteResponse) == False: sys.exit("Error while sending Authorization Authenticator: %s" % commandParsed) @@ -135,6 +139,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name): self._charWriteResponse = "" self.device.char_write_handle(pairingHandle,authDataCommand,True,7) print "Authorization Data sent: %s" % authData.show() + time.sleep(2) commandParsed = self.parser.parse(self._charWriteResponse) if self.parser.isNukiCommand(self._charWriteResponse) == False: sys.exit("Error while sending Authorization Data: %s" % commandParsed) @@ -151,6 +156,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name): self._charWriteResponse = "" self.device.char_write_handle(pairingHandle,authIDConfirmCommand,True,7) print "Authorization ID Confirmation sent: %s" % authIDConfirm.show() + time.sleep(2) commandParsed = self.parser.parse(self._charWriteResponse) if self.parser.isNukiCommand(self._charWriteResponse) == False: sys.exit("Error while sending Authorization ID Confirmation: %s" % commandParsed) @@ -165,13 +171,14 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name): def readLockState(self): self._makeBLEConnection() keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66") - self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse) + self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)) stateReq = nuki_messages.Nuki_REQ('000C') stateReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=stateReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex')) stateReqEncryptedCommand = stateReqEncrypted.generate() self._charWriteResponse = "" self.device.char_write_handle(keyturnerUSDIOHandle,stateReqEncryptedCommand,True,3) print "Nuki State Request sent: %s\nresponse received: %s" % (stateReq.show(),self._charWriteResponse) + time.sleep(2) commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:] if self.parser.isNukiCommand(commandParsed) == False: sys.exit("Error while requesting Nuki STATES: %s" % commandParsed) @@ -186,13 +193,14 @@ def readLockState(self): def lockAction(self,lockAction): self._makeBLEConnection() keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66") - self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse) + self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)) challengeReq = nuki_messages.Nuki_REQ('0004') challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex')) challengeReqEncryptedCommand = challengeReqEncrypted.generate() self._charWriteResponse = "" self.device.char_write_handle(keyturnerUSDIOHandle,challengeReqEncryptedCommand,True,4) print "Nuki CHALLENGE Request sent: %s" % challengeReq.show() + time.sleep(2) commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:] if self.parser.isNukiCommand(commandParsed) == False: sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed) @@ -207,6 +215,7 @@ def lockAction(self,lockAction): self._charWriteResponse = "" self.device.char_write_handle(keyturnerUSDIOHandle,lockActionReqEncryptedCommand,True,4) print "Nuki Lock Action Request sent: %s" % lockActionReq.show() + time.sleep(2) commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:] if self.parser.isNukiCommand(commandParsed) == False: sys.exit("Error while requesting Nuki Lock Action: %s" % commandParsed) @@ -220,7 +229,7 @@ def lockAction(self,lockAction): def getLogEntriesCount(self, pinHex): self._makeBLEConnection() keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66") - self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse) + self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)) challengeReq = nuki_messages.Nuki_REQ('0004') challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex')) challengeReqEncryptedCommand = challengeReqEncrypted.generate() @@ -228,6 +237,7 @@ def getLogEntriesCount(self, pinHex): print "Requesting CHALLENGE: %s" % challengeReqEncrypted.generate("HEX") self.device.char_write_handle(keyturnerUSDIOHandle,challengeReqEncryptedCommand,True,5) print "Nuki CHALLENGE Request sent: %s" % challengeReq.show() + time.sleep(2) commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:] if self.parser.isNukiCommand(commandParsed) == False: sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed) @@ -242,6 +252,7 @@ def getLogEntriesCount(self, pinHex): self._charWriteResponse = "" self.device.char_write_handle(keyturnerUSDIOHandle,logEntriesReqEncryptedCommand,True,4) print "Nuki Log Entries Request sent: %s" % logEntriesReq.show() + time.sleep(2) commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:] if self.parser.isNukiCommand(commandParsed) == False: sys.exit("Error while requesting Nuki Log Entries: %s" % commandParsed) @@ -257,7 +268,7 @@ def getLogEntriesCount(self, pinHex): def getLogEntries(self,count,pinHex): self._makeBLEConnection() keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66") - self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse) + self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True)) challengeReq = nuki_messages.Nuki_REQ('0004') challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex')) challengeReqEncryptedCommand = challengeReqEncrypted.generate() @@ -265,6 +276,7 @@ def getLogEntries(self,count,pinHex): self._charWriteResponse = "" self.device.char_write_handle(keyturnerUSDIOHandle,challengeReqEncryptedCommand,True,5) print "Nuki CHALLENGE Request sent: %s" % challengeReq.show() + time.sleep(2) commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:] if self.parser.isNukiCommand(commandParsed) == False: sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed) @@ -279,6 +291,7 @@ def getLogEntries(self,count,pinHex): self._charWriteResponse = "" self.device.char_write_handle(keyturnerUSDIOHandle,logEntriesReqEncryptedCommand,True,6) print "Nuki Log Entries Request sent: %s" % logEntriesReq.show() + time.sleep(2) messages = self.parser.splitEncryptedMessages(self._charWriteResponse) print "Received %d messages" % len(messages) logMessages = []