From d9a99fc8f83200fd0977cf0874f605fef0eb3ade Mon Sep 17 00:00:00 2001 From: thorinaboenke Date: Tue, 17 Mar 2026 14:08:45 +0100 Subject: [PATCH] add check if port is in use to webservexecutor --- .../executors/http/webservexecutor.py | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/attackmate/executors/http/webservexecutor.py b/src/attackmate/executors/http/webservexecutor.py index 39f5544c..c9562929 100644 --- a/src/attackmate/executors/http/webservexecutor.py +++ b/src/attackmate/executors/http/webservexecutor.py @@ -3,14 +3,14 @@ ============================================ Serves files via HTTP """ - +import magic +import socket from attackmate.executors.baseexecutor import BaseExecutor from attackmate.result import Result from attackmate.execexception import ExecException from attackmate.schemas.http import WebServCommand from attackmate.executors.features.cmdvars import CmdVars from http.server import HTTPServer, BaseHTTPRequestHandler -import magic from attackmate.executors.executor_factory import executor_factory @@ -57,8 +57,27 @@ class WebServExecutor(BaseExecutor): def log_command(self, command: WebServCommand): self.logger.info(f'Serving {command.local_path} via HTTP on Port {command.port}') + def _port_in_use(self, address: str, port: int) -> bool: + # IPv4 and TCP + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # bind to the port even if it's still in TIME_WAIT + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + # bind to the port to see if it can be used + s.bind((address, port)) + return False + except OSError: + return True + async def _exec_cmd(self, command: WebServCommand) -> Result: - address = (command.address, CmdVars.variable_to_int('Port', command.port)) + port = CmdVars.variable_to_int('Port', command.port) + address = (command.address, port) + + if self._port_in_use(command.address, port): + raise ExecException( + f'Cannot start webserv: port {port} on {command.address} is already in use' + ) + try: server = WebServe(address, WebRequestHandler, local_path=command.local_path) if command.keep_serving: