Skip to content
This repository was archived by the owner on Oct 2, 2023. It is now read-only.

Commit 4678423

Browse files
author
Drew Shafer
committed
Add first take on bulk CSV token provisioning
1 parent 92ab539 commit 4678423

1 file changed

Lines changed: 57 additions & 0 deletions

File tree

toopher/toopher_api.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import uuid
22
import requests_oauthlib
33
import sys
4+
import binascii
5+
import csv
6+
import json
47

58
DEFAULT_BASE_URL = 'https://api.toopher.com/v1'
69
VERSION = '2.0.0'
@@ -390,6 +393,21 @@ def _update(self, json_response):
390393

391394

392395
class OathOtpValidators(object):
396+
class ImportFormat(object):
397+
SAFENET = {
398+
'csv_dialect' : 'whitespace',
399+
'secret_format' : 'hex',
400+
'header_map' : {
401+
'Serial' : 'requester_specified_id',
402+
'Seed' : 'secret'
403+
},
404+
'defaults' : {
405+
'otp_type' : 'hotp',
406+
'otp_digits' : 6,
407+
'algorithm' : 'sha1'
408+
}
409+
}
410+
393411
def __init__(self, api):
394412
self.api = api
395413

@@ -407,6 +425,45 @@ def provision(self, secret, requester_specified_id=None, otp_type='totp', otp_di
407425
result = self.api.advanced.raw.post(url, **params)
408426
return OathOtpValidator(result, self.api)
409427

428+
def bulk_provision_from_csv(self, file_or_filename, style):
429+
def decode_validator_secret(encoded_secret, encoding):
430+
if encoding == 'hex':
431+
return encoded_secret
432+
elif encoding == 'ascii':
433+
return binascii.hexlify(bytearray(encoded_secret, 'utf-8'))
434+
else:
435+
raise ValueError('Unsupport secret encoding {0}'.format(encoding))
436+
437+
def read_csv(f):
438+
csv.register_dialect('whitespace', delimiter=' ', skipinitialspace=True)
439+
dialect = style['csv_dialect']
440+
f.seek(0)
441+
reader = csv.DictReader(f, dialect=dialect)
442+
validators = []
443+
for row in reader:
444+
validator = style['defaults'].copy()
445+
for csv_field_name, api_field_name in style['header_map'].iteritems():
446+
validator[api_field_name] = row[csv_field_name]
447+
if 'secret_format' in style:
448+
validator['secret'] = decode_validator_secret(validator['secret'], style['secret_format'])
449+
validators.append(validator)
450+
return validators
451+
452+
if isinstance(file_or_filename, basestring):
453+
with open(file_or_filename) as f:
454+
validators = read_csv(f)
455+
else:
456+
validators = read_csv(file_or_filename)
457+
458+
url = '/oath_otp_validators/bulk_provision'
459+
params = {
460+
'oath_otp_validators' : json.dumps(validators)
461+
}
462+
response = self.api.advanced.raw.post(url, **params)
463+
validators_json_list = response['oath_otp_validators']
464+
validators_list = [OathOtpValidator(json_validator, self.api) for json_validator in validators_json_list]
465+
return validators_list
466+
410467
def get_by_id(self, validator_id):
411468
url = '/oath_otp_validators/{0}'.format(validator_id)
412469
return OathOtpValidator(self.api.advanced.raw.get(url), api)

0 commit comments

Comments
 (0)