Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions jwt_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def createConfig():
jwksbuild = buildJWKS(n, e, "jwt_tool")
jwksout = {"keys": []}
jwksout["keys"].append(jwksbuild)
fulljwks = json.dumps(jwksout,separators=(",",":"), indent=4)
fulljwks = json_canon_escapeslash(json.dumps(jwksout,separators=(",",":")), indent=4)
with open(jwksName, 'w') as test_jwks_out:
test_jwks_out.write(fulljwks)
jwks_b64 = base64.urlsafe_b64encode(fulljwks.encode('ascii'))
Expand Down Expand Up @@ -271,6 +271,24 @@ def jwtOut(token, fromMod, desc=""):
except:
pass

def json_canon_escapeslash(s):
if not args.canonicalizeslashes:
return s
else:
s = s.replace("\\", "\\\\")
s = s.replace("/", "\\/")
# not escaping \" as it is already encoded by the json library, also not encoding tabstopp, newline etc, as they are also already encoded.
return s

def json_canonicalize(s):
headDict, paylDict, sig, contents = validateToken(jwt)
paylB64 = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(paylDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
headB64 = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(headDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
return headB64 + "." + paylB64 + "." + sig

def check_canonicalization(s) -> bool:
return (s == json_canonicalize(s))

def setLog(jwt, genTime, logID, modulename, targetURL, additional):
logLine = genTime+" | "+modulename+" | "+targetURL+" | "+additional
with open(logFilename, 'a') as logFile:
Expand All @@ -280,7 +298,7 @@ def setLog(jwt, genTime, logID, modulename, targetURL, additional):
def buildHead(alg, headDict):
newHead = headDict
newHead["alg"] = alg
newHead = base64.urlsafe_b64encode(json.dumps(newHead,separators=(",",":")).encode()).decode('UTF-8').strip("=")
newHead = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(newHead,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
return newHead

def checkNullSig(contents):
Expand Down Expand Up @@ -311,21 +329,21 @@ def checkPubKeyExploit(headDict, paylB64, pubKey):
exit(1)
newHead = headDict
newHead["alg"] = "HS256"
newHead = base64.urlsafe_b64encode(json.dumps(headDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")
newHead = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(headDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
newTok = newHead+"."+paylB64
newSig = base64.urlsafe_b64encode(hmac.new(key.encode(),newTok.encode(),hashlib.sha256).digest()).decode('UTF-8').strip("=")
return newTok, newSig

def injectpayloadclaim(payloadclaim, injectionvalue):
newpaylDict = paylDict
newpaylDict[payloadclaim] = castInput(injectionvalue)
newPaylB64 = base64.urlsafe_b64encode(json.dumps(newpaylDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")
newPaylB64 = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(newpaylDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
return newpaylDict, newPaylB64

def injectheaderclaim(headerclaim, injectionvalue):
def injectheaderclaim(headerclaim, injectionvalue, esc):
newheadDict = headDict
newheadDict[headerclaim] = castInput(injectionvalue)
newHeadB64 = base64.urlsafe_b64encode(json.dumps(newheadDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")
newHeadB64 = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(newheadDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
return newheadDict, newHeadB64

def tamperToken(paylDict, headDict, sig):
Expand Down Expand Up @@ -757,10 +775,10 @@ def jwksGen(headDict, paylDict, jku, privKey, kid="jwt_tool"):
cprintc("Invalid Private Key", "red")
exit(1)
newSig = base64.urlsafe_b64encode(signature).decode('UTF-8').strip("=")
jwksout = json.dumps(newjwks,separators=(",",":"), indent=4)
jwksout = json_canon_escapeslash(json.dumps(newjwks,separators=(",",":"), indent=4))
jwksbuild = {"keys": []}
jwksbuild["keys"].append(newjwks)
fulljwks = json.dumps(jwksbuild,separators=(",",":"), indent=4)
fulljwks = json_canon_escapeslash(json.dumps(jwksbuild,separators=(",",":"), indent=4))
if config['crypto']['jwks'] == "":
jwksName = "jwks_jwttool_RSA_"+nowtime+".json"
with open(jwksName, 'w') as test_jwks_out:
Expand Down Expand Up @@ -1172,9 +1190,9 @@ def getVal(promptString):

def genContents(headDict, paylDict, newContents=""):
if paylDict == {}:
newContents = base64.urlsafe_b64encode(json.dumps(headDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")+"."
newContents = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(headDict,separators=(",",":")).encode())).decode('UTF-8').strip("=")+"."
else:
newContents = base64.urlsafe_b64encode(json.dumps(headDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")+"."+base64.urlsafe_b64encode(json.dumps(paylDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")
newContents = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(headDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")+"."+base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(paylDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
return newContents.encode().decode('UTF-8')

def dissectPayl(paylDict, count=False):
Expand Down Expand Up @@ -1663,7 +1681,7 @@ def reflectedClaims():
for claim in paylDict:
tmpValue = paylDict[claim]
paylDict[claim] = checkVal+claim
tmpContents = base64.urlsafe_b64encode(json.dumps(headDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")+"."+base64.urlsafe_b64encode(json.dumps(paylDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")
tmpContents = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(headDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")+"."+base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(paylDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
jwtOut(tmpContents+"."+sig, "Claim processing check in "+claim+" claim", "Token sent to check if the signature is checked before the "+claim+" claim is processed")
if checkVal+claim in config['argvals']['rescontent']:
cprintc("Injected value in "+claim+" claim was observed - "+checkVal+claim, "red")
Expand Down Expand Up @@ -1847,6 +1865,8 @@ def printLogo():
help="disable redirects for current request (change in jwtconf.ini if permanent)")
parser.add_argument("-M", "--mode", action="store",
help="Scanning mode:\npb = playbook audit\ner = fuzz existing claims to force errors\ncc = fuzz common claims\nat - All Tests!")
parser.add_argument("-cs", "--canonicalizeslashes", action="store_true",
help="Fix canonicalization errors with encoded slashes. Turns '/' into '\\/' and '\\' into '\\\\'.")
parser.add_argument("-X", "--exploit", action="store",
help="eXploit known vulnerabilities:\na = alg:none\nn = null signature\nb = blank password accepted in signature\ns = spoof JWKS (specify JWKS URL with -ju, or set in jwtconf.ini to automate this attack)\nk = key confusion (specify public key with -pk)\ni = inject inline JWKS")
parser.add_argument("-ju", "--jwksurl", action="store",
Expand Down Expand Up @@ -2036,6 +2056,10 @@ def printLogo():
parser.print_usage()
cprintc("No JWT provided", "red")
exit(1)
if not check_canonicalization(findJWT):
cprintc("Issue in canonicalization of JWT. Try --canonicalizeslashes or report to https://github.com/ticarpi/jwt_tool/issues/48.", "red")
cprintc("Canonicalized JWT: "+json_canonicalize(findJWT)+"\n", "red")
exit(1)
if args.mode:
if args.mode not in ['pb','er', 'cc', 'at']:
parser.print_usage()
Expand All @@ -2058,7 +2082,7 @@ def printLogo():
else:
config['argvals']['sigType'] = args.sign
headDict, paylDict, sig, contents = validateToken(jwt)
paylB64 = base64.urlsafe_b64encode(json.dumps(paylDict,separators=(",",":")).encode()).decode('UTF-8').strip("=")
paylB64 = base64.urlsafe_b64encode(json_canon_escapeslash(json.dumps(paylDict,separators=(",",":"))).encode()).decode('UTF-8').strip("=")
config['argvals']['overridesub'] = "false"
if args.targeturl:
config['argvals']['targetUrl'] = args.targeturl.replace('%','%%')
Expand Down