Skip to content
This repository was archived by the owner on Oct 2, 2023. It is now read-only.

Commit 46aa15d

Browse files
author
Drew Shafer
committed
Add first take on bulk CSV token provisioning
1 parent b071d4b commit 46aa15d

1 file changed

Lines changed: 59 additions & 0 deletions

File tree

toopher/__init__.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import time
99
import requests_oauthlib
1010
import sys
11+
import binascii
12+
import csv
13+
import json
1114

1215
DEFAULT_BASE_URL = "https://api.toopher.com/v1"
1316
DEFAULT_IFRAME_TTL = 300
@@ -507,6 +510,22 @@ def _update(self, json_response):
507510

508511

509512
class OathOtpValidators(object):
513+
class ImportFormat(object):
514+
SAFENET = {
515+
'csv_dialect' : 'whitespace',
516+
'secret_format' : 'hex',
517+
'header_map' : {
518+
'Serial' : 'requester_specified_id',
519+
'Seed' : 'secret'
520+
},
521+
'defaults' : {
522+
'otp_type' : 'hotp',
523+
'otp_digits' : 6,
524+
'algorithm' : 'sha1'
525+
}
526+
}
527+
528+
510529
def __init__(self, api):
511530
self.api = api
512531

@@ -523,6 +542,46 @@ def provision(self, secret, requester_specified_id=None, otp_type='totp', otp_di
523542
params.update(kwargs)
524543
result = self.api.advanced.raw.post(url, **params)
525544
return OathOtpValidator(result, self.api)
545+
546+
def bulk_provision_from_csv(self, file_or_filename, style):
547+
def decode_validator_secret(encoded_secret, encoding):
548+
if encoding == 'hex':
549+
return encoded_secret
550+
elif encoding == 'ascii':
551+
return binascii.hexlify(bytearray(encoded_secret, 'utf-8'))
552+
else:
553+
raise ValueError('Unsupport secret encoding {0}'.format(encoding))
554+
555+
def read_csv(f):
556+
csv.register_dialect('whitespace', delimiter=' ', skipinitialspace=True)
557+
dialect = style['csv_dialect']
558+
f.seek(0)
559+
reader = csv.DictReader(f, dialect=dialect)
560+
validators = []
561+
for row in reader:
562+
validator = style['defaults'].copy()
563+
for csv_field_name, api_field_name in style['header_map'].iteritems():
564+
validator[api_field_name] = row[csv_field_name]
565+
if 'secret_format' in style:
566+
validator['secret'] = decode_validator_secret(validator['secret'], style['secret_format'])
567+
validators.append(validator)
568+
return validators
569+
570+
if isinstance(file_or_filename, basestring):
571+
with open(file_or_filename) as f:
572+
validators = read_csv(f)
573+
else:
574+
validators = read_csv(file_or_filename)
575+
576+
url = '/oath_otp_validators/bulk_provision'
577+
params = {
578+
'oath_otp_validators' : json.dumps(validators)
579+
}
580+
response = self.api.advanced.raw.post(url, **params)
581+
validators_json_list = response['oath_otp_validators']
582+
validators_list = [OathOtpValidator(json_validator, self.api) for json_validator in validators_json_list]
583+
return validators_list
584+
526585

527586
def get_by_id(self, validator_id):
528587
url = '/oath_otp_validators/{0}'.format(validator_id)

0 commit comments

Comments
 (0)