Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gatttool.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def discover_characteristics(self):

# Sleep one extra second in case we caught characteristic
# in the middle
time.sleep(1)
time.sleep(5)

if not self._characteristics:
raise NotConnectedError("Characteristic discovery failed")
Expand Down
23 changes: 18 additions & 5 deletions nuki.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import nacl.utils
import pygatt.backends
import array
Expand Down Expand Up @@ -84,12 +85,13 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
pairingHandle = self.device.get_handle('a92ee101-5501-11e4-916c-0800200c9a66')
print "Nuki Pairing UUID handle created: %04x" % pairingHandle
publicKeyReq = nuki_messages.Nuki_REQ('0003')
self.device.subscribe('a92ee101-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse)
self.device.subscribe('a92ee101-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True))
publicKeyReqCommand = publicKeyReq.generate()
self._charWriteResponse = ""
print "Requesting Nuki Public Key using command: %s" % publicKeyReq.show()
self.device.char_write_handle(pairingHandle,publicKeyReqCommand,True,2)
print "Nuki Public key requested"
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while requesting public key: %s" % commandParsed)
Expand All @@ -109,6 +111,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle,publicKeyPushCommand,True,5)
print "Public key pushed"
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while pushing public key: %s" % commandParsed)
Expand All @@ -122,6 +125,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle,authAuthenticatorCommand,True,5)
print "Authorization Authenticator sent: %s" % authAuthenticator.show()
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while sending Authorization Authenticator: %s" % commandParsed)
Expand All @@ -135,6 +139,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle,authDataCommand,True,7)
print "Authorization Data sent: %s" % authData.show()
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while sending Authorization Data: %s" % commandParsed)
Expand All @@ -151,6 +156,7 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
self._charWriteResponse = ""
self.device.char_write_handle(pairingHandle,authIDConfirmCommand,True,7)
print "Authorization ID Confirmation sent: %s" % authIDConfirm.show()
time.sleep(2)
commandParsed = self.parser.parse(self._charWriteResponse)
if self.parser.isNukiCommand(self._charWriteResponse) == False:
sys.exit("Error while sending Authorization ID Confirmation: %s" % commandParsed)
Expand All @@ -165,13 +171,14 @@ def authenticateUser(self, publicKeyHex, privateKeyHex, ID, IDType, name):
def readLockState(self):
self._makeBLEConnection()
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse)
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True))
stateReq = nuki_messages.Nuki_REQ('000C')
stateReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=stateReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
stateReqEncryptedCommand = stateReqEncrypted.generate()
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle,stateReqEncryptedCommand,True,3)
print "Nuki State Request sent: %s\nresponse received: %s" % (stateReq.show(),self._charWriteResponse)
time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki STATES: %s" % commandParsed)
Expand All @@ -186,13 +193,14 @@ def readLockState(self):
def lockAction(self,lockAction):
self._makeBLEConnection()
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse)
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True))
challengeReq = nuki_messages.Nuki_REQ('0004')
challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
challengeReqEncryptedCommand = challengeReqEncrypted.generate()
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle,challengeReqEncryptedCommand,True,4)
print "Nuki CHALLENGE Request sent: %s" % challengeReq.show()
time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed)
Expand All @@ -207,6 +215,7 @@ def lockAction(self,lockAction):
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle,lockActionReqEncryptedCommand,True,4)
print "Nuki Lock Action Request sent: %s" % lockActionReq.show()
time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki Lock Action: %s" % commandParsed)
Expand All @@ -220,14 +229,15 @@ def lockAction(self,lockAction):
def getLogEntriesCount(self, pinHex):
self._makeBLEConnection()
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse)
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True))
challengeReq = nuki_messages.Nuki_REQ('0004')
challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
challengeReqEncryptedCommand = challengeReqEncrypted.generate()
self._charWriteResponse = ""
print "Requesting CHALLENGE: %s" % challengeReqEncrypted.generate("HEX")
self.device.char_write_handle(keyturnerUSDIOHandle,challengeReqEncryptedCommand,True,5)
print "Nuki CHALLENGE Request sent: %s" % challengeReq.show()
time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed)
Expand All @@ -242,6 +252,7 @@ def getLogEntriesCount(self, pinHex):
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle,logEntriesReqEncryptedCommand,True,4)
print "Nuki Log Entries Request sent: %s" % logEntriesReq.show()
time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki Log Entries: %s" % commandParsed)
Expand All @@ -257,14 +268,15 @@ def getLogEntriesCount(self, pinHex):
def getLogEntries(self,count,pinHex):
self._makeBLEConnection()
keyturnerUSDIOHandle = self.device.get_handle("a92ee202-5501-11e4-916c-0800200c9a66")
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse)
self.device.subscribe('a92ee202-5501-11e4-916c-0800200c9a66', self._handleCharWriteResponse, indication=True))
challengeReq = nuki_messages.Nuki_REQ('0004')
challengeReqEncrypted = nuki_messages.Nuki_EncryptedCommand(authID=self.config.get(self.macAddress, 'authorizationID'), nukiCommand=challengeReq, publicKey=self.config.get(self.macAddress, 'publicKeyNuki'), privateKey=self.config.get(self.macAddress, 'privateKeyHex'))
challengeReqEncryptedCommand = challengeReqEncrypted.generate()
print "Requesting CHALLENGE: %s" % challengeReqEncrypted.generate("HEX")
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle,challengeReqEncryptedCommand,True,5)
print "Nuki CHALLENGE Request sent: %s" % challengeReq.show()
time.sleep(2)
commandParsed = self.parser.decrypt(self._charWriteResponse,self.config.get(self.macAddress, 'publicKeyNuki'),self.config.get(self.macAddress, 'privateKeyHex'))[8:]
if self.parser.isNukiCommand(commandParsed) == False:
sys.exit("Error while requesting Nuki CHALLENGE: %s" % commandParsed)
Expand All @@ -279,6 +291,7 @@ def getLogEntries(self,count,pinHex):
self._charWriteResponse = ""
self.device.char_write_handle(keyturnerUSDIOHandle,logEntriesReqEncryptedCommand,True,6)
print "Nuki Log Entries Request sent: %s" % logEntriesReq.show()
time.sleep(2)
messages = self.parser.splitEncryptedMessages(self._charWriteResponse)
print "Received %d messages" % len(messages)
logMessages = []
Expand Down