From fc38efa30d00dce982012a79224d483c2fb18a51 Mon Sep 17 00:00:00 2001 From: filiq Date: Thu, 10 Aug 2023 17:08:00 +0200 Subject: [PATCH 1/3] storage: add import autodetection --- sner/server/parser.py | 27 +++++++++++++++++++++ sner/server/storage/commands.py | 16 +++++++++--- tests/server/storage/test_commands.py | 9 +++++++ tests/server/storage/test_import_parsers.py | 15 ++++++++++++ 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/sner/server/parser.py b/sner/server/parser.py index 05e6bd60..2515da12 100644 --- a/sner/server/parser.py +++ b/sner/server/parser.py @@ -7,6 +7,7 @@ implement ParserBase interface. """ +import json from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime @@ -16,6 +17,7 @@ from littletable import Table as LittleTable import sner.plugin +from sner.lib import ZipFile, file_from_zip, is_zip REGISTERED_PARSERS = {} @@ -30,6 +32,31 @@ def load_parser_plugins(): REGISTERED_PARSERS[plugin_name] = getattr(module, 'ParserModule') +def auto_detect_parser(path): + """tries automatically detect parser""" + parser = None + + if is_zip(path): + with ZipFile(path) as fzip: + for fname in filter(lambda x: x == 'assignment.json', fzip.namelist()): + try: + parser = json.loads(file_from_zip(path, fname).decode('utf-8'))['config']['module'] + except KeyError: + pass + else: + output = Path(path).read_text(encoding='utf-8') + + # tries to detect the parser based on the output + if '' in output: + parser = 'nmap' + elif '' in output: + parser = 'nessus' + elif 'testssl.sh' in output: + parser = 'testssl' + + return parser + + class ParsedItemBase: # pylint: disable=too-few-public-methods """parsed items base object; shared functions""" diff --git a/sner/server/storage/commands.py b/sner/server/storage/commands.py index 4ca1a809..d04c7340 100644 --- a/sner/server/storage/commands.py +++ b/sner/server/storage/commands.py @@ -13,7 +13,7 @@ from sner.lib import format_host_address from sner.server.extensions import db -from sner.server.parser import REGISTERED_PARSERS +from sner.server.parser import REGISTERED_PARSERS, auto_detect_parser from sner.server.storage.core import StorageManager, vuln_export, vuln_report from sner.server.storage.models import Host, Service from sner.server.storage.vulnsearch import sync_vulnsearch @@ -35,16 +35,26 @@ def command(): def storage_import(path, parser, **kwargs): """import data""" - if parser not in REGISTERED_PARSERS: + is_auto_parser = parser == 'auto' + + if parser not in REGISTERED_PARSERS and not is_auto_parser: current_app.logger.error('no such parser') sys.exit(1) - parser_impl = REGISTERED_PARSERS[parser] for item in path: if not Path(item).is_file(): current_app.logger.warning(f'invalid path "{item}"') continue + if is_auto_parser: + parser = auto_detect_parser(item) + + if parser is None: + current_app.logger.error(f'parser was not automatically detected for the file: {item}') + continue + + parser_impl = REGISTERED_PARSERS[parser] + try: if kwargs.get('dry'): StorageManager.import_parsed_dry(parser_impl.parse_path(item)) diff --git a/tests/server/storage/test_commands.py b/tests/server/storage/test_commands.py index da6b4a3a..25845dcb 100644 --- a/tests/server/storage/test_commands.py +++ b/tests/server/storage/test_commands.py @@ -40,6 +40,15 @@ def test_import_command_dryrun(runner): assert 'new service:' in result.output assert 'new vuln:' in result.output + result = runner.invoke(command, ['import', '--dry', 'auto', 'tests/server/data/parser-jarm-job.zip']) + assert result.exit_code == 0 + assert 'new host:' in result.output + assert 'new service:' in result.output + assert 'new note:' in result.output + + result = runner.invoke(command, ['import', '--dry', 'auto', 'tests/server/data/parser-dummy-job.zip']) + assert result.exit_code == 0 + def test_flush_command(runner, service, vuln, note): # pylint: disable=unused-argument """flush storage database""" diff --git a/tests/server/storage/test_import_parsers.py b/tests/server/storage/test_import_parsers.py index f63d7877..843fb742 100644 --- a/tests/server/storage/test_import_parsers.py +++ b/tests/server/storage/test_import_parsers.py @@ -76,3 +76,18 @@ def test_import_command_nc(runner): host = Host.query.one() assert len(host.services) == 2 assert sorted([x.port for x in host.services]) == [21, 22] + +def test_import_command_auto(runner): + """test automatic detection of parser""" + + result = runner.invoke(command, ['import', 'auto', 'tests/server/data/parser-nmap-output.xml']) + assert result.exit_code == 0 + assert Host.query.count() == 1 + + result = runner.invoke(command, ['import', 'auto', 'tests/server/data/parser-nessus-simple.xml']) + assert result.exit_code == 0 + assert Host.query.count() == 2 + + result = runner.invoke(command, ['import', 'auto', 'tests/server/data/parser-six_dns_discover-job.zip']) + assert result.exit_code == 0 + assert Host.query.count() == 3 From edf299a307edce7a35770d899fff50ccc29a05d1 Mon Sep 17 00:00:00 2001 From: filiq Date: Thu, 10 Aug 2023 17:36:21 +0200 Subject: [PATCH 2/3] storage: lint and cover fix --- sner/server/parser.py | 2 -- tests/server/storage/test_import_parsers.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sner/server/parser.py b/sner/server/parser.py index 2515da12..49101058 100644 --- a/sner/server/parser.py +++ b/sner/server/parser.py @@ -51,8 +51,6 @@ def auto_detect_parser(path): parser = 'nmap' elif '' in output: parser = 'nessus' - elif 'testssl.sh' in output: - parser = 'testssl' return parser diff --git a/tests/server/storage/test_import_parsers.py b/tests/server/storage/test_import_parsers.py index 843fb742..1db7b8d3 100644 --- a/tests/server/storage/test_import_parsers.py +++ b/tests/server/storage/test_import_parsers.py @@ -77,6 +77,7 @@ def test_import_command_nc(runner): assert len(host.services) == 2 assert sorted([x.port for x in host.services]) == [21, 22] + def test_import_command_auto(runner): """test automatic detection of parser""" From b9a47fa6ead5b248bec7e0f722e3ba585184a79a Mon Sep 17 00:00:00 2001 From: filiq Date: Tue, 15 Aug 2023 13:28:29 +0200 Subject: [PATCH 3/3] storage: autodetection code cleanup --- sner/server/parser.py | 16 +++---------- sner/server/storage/commands.py | 4 ++-- tests/server/storage/test_commands.py | 12 +++++----- tests/server/storage/test_import_parsers.py | 25 +++++++++------------ 4 files changed, 21 insertions(+), 36 deletions(-) diff --git a/sner/server/parser.py b/sner/server/parser.py index 49101058..94630d41 100644 --- a/sner/server/parser.py +++ b/sner/server/parser.py @@ -34,25 +34,15 @@ def load_parser_plugins(): def auto_detect_parser(path): """tries automatically detect parser""" - parser = None - if is_zip(path): with ZipFile(path) as fzip: for fname in filter(lambda x: x == 'assignment.json', fzip.namelist()): try: - parser = json.loads(file_from_zip(path, fname).decode('utf-8'))['config']['module'] + return json.loads(file_from_zip(path, fname).decode('utf-8'))['config']['module'] except KeyError: - pass - else: - output = Path(path).read_text(encoding='utf-8') - - # tries to detect the parser based on the output - if '' in output: - parser = 'nmap' - elif '' in output: - parser = 'nessus' + return None - return parser + return None class ParsedItemBase: # pylint: disable=too-few-public-methods diff --git a/sner/server/storage/commands.py b/sner/server/storage/commands.py index d04c7340..f9e66fa5 100644 --- a/sner/server/storage/commands.py +++ b/sner/server/storage/commands.py @@ -30,12 +30,12 @@ def command(): @with_appcontext @click.option('--dry', is_flag=True, help='do not update database, only print new items') @click.option('--addtag', multiple=True, help='add tag to all imported objects, can be used several times') -@click.argument('parser') +@click.option('--parser', help='specify which parser to use instead of auto detection') @click.argument('path', nargs=-1) def storage_import(path, parser, **kwargs): """import data""" - is_auto_parser = parser == 'auto' + is_auto_parser = parser is None if parser not in REGISTERED_PARSERS and not is_auto_parser: current_app.logger.error('no such parser') diff --git a/tests/server/storage/test_commands.py b/tests/server/storage/test_commands.py index 25845dcb..02042e5d 100644 --- a/tests/server/storage/test_commands.py +++ b/tests/server/storage/test_commands.py @@ -17,36 +17,36 @@ def test_import_command_errorhandling(runner): """test invalid parser""" # invalid parser - result = runner.invoke(command, ['import', 'invalid', 'dummy']) + result = runner.invoke(command, ['import', '--parser', 'invalid', 'dummy']) assert result.exit_code == 1 # parse exception handling - result = runner.invoke(command, ['import', 'nmap', 'sner.yaml.example', 'notexist']) + result = runner.invoke(command, ['import', '--parser', 'nmap', 'sner.yaml.example', 'notexist']) assert result.exit_code == 0 def test_import_command_dryrun(runner): """test import dry run""" - result = runner.invoke(command, ['import', '--dry', 'nmap', 'tests/server/data/parser-nmap-output.xml']) + result = runner.invoke(command, ['import', '--dry', '--parser', 'nmap', 'tests/server/data/parser-nmap-output.xml']) assert result.exit_code == 0 assert 'new host:' in result.output assert 'new service:' in result.output assert 'new note:' in result.output - result = runner.invoke(command, ['import', '--dry', 'nessus', 'tests/server/data/parser-nessus-simple.xml']) + result = runner.invoke(command, ['import', '--dry', '--parser', 'nessus', 'tests/server/data/parser-nessus-simple.xml']) assert result.exit_code == 0 assert 'new host:' in result.output assert 'new service:' in result.output assert 'new vuln:' in result.output - result = runner.invoke(command, ['import', '--dry', 'auto', 'tests/server/data/parser-jarm-job.zip']) + result = runner.invoke(command, ['import', '--dry', 'tests/server/data/parser-jarm-job.zip']) assert result.exit_code == 0 assert 'new host:' in result.output assert 'new service:' in result.output assert 'new note:' in result.output - result = runner.invoke(command, ['import', '--dry', 'auto', 'tests/server/data/parser-dummy-job.zip']) + result = runner.invoke(command, ['import', '--dry', 'tests/server/data/parser-dummy-job.zip']) assert result.exit_code == 0 diff --git a/tests/server/storage/test_import_parsers.py b/tests/server/storage/test_import_parsers.py index 1db7b8d3..bed6f8dc 100644 --- a/tests/server/storage/test_import_parsers.py +++ b/tests/server/storage/test_import_parsers.py @@ -12,7 +12,7 @@ def test_import_command_nmap_job(runner): """test import nmap job""" - result = runner.invoke(command, ['import', 'nmap', 'tests/server/data/parser-nmap-job.zip']) + result = runner.invoke(command, ['import', '--parser', 'nmap', 'tests/server/data/parser-nmap-job.zip']) assert result.exit_code == 0 assert Host.query.one() @@ -21,11 +21,11 @@ def test_import_command_nmap_job(runner): def test_import_command_nmap_rawdata(runner): """test nmap parser""" - result = runner.invoke(command, ['import', 'nmap', 'tests/server/data/parser-nmap-output.xml']) + result = runner.invoke(command, ['import', '--parser', 'nmap', 'tests/server/data/parser-nmap-output.xml']) assert result.exit_code == 0 # run twice to check update scheme of the import algorithms - result = runner.invoke(command, ['import', 'nmap', 'tests/server/data/parser-nmap-output.xml']) + result = runner.invoke(command, ['import', '--parser', 'nmap', 'tests/server/data/parser-nmap-output.xml']) assert result.exit_code == 0 host = Host.query.one() @@ -39,11 +39,11 @@ def test_import_command_nmap_rawdata(runner): def test_import_command_nessus(runner): """test nessus parser""" - result = runner.invoke(command, ['import', 'nessus', 'tests/server/data/parser-nessus-simple.xml']) + result = runner.invoke(command, ['import', '--parser', 'nessus', 'tests/server/data/parser-nessus-simple.xml']) assert result.exit_code == 0 # run twice to check update scheme of the import algorithms - result = runner.invoke(command, ['import', 'nessus', 'tests/server/data/parser-nessus-simple.xml']) + result = runner.invoke(command, ['import', '--parser', 'nessus', 'tests/server/data/parser-nessus-simple.xml']) assert result.exit_code == 0 host = Host.query.one() @@ -59,7 +59,7 @@ def test_import_command_nessus(runner): def test_import_command_manymap_job(runner): """test manymap parser; zipfile import""" - result = runner.invoke(command, ['import', 'manymap', 'tests/server/data/parser-manymap-job.zip']) + result = runner.invoke(command, ['import', '--parser', 'manymap', 'tests/server/data/parser-manymap-job.zip']) assert result.exit_code == 0 host = Host.query.one() @@ -70,7 +70,7 @@ def test_import_command_manymap_job(runner): def test_import_command_nc(runner): """test nc parser""" - result = runner.invoke(command, ['import', 'nc', 'tests/server/data/parser-nc.txt']) + result = runner.invoke(command, ['import', '--parser', 'nc', 'tests/server/data/parser-nc.txt']) assert result.exit_code == 0 host = Host.query.one() @@ -81,14 +81,9 @@ def test_import_command_nc(runner): def test_import_command_auto(runner): """test automatic detection of parser""" - result = runner.invoke(command, ['import', 'auto', 'tests/server/data/parser-nmap-output.xml']) + result = runner.invoke(command, ['import', 'tests/server/data/parser-six_dns_discover-job.zip']) assert result.exit_code == 0 - assert Host.query.count() == 1 + assert Host.query - result = runner.invoke(command, ['import', 'auto', 'tests/server/data/parser-nessus-simple.xml']) + result = runner.invoke(command, ['import', 'tests/server/data/parser-nc.txt']) assert result.exit_code == 0 - assert Host.query.count() == 2 - - result = runner.invoke(command, ['import', 'auto', 'tests/server/data/parser-six_dns_discover-job.zip']) - assert result.exit_code == 0 - assert Host.query.count() == 3