Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/skillspector/input_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from __future__ import annotations

import ipaddress
import re
import shutil
import socket
import subprocess
Expand Down Expand Up @@ -160,13 +161,22 @@ def _is_file_url(self, path: str) -> bool:
return False
return not self._is_git_url(path)

def _extract_scp_host(self, url: str) -> str | None:
"""Return the host from an scp-style Git URL, or None if not scp form."""
if "://" in url:
return None
m = re.match(r"^[^@/]+@([^:/]+):.+$", url)
return m.group(1) if m else None

def _validate_url_host(self, url: str, allowed_hosts: frozenset[str]) -> str:
"""Validate URL host against allowlist and SSRF protections.

Returns the hostname on success, raises ValueError on blocked URLs.
"""
parsed = urlparse(url)
host = parsed.hostname or ""
if not host:
host = self._extract_scp_host(url) or ""
if not host:
raise ValueError(f"URL has no valid hostname: {url}")
if not any(host == allowed or host.endswith("." + allowed) for allowed in allowed_hosts):
Expand Down
54 changes: 53 additions & 1 deletion tests/unit/test_input_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
"""Tests for skillspector input_handler (resolve directory, zip, single file)."""

from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from skillspector.input_handler import InputHandler
from skillspector.input_handler import ALLOWED_GIT_HOSTS, InputHandler


def test_resolve_directory(tmp_path: Path) -> None:
Expand Down Expand Up @@ -94,3 +95,54 @@ def test_cleanup_idempotent(tmp_path: Path) -> None:
handler.resolve(str(tmp_path / "a.md"))
handler.cleanup()
handler.cleanup()


def test_scp_url_is_git_url() -> None:
"""scp-style SSH URL is recognised as a Git URL."""
assert InputHandler()._is_git_url("git@github.com:org/repo.git") is True


def test_validate_url_host_scp_extracts_github() -> None:
"""_validate_url_host extracts 'github.com' from an scp-style URL."""
host = InputHandler()._validate_url_host("git@github.com:org/repo.git", ALLOWED_GIT_HOSTS)
assert host == "github.com"


def test_scp_valid_host_clones() -> None:
"""resolve() calls git clone with the scp URL when the host is allowed."""
handler = InputHandler()
try:
with patch(
"skillspector.input_handler.subprocess.run", return_value=MagicMock()
) as mock_run:
handler.resolve("git@github.com:org/repo.git")
assert mock_run.called
call_args = mock_run.call_args[0][0]
assert "git@github.com:org/repo.git" in call_args
finally:
handler.cleanup()


def test_scp_disallowed_host_raises() -> None:
"""_validate_url_host rejects an scp URL whose host is not in the allowlist."""
with pytest.raises(ValueError, match="not in the allowed hosts"):
InputHandler()._validate_url_host("git@malicious.internal:org/repo.git", ALLOWED_GIT_HOSTS)


def test_scp_private_ip_raises() -> None:
"""_validate_url_host rejects an scp URL whose extracted host is not in the allowlist."""
with pytest.raises(ValueError):
InputHandler()._validate_url_host("git@169.254.169.254:org/repo.git", ALLOWED_GIT_HOSTS)


def test_https_url_unchanged() -> None:
"""https URLs continue to extract the host via urlparse without hitting the scp fallback."""
host = InputHandler()._validate_url_host("https://github.com/org/repo.git", ALLOWED_GIT_HOSTS)
assert host == "github.com"


def test_scp_ssrf_gate_fires() -> None:
"""SSRF gate raises ValueError for an scp URL whose host resolves to a private IP."""
with patch("skillspector.input_handler._is_private_ip", return_value=True):
with pytest.raises(ValueError, match="private/internal IP"):
InputHandler()._validate_url_host("git@github.com:org/repo.git", ALLOWED_GIT_HOSTS)