diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 08edf34..f9139f0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,6 +2,9 @@ name: ci on: [push, pull_request] +permissions: + contents: read + jobs: test: @@ -9,21 +12,26 @@ jobs: strategy: matrix: - python-version: [ '2.x', '3.x' ] + python-version: [ '3.6', '3.7', '3.8', '3.9', '3.10' ] steps: - name: Checkout repo - uses: actions/checkout@v2 + uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2 with: python-version: ${{ matrix.python-version }} - name: Install requirements run: | python -m pip install -r requirements.txt - python -m pip install pytest-codestyle || true - python -m pip install pytest-flakes python setup.py install + - name: Check source files + if: matrix.python-version == '3.10' + run: | + python -m pip install pytest-pycodestyle + python -m pip install pytest-flakes + py.test --pycodestyle ftw + py.test --flakes ftw - name: Run tests run: | py.test test/unit @@ -32,6 +40,3 @@ jobs: py.test test/integration/test_http.py py.test test/integration/test_cookie.py --rule=test/integration/COOKIEFIXTURE.yaml py.test test/integration/test_runner.py --rule=test/integration/BASICFIXTURE.yaml - py.test --pycodestyle ftw || true - py.test --flakes ftw - continue-on-error: true diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..c8309e1 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,31 @@ +name: Upload Python Package + +on: + release: + types: [created] + +permissions: + contents: read + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2 + - name: Set up Python + uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2 + with: + python-version: '3.x' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel twine + - name: Build and publish + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: | + python setup.py sdist bdist_wheel + twine upload dist/* diff --git a/ftw/errors.py b/ftw/errors.py index d129cab..7d1ee9c 100644 --- a/ftw/errors.py +++ b/ftw/errors.py @@ -1,3 +1,3 @@ class TestError(Exception): def __init___(self, msg, context_args): - Exception.__init__(self, "{0} {1}".format(msg, context_args)) + Exception.__init__(self, '{0} {1}'.format(msg, context_args)) diff --git a/ftw/http.py b/ftw/http.py index f951e9d..68da291 100644 --- a/ftw/http.py +++ b/ftw/http.py @@ -1,37 +1,30 @@ -from __future__ import print_function -import brotli -import io -import socket -import ssl +from http import cookies +from io import BytesIO +import base64 +import encodings import errno -import time import gzip import os -import sys import re -import base64 +import socket +import ssl +import sys import zlib -import encodings -from IPy import IP +import select -from six import BytesIO, PY2, ensure_binary, ensure_str, iteritems, \ - text_type -from six.moves import http_cookies +import brotli +from IPy import IP from . import errors +from . import util -if PY2: - reload(sys) # pragma: no flakes - sys.setdefaultencoding('utf8') - escape_codec = 'string_escape' -else: - escape_codec = 'unicode_escape' +SOCKET_TIMEOUT = .3 class HttpResponse(object): def __init__(self, http_response, user_agent): - self.response = ensure_binary(http_response) + self.response = util.ensure_binary(http_response) # For testing purposes HTTPResponse might be called OOL try: self.dest_addr = user_agent.request_object.dest_addr @@ -102,7 +95,7 @@ def check_for_cookie(self, cookie): IP(self.dest_addr) except ValueError: origin_is_ip = False - for cookie_morsals in cookie.values(): + for cookie_morsals in list(cookie.values()): # If the coverdomain is blank or the domain is an IP # set the domain to be the origin if cookie_morsals['domain'] == '' or origin_is_ip: @@ -122,8 +115,8 @@ def check_for_cookie(self, cookie): cover_domain = cover_domain[first_non_dot:] # We must parse the coverDomain to make sure its # not in the suffix list - psl_path = os.path.dirname(__file__) + os.path.sep + \ - 'util' + os.path.sep + 'public_suffix_list.dat' + psl_path = os.path.join(os.path.dirname(__file__), + 'util', 'public_suffix_list.dat') # Check if the public suffix list is present in the ftw dir if os.path.exists(psl_path): pass @@ -135,7 +128,7 @@ def check_for_cookie(self, cookie): 'function': 'http.HttpResponse.check_for_cookie' }) try: - with io.open(psl_path, 'r', encoding='utf-8') as fo: + with open(psl_path, 'r', encoding='utf-8') as fo: for line in fo: if line[:2] == '//' or line[0] == ' ' or \ line[0].strip() == '': @@ -181,7 +174,7 @@ def process_response(self): Parses an HTTP response after an HTTP request is sent """ split_response = self.response.split(self.CRLF) - response_line = ensure_str(split_response[0]) + response_line = util.ensure_str(split_response[0]) response_headers = {} response_data = None data_line = None @@ -200,13 +193,13 @@ def process_response(self): 'header_rcvd': str(header), 'function': 'http.HttpResponse.process_response' }) - header = ensure_str(header[0]), ensure_str(header[1]) + header = util.ensure_str(header[0]), util.ensure_str(header[1]) response_headers[header[0].lower()] = header[1].lstrip() - if 'set-cookie' in response_headers.keys(): + if 'set-cookie' in list(response_headers.keys()): try: - cookie = http_cookies.SimpleCookie() + cookie = cookies.SimpleCookie() cookie.load(response_headers['set-cookie']) - except http_cookies.CookieError as err: + except cookies.CookieError as err: raise errors.TestError( 'Error processing the cookie content into a SimpleCookie', { @@ -228,7 +221,7 @@ def process_response(self): response_data = self.CRLF.join(split_response[data_line:]) # if the output headers say there is encoding - if 'content-encoding' in response_headers.keys(): + if 'content-encoding' in list(response_headers.keys()): response_data = self.parse_content_encoding( response_headers, response_data) if len(response_line.split(' ', 2)) != 3: @@ -273,7 +266,6 @@ def __init__(self): 'ADH-AES256-SHA:ECDHE-ECDSA-AES128-GCM-SHA256:' \ 'ECDHE-RSA-AES128-GCM-SHA256:AES128-GCM-SHA256:AES128-SHA256:HIGH:' self.CRLF = '\r\n' - self.HTTP_TIMEOUT = .3 self.RECEIVE_BYTES = 8192 self.SOCKET_TIMEOUT = 5 @@ -305,8 +297,9 @@ def build_socket(self): self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Check if TLS if self.request_object.protocol == 'https': - context = ssl.SSLContext(ssl.PROTOCOL_TLS) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context.set_ciphers(self.CIPHERS) + context.load_default_certs(ssl.Purpose.SERVER_AUTH) self.sock = context.wrap_socket( self.sock, server_hostname=self.request_object.dest_addr) self.sock.connect( @@ -329,7 +322,7 @@ def find_cookie(self): return_cookies = [] origin_domain = self.request_object.dest_addr for cookie in self.cookiejar: - for cookie_morsals in cookie[0].values(): + for cookie_morsals in list(cookie[0].values()): cover_domain = cookie_morsals['domain'] if cover_domain == '': if origin_domain == cookie[1]: @@ -357,12 +350,12 @@ def build_request(self): # If the user has requested a tracked cookie and we have one set it if available_cookies: cookie_value = '' - if 'cookie' in self.request_object.headers.keys(): + if 'cookie' in list(self.request_object.headers.keys()): # Create a SimpleCookie out of our provided cookie try: - provided_cookie = http_cookies.SimpleCookie() + provided_cookie = cookies.SimpleCookie() provided_cookie.load(self.request_object.headers['cookie']) - except http_cookies.CookieError as err: + except cookies.CookieError as err: raise errors.TestError( 'Error processing the existing cookie into a ' 'SimpleCookie', @@ -373,29 +366,29 @@ def build_request(self): 'function': 'http.HttpResponse.build_request' }) result_cookie = {} - for cookie_key, cookie_morsal in iteritems(provided_cookie): + for cookie_key, cookie_morsal in list(provided_cookie.items()): result_cookie[cookie_key] = \ provided_cookie[cookie_key].value for cookie in available_cookies: - for cookie_key, cookie_morsal in iteritems(cookie): - if cookie_key in result_cookie.keys(): + for cookie_key, cookie_morsal in cookie: + if cookie_key in list(result_cookie.keys()): # we don't overwrite a user specified # cookie with a saved one pass else: result_cookie[cookie_key] = \ cookie[cookie_key].value - for key, value in iteritems(result_cookie): - cookie_value += (text_type(key) + '=' + - text_type(value) + '; ') + for key, value in list(result_cookie.items()): + cookie_value += (str(key) + '=' + + str(value) + '; ') # Remove the trailing semicolon cookie_value = cookie_value[:-2] self.request_object.headers['cookie'] = cookie_value else: for cookie in available_cookies: - for cookie_key, cookie_morsal in iteritems(cookie): - cookie_value += (text_type(cookie_key) + '=' + - text_type(cookie_morsal.coded_value) + + for cookie_key, cookie_morsal in list(cookie.items()): + cookie_value += (str(cookie_key) + '=' + + str(cookie_morsal.coded_value) + '; ') # Remove the trailing semicolon cookie_value = cookie_value[:-2] @@ -404,18 +397,18 @@ def build_request(self): # Expand out our headers into a string headers = '' if self.request_object.headers != {}: - for hname, hvalue in iteritems(self.request_object.headers): - headers += text_type(hname) + ': ' + \ - text_type(hvalue) + self.CRLF + for hname, hvalue in self.request_object.headers.items(): + headers += str(hname) + ': ' + \ + str(hvalue) + self.CRLF request = request.replace('#headers#', headers) # If we have data append it if self.request_object.data != '': # Before we do that see if that is a charset - encoding = "utf-8" + encoding = 'utf-8' # Check to see if we have a content type and magic is # off (otherwise UTF-8) - if 'Content-Type' in self.request_object.headers.keys() and \ + if 'Content-Type' in list(self.request_object.headers.keys()) and \ self.request_object.stop_magic is False: pattern = re.compile(r'\;\s{0,1}?charset\=(.*?)(?:$|\;|\s)') m = re.search(pattern, @@ -431,20 +424,24 @@ def build_request(self): if choice in possible_choices: encoding = choice try: - data = self.request_object.data.encode(encoding) - except UnicodeEncodeError as err: + data_bytes = \ + self.request_object.data.encode(encoding, 'strict') + except UnicodeError as err: raise errors.TestError( 'Error encoding the data with the charset specified', { 'msg': str(err), 'Content-Type': str(self.request_object.headers['Content-Type']), - 'data': text_type(self.request_object.data), + 'data': str(self.request_object.data), 'function': 'http.HttpResponse.build_request' }) - request = request.replace('#data#', ensure_str(data)) + data = util.ensure_str(data_bytes) else: - request = request.replace('#data#', '') + data = '' + + request = request.replace('#data#', data).encode('utf-8', 'strict') + # If we have a Raw Request we should use that instead if self.request_object.raw_request is not None: if self.request_object.encoded_request is not None: @@ -453,52 +450,92 @@ def build_request(self): { 'function': 'http.HttpUA.build_request' }) - request = ensure_binary(self.request_object.raw_request) + request = self.request_object.raw_request.encode('utf-8', 'strict') # We do this regardless of magic if you want to send a literal # '\' 'r' or 'n' use encoded request. - request = request.decode(escape_codec) + request = request.decode('unicode_escape') + if self.request_object.encoded_request is not None: request = base64.b64decode(self.request_object.encoded_request) - request = request.decode(escape_codec) - # if we have an Encoded request we should use that - self.request = ensure_binary(request) + + # Use the request created + self.request = util.ensure_binary(request) def get_response(self): """ Get the response from the socket """ - self.sock.setblocking(0) our_data = [] - # Beginning time - begin = time.time() + self.sock.setblocking(False) + try: + our_data = self.read_response_from_socket() + finally: + try: + self.sock.shutdown(socket.SHUT_WR) + self.sock.close() + except OSError as err: + raise errors.TestError( + 'We were unable to close the socket as expected.', + { + 'msg': err, + 'function': 'http.HttpUA.get_response' + }) + else: + self.response_object = HttpResponse(b''.join(our_data), self) + finally: + if not b''.join(our_data): + raise errors.TestError( + 'No response from server.' + + ' Request likely timed out.', + { + 'host': self.request_object.dest_addr, + 'port': self.request_object.port, + 'proto': self.request_object.protocol, + 'msg': 'Please send the request and check' + + ' Wireshark', + 'function': 'http.HttpUA.get_response' + }) + + def read_response_from_socket(self): + # wait for socket to become ready + ready_sock, _, _ = select.select( + [self.sock], [], [self.sock], self.SOCKET_TIMEOUT) + if not ready_sock: + raise errors.TestError( + f'No response from server within {self.SOCKET_TIMEOUT}s', + { + 'host': self.request_object.dest_addr, + 'port': self.request_object.port, + 'proto': self.request_object.protocol, + 'msg': 'Please send the request and check Wireshark', + 'function': 'http.HttpUA.get_response' + }) + + our_data = [] while True: - # If we have data then if we're passed the timeout break - if our_data and time.time() - begin > self.HTTP_TIMEOUT: - break - # If we're dataless wait just a bit - elif time.time() - begin > self.HTTP_TIMEOUT * 2: - break - # Recv data try: data = self.sock.recv(self.RECEIVE_BYTES) - if data: - our_data.append(ensure_binary(data)) - begin = time.time() - else: - # Sleep for sometime to indicate a gap - time.sleep(self.HTTP_TIMEOUT) - except socket.error as err: - # Check if we got a timeout - if err.errno == errno.EAGAIN: - pass + if len(data) == 0: + # we're done + break + our_data.append(util.ensure_binary(data)) + except BlockingIOError as e: + # If we can't handle the error here, pass it on + if e.errno == socket.EAGAIN or e.errno == socket.EWOULDBLOCK: + # we're done + break + except OSError as err: # SSL will return SSLWantRead instead of EAGAIN - elif sys.platform == 'win32' and \ - err.errno == errno.WSAEWOULDBLOCK: + if (sys.platform == 'win32' and + err.errno == errno.WSAEWOULDBLOCK): pass elif (self.request_object.protocol == 'https' and - err.args[0] == ssl.SSL_ERROR_WANT_READ): - continue - # If we didn't it's an error + err.args[0] == ssl.SSL_ERROR_WANT_READ): + ready_sock, _, _ = select.select( + [self.sock], [], [self.sock], SOCKET_TIMEOUT) + if not ready_sock: + break + # It's an error else: raise errors.TestError( 'Failed to connect to server', @@ -509,26 +546,4 @@ def get_response(self): 'message': err, 'function': 'http.HttpUA.get_response' }) - try: - self.sock.shutdown(socket.SHUT_WR) - self.sock.close() - except socket.error as err: - raise errors.TestError( - 'We were unable to close the socket as expected.', - { - 'msg': err, - 'function': 'http.HttpUA.get_response' - }) - else: - self.response_object = HttpResponse(b''.join(our_data), self) - finally: - if not b''.join(our_data): - raise errors.TestError( - 'No response from server. Request likely timed out.', - { - 'host': self.request_object.dest_addr, - 'port': self.request_object.port, - 'proto': self.request_object.protocol, - 'msg': 'Please send the request and check Wireshark', - 'function': 'http.HttpUA.get_response' - }) + return our_data diff --git a/ftw/logchecker.py b/ftw/logchecker.py index c99ad83..22e0fa5 100644 --- a/ftw/logchecker.py +++ b/ftw/logchecker.py @@ -1,9 +1,7 @@ -import abc -import six +from abc import ABC, abstractmethod -@six.add_metaclass(abc.ABCMeta) -class LogChecker(): +class LogChecker(ABC): """ LogChecker is an abstract class that integrations with WAFs MUST implement. This class is used by the testrunner to test log lines against an expected @@ -17,7 +15,21 @@ def set_times(self, start, end): self.start = start self.end = end - @abc.abstractmethod + def mark_start(self, stage_id): + """ + May be implemented to set up the log checker before + the request is being sent + """ + pass + + def mark_end(self, stage_id): + """ + May be implemented to tell the log checker that + the response has been received + """ + pass + + @abstractmethod def get_logs(self): """ MUST be implemented, MUST return an array of strings diff --git a/ftw/pytest_plugin.py b/ftw/pytest_plugin.py index 79cf0c8..63d288c 100644 --- a/ftw/pytest_plugin.py +++ b/ftw/pytest_plugin.py @@ -1,13 +1,13 @@ +from http.server import HTTPServer +from http.server import SimpleHTTPRequestHandler + import pytest from . import util from .ruleset import Test -from six.moves.BaseHTTPServer import HTTPServer -from six.moves.SimpleHTTPServer import SimpleHTTPRequestHandler - -def get_testdata(rulesets): +def get_testdata(rulesets, use_rulesets): """ In order to do test-level parametrization (is this a word?), we have to bundle the test data from rulesets into tuples so py.test can understand @@ -17,7 +17,10 @@ def get_testdata(rulesets): for ruleset in rulesets: for test in ruleset.tests: if test.enabled: - testdata.append((ruleset, test)) + args = [test] + if use_rulesets: + args = [rulesets] + args + testdata.append(args) return testdata @@ -27,13 +30,13 @@ def test_id(val): Dynamically names tests, useful for when we are running dozens to hundreds of tests """ - if isinstance(val, (dict, Test,)): + if isinstance(val, (dict, Test)): # We must be carful here because errors are swallowed and # defaults returned - if 'name' in val.ruleset_meta.keys(): + if 'name' in list(val.ruleset_meta.keys()): return '%s -- %s' % (val.ruleset_meta['name'], val.test_title) else: - return '%s -- %s' % ("Unnamed_Test", val.test_title) + return '%s -- %s' % ('Unnamed_Test', val.test_title) @pytest.fixture @@ -104,7 +107,7 @@ def pytest_addoption(parser): help='pass in a tablename to parse journal results') parser.addoption('--port', action='store', default=None, help='destination port to direct tests towards', - choices=range(1, 65536), + choices=list(range(1, 65536)), type=int) parser.addoption('--protocol', action='store', default=None, help='destination protocol to direct tests towards', @@ -127,7 +130,13 @@ def pytest_generate_tests(metafunc): metafunc.config.option.ruledir_recurse, True) if metafunc.config.option.rule: rulesets = util.get_rulesets(metafunc.config.option.rule, False) - if 'ruleset' in metafunc.fixturenames and \ - 'test' in metafunc.fixturenames: - metafunc.parametrize('ruleset, test', get_testdata(rulesets), - ids=test_id) + if 'test' in metafunc.fixturenames: + use_rulesets = False + arg_names = ['test'] + if 'ruleset' in metafunc.fixturenames: + use_rulesets = True + arg_names = ['ruleset'] + arg_names + metafunc.parametrize( + arg_names, + get_testdata(rulesets, use_rulesets), + ids=test_id) diff --git a/ftw/ruleset.py b/ftw/ruleset.py index 83d6721..13f06af 100644 --- a/ftw/ruleset.py +++ b/ftw/ruleset.py @@ -1,9 +1,8 @@ import re - -from six import ensure_str -from six.moves.urllib.parse import parse_qsl, unquote, urlencode +from urllib.parse import parse_qsl, unquote, urlencode from . import errors +from . import util class Output(object): @@ -115,18 +114,20 @@ def __init__(self, raw_request=None, # Check if there is any data and do defaults if self.data != '': # Default values for content length and header - if 'Content-Type' not in headers.keys() and stop_magic is False: + if 'Content-Type' not in list(headers.keys()) and \ + stop_magic is False: headers['Content-Type'] = 'application/x-www-form-urlencoded' # check if encoded and encode if it should be - if 'Content-Type' in headers.keys(): + if 'Content-Type' in list(headers.keys()): if headers['Content-Type'] == \ 'application/x-www-form-urlencoded' and stop_magic is False: - if ensure_str(unquote(self.data)) == self.data: + if util.ensure_str(unquote(self.data)) == self.data: query_string = parse_qsl(self.data) if len(query_string) != 0: encoded_args = urlencode(query_string) self.data = encoded_args - if 'Content-Length' not in headers.keys() and stop_magic is False: + if 'Content-Length' not in list(headers.keys()) and \ + stop_magic is False: # The two is for the trailing CRLF and the one after headers['Content-Length'] = len(self.data) @@ -136,18 +137,26 @@ class Stage(object): This class holds information about 1 stage in a test, which contains 1 input and 1 output """ - def __init__(self, stage_dict): + def __init__(self, stage_dict, stage_index, test): self.stage_dict = stage_dict + self.stage_index = stage_index + self.test = test self.input = Input(**stage_dict['input']) self.output = Output(stage_dict['output']) + self.id = self.build_id() + + def build_id(self): + rule_name = self.test.ruleset_meta["name"].split('.')[0] + return f'{rule_name}-{self.test.test_index}-{self.stage_index}' class Test(object): """ This class holds information for 1 test and potentially many stages """ - def __init__(self, test_dict, ruleset_meta): + def __init__(self, test_dict, test_index, ruleset_meta): self.test_dict = test_dict + self.test_index = test_index self.ruleset_meta = ruleset_meta self.test_title = self.test_dict['test_title'] self.stages = self.build_stages() @@ -159,10 +168,8 @@ def build_stages(self): """ Processes and loads an array of stages from the test dictionary """ - return map( - lambda stage_dict: Stage(stage_dict['stage']), - self.test_dict['stages'] - ) + return [Stage(stage_dict['stage'], index, self) + for index, stage_dict in enumerate(self.test_dict['stages'])] class Ruleset(object): @@ -184,10 +191,8 @@ def extract_tests(self): creates test objects based on input """ try: - return map( - lambda test_dict: Test(test_dict, self.meta), - self.yaml_file['tests'] - ) + return [Test(test_dict, index, self.meta) + for index, test_dict in enumerate(self.yaml_file['tests'])] except errors.TestError as e: e.args[1]['meta'] = self.meta raise e diff --git a/ftw/testrunner.py b/ftw/testrunner.py index fb0d0ba..21f6ade 100644 --- a/ftw/testrunner.py +++ b/ftw/testrunner.py @@ -1,10 +1,8 @@ -from __future__ import print_function import datetime -from dateutil import parser -import pytest import sqlite3 -from six import ensure_str +from dateutil import parser +import pytest from . import errors from . import http @@ -57,7 +55,7 @@ def test_response(self, response_object, regex): 'response_object': response_object, 'function': 'testrunner.TestRunner.test_response' }) - if regex.search(ensure_str(response_object.response)): + if regex.search(util.ensure_str(response_object.response)): assert True else: assert False @@ -67,7 +65,7 @@ def test_response_str(self, response, regex): Checks if the response response contains a regex specified in the output stage. It will assert that the regex is present. """ - if regex.search(ensure_str(response)): + if regex.search(util.ensure_str(response)): assert True else: assert False @@ -99,10 +97,10 @@ def run_stage_with_journal(self, rule_id, test, journal_file, conn.text_factory = str cur = conn.cursor() for i, stage in enumerate(test.stages): - ''' + """ Query DB here for rule_id & test_title Compare against logger_obj - ''' + """ q = self.query_for_stage_results(tablename) results = cur.execute(q, [i, test.test_title]).fetchall() if len(results) == 0: @@ -176,7 +174,6 @@ def run_stage(self, stage, logger_obj=None, http_ua=None): input, waits for output then compares expected vs actual output http_ua can be passed in to persist cookies """ - # Send our request (exceptions caught as needed) if stage.output.expect_error: with pytest.raises(errors.TestError) as excinfo: @@ -189,11 +186,20 @@ def run_stage(self, stage, logger_obj=None, http_ua=None): else: if not http_ua: http_ua = http.HttpUA() - start = datetime.datetime.utcnow() + if ((stage.output.log_contains_str or + stage.output.no_log_contains_str) and + logger_obj is not None): + logger_obj.mark_start(stage.id) + start = datetime.datetime.utcnow() http_ua.send_request(stage.input) - end = datetime.datetime.utcnow() - if (stage.output.log_contains_str or - stage.output.no_log_contains_str) and logger_obj is not None: + if ((stage.output.log_contains_str or + stage.output.no_log_contains_str) and + logger_obj is not None): + logger_obj.mark_end(stage.id) + end = datetime.datetime.utcnow() + if ((stage.output.log_contains_str or + stage.output.no_log_contains_str) and + logger_obj is not None): logger_obj.set_times(start, end) lines = logger_obj.get_logs() if stage.output.log_contains_str: diff --git a/ftw/util.py b/ftw/util.py index b5f8f0b..d15f48c 100644 --- a/ftw/util.py +++ b/ftw/util.py @@ -1,9 +1,8 @@ -from __future__ import print_function -import io -import yaml import os -import sqlite3 from glob import glob +import sqlite3 + +import yaml from . import ruleset @@ -86,7 +85,7 @@ def extract_yaml(yaml_files): loaded_yaml = [] for yaml_file in yaml_files: try: - with io.open(yaml_file, encoding='utf-8') as fd: + with open(yaml_file, encoding='utf-8') as fd: loaded_yaml.append(yaml.safe_load(fd)) except IOError as e: print('Error reading file', yaml_file) @@ -98,3 +97,21 @@ def extract_yaml(yaml_files): print('General error') raise e return loaded_yaml + + +def ensure_str(s, encoding='utf-8', errors='strict'): + # Optimization: Fast return for the common case. + if isinstance(s, str): + return s + if isinstance(s, bytes): + return s.decode(encoding, errors) + elif not isinstance(s, (str, bytes)): + raise TypeError('not expecting type "%s"' % type(s)) + + +def ensure_binary(s, encoding='utf-8', errors='strict'): + if isinstance(s, bytes): + return s + if isinstance(s, str): + return s.encode(encoding, errors) + raise TypeError('not expecting type "%s"' % type(s)) diff --git a/ftw/util/ironbee.py b/ftw/util/ironbee.py index cb36089..6f40bd1 100644 --- a/ftw/util/ironbee.py +++ b/ftw/util/ironbee.py @@ -1,18 +1,18 @@ -from __future__ import print_function + import os import request_to_yaml filelist = [] -for root, dirs, files in os.walk("waf-research", topdown=False): +for root, dirs, files in os.walk('waf-research', topdown=False): for name in files: extension = name[-5:] - if extension == ".test": + if extension == '.test': filelist.append(os.path.join(root, name)) for fname in filelist: f = open(fname, 'r') - request = "" + request = '' for line in f.readlines(): if line[0] != '#': request += line diff --git a/ftw/util/request_to_yaml.py b/ftw/util/request_to_yaml.py index 24edd25..7ba43d3 100644 --- a/ftw/util/request_to_yaml.py +++ b/ftw/util/request_to_yaml.py @@ -9,15 +9,15 @@ def __init__(self): def double_quote(self, mystr): return mystr - return "\"" + str(mystr) + "\"" + return '"' + str(mystr) + '"' def generate_yaml(self): data = dict( meta=dict( - author="Zack", + author='Zack', enabled=True, - name="EXAMPLE.yaml", - description="Description" + name='EXAMPLE.yaml', + description='Description' ), tests=[dict( rule_id=1234, @@ -36,9 +36,9 @@ def generate_yaml(self): def get_request_line(self, req): req = req.split('\r\n')[0] req = req.split(' ', 2) - self.input["method"] = self.double_quote(req[0]) - self.input["uri"] = self.double_quote(req[1]) - self.input["version"] = self.double_quote(req[2]) + self.input['method'] = self.double_quote(req[0]) + self.input['uri'] = self.double_quote(req[1]) + self.input['version'] = self.double_quote(req[2]) def get_headers(self, req): req = req.split('\r\n')[1:] @@ -49,12 +49,12 @@ def get_headers(self, req): break head = req[num].split(':') header[head[0]] = head[1].strip() - self.input["headers"] = self.double_quote(header) + self.input['headers'] = self.double_quote(header) def get_data(self, req): req = req.split('\r\n')[1:] - self.input["data"] = self.double_quote( - "\r\n".join(req[self.data_start + 1:])) + self.input['data'] = self.double_quote( + '\r\n'.join(req[self.data_start + 1:])) def write_yaml(self, fname, yaml_out): f = open(fname, 'w') @@ -63,12 +63,12 @@ def write_yaml(self, fname, yaml_out): # Example Usage # req = Request() # -# request = """GET / HTTP/1.1 +# request = '''GET / HTTP/1.1 # User-Agent: test:/data # # xyz # -# """ +# ''' # request = request.replace('\n', '\r\n') # req.get_request_line(request) # req.get_headers(request) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..374b58c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel" +] +build-backend = "setuptools.build_meta" diff --git a/requirements.txt b/requirements.txt index 6f40c81..b46f7dc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ -Brotli==1.0.7 -IPy==0.83 -PyYAML==4.2b1 -pytest==4.6 -python-dateutil==2.6.0 -six==1.14.0 +Brotli==1.0.9 +IPy==1.01 +PyYAML==6.0 +pytest==6.2.5 +python-dateutil==2.8.2 diff --git a/setup.cfg b/setup.cfg index cc752ab..8b88600 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [metadata] -description-file = README.md +description_file = README.md [tool:pytest] addopts = -s -v diff --git a/setup.py b/setup.py index 8c9ee2c..59309d0 100644 --- a/setup.py +++ b/setup.py @@ -1,30 +1,44 @@ -#!/usr/bin/env python +import setuptools -from setuptools import setup +with open("README.md", "r", encoding="utf-8") as fh: + long_description = fh.read() -setup(name='ftw', - version='1.2.2', - description='Framework for Testing WAFs', - author='Chaim Sanders, Zack Allen', - author_email='zma4580@gmail.com, chaim.sanders@gmail.com', - url='https://www.github.com/coreruleset/ftw', - download_url='https://github.com/coreruleset/ftw/tarball/1.2.2', - include_package_data=True, - package_data={ +setuptools.setup( + name='ftw', + description='Framework for Testing WAFs', + long_description=long_description, + long_description_content_type="text/markdown", + author='Chaim Sanders, Zack Allen', + author_email='zma4580@gmail.com, chaim.sanders@gmail.com', + url='https://github.com/coreruleset/ftw', + include_package_data=True, + package_data={ 'ftw': ['util/public_suffix_list.dat'] - }, - entry_points={ - 'pytest11': [ - 'ftw = ftw.pytest_plugin' - ] - }, - packages=['ftw'], - keywords=['waf'], - install_requires=[ - 'Brotli==1.0.7', - 'IPy==0.83', - 'PyYAML==4.2b1', - 'pytest==4.6', - 'python-dateutil==2.6.0', - 'six==1.14.0' - ]) + }, + entry_points={ + 'pytest11': [ + 'ftw = ftw.pytest_plugin' + ] + }, + keywords=['waf'], + project_urls={ + "Bug Tracker": 'https://github.com/coreruleset/ftw/issues', + }, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Framework :: Pytest", + ], + packages=["ftw"], + python_requires=">=3.6", + use_scm_version=True, + setup_requires=['setuptools_scm'], + install_requires=[ + 'Brotli==1.0.9', + 'IPy==1.01', + 'PyYAML==6.0', + 'pytest==6.2.5', + 'python-dateutil==2.8.2' + ], +) diff --git a/test/integration/COOKIEFIXTURE.yaml b/test/integration/COOKIEFIXTURE.yaml index e5676e6..f700204 100644 --- a/test/integration/COOKIEFIXTURE.yaml +++ b/test/integration/COOKIEFIXTURE.yaml @@ -1,40 +1,67 @@ --- - meta: - author: "Chaim" - enabled: true - name: "COOKIEFIXTURE.yaml" - description: "Tests cookie saving functionality" - tests: - - - test_title: "Multi-Stage w\\ Cookie" - stages: - - - stage: - input: - save_cookie: true - dest_addr: "www.ieee.org" - method: "GET" - port: 443 - headers: - User-Agent: "Foo" - Host: "www.ieee.org" - protocol: "https" - uri: "/" - output: - status: 200 - response_contains: "Set-Cookie: TS01247332=" - - - stage: - input: - save_cookie: true - dest_addr: "www.ieee.org" - method: "GET" - port: 443 - headers: - User-Agent: "Foo" - Host: "www.ieee.org" - protocol: "https" - uri: "/" - output: - status: 200 - response_contains: "Set-Cookie: TS01247332=" +meta: + author: "Chaim" + enabled: true + name: "COOKIEFIXTURE.yaml" + description: "Tests cookie saving functionality" +tests: + - test_title: "Multi-Stage w\\ Cookie" + stages: + - stage: + input: + save_cookie: true + dest_addr: "www.cloudflare.com" + method: "GET" + port: 443 + headers: + User-Agent: "Foo" + Host: "www.cloudflare.com" + protocol: "https" + uri: "/" + output: + status: 200 + response_contains: "[Ss]et-[Cc]ookie: __cf_bm=" + - stage: + input: + save_cookie: true + dest_addr: "www.cloudflare.com" + method: "GET" + port: 443 + headers: + User-Agent: "Foo" + Host: "www.cloudflare.com" + protocol: "https" + uri: "/" + output: + status: 200 + no_response_contains: "[Ss]et-[Cc]ookie: __cf_bm=" + - test_title: "Multi-Stage w\\ Cookie; failure because the cookie is reset if not all cookies are present and ftw can only handle one cookie header" + stages: + - stage: + input: + save_cookie: true + dest_addr: "www.ieee.org" + method: "GET" + port: 443 + headers: + User-Agent: "Foo" + Host: "www.ieee.org" + protocol: "https" + uri: "/" + output: + status: 200 + response_contains: "[Ss]et-[Cc]ookie: TS01247332=" + - stage: + input: + save_cookie: true + dest_addr: "www.ieee.org" + method: "GET" + port: 443 + headers: + User-Agent: "Foo" + Host: "www.ieee.org" + protocol: "https" + uri: "/" + output: + status: 200 + no_response_contains: "[Ss]et-[Cc]ookie: TS01247332=" diff --git a/test/integration/test_cookie.py b/test/integration/test_cookie.py index 0dd9f8a..bf36434 100644 --- a/test/integration/test_cookie.py +++ b/test/integration/test_cookie.py @@ -2,6 +2,11 @@ import pytest +@pytest.mark.skip( + reason=""" + 1. ieee.org has a very bad web server, so responses fail a lot + 2. ieee.org sends multiple set-cookie headers and ftw can only handle a single header of the same name""" +) def test_default(ruleset, test, destaddr): """ Default tester with no logger obj. Useful for HTML contains and Status code diff --git a/test/integration/test_http.py b/test/integration/test_http.py index 54e41f1..dc656aa 100644 --- a/test/integration/test_http.py +++ b/test/integration/test_http.py @@ -1,60 +1,59 @@ -from __future__ import print_function + from ftw import ruleset, http, errors import pytest -@pytest.mark.skip(reason='Integration failure, @chaimsanders for more info') def test_cookies1(): """Tests accessing a site that sets a cookie and then wants to resend the cookie""" http_ua = http.HttpUA() - x = ruleset.Input(protocol="https", port=443, dest_addr="www.ieee.org", - headers={"Host": "www.ieee.org"}) + x = ruleset.Input(protocol='https', port=443, dest_addr='www.cloudflare.com', + headers={'Host': 'www.cloudflare.com'}) http_ua.send_request(x) with pytest.raises(KeyError): - print(http_ua.request_object.headers["cookie"]) - assert("set-cookie" in http_ua.response_object.headers.keys()) - cookie_data = http_ua.response_object.headers["set-cookie"] - cookie_var = cookie_data.split("=")[0] - x = ruleset.Input(protocol="https", port=443, dest_addr="www.ieee.org", - headers={"Host": "www.ieee.org"}) - http_ua.send_request(x) - assert(http_ua.request_object.headers["cookie"].split('=')[0] == + print(http_ua.request_object.headers['cookie']) + assert('set-cookie' in list(http_ua.response_object.headers.keys())) + cookie_data = http_ua.response_object.headers['set-cookie'] + cookie_var = cookie_data.split('=')[0] + x = ruleset.Input(protocol='https', port=443, dest_addr='www.cloudflare.com', + headers={'Host': 'www.cloudflare.com'}) + http_ua.send_request(x) + assert(http_ua.request_object.headers['cookie'].split('=')[0] == cookie_var) def test_cookies2(): """Test to make sure that we don't override user specified cookies""" http_ua = http.HttpUA() - x = ruleset.Input(dest_addr="ieee.org", headers={"Host": "ieee.org"}) + x = ruleset.Input(dest_addr='example.com', headers={'Host': 'example.com'}) http_ua.send_request(x) - x = ruleset.Input(dest_addr="ieee.org", + x = ruleset.Input(dest_addr='example.com', headers={ - "Host": "ieee.org", - "cookie": "TS01247332=012f3506234413e6c5cb14e8c0" - "d5bf890fdd02481614b01cd6cd30911c6733e" - "3e6f79e72aa"}) + 'Host': 'example.com', + 'cookie': 'TS01247332=012f3506234413e6c5cb14e8c0' + 'd5bf890fdd02481614b01cd6cd30911c6733e' + '3e6f79e72aa'}) http_ua.send_request(x) assert('TS01247332=012f3506234413e6c5cb14e8c0d5bf890fdd02481614b01cd6c' 'd30911c6733e3e6f79e72aa' in - http_ua.request_object.headers["cookie"]) + http_ua.request_object.headers['cookie']) def test_cookies3(): """Test to make sure we retain cookies when user specified values are provided""" http_ua = http.HttpUA() - x = ruleset.Input(dest_addr="ieee.org", headers={"Host": "ieee.org"}) + x = ruleset.Input(dest_addr='example.com', headers={'Host': 'example.com'}) http_ua.send_request(x) - x = ruleset.Input(dest_addr="ieee.org", + x = ruleset.Input(dest_addr='example.com', headers={ - "Host": "ieee.org", - "cookie": "TS01247332=012f3506234413e6c5cb14e8c0d" - "5bf890fdd02481614b01cd6cd30911c6733e3e" - "6f79e72aa; XYZ=123"}) + 'Host': 'example.com', + 'cookie': 'TS01247332=012f3506234413e6c5cb14e8c0d' + '5bf890fdd02481614b01cd6cd30911c6733e3e' + '6f79e72aa; XYZ=123'}) http_ua.send_request(x) assert(set([chunk.split('=')[0].strip() for chunk in - http_ua.request_object.headers["cookie"].split(';')]) == + http_ua.request_object.headers['cookie'].split(';')]) == set(['XYZ', 'TS01247332'])) @@ -62,32 +61,31 @@ def test_cookies4(): """Test to make sure cookies are saved when user-specified cookie is added""" http_ua = http.HttpUA() - x = ruleset.Input(dest_addr="ieee.org", headers={"Host": "ieee.org"}) + x = ruleset.Input(dest_addr='example.com', headers={'Host': 'example.com'}) http_ua.send_request(x) - x = ruleset.Input(dest_addr="ieee.org", headers={"Host": "ieee.org", - "cookie": "XYZ=123"}) + x = ruleset.Input(dest_addr='example.com', headers={'Host': 'example.com', + 'cookie': 'XYZ=123'}) http_ua.send_request(x) - assert('XYZ' in http_ua.request_object.headers["cookie"]) + assert('XYZ' in http_ua.request_object.headers['cookie']) def test_raw1(): """Test to make sure a raw request will work with \r\n replacement""" - x = ruleset.Input(dest_addr="example.com", - raw_request="""GET / HTTP/1.1\r\n""" - """Host: example.com\r\n\r\n""") + x = ruleset.Input(dest_addr='example.com', + raw_request='GET / HTTP/1.1\r\n' \ + 'Host: example.com\r\n\r\n') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 -@pytest.mark.skip(reason='Integration failure, @chaimsanders for more info') def test_raw2(): """Test to make sure a raw request will work with actual seperators""" - x = ruleset.Input(dest_addr="example.com", raw_request="""GET / HTTP/1.1 + x = ruleset.Input(dest_addr='example.com', raw_request='''GET / HTTP/1.1 Host: example.com -""") +''') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -96,10 +94,10 @@ def test_raw2(): def test_both1(): """Test to make sure that if both encoded and raw are provided there is an error""" - x = ruleset.Input(dest_addr="example.com", - raw_request="""GET / HTTP/1.1\r\n""" - """Host: example.com\r\n\r\n""", - encoded_request="abc123==") + x = ruleset.Input(dest_addr='example.com', + raw_request='GET / HTTP/1.1\r\n' \ + 'Host: example.com\r\n\r\n', + encoded_request='abc123==') http_ua = http.HttpUA() with pytest.raises(errors.TestError): http_ua.send_request(x) @@ -107,9 +105,8 @@ def test_both1(): def test_encoded1(): """Test to make sure a encode request works""" - x = ruleset.Input(dest_addr="example.com", - encoded_request="R0VUIC8gSFRUUC8xLjFcclxuSG9zdDogZXhh" - "bXBsZS5jb21cclxuXHJcbg==") + x = ruleset.Input(dest_addr='example.com', + encoded_request='R0VUIC8gSFRUUC8xLjENCkhvc3Q6IGV4YW1wbGUuY29tDQoNCgo=') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -117,8 +114,8 @@ def test_encoded1(): def test_error1(): """Will return mail -- not header should cause error""" - x = ruleset.Input(dest_addr="Smtp.aol.com", port=25, - headers={"Host": "example.com"}) + x = ruleset.Input(dest_addr='Smtp.aol.com', port=25, + headers={'Host': 'example.com'}) http_ua = http.HttpUA() with pytest.raises(errors.TestError): http_ua.send_request(x) @@ -128,27 +125,27 @@ def test_error5(): """Invalid Header should cause error""" http_ua = http.HttpUA() with pytest.raises(errors.TestError): - http.HttpResponse("HTTP/1.1 200 OK\r\ntest\r\n", http_ua) + http.HttpResponse('HTTP/1.1 200 OK\r\ntest\r\n', http_ua) def test_error6(): """Valid HTTP response should process fine""" http_ua = http.HttpUA() - http.HttpResponse("HTTP/1.1 200 OK\r\ntest: hello\r\n", http_ua) + http.HttpResponse('HTTP/1.1 200 OK\r\ntest: hello\r\n', http_ua) def test_error7(): """Invalid content-type should fail""" http_ua = http.HttpUA() with pytest.raises(errors.TestError): - http.HttpResponse("HTTP/1.1 200 OK\r\nContent-Encoding: XYZ\r\n", + http.HttpResponse('HTTP/1.1 200 OK\r\nContent-Encoding: XYZ\r\n', http_ua) def test_error2(): """Invalid request should cause timeout""" - x = ruleset.Input(dest_addr="example.com", port=123, - headers={"Host": "example.com"}) + x = ruleset.Input(dest_addr='example.com', port=123, + headers={'Host': 'example.com'}) http_ua = http.HttpUA() with pytest.raises(errors.TestError): http_ua.send_request(x) @@ -158,22 +155,22 @@ def test_error3(): """Invalid status returned in response line""" http_ua = http.HttpUA() with pytest.raises(errors.TestError): - http.HttpResponse("HTTP1.1 test OK\r\n", http_ua) + http.HttpResponse('HTTP1.1 test OK\r\n', http_ua) def test_error4(): """Wrong number of elements returned in response line""" with pytest.raises(errors.TestError): http_ua = http.HttpUA() - http.HttpResponse("HTTP1.1 OK\r\n", http_ua) + http.HttpResponse('HTTP1.1 OK\r\n', http_ua) def test_invalid_gzip(): """Invalid gzip content""" http_ua = http.HttpUA() with pytest.raises(errors.TestError): - http.HttpResponse("HTTP1.1 200 OK\r\n" - "Content-Encoding: gzip\r\n\r\ninvalid data", + http.HttpResponse('HTTP1.1 200 OK\r\n' + 'Content-Encoding: gzip\r\n\r\ninvalid data', http_ua) @@ -181,8 +178,8 @@ def test_invalid_deflate(): """Invalid deflate content""" http_ua = http.HttpUA() with pytest.raises(errors.TestError): - http.HttpResponse("HTTP1.1 200 OK\r\n" - "Content-Encoding: deflate\r\n\r\ninvalid data", + http.HttpResponse('HTTP1.1 200 OK\r\n' + 'Content-Encoding: deflate\r\n\r\ninvalid data', http_ua) @@ -190,14 +187,14 @@ def test_invalid_brotli(): """Invalid brotli content""" http_ua = http.HttpUA() with pytest.raises(errors.TestError): - http.HttpResponse("HTTP1.1 200 OK\r\n" - "Content-Encoding: br\r\n\r\ninvalid data", + http.HttpResponse('HTTP1.1 200 OK\r\n' + 'Content-Encoding: br\r\n\r\ninvalid data', http_ua) def test1(): """Typical request specified should be valid""" - x = ruleset.Input(dest_addr="example.com", headers={"Host": "example.com"}) + x = ruleset.Input(dest_addr='example.com', headers={'Host': 'example.com'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -205,7 +202,7 @@ def test1(): def test2(): """Basic GET without Host on 1.1 - Expect 400""" - x = ruleset.Input(dest_addr="example.com", headers={}) + x = ruleset.Input(dest_addr='example.com', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 400 @@ -213,7 +210,7 @@ def test2(): def test3(): """Basic GET without Host on 1.0 - Expect 404 (server is VHosted)""" - x = ruleset.Input(dest_addr="example.com", version="HTTP/1.0", headers={}) + x = ruleset.Input(dest_addr='example.com', version='HTTP/1.0', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 404 @@ -221,25 +218,17 @@ def test3(): def test4(): """Basic GET wit Host on 1.0 - Expect 200""" - x = ruleset.Input(dest_addr="example.com", version="HTTP/1.0", - headers={"Host": "example.com"}) + x = ruleset.Input(dest_addr='example.com', version='HTTP/1.0', + headers={'Host': 'example.com'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 -def test5(): - """Basic GET without Host on 0.9 - Expect 505 version not supported""" - x = ruleset.Input(dest_addr="example.com", version="HTTP/0.9", headers={}) - http_ua = http.HttpUA() - http_ua.send_request(x) - assert http_ua.response_object.status == 505 - - def test6(): """Basic GET without Host with invalid version (request line) - Expect 505 not supported""" - x = ruleset.Input(dest_addr="example.com", version="HTTP/1.0 x", + x = ruleset.Input(dest_addr='example.com', version='HTTP/1.0 x', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) @@ -248,8 +237,8 @@ def test6(): def test7(): """TEST method which doesn't exist - Expect 501""" - x = ruleset.Input(method="TEST", dest_addr="example.com", - version="HTTP/1.0", headers={}) + x = ruleset.Input(method='TEST', dest_addr='example.com', + version='HTTP/1.0', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 501 @@ -257,8 +246,8 @@ def test7(): def test8(): """PROPFIND method which isn't allowed - Expect 405""" - x = ruleset.Input(method="PROPFIND", dest_addr="example.com", - version="HTTP/1.0", headers={}) + x = ruleset.Input(method='PROPFIND', dest_addr='example.com', + version='HTTP/1.0', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 405 @@ -266,8 +255,8 @@ def test8(): def test9(): """OPTIONS method - Expect 200""" - x = ruleset.Input(method="OPTIONS", dest_addr="example.com", - version="HTTP/1.0", headers={}) + x = ruleset.Input(method='OPTIONS', dest_addr='example.com', + version='HTTP/1.0', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -275,8 +264,8 @@ def test9(): def test10(): """HEAD method - Expect 200""" - x = ruleset.Input(method="HEAD", dest_addr="example.com", - version="HTTP/1.0", headers={"Host": "example.com"}) + x = ruleset.Input(method='HEAD', dest_addr='example.com', + version='HTTP/1.0', headers={'Host': 'example.com'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -284,8 +273,8 @@ def test10(): def test11(): """POST method no data - Expect 411""" - x = ruleset.Input(method="POST", dest_addr="example.com", - version="HTTP/1.0", headers={}) + x = ruleset.Input(method='POST', dest_addr='example.com', + version='HTTP/1.0', headers={}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 411 @@ -293,10 +282,10 @@ def test11(): def test12(): """POST method no data with content length header - Expect 200""" - x = ruleset.Input(method="POST", dest_addr="example.com", - version="HTTP/1.0", - headers={"Content-Length": "0", "Host": "example.com"}, - data="") + x = ruleset.Input(method='POST', dest_addr='example.com', + version='HTTP/1.0', + headers={'Content-Length': '0', 'Host': 'example.com'}, + data='') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -304,8 +293,8 @@ def test12(): def test13(): """Request https on port 80 (default)""" - x = ruleset.Input(protocol="https", dest_addr="example.com", - headers={"Host": "example.com"}) + x = ruleset.Input(protocol='https', dest_addr='example.com', + headers={'Host': 'example.com'}) http_ua = http.HttpUA() with pytest.raises(errors.TestError): http_ua.send_request(x) @@ -313,8 +302,8 @@ def test13(): def test14(): """Request https on port 443 should work""" - x = ruleset.Input(protocol="https", port=443, dest_addr="example.com", - headers={"Host": "example.com"}) + x = ruleset.Input(protocol='https', port=443, dest_addr='example.com', + headers={'Host': 'example.com'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -322,12 +311,12 @@ def test14(): def test15(): """Request with content-type and content-length specified""" - x = ruleset.Input(method="POST", protocol="http", port=80, - dest_addr="example.com", + x = ruleset.Input(method='POST', protocol='http', port=80, + dest_addr='example.com', headers={ - "Content-Type": "application/x-www-form-urlencoded", - "Host": "example.com", "Content-Length": "7"}, - data="test=hi") + 'Content-Type': 'application/x-www-form-urlencoded', + 'Host': 'example.com', 'Content-Length': '7'}, + data='test=hi') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -335,12 +324,12 @@ def test15(): def test16(): """Post request with content-type but not content-length""" - x = ruleset.Input(method="POST", protocol="http", port=80, - dest_addr="example.com", + x = ruleset.Input(method='POST', protocol='http', port=80, + dest_addr='example.com', headers={ - "Content-Type": "application/x-www-form-urlencoded", - "Host": "example.com"}, - data="test=hi") + 'Content-Type': 'application/x-www-form-urlencoded', + 'Host': 'example.com'}, + data='test=hi') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -348,9 +337,9 @@ def test16(): def test17(): """Post request with no content-type AND no content-length""" - x = ruleset.Input(method="POST", protocol="http", port=80, uri="/", - dest_addr="example.com", - headers={"Host": "example.com"}, data="test=hi") + x = ruleset.Input(method='POST', protocol='http', port=80, uri='/', + dest_addr='example.com', + headers={'Host': 'example.com'}, data='test=hi') http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 @@ -358,53 +347,53 @@ def test17(): def test18(): """Send a request and check that the space is encoded automagically""" - x = ruleset.Input(method="POST", protocol="http", port=80, - uri="/", dest_addr="example.com", - headers={"Host": "example.com"}, - data="test=hit f&test2=hello") + x = ruleset.Input(method='POST', protocol='http', port=80, + uri='/', dest_addr='example.com', + headers={'Host': 'example.com'}, + data='test=hit f&test2=hello') http_ua = http.HttpUA() http_ua.send_request(x) - assert http_ua.request_object.data == "test=hit+f&test2=hello" + assert http_ua.request_object.data == 'test=hit+f&test2=hello' def test19(): """Send a raw question mark and test it is encoded automagically""" - x = ruleset.Input(method="POST", protocol="http", port=80, uri="/", - dest_addr="example.com", - headers={"Host": "example.com"}, data="test=hello?x") + x = ruleset.Input(method='POST', protocol='http', port=80, uri='/', + dest_addr='example.com', + headers={'Host': 'example.com'}, data='test=hello?x') http_ua = http.HttpUA() http_ua.send_request(x) - assert http_ua.request_object.data == "test=hello%3Fx" + assert http_ua.request_object.data == 'test=hello%3Fx' def test_brotli(): """Accept-Encoding br""" - x = ruleset.Input(dest_addr="httpbin.org", uri="/brotli", - headers={"Host": "httpbin.org", - "Accept-Encoding": "br"}) + x = ruleset.Input(dest_addr='httpbin.org', uri='/brotli', + headers={'Host': 'httpbin.org', + 'Accept-Encoding': 'br'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 - assert http_ua.response_object.headers["content-encoding"] == "br" + assert http_ua.response_object.headers['content-encoding'] == 'br' def test_deflate(): """Accept-Encoding deflate""" - x = ruleset.Input(dest_addr="example.com", version="HTTP/1.0", - headers={"Host": "example.com", - "Accept-Encoding": "deflate"}) + x = ruleset.Input(dest_addr='example.com', version='HTTP/1.0', + headers={'Host': 'example.com', + 'Accept-Encoding': 'deflate'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 - assert http_ua.response_object.headers["content-encoding"] == "deflate" + assert http_ua.response_object.headers['content-encoding'] == 'deflate' def test_gzip(): """Accept-Encoding gzip""" - x = ruleset.Input(dest_addr="example.com", version="HTTP/1.0", - headers={"Host": "example.com", - "Accept-Encoding": "gzip"}) + x = ruleset.Input(dest_addr='example.com', version='HTTP/1.0', + headers={'Host': 'example.com', + 'Accept-Encoding': 'gzip'}) http_ua = http.HttpUA() http_ua.send_request(x) assert http_ua.response_object.status == 200 - assert http_ua.response_object.headers["content-encoding"] == "gzip" + assert http_ua.response_object.headers['content-encoding'] == 'gzip' diff --git a/test/integration/test_logcontains.py b/test/integration/test_logcontains.py index b76f77b..4549396 100644 --- a/test/integration/test_logcontains.py +++ b/test/integration/test_logcontains.py @@ -27,13 +27,13 @@ def logchecker_obj(): return LoggerTestObj() -def test_logcontains_withlog(logchecker_obj, ruleset, test): +def test_logcontains_withlog(logchecker_obj, test): runner = testrunner.TestRunner() for stage in test.stages: runner.run_stage(stage, logchecker_obj) -def test_logcontains_nolog(logchecker_obj, ruleset, test): +def test_logcontains_nolog(logchecker_obj, test): logchecker_obj.do_nothing = True runner = testrunner.TestRunner() with(pytest.raises(AssertionError)): diff --git a/test/unit/test_response.py b/test/unit/test_response.py index fc92776..e673ee6 100644 --- a/test/unit/test_response.py +++ b/test/unit/test_response.py @@ -16,7 +16,7 @@ def test_response_failure(): http_ua = http.HttpUA() with pytest.raises(AssertionError): runner.test_response(http.HttpResponse( - "HTTP/1.1 200 OK\r\n\r\ncat", http_ua), + 'HTTP/1.1 200 OK\r\n\r\ncat', http_ua), re.compile('dog')) @@ -24,5 +24,5 @@ def test_response_success(): runner = testrunner.TestRunner() http_ua = http.HttpUA() runner.test_response(http.HttpResponse( - "HTTP/1.1 200 OK\r\n\r\ncat", http_ua), + 'HTTP/1.1 200 OK\r\n\r\ncat', http_ua), re.compile('cat')) diff --git a/test/unit/test_ruleset.py b/test/unit/test_ruleset.py index d2d5035..763773a 100644 --- a/test/unit/test_ruleset.py +++ b/test/unit/test_ruleset.py @@ -25,7 +25,7 @@ def test_input(): dictionary = {} dictionary['headers'] = headers input_2 = ruleset.Input(**dictionary) - assert(len(input_2.headers.keys()) == 2) + assert(len(list(input_2.headers.keys())) == 2) dictionary_2 = {'random_key': 'bar'} with pytest.raises(TypeError): ruleset.Input(**dictionary_2) @@ -33,11 +33,12 @@ def test_input(): def test_testobj(): with pytest.raises(KeyError) as excinfo: - ruleset.Test({}, {}) + ruleset.Test({}, {}, {}) assert 'test_title' in str(excinfo.value) + ruleset_meta = {'name': 'test-name.yaml'} stages_dict = {'test_title': 1, 'stages': [{'stage': {'output': {'log_contains': 'foo'}, 'input': {}}}]} - ruleset.Test(stages_dict, {}) + ruleset.Test(stages_dict, {}, ruleset_meta) def test_ruleset():