diff --git a/app/api/v2/managers/ability_api_manager.py b/app/api/v2/managers/ability_api_manager.py index 8dc6be270..80437ea90 100644 --- a/app/api/v2/managers/ability_api_manager.py +++ b/app/api/v2/managers/ability_api_manager.py @@ -8,16 +8,21 @@ from app.api.v2.managers.base_api_manager import BaseApiManager from app.api.v2.responses import JsonHttpBadRequest from app.objects.c_ability import AbilitySchema +from app.service.file_svc import FileSvc from app.utility.base_world import BaseWorld class AbilityApiManager(BaseApiManager): + _EXECUTOR_LABEL_PATTERN = re.compile(r'^[a-zA-Z0-9_.-]+$') + def __init__(self, data_svc, file_svc): super().__init__(data_svc=data_svc, file_svc=file_svc) async def create_on_disk_object(self, data: dict, access: dict, ram_key: str, id_property: str, obj_class: type): self._validate_ability_data(create=True, data=data) - obj_id = data.get('id') + obj_id = data['id'] + if self.find_object(ram_key, {id_property: obj_id}): + raise JsonHttpBadRequest(f'Ability with given id already exists: {obj_id}') file_path = self._create_ability_filepath(data.get('tactic'), obj_id) allowed = self._get_allowed_from_access(access) await self._save_and_reload_object(file_path, data, obj_class, allowed) @@ -51,39 +56,136 @@ async def update_on_disk_object(self, obj: Any, data: dict, ram_key: str, id_pro return next(self.find_objects(ram_key, {id_property: obj_id})) def _validate_ability_data(self, create: bool, data: dict): - # Correct ability_id key for ability file saving. - data['id'] = data.pop('ability_id', '') + # Normalize ability ID: prefer explicit 'ability_id' if provided, otherwise preserve any existing 'id'. + ability_id = None + if 'ability_id' in data: + ability_id = data.pop('ability_id') + elif 'id' in data: + ability_id = data.get('id') + + # Sanitize supplied IDs before assigning them internally. If no ID is supplied during creation, + # generate one instead. + if ability_id in (None, '') and create: + data['id'] = str(uuid.uuid4()) + else: + data['id'] = BaseApiManager._sanitize_id(ability_id) # If a new ability is being created, ensure required fields present. if create: - # Set ability ID if undefined - if not data['id']: - data['id'] = str(uuid.uuid4()) if not data.get('name'): raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing name') - if 'tactic' not in data: + if not data.get('tactic'): raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing tactic') - if not data.get('executors'): + if not (data.get('executors') or data.get('platforms')): raise JsonHttpBadRequest(f'Cannot create ability {data["id"]}: at least one executor required') - # Validate ID, used for file creation - validator = re.compile(r'^[a-zA-Z0-9-_]+$') - if 'id' in data and not validator.match(data['id']): - raise JsonHttpBadRequest(f'Invalid ability ID {data["id"]}. IDs can only contain ' - 'alphanumeric characters, hyphens, and underscores.') # Validate tactic, used for directory creation, lower case if present + validator = re.compile(r'^[a-zA-Z0-9-_]+$') if 'tactic' in data: - if not validator.match(data['tactic']): + if not isinstance(data['tactic'], str) or not validator.match(data['tactic']): raise JsonHttpBadRequest(f'Invalid ability tactic {data["tactic"]}. Tactics can only contain ' 'alphanumeric characters, hyphens, and underscores.') data['tactic'] = data['tactic'].lower() - if 'executors' in data and not data.get('executors'): + if 'executors' in data and not data.get('executors') and 'platforms' not in data: raise JsonHttpBadRequest(f'Cannot create ability {data["id"]}: at least one executor required') if 'name' in data and not data.get('name'): raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing name') + self._validate_ability_privilege(data) + self._validate_ability_executors(data) + + def _validate_ability_privilege(self, data: dict): + if 'privilege' not in data: + return + + privilege = data.get('privilege') + if privilege is None or privilege == '': + return + if not isinstance(privilege, str): + raise JsonHttpBadRequest(f'Invalid ability privilege {privilege}. Privilege must be one of: ' + 'User, Elevated.') + + allowed_privileges = {privilege.name for privilege in BaseWorld.Privileges} + if privilege not in allowed_privileges: + raise JsonHttpBadRequest(f'Invalid ability privilege {privilege}. Privilege must be one of: ' + 'User, Elevated.') + + def _validate_ability_executors(self, data: dict): + if data.get('executors') is not None: + self._validate_executor_list(data['executors']) + if data.get('platforms') is not None: + self._validate_platform_executor_map(data['platforms']) + + def _validate_executor_list(self, executors): + if not isinstance(executors, list): + raise JsonHttpBadRequest('Invalid ability executors. Executors must be a list.') + + for index, executor in enumerate(executors): + if not isinstance(executor, dict): + raise JsonHttpBadRequest(f'Invalid ability executor at index {index}. Executor must be a dictionary.') + self._validate_executor_label(executor.get('name'), f'executor[{index}].name') + self._validate_executor_label(executor.get('platform'), f'executor[{index}].platform') + self._validate_payloads(executor.get('payloads'), f'executor[{index}].payloads') + + def _validate_platform_executor_map(self, platforms): + if not isinstance(platforms, dict): + raise JsonHttpBadRequest('Invalid ability platforms. Platforms must be a dictionary.') + + for platform_names, platform_executors in platforms.items(): + for platform_name in self._split_and_validate_labels(platform_names, 'platform'): + if not isinstance(platform_executors, dict): + raise JsonHttpBadRequest(f'Invalid ability platform {platform_name}. Platform executors must be ' + 'a dictionary.') + for executor_names, executor in platform_executors.items(): + for executor_name in self._split_and_validate_labels(executor_names, 'executor'): + if not isinstance(executor, dict): + raise JsonHttpBadRequest(f'Invalid ability executor {executor_name} for platform ' + f'{platform_name}. Executor must be a dictionary.') + self._validate_payloads(executor.get('payloads'), f'platforms.{platform_name}.payloads') + + @classmethod + def _split_and_validate_labels(cls, value, field_name): + if not isinstance(value, str): + raise JsonHttpBadRequest(f'Invalid ability {field_name} {value}. {field_name.capitalize()} names must be ' + 'strings.') + + labels = [label.strip() for label in value.split(',')] + if not labels or any(not label for label in labels): + raise JsonHttpBadRequest(f'Invalid ability {field_name} {value}. {field_name.capitalize()} names cannot ' + 'be empty.') + + for label in labels: + cls._validate_executor_label(label, field_name) + return labels + + @classmethod + def _validate_executor_label(cls, value, field_name): + if not isinstance(value, str) or not value: + raise JsonHttpBadRequest(f'Invalid ability {field_name}. Executor and platform names must be non-empty ' + 'strings.') + if not cls._EXECUTOR_LABEL_PATTERN.match(value): + raise JsonHttpBadRequest(f'Invalid ability {field_name} {value}. Executor and platform names can only ' + 'contain alphanumeric characters, periods, hyphens, and underscores.') + + @staticmethod + def _validate_payloads(payloads, field_name): + if payloads is None: + return + if not isinstance(payloads, list): + raise JsonHttpBadRequest(f'Invalid ability {field_name}. Payloads must be a list.') + + for payload in payloads: + if not isinstance(payload, str): + raise JsonHttpBadRequest(f'Invalid ability payload {payload}. Payload names must be strings.') + safe_filename = FileSvc._validate_filename(payload) + if BaseWorld.is_uuid4(payload) and safe_filename: + continue + if not safe_filename: + raise JsonHttpBadRequest(f'Invalid ability payload {payload}. Payload names cannot contain path ' + 'separators, traversal sequences, null bytes, or unsafe characters.') + def _create_ability_filepath(self, tactic: str, obj_id: str): tactic_dir = os.path.join('data', 'abilities', tactic) if not os.path.exists(tactic_dir): diff --git a/app/objects/c_ability.py b/app/objects/c_ability.py index 188c133c0..00e325b54 100644 --- a/app/objects/c_ability.py +++ b/app/objects/c_ability.py @@ -34,17 +34,80 @@ class Meta: delete_payload = ma.fields.Bool(load_default=None) @ma.pre_load - def fix_id(self, data, **_): + def normalize_ability_file_fields(self, data, **_): + """ + Ensures that ability file fields are formatted correctly for processing + """ + if not isinstance(data, dict): + return data if 'id' in data: data['ability_id'] = data.pop('id') + if isinstance(data.get('technique'), dict): + technique = data.pop('technique') + data.setdefault('technique_id', technique.get('attack_id')) + data.setdefault('technique_name', technique.get('name')) + if 'platforms' in data and 'executors' not in data: + data['executors'] = self._platforms_to_executor_list(data.pop('platforms')) + if self._has_legacy_requirements(data.get('requirements')): + data['requirements'] = self._legacy_requirements_to_list(data['requirements']) return data @ma.post_load def build_ability(self, data, **kwargs): - if 'technique' in data: - data['technique_name'] = data.pop('technique') return None if kwargs.get('partial') is True else Ability(**data) + @staticmethod + def _platforms_to_executor_list(platforms): + """ + Translates legacy platform-structured YAML into caldera executor format + """ + executors = [] + if not isinstance(platforms, dict): + raise ma.ValidationError('Platforms must be a dictionary.', 'platforms') + for platform_names, platform_executors in platforms.items(): + if not isinstance(platform_executors, dict): + raise ma.ValidationError('Platform executors must be a dictionary.', 'platforms') + + platform_list = [name.strip() for name in str(platform_names).split(',')] + + for executor_names, executor_data in platform_executors.items(): + if not isinstance(executor_data, dict): + raise ma.ValidationError('Executor data must be a dictionary.', 'platforms') + executor = dict(executor_data) # make a dict of the data and fix up below + if isinstance(executor.get('cleanup'), str): + # cleanup actions should be in a list + executor['cleanup'] = [executor['cleanup']] + if isinstance(executor.get('parsers'), dict): + executor['parsers'] = [ + {'module': module, 'parserconfigs': parserconfigs} + for module, parserconfigs in executor['parsers'].items() + ] + + executor_list = [name.strip() for name in str(executor_names).split(',')] + executors.extend( + {**executor, 'platform': platform_name, 'name': executor_name} + for platform_name in platform_list + for executor_name in executor_list + ) + return executors + + @staticmethod + def _has_legacy_requirements(requirements): + return ( + isinstance(requirements, list) + and requirements + and isinstance(requirements[0], dict) + and 'relationship_match' not in requirements[0] + ) + + @staticmethod + def _legacy_requirements_to_list(requirements): + converted = [] + for requirement in requirements: + for module, relationship_match in requirement.items(): + converted.append({'module': module, 'relationship_match': relationship_match}) + return converted + class Ability(FirstClassObjectInterface, BaseObject): diff --git a/app/objects/secondclass/c_executor.py b/app/objects/secondclass/c_executor.py index 4a5970ea7..8c425a2cb 100644 --- a/app/objects/secondclass/c_executor.py +++ b/app/objects/secondclass/c_executor.py @@ -21,6 +21,18 @@ class ExecutorSchema(ma.Schema): variations = ma.fields.List(ma.fields.Nested(VariationSchema())) additional_info = ma.fields.Dict(keys=ma.fields.String(), values=ma.fields.String()) + @ma.validates_schema + def validate_required_executor_fields(self, data, **kwargs): + if kwargs.get('partial') is True: + return + + errors = {} + for field_name in ('name', 'platform'): + if not data.get(field_name): + errors[field_name] = ['Missing data for required field.'] + if errors: + raise ma.ValidationError(errors) + @ma.post_load def build_executor(self, data, **_): return Executor(**data) diff --git a/tests/api/v2/handlers/test_ability_upload.py b/tests/api/v2/handlers/test_ability_upload.py new file mode 100644 index 000000000..c1bed7dcf --- /dev/null +++ b/tests/api/v2/handlers/test_ability_upload.py @@ -0,0 +1,433 @@ +import copy +import os +import pytest + +from http import HTTPStatus + +from app.utility.base_service import BaseService + + +def ability_file_cleanup(tactic, ability_id): + file_path = f'data/abilities/{tactic}/{ability_id}.yml' + if os.path.exists(file_path): + try: + os.remove(file_path) + except OSError: + pass + + +def basic_platform_ability(identifier_key, identifier, name, description, tactic): + return { + identifier_key: identifier, + 'name': name, + 'description': description, + 'tactic': tactic, + 'technique': { + 'attack_id': 'T1083', + 'name': 'File and Directory Discovery' + }, + 'platforms': { + 'darwin': { + 'sh': { + 'command': 'ls #{host.system.path}' + } + }, + 'linux': { + 'sh': { + 'command': 'ls #{host.system.path}' + } + }, + 'windows': { + 'psh': { + 'command': 'dir #{host.system.path}' + } + } + }, + 'requirements': [ + { + 'plugins.stockpile.app.requirements.paw_provenance': [ + { + 'source': 'host.system.path' + } + ] + } + ] + } + + +def cleanup_payload_ability(payload): + ability_id = payload.get('id') or payload.get('ability_id') + tactic = payload.get('tactic') + if ability_id and tactic: + ability_file_cleanup(tactic, ability_id) + + +@pytest.fixture +def valid_ability_payload(): + yield basic_platform_ability( + 'id', + 'upload-test-001', + 'Uploaded Test Ability', + 'An ability uploaded via YAML file', + 'discovery' + ) + + ability_file_cleanup('discovery', 'upload-test-001') + + +@pytest.fixture +def valid_ability_payload_with_ability_id(): + yield basic_platform_ability( + 'ability_id', + 'upload-test-002', + 'Uploaded Test Ability 2', + 'An ability using ability_id key', + 'collection' + ) + + ability_file_cleanup('collection', 'upload-test-002') + + +@pytest.fixture +def new_executors_ability_payload(): + ability_id = 'upload-test-new-executors' + tactic = 'discovery' + ability_file_cleanup(tactic, ability_id) + yield { + 'id': ability_id, + 'repeatable': False, + 'name': 'New executors ability', + 'additional_info': { + 'cleanup': '' + }, + 'technique_name': 'File and Directory Discovery', + 'executors': [ + { + 'name': 'sh', + 'additional_info': {}, + 'variations': [], + 'platform': 'linux', + 'command': 'ls', + 'code': None, + 'language': None, + 'payloads': [], + 'timeout': 60, + 'parsers': [], + 'cleanup': [], + 'uploads': [], + 'build_target': None + }, + { + 'name': 'psh', + 'additional_info': {}, + 'variations': [], + 'platform': 'windows', + 'command': 'dir', + 'code': None, + 'language': None, + 'payloads': [], + 'timeout': 60, + 'parsers': [], + 'cleanup': [], + 'uploads': [], + 'build_target': None + } + ], + 'buckets': [], + 'technique_id': 'T1083', + 'delete_payload': True, + 'tactic': tactic, + 'description': 'Simple new-style ability payload.', + 'singleton': False, + 'plugin': '', + 'requirements': [], + 'privilege': '', + 'access': {} + } + ability_file_cleanup(tactic, ability_id) + + +class TestAbilityUploadApi: + + async def test_create_ability_from_old_platforms_yaml_style_payload(self, api_v2_client, api_cookies, + valid_ability_payload): + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=valid_ability_payload) + + assert resp.status == HTTPStatus.OK + result = await resp.json() + assert result['ability_id'] == 'upload-test-001' + assert result['name'] == 'Uploaded Test Ability' + assert result['tactic'] == 'discovery' + assert result['technique_id'] == 'T1083' + assert result['technique_name'] == 'File and Directory Discovery' + assert [ + (executor['platform'], executor['name'], executor['command']) + for executor in result['executors'] + ] == [ + ('darwin', 'sh', 'ls #{host.system.path}'), + ('linux', 'sh', 'ls #{host.system.path}'), + ('windows', 'psh', 'dir #{host.system.path}') + ] + assert os.path.exists('data/abilities/discovery/upload-test-001.yml') + + ability = (await BaseService.get_service('data_svc').locate( + 'abilities', {'ability_id': 'upload-test-001'} + ))[0] + assert ability.display == result + assert ability.requirements[0].module == 'plugins.stockpile.app.requirements.paw_provenance' + + async def test_create_ability_from_yaml_style_payload_with_ability_id( + self, api_v2_client, api_cookies, valid_ability_payload_with_ability_id + ): + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, + json=valid_ability_payload_with_ability_id) + + assert resp.status == HTTPStatus.OK + result = await resp.json() + assert result['ability_id'] == 'upload-test-002' + assert result['name'] == 'Uploaded Test Ability 2' + assert result['tactic'] == 'collection' + assert os.path.exists('data/abilities/collection/upload-test-002.yml') + + async def test_create_ability_from_new_executors_yaml_style(self, api_v2_client, api_cookies, + new_executors_ability_payload): + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, + json=new_executors_ability_payload) + + assert resp.status == HTTPStatus.OK + ability_data = await resp.json() + assert ability_data['ability_id'] == new_executors_ability_payload['id'] + assert ability_data['technique_id'] == new_executors_ability_payload['technique_id'] + assert ability_data['technique_name'] == new_executors_ability_payload['technique_name'] + assert [ + (executor['platform'], executor['name'], executor['command']) + for executor in ability_data['executors'] + ] == [('linux', 'sh', 'ls'), ('windows', 'psh', 'dir')] + + stored_ability = (await BaseService.get_service('data_svc').locate( + 'abilities', {'ability_id': new_executors_ability_payload['id']} + ))[0] + assert stored_ability.display == ability_data + + async def test_create_ability_sanitizes_upload_id_before_save( + self, api_v2_client, api_cookies, new_executors_ability_payload + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = '../upload test/sanitized-id!' + expected_id = 'uploadtestsanitized-id' + + try: + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.OK + ability_data = await resp.json() + assert ability_data['ability_id'] == expected_id + assert os.path.exists(f'data/abilities/{payload["tactic"]}/{expected_id}.yml') + finally: + ability_file_cleanup(payload['tactic'], expected_id) + + async def test_create_ability_without_id_generates_id( + self, api_v2_client, api_cookies, new_executors_ability_payload + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload.pop('id') + ability_id = None + + try: + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.OK + ability_data = await resp.json() + ability_id = ability_data['ability_id'] + assert ability_id + assert os.path.exists(f'data/abilities/{payload["tactic"]}/{ability_id}.yml') + finally: + if ability_id: + ability_file_cleanup(payload['tactic'], ability_id) + + async def test_create_ability_accepts_safe_payload_and_plugin_style_executor_names( + self, api_v2_client, api_cookies, new_executors_ability_payload + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = 'upload-test-plugin-executor-labels' + payload['executors'][0]['name'] = 'plugin.exec-1' + payload['executors'][0]['payloads'] = [ + 'safe-payload.ps1', + '766be199-7316-4b26-b3db-e272aaf7e0d4' + ] + + try: + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.OK + ability_data = await resp.json() + assert ability_data['executors'][0]['name'] == 'plugin.exec-1' + assert ability_data['executors'][0]['payloads'] == payload['executors'][0]['payloads'] + finally: + cleanup_payload_ability(payload) + + @pytest.mark.parametrize( + ('payload_name', 'suffix'), + [ + ('../evil.ps1', 'parent'), + ('payloads/evil.ps1', 'nested'), + ('/tmp/evil.ps1', 'absolute'), + ('evil\x00.ps1', 'null-byte'), + ('evil;rm.ps1', 'unsafe'), + ] + ) + async def test_create_ability_rejects_unsafe_payload_paths( + self, api_v2_client, api_cookies, new_executors_ability_payload, payload_name, suffix + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = f'upload-test-invalid-payload-{suffix}' + payload['executors'][0]['payloads'] = [payload_name] + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.BAD_REQUEST + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + @pytest.mark.parametrize( + ('executors', 'suffix'), + [ + (['sh'], 'non-dict'), + ([{'platform': 'linux', 'command': 'ls'}], 'missing-name'), + ([{'name': 'sh', 'command': 'ls'}], 'missing-platform'), + ([{'name': 'sh', 'platform': 'linux', 'command': 'ls', 'payloads': 'payload.ps1'}], 'bad-payloads'), + ] + ) + async def test_create_ability_rejects_schema_invalid_new_style_executors( + self, api_v2_client, api_cookies, new_executors_ability_payload, executors, suffix + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = f'upload-test-schema-invalid-executor-{suffix}' + payload['executors'] = executors + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.UNPROCESSABLE_ENTITY + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + @pytest.mark.parametrize( + ('executors', 'suffix'), + [ + ([{'name': 'sh/evil', 'platform': 'linux', 'command': 'ls'}], 'unsafe-name'), + ([{'name': 'sh', 'platform': 'lin ux', 'command': 'ls'}], 'unsafe-platform'), + ] + ) + async def test_create_ability_rejects_policy_invalid_new_style_executors( + self, api_v2_client, api_cookies, new_executors_ability_payload, executors, suffix + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = f'upload-test-policy-invalid-executor-{suffix}' + payload['executors'] = executors + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.BAD_REQUEST + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + @pytest.mark.parametrize( + ('platforms', 'suffix'), + [ + ({'linux': ['sh']}, 'platform-not-dict'), + ({'linux': {'sh': 'ls'}}, 'executor-not-dict'), + ] + ) + async def test_create_ability_rejects_schema_invalid_legacy_platform_executors( + self, api_v2_client, api_cookies, valid_ability_payload, platforms, suffix + ): + payload = copy.deepcopy(valid_ability_payload) + payload['id'] = f'upload-test-schema-invalid-platform-{suffix}' + payload['platforms'] = platforms + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.UNPROCESSABLE_ENTITY + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + @pytest.mark.parametrize( + ('platforms', 'suffix'), + [ + ({'lin/ux': {'sh': {'command': 'ls'}}}, 'unsafe-platform'), + ({'linux': {'sh/evil': {'command': 'ls'}}}, 'unsafe-executor'), + ] + ) + async def test_create_ability_rejects_policy_invalid_legacy_platform_executors( + self, api_v2_client, api_cookies, valid_ability_payload, platforms, suffix + ): + payload = copy.deepcopy(valid_ability_payload) + payload['id'] = f'upload-test-policy-invalid-platform-{suffix}' + payload['platforms'] = platforms + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.BAD_REQUEST + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + @pytest.mark.parametrize( + ('privilege', 'suffix'), + [ + (None, 'none'), + ('', 'empty'), + ('User', 'user'), + ('Elevated', 'elevated'), + ] + ) + async def test_create_ability_accepts_valid_privileges( + self, api_v2_client, api_cookies, new_executors_ability_payload, privilege, suffix + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = f'upload-test-valid-privilege-{suffix}' + payload['privilege'] = privilege + + try: + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.OK + finally: + cleanup_payload_ability(payload) + + @pytest.mark.parametrize( + ('privilege', 'suffix'), + [ + ('Admin', 'admin'), + ('root', 'root'), + ('elevated', 'lowercase-elevated'), + ] + ) + async def test_create_ability_rejects_policy_invalid_privileges( + self, api_v2_client, api_cookies, new_executors_ability_payload, privilege, suffix + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = f'upload-test-invalid-privilege-{suffix}' + payload['privilege'] = privilege + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.BAD_REQUEST + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + async def test_create_ability_rejects_schema_invalid_privilege_type( + self, api_v2_client, api_cookies, new_executors_ability_payload + ): + payload = copy.deepcopy(new_executors_ability_payload) + payload['id'] = 'upload-test-schema-invalid-privilege-non-string' + payload['privilege'] = 7 + + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, json=payload) + + assert resp.status == HTTPStatus.UNPROCESSABLE_ENTITY + assert not os.path.exists(f'data/abilities/{payload["tactic"]}/{payload["id"]}.yml') + + async def test_create_ability_from_yaml_style_payload_missing_required_fields(self, api_v2_client, api_cookies): + resp = await api_v2_client.post('/api/v2/abilities', cookies=api_cookies, + json={'description': 'no name, tactic, or executor configuration'}) + + assert resp.status == HTTPStatus.BAD_REQUEST + + async def test_unauthorized_create_ability_from_yaml_style_payload(self, api_v2_client, valid_ability_payload): + resp = await api_v2_client.post('/api/v2/abilities', json=valid_ability_payload) + + assert resp.status == HTTPStatus.UNAUTHORIZED