Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 89 additions & 66 deletions src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
from rich.panel import Panel
from rich.align import Align
from rich.table import Table
from ._download_security import (
is_https_or_localhost_http,
)
from .shared_infra import (
install_shared_infra as _install_shared_infra_impl,
refresh_shared_templates as _refresh_shared_templates_impl,
Expand Down Expand Up @@ -1110,49 +1113,58 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None:

# Try as URL (http/https)
if source.startswith("http://") or source.startswith("https://"):
from ipaddress import ip_address
from urllib.parse import urlparse
from functools import partial
from urllib.parse import urlparse as _urlparse

from specify_cli._download_security import read_response_limited as _read_response_limited
from specify_cli.authentication.http import open_url as _open_url

parsed_src = urlparse(source)
src_host = parsed_src.hostname or ""
src_loopback = src_host == "localhost"
if not src_loopback:
try:
src_loopback = ip_address(src_host).is_loopback
except ValueError:
# Host is not an IP literal (e.g., a DNS name); keep default non-loopback.
pass
if parsed_src.scheme != "https" and not (parsed_src.scheme == "http" and src_loopback):
console.print("[red]Error:[/red] Only HTTPS URLs are allowed, except HTTP for localhost.")
if not is_https_or_localhost_http(source):
console.print(
"[red]Error:[/red] URL must be a valid URL with a host and use HTTPS. "
"HTTP is only allowed for localhost, 127.0.0.1, and ::1."
)
raise typer.Exit(1)

from specify_cli._github_http import resolve_github_release_asset_api_url as _resolve_gh_asset

_wf_url_extra_headers = None
_resolved_wf_url = _resolve_gh_asset(source, _open_url, timeout=30)
_resolved_wf_url = _resolve_gh_asset(
source, partial(_open_url, strict_redirects=True), timeout=30
)
if _resolved_wf_url:
source = _resolved_wf_url
_wf_url_extra_headers = {"Accept": "application/octet-stream"}

import tempfile
try:
with _open_url(source, timeout=30, extra_headers=_wf_url_extra_headers) as resp:
with _open_url(
source,
timeout=30,
extra_headers=_wf_url_extra_headers,
strict_redirects=True,
) as resp:
final_url = resp.geturl()
final_parsed = urlparse(final_url)
final_host = final_parsed.hostname or ""
final_lb = final_host == "localhost"
if not final_lb:
try:
final_lb = ip_address(final_host).is_loopback
except ValueError:
# Redirect host is not an IP literal; keep loopback as determined above.
pass
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_lb):
console.print(f"[red]Error:[/red] URL redirected to non-HTTPS: {final_url}")
if not is_https_or_localhost_http(final_url):
final_parsed = _urlparse(final_url)
if not final_parsed.hostname:
console.print(
f"[red]Error:[/red] URL redirected to a URL with no hostname: {final_url}"
)
else:
console.print(
"[red]Error:[/red] URL redirected to a URL without HTTPS "
"(HTTP is allowed only for localhost, 127.0.0.1, and ::1): "
f"{final_url}"
)
raise typer.Exit(1)
with tempfile.NamedTemporaryFile(suffix=".yml", delete=False) as tmp:
tmp.write(resp.read())
tmp.write(
_read_response_limited(
resp,
label=f"workflow {source}",
)
)
tmp_path = Path(tmp.name)
except typer.Exit:
raise
Expand Down Expand Up @@ -1202,24 +1214,11 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None:
raise typer.Exit(1)

# Validate URL scheme (HTTPS required, HTTP allowed for localhost only)
from ipaddress import ip_address
from urllib.parse import urlparse

parsed_url = urlparse(workflow_url)
url_host = parsed_url.hostname or ""
is_loopback = False
if url_host == "localhost":
is_loopback = True
else:
try:
is_loopback = ip_address(url_host).is_loopback
except ValueError:
# Host is not an IP literal (e.g., a regular hostname); treat as non-loopback.
pass
if parsed_url.scheme != "https" and not (parsed_url.scheme == "http" and is_loopback):
if not is_https_or_localhost_http(workflow_url):
console.print(
f"[red]Error:[/red] Workflow '{source}' has an invalid install URL. "
"Only HTTPS URLs are allowed, except HTTP for localhost/loopback."
"It must be a valid URL with a host and use HTTPS; HTTP is only allowed "
"for localhost, 127.0.0.1, and ::1."
)
raise typer.Exit(1)

Expand All @@ -1233,37 +1232,52 @@ def _validate_and_install_local(yaml_path: Path, source_label: str) -> None:
workflow_file = workflow_dir / "workflow.yml"

try:
from functools import partial
from urllib.parse import urlparse as _urlparse

from specify_cli.authentication.http import open_url as _open_url
from specify_cli._github_http import resolve_github_release_asset_api_url as _resolve_gh_asset
from specify_cli._download_security import read_response_limited as _read_response_limited

_wf_cat_extra_headers = None
_resolved_workflow_url = _resolve_gh_asset(workflow_url, _open_url, timeout=30)
_resolved_workflow_url = _resolve_gh_asset(
workflow_url, partial(_open_url, strict_redirects=True), timeout=30
)
if _resolved_workflow_url:
workflow_url = _resolved_workflow_url
_wf_cat_extra_headers = {"Accept": "application/octet-stream"}

workflow_dir.mkdir(parents=True, exist_ok=True)
with _open_url(workflow_url, timeout=30, extra_headers=_wf_cat_extra_headers) as response:
with _open_url(
workflow_url,
timeout=30,
extra_headers=_wf_cat_extra_headers,
strict_redirects=True,
) as response:
# Validate final URL after redirects
final_url = response.geturl()
final_parsed = urlparse(final_url)
final_host = final_parsed.hostname or ""
final_loopback = final_host == "localhost"
if not final_loopback:
try:
final_loopback = ip_address(final_host).is_loopback
except ValueError:
# Host is not an IP literal (e.g., a regular hostname); treat as non-loopback.
pass
if final_parsed.scheme != "https" and not (final_parsed.scheme == "http" and final_loopback):
if not is_https_or_localhost_http(final_url):
if workflow_dir.exists():
import shutil
shutil.rmtree(workflow_dir, ignore_errors=True)
console.print(
f"[red]Error:[/red] Workflow '{source}' redirected to non-HTTPS URL: {final_url}"
)
final_parsed = _urlparse(final_url)
if not final_parsed.hostname:
console.print(
f"[red]Error:[/red] Workflow '{source}' redirected to a URL with no hostname: {final_url}"
)
else:
console.print(
f"[red]Error:[/red] Workflow '{source}' redirected to a URL without HTTPS "
"(HTTP is allowed only for localhost, 127.0.0.1, and ::1): "
f"{final_url}"
)
raise typer.Exit(1)
workflow_file.write_bytes(response.read())
workflow_file.write_bytes(
_read_response_limited(
response,
label=f"workflow '{source}' download",
)
)
except Exception as exc:
if workflow_dir.exists():
import shutil
Expand Down Expand Up @@ -1687,26 +1701,35 @@ def workflow_step_add(
raise typer.Exit(1)

from urllib.parse import urlparse
from specify_cli._download_security import read_response_limited as _read_response_limited
from specify_cli.authentication.http import open_url as _open_url

def _safe_fetch(url: str) -> bytes:
parsed = urlparse(url)
is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1")
if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost):
raise ValueError(f"Refusing to fetch from non-HTTPS URL: {url}")
if not parsed.hostname:
raise ValueError(f"Refusing to fetch from URL with no hostname: {url}")
with _open_url(url, timeout=30) as resp:
if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost):
raise ValueError(
"Refusing to fetch from URL without HTTPS "
"(HTTP is allowed only for localhost, 127.0.0.1, and ::1): "
f"{url}"
)
with _open_url(url, timeout=30, strict_redirects=True) as resp:
final_url = resp.geturl()
final_parsed = urlparse(final_url)
final_is_localhost = final_parsed.hostname in ("localhost", "127.0.0.1", "::1")
if not final_parsed.hostname:
raise ValueError(f"Redirect to URL with no hostname: {final_url}")
if final_parsed.scheme != "https" and not (
final_parsed.scheme == "http" and final_is_localhost
):
raise ValueError(f"Redirect to non-HTTPS URL: {final_url}")
if not final_parsed.hostname:
raise ValueError(f"Redirect to URL with no hostname: {final_url}")
return resp.read()
raise ValueError(
"Redirect to URL without HTTPS "
"(HTTP is allowed only for localhost, 127.0.0.1, and ::1): "
f"{final_url}"
)
return _read_response_limited(resp, label=f"workflow step {url}")

_validate_step_id_or_exit(step_id)

Expand Down
Loading