From e2173052b83f9c6dee2198fe5fd000dd4c043aa5 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Thu, 7 Aug 2025 14:33:28 +0100 Subject: [PATCH 01/12] Bug fixes for mock Auth and Registry: * Allow CORS for Auth server * Use correct scopes for registration and query APIs of Registry * Relax enforced use of optional Auth features --- nmostesting/NMOSTesting.py | 1 + nmostesting/mocks/Auth.py | 12 +--- nmostesting/mocks/Registry.py | 101 +++++++++++++++++++++------------- 3 files changed, 66 insertions(+), 48 deletions(-) diff --git a/nmostesting/NMOSTesting.py b/nmostesting/NMOSTesting.py index b95f4a15c..77387470e 100644 --- a/nmostesting/NMOSTesting.py +++ b/nmostesting/NMOSTesting.py @@ -169,6 +169,7 @@ # Primary Authorization server if CONFIG.ENABLE_AUTH: auth_app = Flask(__name__) + CORS(auth_app) auth_app.debug = False auth_app.config['AUTH_INSTANCE'] = 0 auth_app.config['PORT'] = PRIMARY_AUTH.port diff --git a/nmostesting/mocks/Auth.py b/nmostesting/mocks/Auth.py index 4b43921fa..35d484ce5 100644 --- a/nmostesting/mocks/Auth.py +++ b/nmostesting/mocks/Auth.py @@ -302,10 +302,6 @@ def auth_auth(): # Recommended parameters # state - ctype_valid, ctype_message = check_content_type(request.headers, "application/x-www-form-urlencoded") - if not ctype_valid: - raise AuthException("invalid_request", ctype_message) - # hmm, no client authorization done, just redirects a random authorization code back to the client # TODO: add web pages for client authorization for the future @@ -370,7 +366,6 @@ def auth_auth(): def auth_token(): auth = AUTHS[flask.current_app.config["AUTH_INSTANCE"]] try: - auth_header_required = False scopes = [] ctype_valid, ctype_message = check_content_type(request.headers, "application/x-www-form-urlencoded") @@ -395,13 +390,12 @@ def auth_token(): refresh_token = query["refresh_token"][0] if "refresh_token" in query else None + # Scope query parameter is OPTIONAL https://datatracker.ietf.org/doc/html/rfc6749#section-6 scopes = query["scope"][0].split() if "scope" in query else SCOPE.split() if SCOPE else [] if scopes: scope_found = IS10Utils.is_any_contain(scopes, SCOPES) if not scope_found: raise AuthException("invalid_scope", "scope: {} are not supported".format(scopes)) - else: - raise AuthException("invalid_scope", "empty scope") if grant_type: # Authorization Code Grant @@ -484,8 +478,6 @@ def auth_token(): else: raise AuthException("unsupported_grant_type", "missing client_assertion_type used for private_key_jwt client authentication") - else: - auth_header_required = True # for the Confidential client, client_id and client_secret are embedded in the Authorization header auth_header = request.headers.get("Authorization", None) @@ -504,8 +496,6 @@ def auth_token(): "missing client_id or client_secret from authorization header") else: raise AuthException("invalid_client", "invalid authorization header") - elif auth_header_required: - raise AuthException("invalid_client", "invalid authorization header", HTTPStatus.UNAUTHORIZED) # client_id MUST be provided by all types of client if not client_id: diff --git a/nmostesting/mocks/Registry.py b/nmostesting/mocks/Registry.py index af2e79225..a824849b7 100644 --- a/nmostesting/mocks/Registry.py +++ b/nmostesting/mocks/Registry.py @@ -171,29 +171,29 @@ def _check_path_match(self, path, path_wildcards): break return path_match - def check_authorized(self, headers, path, write=False): + def check_authorized(self, headers, path, scopes=["x-nmos-registration"], write=False): if ENABLE_AUTH: try: + if "Authorization" not in request.headers: + return 400, "Authorization header not found" if not request.headers["Authorization"].startswith("Bearer "): - return 400 + return 400, "Bearer not found in Authorization header" token = request.headers["Authorization"].split(" ")[1] claims = jwt.decode(token, PRIMARY_AUTH.generate_jwk()) claims.validate() if claims["iss"] != PRIMARY_AUTH.make_issuer(): - return 401 + return 401, f"Unexpected issuer, expected: {PRIMARY_AUTH.make_issuer()}, actual: {claims['iss']}" # TODO: Check 'aud' claim matches 'mocks.' - if not self._check_path_match(path, claims["x-nmos-registration"]["read"]): - return 403 - if write: - if not self._check_path_match(path, claims["x-nmos-registration"]["write"]): - return 403 - except KeyError: - # TODO: Add debug which can be returned in the error response JSON - return 400 - except Exception: - # TODO: Add debug which can be returned in the error response JSON - return 400 - return True + for scope in scopes: + if not self._check_path_match(path, claims[scope]["read"]): + return 403, f"Paths mismatch for {scope} read claims" + if write and not self._check_path_match(path, claims[scope]["write"]): + return 403, f"Paths mismatch for {scope} write claims" + except KeyError as err: + return 400, f"KeyError: {err}" + except Exception as err: + return 400, f"Exception: {err}" + return True, "" # Query API subscription support methods @@ -351,9 +351,11 @@ def registration_root(): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-registration"]) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) base_data = [version + '/' for version in SPECIFICATIONS["is-04"]["versions"]] @@ -366,9 +368,11 @@ def base_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-registration"]) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) base_data = ["resource/", "health/"] # Using json.dumps to support older Flask versions http://flask.pocoo.org/docs/1.0/security/#json-security @@ -380,9 +384,12 @@ def post_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized = registry.check_authorized(request.headers, request.path, True) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-registration"], + write=True) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) if not registry.test_first_reg: registered = False try: @@ -410,9 +417,12 @@ def delete_resource(version, resource_type, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized = registry.check_authorized(request.headers, request.path, True) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-registration"], + write=True) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) resource_type = resource_type.rstrip("s") if not registry.test_first_reg: registered = False @@ -444,9 +454,12 @@ def heartbeat(version, node_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized = registry.check_authorized(request.headers, request.path, True) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-registration"], + write=True) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) if node_id in registry.get_resources()["node"]: # store raw request payload, in order to check for empty request bodies later try: @@ -463,9 +476,11 @@ def query_root(): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-query"]) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) base_data = [version + '/' for version in SPECIFICATIONS["is-04"]["versions"]] @@ -477,9 +492,11 @@ def query(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-query"]) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) registry.requested_query_api_version = version @@ -501,9 +518,11 @@ def query_resource(version, resource): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-query"]) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) registry.requested_query_api_version = version @@ -626,9 +645,11 @@ def get_resource(version, resource, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-query"]) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) registry.requested_query_api_version = version registry.query_api_called = True @@ -652,9 +673,12 @@ def post_subscription(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-query"], + write=True) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) registry.requested_query_api_version = version subscription_request = request.json @@ -696,9 +720,12 @@ def delete_subscription(version, subscription_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized = registry.check_authorized(request.headers, request.path) + authorized, error_message = registry.check_authorized(request.headers, + request.path, + scopes=["x-nmos-query"], + write=True) if authorized is not True: - abort(authorized) + abort(authorized, description=error_message) registry.requested_query_api_version = version From e59a3790932ca5e428cec10d01cdf182300cb052 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Thu, 7 Aug 2025 15:37:38 +0100 Subject: [PATCH 02/12] Cache client scopes at authorisation to use on token refresh --- nmostesting/mocks/Auth.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nmostesting/mocks/Auth.py b/nmostesting/mocks/Auth.py index 35d484ce5..975ae8354 100644 --- a/nmostesting/mocks/Auth.py +++ b/nmostesting/mocks/Auth.py @@ -113,6 +113,7 @@ def __init__(self, port_increment, version="v1.0"): self.host = get_mocks_hostname() # authorization code of the authorization code flow self.code = None + self.scopes_cache = {} # remember client scopes def make_mdns_info(self, priority=0, api_ver=None, ip=None): """Get an mDNS ServiceInfo object in order to create an advertisement""" @@ -338,6 +339,8 @@ def auth_auth(): if not scope_found: error = "invalid_request" error_description = "scope: {} are not supported".format(scopes) + # cache the client scopes + auth.scopes_cache[request.args["client_id"]] = request.args["scope"].split() vars = {} if error: @@ -391,11 +394,16 @@ def auth_token(): refresh_token = query["refresh_token"][0] if "refresh_token" in query else None # Scope query parameter is OPTIONAL https://datatracker.ietf.org/doc/html/rfc6749#section-6 - scopes = query["scope"][0].split() if "scope" in query else SCOPE.split() if SCOPE else [] + # Use scopes cached from when the token was created if not provided in query + cached_scopes = auth.scopes_cache[client_id] if client_id in auth.scopes_cache else [] + scopes = query["scope"][0].split() if "scope" in query else cached_scopes \ + if len(cached_scopes) else SCOPE.split() if SCOPE else [] if scopes: scope_found = IS10Utils.is_any_contain(scopes, SCOPES) if not scope_found: raise AuthException("invalid_scope", "scope: {} are not supported".format(scopes)) + else: + raise AuthException("invalid_scope", "empty scope") if grant_type: # Authorization Code Grant From 869a667ee73beeb9f4c59d6841a7634cd9a24d5e Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Thu, 7 Aug 2025 17:11:02 +0100 Subject: [PATCH 03/12] Factor check authorization into a utility function --- nmostesting/IS10Utils.py | 36 ++++++++++ nmostesting/mocks/Registry.py | 122 ++++++++++++++-------------------- 2 files changed, 86 insertions(+), 72 deletions(-) diff --git a/nmostesting/IS10Utils.py b/nmostesting/IS10Utils.py index 8f170cc00..b485f892c 100644 --- a/nmostesting/IS10Utils.py +++ b/nmostesting/IS10Utils.py @@ -15,6 +15,7 @@ from Crypto.PublicKey import RSA from authlib.jose import jwt, JsonWebKey +import re import time import uuid @@ -22,6 +23,7 @@ from OpenSSL import crypto from cryptography.hazmat.primitives import serialization from cryptography import x509 +from flask import request from .TestHelper import get_default_ip, get_mocks_hostname from . import Config as CONFIG @@ -157,3 +159,37 @@ def is_any_contain(list, enum): if item in [e.name for e in enum]: return True return False + + @staticmethod + def check_authorization(auth, path, scopes=["x-nmos-registration"], write=False): + def _check_path_match(path, path_wildcards): + path_match = False + for path_wildcard in path_wildcards: + pattern = path_wildcard.replace("*", ".*") + if re.search(pattern, path): + path_match = True + break + return path_match + + if CONFIG.ENABLE_AUTH: + try: + if "Authorization" not in request.headers: + return 400, "Authorization header not found" + if not request.headers["Authorization"].startswith("Bearer "): + return 400, "Bearer not found in Authorization header" + token = request.headers["Authorization"].split(" ")[1] + claims = jwt.decode(token, auth.generate_jwk()) + claims.validate() + if claims["iss"] != auth.make_issuer(): + return 401, f"Unexpected issuer, expected: {auth.make_issuer()}, actual: {claims['iss']}" + # TODO: Check 'aud' claim matches 'mocks.' + for scope in scopes: + if not _check_path_match(path, claims[scope]["read"]): + return 403, f"Paths mismatch for {scope} read claims" + if write and not _check_path_match(path, claims[scope]["write"]): + return 403, f"Paths mismatch for {scope} write claims" + except KeyError as err: + return 400, f"KeyError: {err}" + except Exception as err: + return 400, f"Exception: {err}" + return True, "" diff --git a/nmostesting/mocks/Registry.py b/nmostesting/mocks/Registry.py index a824849b7..6f3233add 100644 --- a/nmostesting/mocks/Registry.py +++ b/nmostesting/mocks/Registry.py @@ -15,12 +15,13 @@ import time import flask import json -import re import uuid import functools from flask import request, jsonify, abort, Blueprint, Response from threading import Event, Lock + +from ..IS10Utils import IS10Utils from ..Config import PORT_BASE, ENABLE_AUTH, \ WEBSOCKET_PORT_BASE, ENABLE_HTTPS, SPECIFICATIONS from authlib.jose import jwt @@ -162,39 +163,6 @@ def _get_client_id(self, headers): return None return None - def _check_path_match(self, path, path_wildcards): - path_match = False - for path_wildcard in path_wildcards: - pattern = path_wildcard.replace("*", ".*") - if re.search(pattern, path): - path_match = True - break - return path_match - - def check_authorized(self, headers, path, scopes=["x-nmos-registration"], write=False): - if ENABLE_AUTH: - try: - if "Authorization" not in request.headers: - return 400, "Authorization header not found" - if not request.headers["Authorization"].startswith("Bearer "): - return 400, "Bearer not found in Authorization header" - token = request.headers["Authorization"].split(" ")[1] - claims = jwt.decode(token, PRIMARY_AUTH.generate_jwk()) - claims.validate() - if claims["iss"] != PRIMARY_AUTH.make_issuer(): - return 401, f"Unexpected issuer, expected: {PRIMARY_AUTH.make_issuer()}, actual: {claims['iss']}" - # TODO: Check 'aud' claim matches 'mocks.' - for scope in scopes: - if not self._check_path_match(path, claims[scope]["read"]): - return 403, f"Paths mismatch for {scope} read claims" - if write and not self._check_path_match(path, claims[scope]["write"]): - return 403, f"Paths mismatch for {scope} write claims" - except KeyError as err: - return 400, f"KeyError: {err}" - except Exception as err: - return 400, f"Exception: {err}" - return True, "" - # Query API subscription support methods def subscribe_to_query_api(self, version, subscription_request, secure=False): @@ -351,9 +319,9 @@ def registration_root(): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-registration"]) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-registration"]) if authorized is not True: abort(authorized, description=error_message) @@ -368,9 +336,10 @@ def base_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-registration"]) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-registration"]) + if authorized is not True: abort(authorized, description=error_message) base_data = ["resource/", "health/"] @@ -384,10 +353,11 @@ def post_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-registration"], - write=True) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-registration"], + write=True) + if authorized is not True: abort(authorized, description=error_message) if not registry.test_first_reg: @@ -417,10 +387,11 @@ def delete_resource(version, resource_type, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-registration"], - write=True) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-registration"], + write=True) + if authorized is not True: abort(authorized, description=error_message) resource_type = resource_type.rstrip("s") @@ -454,10 +425,11 @@ def heartbeat(version, node_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-registration"], - write=True) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-registration"], + write=True) + if authorized is not True: abort(authorized, description=error_message) if node_id in registry.get_resources()["node"]: @@ -476,9 +448,10 @@ def query_root(): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-query"]) + if authorized is not True: abort(authorized, description=error_message) @@ -492,9 +465,10 @@ def query(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-query"]) + if authorized is not True: abort(authorized, description=error_message) @@ -518,9 +492,10 @@ def query_resource(version, resource): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-query"]) + if authorized is not True: abort(authorized, description=error_message) @@ -645,9 +620,10 @@ def get_resource(version, resource, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-query"]) + if authorized is not True: abort(authorized, description=error_message) @@ -673,10 +649,11 @@ def post_subscription(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-query"], - write=True) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-query"], + write=True) + if authorized is not True: abort(authorized, description=error_message) @@ -720,10 +697,11 @@ def delete_subscription(version, subscription_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = registry.check_authorized(request.headers, - request.path, - scopes=["x-nmos-query"], - write=True) + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-query"], + write=True) + if authorized is not True: abort(authorized, description=error_message) From 1f2c6f00c55685c85cdb36a0823f47b4909566ca Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Fri, 8 Aug 2025 10:01:12 +0100 Subject: [PATCH 04/12] Add authorization to mock Node --- nmostesting/mocks/Node.py | 60 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/nmostesting/mocks/Node.py b/nmostesting/mocks/Node.py index 80245f1c9..5484eefea 100644 --- a/nmostesting/mocks/Node.py +++ b/nmostesting/mocks/Node.py @@ -24,6 +24,8 @@ from .. import Config as CONFIG from ..TestHelper import get_default_ip, do_request from ..IS04Utils import IS04Utils +from ..IS10Utils import IS10Utils +from .Auth import PRIMARY_AUTH class Node(object): @@ -376,6 +378,12 @@ def x_nmos_root(): def connection_root(): base_data = ['v1.0/', 'v1.1/'] + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) + return make_response(Response(json.dumps(base_data), mimetype='application/json')) @@ -383,6 +391,12 @@ def connection_root(): def version(version): base_data = ['bulk/', 'single/'] + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) + return make_response(Response(json.dumps(base_data), mimetype='application/json')) @@ -390,11 +404,23 @@ def version(version): def single(version): base_data = ['senders/', 'receivers/'] + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) + return make_response(Response(json.dumps(base_data), mimetype='application/json')) @NODE_API.route('/x-nmos/connection//single//', methods=["GET"], strict_slashes=False) def resources(version, resource): + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) + if resource == 'senders': base_data = [r + '/' for r in [*NODE.senders]] elif resource == 'receivers': @@ -405,6 +431,12 @@ def resources(version, resource): @NODE_API.route('/x-nmos/connection//single//', methods=["GET"], strict_slashes=False) def connection(version, resource, resource_id): + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) + if resource != 'senders' and resource != 'receivers': abort(404) @@ -441,6 +473,12 @@ def _get_constraints(resource): @NODE_API.route('/x-nmos/connection//single///constraints', methods=["GET"], strict_slashes=False) def constraints(version, resource, resource_id): + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) + base_data = [_get_constraints(resource)] return make_response(Response(json.dumps(base_data), mimetype='application/json')) @@ -479,6 +517,13 @@ def staged(version, resource, resource_id): activating a connection without staging or deactivating an active connection Updates data then POSTs updated resource to registry """ + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"], + write=request.method == 'PATCH') + if authorized is not True: + abort(authorized, description=error_message) + # Track requests NODE.staged_requests.append({'method': request.method, 'resource': resource, 'resource_id': resource_id, 'data': request.get_json(silent=True)}) @@ -516,6 +561,11 @@ def staged(version, resource, resource_id): @NODE_API.route('/x-nmos/connection//single///active', methods=["GET"], strict_slashes=False) def active(version, resource, resource_id): + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) try: if resource == 'senders': base_data = NODE.senders[resource_id]['activations']['active'] @@ -530,6 +580,11 @@ def active(version, resource, resource_id): @NODE_API.route('/x-nmos/connection//single///transporttype', methods=["GET"], strict_slashes=False) def transport_type(version, resource, resource_id): + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) # TODO fetch from resource info base_data = "urn:x-nmos:transport:rtp" @@ -584,6 +639,11 @@ def node_sdp(media_type, media_subtype): @NODE_API.route('/x-nmos/connection//single///transportfile', methods=["GET"], strict_slashes=False) def transport_file(version, resource, resource_id): + authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, + request.path, + scopes=["x-nmos-connection"]) + if authorized is not True: + abort(authorized, description=error_message) # GET should either redirect to the location of the transport file or return it directly try: if resource == 'senders': From 134c434c9d4ccd86dc8e134f11193b4939047e2a Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Fri, 8 Aug 2025 11:15:31 +0100 Subject: [PATCH 05/12] Introduce caching of token evaluation to improve performance --- nmostesting/IS10Utils.py | 11 ++-- nmostesting/mocks/Node.py | 23 +++++---- nmostesting/mocks/Registry.py | 94 +++++++++++++++++++++-------------- 3 files changed, 73 insertions(+), 55 deletions(-) diff --git a/nmostesting/IS10Utils.py b/nmostesting/IS10Utils.py index b485f892c..bae35b7a2 100644 --- a/nmostesting/IS10Utils.py +++ b/nmostesting/IS10Utils.py @@ -161,7 +161,7 @@ def is_any_contain(list, enum): return False @staticmethod - def check_authorization(auth, path, scopes=["x-nmos-registration"], write=False): + def check_authorization(auth, path, scope="x-nmos-registration", write=False): def _check_path_match(path, path_wildcards): path_match = False for path_wildcard in path_wildcards: @@ -183,11 +183,10 @@ def _check_path_match(path, path_wildcards): if claims["iss"] != auth.make_issuer(): return 401, f"Unexpected issuer, expected: {auth.make_issuer()}, actual: {claims['iss']}" # TODO: Check 'aud' claim matches 'mocks.' - for scope in scopes: - if not _check_path_match(path, claims[scope]["read"]): - return 403, f"Paths mismatch for {scope} read claims" - if write and not _check_path_match(path, claims[scope]["write"]): - return 403, f"Paths mismatch for {scope} write claims" + if not _check_path_match(path, claims[scope]["read"]): + return 403, f"Paths mismatch for {scope} read claims" + if write and not _check_path_match(path, claims[scope]["write"]): + return 403, f"Paths mismatch for {scope} write claims" except KeyError as err: return 400, f"KeyError: {err}" except Exception as err: diff --git a/nmostesting/mocks/Node.py b/nmostesting/mocks/Node.py index 5484eefea..18ab0924c 100644 --- a/nmostesting/mocks/Node.py +++ b/nmostesting/mocks/Node.py @@ -380,7 +380,7 @@ def connection_root(): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -393,7 +393,7 @@ def version(version): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -406,7 +406,7 @@ def single(version): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -417,7 +417,7 @@ def single(version): def resources(version, resource): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -433,7 +433,7 @@ def resources(version, resource): def connection(version, resource, resource_id): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -475,7 +475,7 @@ def _get_constraints(resource): def constraints(version, resource, resource_id): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -517,10 +517,11 @@ def staged(version, resource, resource_id): activating a connection without staging or deactivating an active connection Updates data then POSTs updated resource to registry """ + write = (request.method == 'PATCH') authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"], - write=request.method == 'PATCH') + scope="x-nmos-connection", + write=write) if authorized is not True: abort(authorized, description=error_message) @@ -563,7 +564,7 @@ def staged(version, resource, resource_id): def active(version, resource, resource_id): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) try: @@ -582,7 +583,7 @@ def active(version, resource, resource_id): def transport_type(version, resource, resource_id): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) # TODO fetch from resource info @@ -641,7 +642,7 @@ def node_sdp(media_type, media_subtype): def transport_file(version, resource, resource_id): authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, request.path, - scopes=["x-nmos-connection"]) + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) # GET should either redirect to the location of the transport file or return it directly diff --git a/nmostesting/mocks/Registry.py b/nmostesting/mocks/Registry.py index 6f3233add..19ecc108d 100644 --- a/nmostesting/mocks/Registry.py +++ b/nmostesting/mocks/Registry.py @@ -79,6 +79,7 @@ def reset(self): self.query_api_called = False self.paging_limit = 100 self.pagination_used = False + self.auth_cache = {} def add(self, headers, payload, version): self.last_time = time.time() @@ -163,6 +164,23 @@ def _get_client_id(self, headers): return None return None + def check_authorization(self, auth, path, scope, write=False): + if scope in self.auth_cache and \ + ((write and self.auth_cache[scope]["Write"]) or self.auth_cache[scope]["Read"]): + return True, "" + + authorized, error_message = IS10Utils.check_authorization(auth, + path, + scope=scope, + write=write) + if authorized: + if scope not in self.auth_cache: + self.auth_cache[scope] = {"Read": True, "Write": write} + else: + self.auth_cache[scope]["Read"] = True + self.auth_cache[scope]["Write"] = self.auth_cache[scope]["Write"] or write + return authorized, error_message + # Query API subscription support methods def subscribe_to_query_api(self, version, subscription_request, secure=False): @@ -319,9 +337,9 @@ def registration_root(): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-registration"]) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-registration") if authorized is not True: abort(authorized, description=error_message) @@ -336,9 +354,9 @@ def base_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-registration"]) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-registration") if authorized is not True: abort(authorized, description=error_message) @@ -353,10 +371,10 @@ def post_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-registration"], - write=True) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-registration", + write=True) if authorized is not True: abort(authorized, description=error_message) @@ -387,10 +405,10 @@ def delete_resource(version, resource_type, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-registration"], - write=True) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-registration", + write=True) if authorized is not True: abort(authorized, description=error_message) @@ -425,10 +443,10 @@ def heartbeat(version, node_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(500) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-registration"], - write=True) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-registration", + write=True) if authorized is not True: abort(authorized, description=error_message) @@ -448,9 +466,9 @@ def query_root(): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-query") if authorized is not True: abort(authorized, description=error_message) @@ -465,9 +483,9 @@ def query(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-query") if authorized is not True: abort(authorized, description=error_message) @@ -492,9 +510,9 @@ def query_resource(version, resource): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-query") if authorized is not True: abort(authorized, description=error_message) @@ -620,9 +638,9 @@ def get_resource(version, resource, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-query"]) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-query") if authorized is not True: abort(authorized, description=error_message) @@ -649,10 +667,10 @@ def post_subscription(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-query"], - write=True) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-query", + write=True) if authorized is not True: abort(authorized, description=error_message) @@ -697,10 +715,10 @@ def delete_subscription(version, subscription_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scopes=["x-nmos-query"], - write=True) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-query", + write=True) if authorized is not True: abort(authorized, description=error_message) From 135392462b1fbf6cccecbb3895c02bf882911c52 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Fri, 8 Aug 2025 11:22:36 +0100 Subject: [PATCH 06/12] Introduce caching of token evaluation in mock Node to improve performance --- nmostesting/mocks/Node.py | 80 ++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/nmostesting/mocks/Node.py b/nmostesting/mocks/Node.py index 18ab0924c..22dd074bf 100644 --- a/nmostesting/mocks/Node.py +++ b/nmostesting/mocks/Node.py @@ -41,6 +41,7 @@ def reset(self): self.receivers = {} self.senders = {} self.patched_sdp = {} + self.auth_cache = {} def get_sender(self, media_type="video/raw", version="v1.3"): protocol = "http" @@ -362,6 +363,23 @@ def patch_staged(self, resource, resource_id, request_json): return response_data, response_code + def check_authorization(self, auth, path, scope, write=False): + if scope in self.auth_cache and \ + ((write and self.auth_cache[scope]["Write"]) or self.auth_cache[scope]["Read"]): + return True, "" + + authorized, error_message = IS10Utils.check_authorization(auth, + path, + scope=scope, + write=write) + if authorized: + if scope not in self.auth_cache: + self.auth_cache[scope] = {"Read": True, "Write": write} + else: + self.auth_cache[scope]["Read"] = True + self.auth_cache[scope]["Write"] = self.auth_cache[scope]["Write"] or write + return authorized, error_message + NODE = Node(1) NODE_API = Blueprint('node_api', __name__) @@ -378,9 +396,9 @@ def x_nmos_root(): def connection_root(): base_data = ['v1.0/', 'v1.1/'] - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -391,9 +409,9 @@ def connection_root(): def version(version): base_data = ['bulk/', 'single/'] - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -404,9 +422,9 @@ def version(version): def single(version): base_data = ['senders/', 'receivers/'] - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -415,9 +433,9 @@ def single(version): @NODE_API.route('/x-nmos/connection//single//', methods=["GET"], strict_slashes=False) def resources(version, resource): - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -431,9 +449,9 @@ def resources(version, resource): @NODE_API.route('/x-nmos/connection//single//', methods=["GET"], strict_slashes=False) def connection(version, resource, resource_id): - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -473,9 +491,9 @@ def _get_constraints(resource): @NODE_API.route('/x-nmos/connection//single///constraints', methods=["GET"], strict_slashes=False) def constraints(version, resource, resource_id): - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) @@ -518,10 +536,10 @@ def staged(version, resource, resource_id): Updates data then POSTs updated resource to registry """ write = (request.method == 'PATCH') - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection", - write=write) + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection", + write=write) if authorized is not True: abort(authorized, description=error_message) @@ -562,9 +580,9 @@ def staged(version, resource, resource_id): @NODE_API.route('/x-nmos/connection//single///active', methods=["GET"], strict_slashes=False) def active(version, resource, resource_id): - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) try: @@ -581,9 +599,9 @@ def active(version, resource, resource_id): @NODE_API.route('/x-nmos/connection//single///transporttype', methods=["GET"], strict_slashes=False) def transport_type(version, resource, resource_id): - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) # TODO fetch from resource info @@ -640,9 +658,9 @@ def node_sdp(media_type, media_subtype): @NODE_API.route('/x-nmos/connection//single///transportfile', methods=["GET"], strict_slashes=False) def transport_file(version, resource, resource_id): - authorized, error_message = IS10Utils.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection") if authorized is not True: abort(authorized, description=error_message) # GET should either redirect to the location of the transport file or return it directly From c0782d70c539167da21d13ac92291f09f4dc6604 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Fri, 8 Aug 2025 13:42:38 +0100 Subject: [PATCH 07/12] Improve authorization checking --- nmostesting/mocks/Node.py | 6 +++++- nmostesting/mocks/Registry.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/nmostesting/mocks/Node.py b/nmostesting/mocks/Node.py index 22dd074bf..304c70d85 100644 --- a/nmostesting/mocks/Node.py +++ b/nmostesting/mocks/Node.py @@ -364,7 +364,11 @@ def patch_staged(self, resource, resource_id, request_json): return response_data, response_code def check_authorization(self, auth, path, scope, write=False): - if scope in self.auth_cache and \ + if not CONFIG.ENABLE_AUTH: + return True, "" + + if "Authorization" in request.headers and request.headers["Authorization"].startswith("Bearer ") \ + and scope in self.auth_cache and \ ((write and self.auth_cache[scope]["Write"]) or self.auth_cache[scope]["Read"]): return True, "" diff --git a/nmostesting/mocks/Registry.py b/nmostesting/mocks/Registry.py index 19ecc108d..698bdf59e 100644 --- a/nmostesting/mocks/Registry.py +++ b/nmostesting/mocks/Registry.py @@ -165,7 +165,11 @@ def _get_client_id(self, headers): return None def check_authorization(self, auth, path, scope, write=False): - if scope in self.auth_cache and \ + if not ENABLE_AUTH: + return True, "" + + if "Authorization" in request.headers and request.headers["Authorization"].startswith("Bearer ") \ + and scope in self.auth_cache and \ ((write and self.auth_cache[scope]["Write"]) or self.auth_cache[scope]["Read"]): return True, "" From 432fe2dd0555fc64819a9373c0b7019a802fc499 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe <64410119+jonathan-r-thorpe@users.noreply.github.com> Date: Fri, 8 Aug 2025 16:35:31 +0100 Subject: [PATCH 08/12] Apply suggestions from code review Co-authored-by: Simon Lo --- nmostesting/mocks/Auth.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nmostesting/mocks/Auth.py b/nmostesting/mocks/Auth.py index 975ae8354..b4e70c69f 100644 --- a/nmostesting/mocks/Auth.py +++ b/nmostesting/mocks/Auth.py @@ -340,7 +340,7 @@ def auth_auth(): error = "invalid_request" error_description = "scope: {} are not supported".format(scopes) # cache the client scopes - auth.scopes_cache[request.args["client_id"]] = request.args["scope"].split() + auth.scopes_cache[request.args["client_id"]] = scopes vars = {} if error: @@ -393,7 +393,9 @@ def auth_token(): refresh_token = query["refresh_token"][0] if "refresh_token" in query else None - # Scope query parameter is OPTIONAL https://datatracker.ietf.org/doc/html/rfc6749#section-6 + # Scope query parameter is OPTIONAL + # see https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2 + # and https://datatracker.ietf.org/doc/html/rfc6749#section-6 # Use scopes cached from when the token was created if not provided in query cached_scopes = auth.scopes_cache[client_id] if client_id in auth.scopes_cache else [] scopes = query["scope"][0].split() if "scope" in query else cached_scopes \ From a7b5b8ced510673fd9928f9764a29d4bf03c48eb Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Mon, 6 Oct 2025 10:41:00 +0100 Subject: [PATCH 09/12] Guard against null command responses --- nmostesting/IS12Utils.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nmostesting/IS12Utils.py b/nmostesting/IS12Utils.py index 7cf85bed3..b1e10cca5 100644 --- a/nmostesting/IS12Utils.py +++ b/nmostesting/IS12Utils.py @@ -171,6 +171,12 @@ def send_command(self, test, command_json): for tm in self.ncp_websocket.get_timestamped_messages(): parsed_message = json.loads(tm.message) + if parsed_message is None: + raise NMOSTestException(test.FAIL( + f"Null message received for command: {str(command_json)}", + f"https://specs.amwa.tv/is-12/branches/{self.apis[CONTROL_API_KEY]['spec_branch']}" + "/docs/Protocol_messaging.html#command-message-type")) + if self.message_type_to_schema_name(parsed_message.get("messageType")): self._validate_is12_schema( test, @@ -230,7 +236,7 @@ def get_notifications(self): # Get any timestamped messages that have arrived in the interim period for tm in self.ncp_websocket.get_timestamped_messages(): parsed_message = json.loads(tm.message) - if parsed_message["messageType"] == MessageTypes.Notification: + if parsed_message and parsed_message["messageType"] == MessageTypes.Notification: self.notifications += [IS12Notification(n, tm.received_time) for n in parsed_message["notifications"]] return self.notifications From 5bf361435a89bcb9e2e742527130d6ae06197447 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Mon, 6 Oct 2025 11:18:56 +0100 Subject: [PATCH 10/12] Authorization decorator for mock Node --- nmostesting/mocks/Node.py | 86 ++++++++++++--------------------------- 1 file changed, 27 insertions(+), 59 deletions(-) diff --git a/nmostesting/mocks/Node.py b/nmostesting/mocks/Node.py index 304c70d85..42aa0fdf7 100644 --- a/nmostesting/mocks/Node.py +++ b/nmostesting/mocks/Node.py @@ -389,6 +389,23 @@ def check_authorization(self, auth, path, scope, write=False): NODE_API = Blueprint('node_api', __name__) +# Authorization decorator +def check_authorization(func): + def wrapper(*args, **kwargs): + write = (request.method == 'PATCH') + authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-connection", + write=write) + if authorized is not True: + abort(authorized, description=error_message) + + return func(*args, **kwargs) + # Rename wrapper to allow decoration of decorator + wrapper.__name__ = func.__name__ + return wrapper + + @NODE_API.route('/x-nmos', methods=['GET'], strict_slashes=False) def x_nmos_root(): base_data = ['connection/'] @@ -397,52 +414,32 @@ def x_nmos_root(): @NODE_API.route('/x-nmos/connection', methods=['GET'], strict_slashes=False) +@check_authorization def connection_root(): base_data = ['v1.0/', 'v1.1/'] - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) - return make_response(Response(json.dumps(base_data), mimetype='application/json')) @NODE_API.route('/x-nmos/connection/', methods=['GET'], strict_slashes=False) +@check_authorization def version(version): base_data = ['bulk/', 'single/'] - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) - return make_response(Response(json.dumps(base_data), mimetype='application/json')) @NODE_API.route('/x-nmos/connection//single', methods=['GET'], strict_slashes=False) +@check_authorization def single(version): base_data = ['senders/', 'receivers/'] - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) - return make_response(Response(json.dumps(base_data), mimetype='application/json')) @NODE_API.route('/x-nmos/connection//single//', methods=["GET"], strict_slashes=False) +@check_authorization def resources(version, resource): - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) - if resource == 'senders': base_data = [r + '/' for r in [*NODE.senders]] elif resource == 'receivers': @@ -452,13 +449,8 @@ def resources(version, resource): @NODE_API.route('/x-nmos/connection//single//', methods=["GET"], strict_slashes=False) +@check_authorization def connection(version, resource, resource_id): - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) - if resource != 'senders' and resource != 'receivers': abort(404) @@ -494,13 +486,8 @@ def _get_constraints(resource): @NODE_API.route('/x-nmos/connection//single///constraints', methods=["GET"], strict_slashes=False) +@check_authorization def constraints(version, resource, resource_id): - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) - base_data = [_get_constraints(resource)] return make_response(Response(json.dumps(base_data), mimetype='application/json')) @@ -532,6 +519,7 @@ def _check_constraint(constraint, transport_param): @NODE_API.route('/x-nmos/connection//single///staged', methods=["GET", "PATCH"], strict_slashes=False) +@check_authorization def staged(version, resource, resource_id): """ GET returns current staged data for given resource @@ -539,14 +527,6 @@ def staged(version, resource, resource_id): activating a connection without staging or deactivating an active connection Updates data then POSTs updated resource to registry """ - write = (request.method == 'PATCH') - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection", - write=write) - if authorized is not True: - abort(authorized, description=error_message) - # Track requests NODE.staged_requests.append({'method': request.method, 'resource': resource, 'resource_id': resource_id, 'data': request.get_json(silent=True)}) @@ -583,12 +563,8 @@ def staged(version, resource, resource_id): @NODE_API.route('/x-nmos/connection//single///active', methods=["GET"], strict_slashes=False) +@check_authorization def active(version, resource, resource_id): - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) try: if resource == 'senders': base_data = NODE.senders[resource_id]['activations']['active'] @@ -602,12 +578,8 @@ def active(version, resource, resource_id): @NODE_API.route('/x-nmos/connection//single///transporttype', methods=["GET"], strict_slashes=False) +@check_authorization def transport_type(version, resource, resource_id): - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) # TODO fetch from resource info base_data = "urn:x-nmos:transport:rtp" @@ -661,12 +633,8 @@ def node_sdp(media_type, media_subtype): @NODE_API.route('/x-nmos/connection//single///transportfile', methods=["GET"], strict_slashes=False) +@check_authorization def transport_file(version, resource, resource_id): - authorized, error_message = NODE.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-connection") - if authorized is not True: - abort(authorized, description=error_message) # GET should either redirect to the location of the transport file or return it directly try: if resource == 'senders': From 0ab67ed799f0f5e091f2f67efa59ce01398f52c2 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Mon, 6 Oct 2025 11:31:54 +0100 Subject: [PATCH 11/12] Authorization decorator for mock Registry --- nmostesting/mocks/Registry.py | 128 ++++++++-------------------------- 1 file changed, 30 insertions(+), 98 deletions(-) diff --git a/nmostesting/mocks/Registry.py b/nmostesting/mocks/Registry.py index 698bdf59e..a06acad3f 100644 --- a/nmostesting/mocks/Registry.py +++ b/nmostesting/mocks/Registry.py @@ -325,28 +325,35 @@ def _close_subscription_websockets(self): REGISTRY_API = Blueprint('registry_api', __name__) +# Authorization decorator +def check_enabled_and_authorization(func): + def wrapper(*args, **kwargs): + registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] + if not registry.enabled: + abort(503) + authorized, error_message = registry.check_authorization(PRIMARY_AUTH, + request.path, + scope="x-nmos-registration") + if authorized is not True: + abort(authorized, description=error_message) + + return func(*args, **kwargs) + # Rename wrapper to allow decoration of decorator + wrapper.__name__ = func.__name__ + return wrapper + + @REGISTRY_API.route('/x-nmos', methods=["GET"], strict_slashes=False) +@check_enabled_and_authorization def x_nmos(): - registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - base_data = ['query/', 'registration/'] return Response(json.dumps(base_data), mimetype='application/json') @REGISTRY_API.route('/x-nmos/registration', methods=["GET"], strict_slashes=False) +@check_enabled_and_authorization def registration_root(): - registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-registration") - if authorized is not True: - abort(authorized, description=error_message) - base_data = [version + '/' for version in SPECIFICATIONS["is-04"]["versions"]] return Response(json.dumps(base_data), mimetype='application/json') @@ -354,16 +361,8 @@ def registration_root(): # IS-04 resources @REGISTRY_API.route('/x-nmos/registration/', methods=["GET"], strict_slashes=False) +@check_enabled_and_authorization def base_resource(version): - registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-registration") - - if authorized is not True: - abort(authorized, description=error_message) base_data = ["resource/", "health/"] # Using json.dumps to support older Flask versions http://flask.pocoo.org/docs/1.0/security/#json-security @@ -371,17 +370,9 @@ def base_resource(version): @REGISTRY_API.route('/x-nmos/registration//resource', methods=["POST"]) +@check_enabled_and_authorization def post_resource(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(500) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-registration", - write=True) - - if authorized is not True: - abort(authorized, description=error_message) if not registry.test_first_reg: registered = False try: @@ -405,17 +396,9 @@ def post_resource(version): @REGISTRY_API.route('/x-nmos/registration//resource//', methods=["DELETE"]) +@check_enabled_and_authorization def delete_resource(version, resource_type, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(500) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-registration", - write=True) - - if authorized is not True: - abort(authorized, description=error_message) resource_type = resource_type.rstrip("s") if not registry.test_first_reg: registered = False @@ -443,17 +426,9 @@ def delete_resource(version, resource_type, resource_id): @REGISTRY_API.route('/x-nmos/registration//health/nodes/', methods=["POST"]) +@check_enabled_and_authorization def heartbeat(version, node_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(500) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-registration", - write=True) - - if authorized is not True: - abort(authorized, description=error_message) if node_id in registry.get_resources()["node"]: # store raw request payload, in order to check for empty request bodies later try: @@ -466,34 +441,17 @@ def heartbeat(version, node_id): @REGISTRY_API.route('/x-nmos/query', methods=["GET"], strict_slashes=False) +@check_enabled_and_authorization def query_root(): - registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-query") - - if authorized is not True: - abort(authorized, description=error_message) - base_data = [version + '/' for version in SPECIFICATIONS["is-04"]["versions"]] return Response(json.dumps(base_data), mimetype='application/json') @REGISTRY_API.route('/x-nmos/query/', methods=["GET"], strict_slashes=False) +@check_enabled_and_authorization def query(version): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-query") - - if authorized is not True: - abort(authorized, description=error_message) - registry.requested_query_api_version = version base_data = ['devices/', 'flows/', 'nodes/', 'receivers/', 'senders/', 'sources/', 'subscriptions/'] @@ -510,17 +468,9 @@ def compare_resources(resource1, resource2): @REGISTRY_API.route('/x-nmos/query//', methods=["GET"], strict_slashes=False) +@check_enabled_and_authorization def query_resource(version, resource): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-query") - - if authorized is not True: - abort(authorized, description=error_message) - registry.requested_query_api_version = version # NOTE: Advanced Query Syntax (RQL) is not currently supported @@ -638,17 +588,9 @@ def query_resource(version, resource): @REGISTRY_API.route('/x-nmos/query///', methods=['GET'], strict_slashes=False) +@check_enabled_and_authorization def get_resource(version, resource, resource_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-query") - - if authorized is not True: - abort(authorized, description=error_message) - registry.requested_query_api_version = version registry.query_api_called = True @@ -666,19 +608,9 @@ def get_resource(version, resource, resource_id): @REGISTRY_API.route('/x-nmos/query//subscriptions', methods=["POST"]) +@check_enabled_and_authorization def post_subscription(version): - registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-query", - write=True) - - if authorized is not True: - abort(authorized, description=error_message) - registry.requested_query_api_version = version subscription_request = request.json @@ -714,8 +646,8 @@ def post_subscription(version): @REGISTRY_API.route('/x-nmos/query//subscriptions/', methods=["DELETE"]) +@check_enabled_and_authorization def delete_subscription(version, subscription_id): - registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] if not registry.enabled: abort(503) From 969071d981547ae59bb44faa4bda363d868e9403 Mon Sep 17 00:00:00 2001 From: jonathan-r-thorpe Date: Mon, 6 Oct 2025 15:22:11 +0100 Subject: [PATCH 12/12] Remove redundant authorization check code --- nmostesting/mocks/Registry.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/nmostesting/mocks/Registry.py b/nmostesting/mocks/Registry.py index a06acad3f..401f5db6e 100644 --- a/nmostesting/mocks/Registry.py +++ b/nmostesting/mocks/Registry.py @@ -649,16 +649,6 @@ def post_subscription(version): @check_enabled_and_authorization def delete_subscription(version, subscription_id): registry = REGISTRIES[flask.current_app.config["REGISTRY_INSTANCE"]] - if not registry.enabled: - abort(503) - authorized, error_message = registry.check_authorization(PRIMARY_AUTH, - request.path, - scope="x-nmos-query", - write=True) - - if authorized is not True: - abort(authorized, description=error_message) - registry.requested_query_api_version = version try: