Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions src/inference_endpoint/endpoint_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from importlib import import_module
from pathlib import Path
from typing import Annotated, Any, Literal
from urllib.parse import urlparse

import cyclopts
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
Expand All @@ -45,6 +46,21 @@
)
from .utils import get_ephemeral_port_limit, get_ephemeral_port_range


def _endpoint_destination(url: str) -> tuple[str | None, int]:
"""Resolve an endpoint URL to its ``(host, port)`` destination identity.

Used to count distinct destinations for the ephemeral-port budget. A
bare ``host:port`` (no scheme) is parsed as http so the host/port land
in the right fields instead of collapsing to ``(None, None)``; a missing
port resolves to the scheme default (443 for https, else 80) so that
http and https to the same host count as distinct destinations.
"""
parsed = urlparse(url if "://" in url else f"http://{url}")
port = parsed.port or (443 if parsed.scheme == "https" else 80)
return (parsed.hostname, port)


ADAPTER_MAP = {
APIType.OPENAI: "inference_endpoint.openai.openai_msgspec_adapter.OpenAIMsgspecAdapter",
APIType.OPENAI_COMPLETIONS: "inference_endpoint.openai.completions_adapter.OpenAITextCompletionsAdapter",
Expand Down Expand Up @@ -253,13 +269,28 @@ def _resolve_defaults(self) -> HTTPClientConfig:
system_maximum_ports = high - low + 1
available_ports = get_ephemeral_port_limit()

# The ephemeral-port limit is per (source IP, destination) pair: the
# TCP 4-tuple (src_ip, src_port, dst_ip, dst_port) only needs to be
# unique, so the kernel reuses local ports across distinct
# destinations. Each distinct endpoint therefore has its own
# ~`available_ports` budget. Workers are round-robined across
# endpoints, so scale the cap by the distinct-endpoint count;
# otherwise concurrency is needlessly throttled to a single
# endpoint's budget when several endpoints are configured.
distinct_endpoints = len(
{_endpoint_destination(u) for u in self.endpoint_urls}
)
port_budget = available_ports * max(1, distinct_endpoints)

if self.max_connections == -1:
object.__setattr__(self, "max_connections", available_ports)
object.__setattr__(self, "max_connections", port_budget)
elif self.max_connections > 0:
if self.max_connections > available_ports:
if self.max_connections > port_budget:
raise RuntimeError(
f"--max-connections ({self.max_connections}) exceeds ephemeral port limit ({available_ports}). "
f"Either reduce --max-connections or increase system port limit."
f"--max-connections ({self.max_connections}) exceeds the ephemeral "
f"port budget ({port_budget} = {available_ports} ports x "
f"{max(1, distinct_endpoints)} distinct endpoint(s)). Reduce "
f"--max-connections, add endpoints, or raise the system port range."
)
Comment thread
viraatc marked this conversation as resolved.

if self.min_required_connections == -1:
Expand Down
122 changes: 122 additions & 0 deletions tests/unit/endpoint_client/test_http_client_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from unittest.mock import patch

import pytest
from inference_endpoint.endpoint_client import config as cfg
from inference_endpoint.endpoint_client.cpu_affinity import UnsupportedPlatformError

Expand Down Expand Up @@ -43,3 +44,124 @@ def test_http_client_config_constructs_when_numa_unsupported(self):
):
c = cfg.HTTPClientConfig()
assert c.num_workers == 10


class TestEndpointBudgetScaling:
"""max_connections budget scales with the number of distinct endpoints.

The ephemeral-port limit is per (source IP, destination) pair, so each
distinct endpoint contributes its own ~available_ports budget. num_workers
is pinned (>=1) so config resolution skips the NUMA auto-probe.
"""

def test_auto_budget_scales_with_distinct_endpoints(self):
with (
patch.object(cfg, "get_ephemeral_port_range", return_value=(32768, 60999)),
patch.object(cfg, "get_ephemeral_port_limit", return_value=10000),
):
c = cfg.HTTPClientConfig(
endpoint_urls=[
"http://10.0.0.1:8000",
"http://10.0.0.2:8000",
"http://10.0.0.3:8000",
],
num_workers=10,
)
assert c.max_connections == 30000 # 10000 ports x 3 distinct endpoints

