diff --git a/api/app/rbac_roles.py b/api/app/rbac_roles.py index cbae105..3af75a5 100644 --- a/api/app/rbac_roles.py +++ b/api/app/rbac_roles.py @@ -6,12 +6,13 @@ "custom", } +DENIED_CREATOR_ROLES = {"viewer"} + DB_ROLE_BY_RBAC_ROLE = { "viewer": "user", "editor": "user", "obs_manager": "sensor", "sensor": "sensor", - # Custom policies still require baseline schema/table permissions. "custom": "user", } @@ -28,3 +29,9 @@ def validate_rbac_role(role: str) -> str: def get_db_role_for_rbac(role: str) -> str: return DB_ROLE_BY_RBAC_ROLE[validate_rbac_role(role)] + + +def check_create_permission(role) -> bool: + if role is None: + return True + return role.lower() not in DENIED_CREATOR_ROLES diff --git a/api/app/v1/endpoints/create/bulk_observation.py b/api/app/v1/endpoints/create/bulk_observation.py index 0f7e3d0..852cec0 100644 --- a/api/app/v1/endpoints/create/bulk_observation.py +++ b/api/app/v1/endpoints/create/bulk_observation.py @@ -15,7 +15,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w from app.oauth import get_current_user -from app.utils.utils import safe_parse_datetime +from app.rbac_roles import check_create_permission +from app.utils.utils import extract_iot_id, safe_parse_datetime from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from asyncpg.types import Range @@ -84,6 +85,18 @@ async def bulk_observations( current_user=user, pgpool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: async with pgpool.acquire() as conn: async with conn.transaction(): diff --git a/api/app/v1/endpoints/create/data_array_observation.py b/api/app/v1/endpoints/create/data_array_observation.py index a059017..e2f883f 100644 --- a/api/app/v1/endpoints/create/data_array_observation.py +++ b/api/app/v1/endpoints/create/data_array_observation.py @@ -17,12 +17,16 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w +from app.rbac_roles import check_create_permission +from app.v1.endpoints.functions import set_role from app.utils.utils import ( build_self_link, check_iot_id_in_payload, check_missing_properties, handle_datetime_fields, handle_result_field, + build_self_link, + extract_iot_id, ) from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError @@ -99,6 +103,18 @@ async def data_array_observation( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: response_urls = [] diff --git a/api/app/v1/endpoints/create/datastream.py b/api/app/v1/endpoints/create/datastream.py index 47a6ed3..66178b4 100644 --- a/api/app/v1/endpoints/create/datastream.py +++ b/api/app/v1/endpoints/create/datastream.py @@ -14,7 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import require_json_content_type, validate_payload_keys +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -67,6 +68,8 @@ if AUTHORIZATION: ALLOWED_KEYS.append("Network") +DENIED_CREATOR_ROLES = ["viewer"] + @v1.api_route( "/Datastreams", @@ -83,6 +86,18 @@ async def create_datastream( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -91,6 +106,8 @@ async def create_datastream( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) commit_id = await set_commit( @@ -159,6 +176,18 @@ async def create_datastream_for_thing( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -172,6 +201,8 @@ async def create_datastream_for_thing( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) commit_id = await set_commit( @@ -240,6 +271,18 @@ async def create_datastream_for_sensor( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -253,6 +296,8 @@ async def create_datastream_for_sensor( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) commit_id = await set_commit( @@ -324,12 +369,24 @@ async def create_datastream_for_observed_property( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) if not observed_property_id: raise Exception("Observed Property ID is required.") - + payload["ObservedProperty"] = {"@iot.id": observed_property_id} validate_payload_keys(payload, ALLOWED_KEYS) @@ -337,6 +394,8 @@ async def create_datastream_for_observed_property( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) commit_id = await set_commit( diff --git a/api/app/v1/endpoints/create/feature_of_interest.py b/api/app/v1/endpoints/create/feature_of_interest.py index 51f949f..dc245fc 100644 --- a/api/app/v1/endpoints/create/feature_of_interest.py +++ b/api/app/v1/endpoints/create/feature_of_interest.py @@ -14,11 +14,7 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import ( - require_json_content_type, - validate_payload_keys, - validate_required_keys, -) +from app.utils.utils import validate_payload_keys, validate_required_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -56,6 +52,7 @@ ] REQUIRED_KEYS = ["name", "encodingType", "feature"] +DENIED_CREATOR_ROLES = ["viewer"] @v1.api_route( @@ -73,6 +70,18 @@ async def create_feature_of_interest( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -82,6 +91,8 @@ async def create_feature_of_interest( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) commit_id = await set_commit( diff --git a/api/app/v1/endpoints/create/historical_location.py b/api/app/v1/endpoints/create/historical_location.py index 8ed9ae6..d7c00a5 100644 --- a/api/app/v1/endpoints/create/historical_location.py +++ b/api/app/v1/endpoints/create/historical_location.py @@ -14,7 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import require_json_content_type, validate_payload_keys +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -59,6 +60,18 @@ async def create_historical_location( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -68,7 +81,7 @@ async def create_historical_location( async with connection.transaction(): if current_user is not None: await set_role(connection, current_user) - + commit_id = await set_commit( connection, commit_message, current_user ) @@ -127,6 +140,18 @@ async def create_historical_location_for_thing( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) diff --git a/api/app/v1/endpoints/create/location.py b/api/app/v1/endpoints/create/location.py index 6b9c690..5fc940f 100644 --- a/api/app/v1/endpoints/create/location.py +++ b/api/app/v1/endpoints/create/location.py @@ -14,11 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import ( - require_json_content_type, - validate_payload_keys, - validate_required_keys, -) +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, validate_required_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -73,6 +70,18 @@ async def create_location( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -136,6 +145,18 @@ async def create_location_for_thing( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) diff --git a/api/app/v1/endpoints/create/network.py b/api/app/v1/endpoints/create/network.py index ef0a8a7..88c445a 100644 --- a/api/app/v1/endpoints/create/network.py +++ b/api/app/v1/endpoints/create/network.py @@ -13,11 +13,8 @@ # limitations under the License. from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import ( - require_json_content_type, - validate_payload_keys, - validate_required_keys, -) +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, validate_required_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -63,6 +60,18 @@ async def create_network( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) diff --git a/api/app/v1/endpoints/create/observation.py b/api/app/v1/endpoints/create/observation.py index 26facdb..b5418e9 100644 --- a/api/app/v1/endpoints/create/observation.py +++ b/api/app/v1/endpoints/create/observation.py @@ -14,7 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import require_json_content_type, validate_payload_keys +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError, UniqueViolationError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -70,6 +71,18 @@ async def create_observation( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -149,12 +162,24 @@ async def create_observation_for_datastream( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) if not datastream_id: raise Exception("Datastream ID not provided") - + validate_payload_keys(payload, ALLOWED_KEYS) async with pool.acquire() as connection: @@ -211,7 +236,7 @@ async def create_observation_for_datastream( @v1.api_route( - "/FeaturesOfInterest({feature_of_interest_id})/Observations", + "/FeaturesOfInterest({foi_id})/Observations", methods=["POST"], tags=["Observations"], summary="Create a new Observation for a FeatureOfInterest", @@ -220,16 +245,28 @@ async def create_observation_for_datastream( ) async def create_observation_for_feature_of_interest( request: Request, - feature_of_interest_id: int, - payload: dict = Body(example=PAYLOAD_EXAMPLE), + foi_id: int, + payload: dict = Body(examples={"default": {"value": PAYLOAD_EXAMPLE}}), commit_message=message, current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) - if not feature_of_interest_id: + if not foi_id: raise Exception("FeatureOfInterest ID not provided") validate_payload_keys(payload, ALLOWED_KEYS) @@ -248,7 +285,7 @@ async def create_observation_for_feature_of_interest( _, header = await insert_observation_entity( connection, payload, - feature_of_interest_id=feature_of_interest_id, + feature_of_interest_id=foi_id, commit_id=commit_id, ) diff --git a/api/app/v1/endpoints/create/observed_property.py b/api/app/v1/endpoints/create/observed_property.py index 2b888e4..2e52b7b 100644 --- a/api/app/v1/endpoints/create/observed_property.py +++ b/api/app/v1/endpoints/create/observed_property.py @@ -14,11 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import ( - require_json_content_type, - validate_payload_keys, - validate_required_keys, -) +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, validate_required_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -54,6 +51,7 @@ ] REQUIRED_KEYS = ["name", "definition", "description"] +DENIED_CREATOR_ROLES = ["viewer"] @v1.api_route( @@ -71,6 +69,18 @@ async def create_observed_property( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -80,8 +90,10 @@ async def create_observed_property( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) - + commit_id = await set_commit( connection, commit_message, current_user ) diff --git a/api/app/v1/endpoints/create/sensor.py b/api/app/v1/endpoints/create/sensor.py index 672e7d8..bc6ed42 100644 --- a/api/app/v1/endpoints/create/sensor.py +++ b/api/app/v1/endpoints/create/sensor.py @@ -14,11 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import ( - require_json_content_type, - validate_payload_keys, - validate_required_keys, -) +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, validate_required_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -56,6 +53,7 @@ ] REQUIRED_KEYS = ["name", "encodingType", "metadata"] +DENIED_CREATOR_ROLES = ["viewer"] @v1.api_route( @@ -73,6 +71,18 @@ async def create_sensor( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -82,6 +92,8 @@ async def create_sensor( async with pool.acquire() as connection: async with connection.transaction(): if current_user is not None: + if current_user["role"] in DENIED_CREATOR_ROLES: + raise InsufficientPrivilegeError await set_role(connection, current_user) commit_id = await set_commit( diff --git a/api/app/v1/endpoints/create/thing.py b/api/app/v1/endpoints/create/thing.py index ac6cb14..61280f0 100644 --- a/api/app/v1/endpoints/create/thing.py +++ b/api/app/v1/endpoints/create/thing.py @@ -14,11 +14,8 @@ from app import AUTHORIZATION, POSTGRES_PORT_WRITE, VERSIONING from app.db.asyncpg_db import get_pool, get_pool_w -from app.utils.utils import ( - require_json_content_type, - validate_payload_keys, - validate_required_keys, -) +from app.rbac_roles import check_create_permission +from app.utils.utils import validate_payload_keys, validate_required_keys, require_json_content_type from app.v1.endpoints.functions import set_role from asyncpg.exceptions import InsufficientPrivilegeError from fastapi import APIRouter, Body, Depends, Header, Request, status @@ -71,6 +68,18 @@ async def create_thing( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) @@ -134,6 +143,18 @@ async def create_thing_for_location( current_user=user, pool=Depends(get_pool_w) if POSTGRES_PORT_WRITE else Depends(get_pool), ): + if current_user is not None: + user_role = current_user.get("role", "") + if not check_create_permission(user_role): + return JSONResponse( + status_code=status.HTTP_403_FORBIDDEN, + content={ + "code": 403, + "type": "error", + "message": "Insufficient privileges.", + }, + ) + try: require_json_content_type(request) diff --git a/api/tests/test_create_sensor_rbac.py b/api/tests/test_create_sensor_rbac.py new file mode 100644 index 0000000..8bbcd28 --- /dev/null +++ b/api/tests/test_create_sensor_rbac.py @@ -0,0 +1,103 @@ +"""Regression tests for RBAC on POST /Sensors.""" + +import os +import sys +from contextlib import asynccontextmanager +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +# Ensure api/ is on sys.path so 'app' resolves to api/app +API_DIR = str(Path(__file__).resolve().parents[1]) +if API_DIR not in sys.path: + sys.path.insert(0, API_DIR) + +# Patch env vars before importing app modules +os.environ.setdefault("ISTSOS_ADMIN", "admin") +os.environ.setdefault("ISTSOS_ADMIN_PASSWORD", "secret") +os.environ.setdefault("POSTGRES_HOST", "localhost") +os.environ.setdefault("POSTGRES_PORT", "5432") +os.environ.setdefault("POSTGRES_DB", "istsos") +os.environ.setdefault("SECRET_KEY", "test_secret_key_1234567890") +os.environ.setdefault("ALGORITHM", "HS256") + +from app.v1.endpoints.create import sensor as sensor_endpoint # noqa: E402 + + +class _DummyRequest: + def __init__(self): + self.headers = {"content-type": "application/json"} + + +class _FakeConnection: + @asynccontextmanager + async def transaction(self): + yield + + +class _FakePool: + def __init__(self): + self.connection = _FakeConnection() + + @asynccontextmanager + async def acquire(self): + yield self.connection + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_sensor_denies_viewer_role(): + """Viewer must not be able to create sensors.""" + pool = _FakePool() + + with patch.object(sensor_endpoint, "set_role", new=AsyncMock()) as mock_set_role, patch.object( + sensor_endpoint, "set_commit", new=AsyncMock(return_value=1) + ), patch.object( + sensor_endpoint, + "insert_sensor_entity", + new=AsyncMock(return_value=(1, "/Sensors(1)")), + ): + response = await sensor_endpoint.create_sensor( + request=_DummyRequest(), + payload={ + "name": "sensor name 1", + "encodingType": "application/pdf", + "metadata": "Light flux sensor", + }, + commit_message="rbac test", + current_user={"id": 2, "username": "viewer_user", "role": "viewer", "uri": "u"}, + pool=pool, + ) + + assert response.status_code == 401 + assert b"Insufficient privileges" in response.body + mock_set_role.assert_not_called() + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_sensor_allows_editor_role(): + """Editor should still be able to create sensors.""" + pool = _FakePool() + + with patch.object(sensor_endpoint, "set_role", new=AsyncMock()) as mock_set_role, patch.object( + sensor_endpoint, "set_commit", new=AsyncMock(return_value=1) + ), patch.object( + sensor_endpoint, + "insert_sensor_entity", + new=AsyncMock(return_value=(1, "/Sensors(1)")), + ): + response = await sensor_endpoint.create_sensor( + request=_DummyRequest(), + payload={ + "name": "sensor name 1", + "encodingType": "application/pdf", + "metadata": "Light flux sensor", + }, + commit_message="rbac test", + current_user={"id": 3, "username": "editor_user", "role": "editor", "uri": "u"}, + pool=pool, + ) + + assert response.status_code == 201 + assert response.headers.get("location") == "/Sensors(1)" + mock_set_role.assert_called_once() diff --git a/api/tests/test_create_sibling_rbac.py b/api/tests/test_create_sibling_rbac.py new file mode 100644 index 0000000..6d6f094 --- /dev/null +++ b/api/tests/test_create_sibling_rbac.py @@ -0,0 +1,138 @@ +"""Regression tests for RBAC hardening on sibling create endpoints.""" + +import os +import sys +from contextlib import asynccontextmanager +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +# Ensure api/ is on sys.path so 'app' resolves to api/app +API_DIR = str(Path(__file__).resolve().parents[1]) +if API_DIR not in sys.path: + sys.path.insert(0, API_DIR) + +# Patch env vars before importing app modules +os.environ.setdefault("ISTSOS_ADMIN", "admin") +os.environ.setdefault("ISTSOS_ADMIN_PASSWORD", "secret") +os.environ.setdefault("POSTGRES_HOST", "localhost") +os.environ.setdefault("POSTGRES_PORT", "5432") +os.environ.setdefault("POSTGRES_DB", "istsos") +os.environ.setdefault("SECRET_KEY", "test_secret_key_1234567890") +os.environ.setdefault("ALGORITHM", "HS256") + +from app.v1.endpoints.create import datastream as datastream_endpoint # noqa: E402 +from app.v1.endpoints.create import observed_property as observed_property_endpoint # noqa: E402 +from app.v1.endpoints.create import feature_of_interest as foi_endpoint # noqa: E402 + + +class _DummyRequest: + def __init__(self): + self.headers = {"content-type": "application/json"} + + +class _FakeConnection: + @asynccontextmanager + async def transaction(self): + yield + + +class _FakePool: + def __init__(self): + self.connection = _FakeConnection() + + @asynccontextmanager + async def acquire(self): + yield self.connection + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_datastream_denies_viewer_role(): + pool = _FakePool() + + with patch.object(datastream_endpoint, "set_role", new=AsyncMock()) as mock_set_role, patch.object( + datastream_endpoint, "set_commit", new=AsyncMock(return_value=1) + ), patch.object( + datastream_endpoint, + "insert_datastream_entity", + new=AsyncMock(return_value=(1, "/Datastreams(1)")), + ): + response = await datastream_endpoint.create_datastream( + request=_DummyRequest(), + payload={ + "name": "datastream 1", + "description": "d", + "unitOfMeasurement": {"name": "Lumen", "symbol": "lm", "definition": "u"}, + "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement", + "Thing": {"@iot.id": 1}, + "Sensor": {"@iot.id": 1}, + "ObservedProperty": {"@iot.id": 1}, + }, + commit_message="rbac test", + current_user={"id": 2, "username": "viewer_user", "role": "viewer", "uri": "u"}, + pool=pool, + ) + + assert response.status_code == 401 + assert b"Insufficient privileges" in response.body + mock_set_role.assert_not_called() + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_observed_property_allows_editor_role(): + pool = _FakePool() + + with patch.object( + observed_property_endpoint, "set_role", new=AsyncMock() + ) as mock_set_role, patch.object( + observed_property_endpoint, "set_commit", new=AsyncMock(return_value=1) + ), patch.object( + observed_property_endpoint, + "insert_observed_property_entity", + new=AsyncMock(return_value=(1, "/ObservedProperties(1)")), + ): + response = await observed_property_endpoint.create_observed_property( + request=_DummyRequest(), + payload={ + "name": "Luminous Flux", + "definition": "http://www.qudt.org/qudt/owl/1.0.0/quantity/Instances.html/LuminousFlux", + "description": "observedProperty 1", + }, + commit_message="rbac test", + current_user={"id": 3, "username": "editor_user", "role": "editor", "uri": "u"}, + pool=pool, + ) + + assert response.status_code == 201 + assert response.headers.get("location") == "/ObservedProperties(1)" + mock_set_role.assert_called_once() + + +@pytest.mark.asyncio(loop_scope="function") +async def test_create_feature_of_interest_denies_viewer_role(): + pool = _FakePool() + + with patch.object(foi_endpoint, "set_role", new=AsyncMock()) as mock_set_role, patch.object( + foi_endpoint, "set_commit", new=AsyncMock(return_value=1) + ), patch.object( + foi_endpoint, + "insert_feature_of_interest_entity", + new=AsyncMock(return_value=(1, "/FeaturesOfInterest(1)")), + ): + response = await foi_endpoint.create_feature_of_interest( + request=_DummyRequest(), + payload={ + "name": "A weather station.", + "description": "A weather station.", + "encodingType": "application/vnd.geo+json", + "feature": {"type": "Point", "coordinates": [-114.05, 51.05]}, + }, + commit_message="rbac test", + current_user={"id": 2, "username": "viewer_user", "role": "viewer", "uri": "u"}, + pool=pool, + ) + + assert response.status_code == 401 + assert b"Insufficient privileges" in response.body + mock_set_role.assert_not_called() diff --git a/api/tests/test_rbac_roles.py b/api/tests/test_rbac_roles.py index 7645fe8..992426d 100644 --- a/api/tests/test_rbac_roles.py +++ b/api/tests/test_rbac_roles.py @@ -1,6 +1,11 @@ import unittest -from app.rbac_roles import get_db_role_for_rbac, validate_rbac_role +from app.rbac_roles import ( + DENIED_CREATOR_ROLES, + check_create_permission, + get_db_role_for_rbac, + validate_rbac_role, +) class RbacRolesTestCase(unittest.TestCase): @@ -21,5 +26,30 @@ def test_get_db_role_mapping(self): self.assertEqual(get_db_role_for_rbac("custom"), "user") +class CheckCreatePermissionTestCase(unittest.TestCase): + def test_denied_creator_roles_includes_viewer(self): + self.assertIn("viewer", DENIED_CREATOR_ROLES) + + def test_viewer_denied_create(self): + self.assertFalse(check_create_permission("viewer")) + self.assertFalse(check_create_permission("VIEWER")) + + def test_editor_allowed_create(self): + self.assertTrue(check_create_permission("editor")) + self.assertTrue(check_create_permission("EDITOR")) + + def test_obs_manager_allowed_create(self): + self.assertTrue(check_create_permission("obs_manager")) + + def test_sensor_allowed_create(self): + self.assertTrue(check_create_permission("sensor")) + + def test_custom_allowed_create(self): + self.assertTrue(check_create_permission("custom")) + + def test_none_role_allowed(self): + self.assertTrue(check_create_permission(None)) + + if __name__ == "__main__": unittest.main()