def test_single_endpoint_budget_unchanged(self):
with (
patch.object(cfg, "get_ephemeral_port_range", return_value=(32768, 60999)),
patch.object(cfg, "get_ephemeral_port_limit", return_value=10000),
):
c = cfg.HTTPClientConfig(
endpoint_urls=["http://10.0.0.1:8000"], num_workers=10
)
assert c.max_connections == 10000 # single endpoint -> unchanged

def test_duplicate_endpoints_do_not_inflate_budget(self):
# Same (host, port) repeated (even with different paths) is one
# destination -> one budget, since the 4-tuple ignores path.
with (
patch.object(cfg, "get_ephemeral_port_range", return_value=(32768, 60999)),
patch.object(cfg, "get_ephemeral_port_limit", return_value=10000),
):
c = cfg.HTTPClientConfig(
endpoint_urls=[
"http://10.0.0.1:8000/v1/a",
"http://10.0.0.1:8000/v1/b",
"http://10.0.0.1:8000",
],
num_workers=10,
)
assert c.max_connections == 10000 # 1 distinct (host, port)

def test_explicit_max_connections_within_scaled_budget_ok(self):
# 25000 exceeds one endpoint's budget (10000) but fits 3 (30000).
with (
patch.object(cfg, "get_ephemeral_port_range", return_value=(32768, 60999)),
patch.object(cfg, "get_ephemeral_port_limit", return_value=10000),
):
c = cfg.HTTPClientConfig(
endpoint_urls=[
"http://10.0.0.1:8000",
"http://10.0.0.2:8000",
"http://10.0.0.3:8000",
],
num_workers=10,
max_connections=25000,
)
assert c.max_connections == 25000

def test_explicit_max_connections_exceeding_scaled_budget_raises(self):
with (
patch.object(cfg, "get_ephemeral_port_range", return_value=(32768, 60999)),
patch.object(cfg, "get_ephemeral_port_limit", return_value=10000),
):
with pytest.raises(RuntimeError, match="exceeds the ephemeral"):
cfg.HTTPClientConfig(
endpoint_urls=["http://10.0.0.1:8000", "http://10.0.0.2:8000"],
num_workers=10,
max_connections=40000, # > 2 x 10000
)


@pytest.mark.unit
class TestEndpointDestination:
"""Distinct-destination identity used for the ephemeral-port budget."""

@pytest.mark.parametrize(
("url", "expected"),
[
("http://10.0.0.1:8000", ("10.0.0.1", 8000)),
("https://host:9000", ("host", 9000)),
("http://host", ("host", 80)),
("https://host", ("host", 443)),
("10.0.0.1:8000", ("10.0.0.1", 8000)), # schemeless host:port
("host:9000", ("host", 9000)),
("http://[::1]:8000", ("::1", 8000)), # IPv6
],
)
def test_resolves_host_and_port(self, url, expected):
assert cfg._endpoint_destination(url) == expected

def test_schemeless_urls_count_as_distinct(self):
# Bare host:port must not collapse to (None, None) and inflate to 1.
keys = {cfg._endpoint_destination(u) for u in ("a:8000", "b:8000")}
assert len(keys) == 2

def test_http_and_https_same_host_are_distinct(self):
# Default ports differ (80 vs 443) -> two destinations, not one.
keys = {cfg._endpoint_destination(u) for u in ("http://h", "https://h")}
assert len(keys) == 2

def test_schemeless_budget_scales_with_distinct_hosts(self):
with (
patch.object(cfg, "get_ephemeral_port_range", return_value=(32768, 60999)),
patch.object(cfg, "get_ephemeral_port_limit", return_value=10000),
):
c = cfg.HTTPClientConfig(
endpoint_urls=["10.0.0.1:8000", "10.0.0.2:8000"],
num_workers=10,
)
assert c.max_connections == 20000 # 10000 ports x 2 distinct hosts
Loading