From b28898a2b80d3fc58b1ec26e4c199e1d820514bd Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 13 Oct 2025 14:56:34 +0545 Subject: [PATCH 01/22] Add SGLang Router Min Support --- src/dstack/_internal/core/backends/base/compute.py | 7 ++++++- src/dstack/_internal/core/models/gateways.py | 2 ++ .../_internal/proxy/gateway/resources/nginx/service.jinja2 | 2 +- src/dstack/_internal/proxy/gateway/routers/registry.py | 4 ++++ src/dstack/_internal/proxy/gateway/schemas/registry.py | 1 + src/dstack/_internal/proxy/gateway/services/nginx.py | 1 + src/dstack/_internal/proxy/gateway/services/registry.py | 3 +++ src/dstack/_internal/proxy/lib/models.py | 1 + src/dstack/_internal/server/services/gateways/client.py | 2 ++ src/dstack/_internal/server/services/services/__init__.py | 7 +++++++ 10 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 6a1f6af4a4..931b097aa5 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -979,7 +979,12 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # For testing + logger.debug( + "Using test gateway wheel: https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" + ) + return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/core/models/gateways.py b/src/dstack/_internal/core/models/gateways.py index 6a480b5806..d7a7fdd2c6 100644 --- a/src/dstack/_internal/core/models/gateways.py +++ b/src/dstack/_internal/core/models/gateways.py @@ -44,12 +44,14 @@ class GatewayCertificate(CoreModel): ] +# https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 class GatewayConfiguration(CoreModel): type: Literal["gateway"] = "gateway" name: Annotated[Optional[str], Field(description="The gateway name")] = None default: Annotated[bool, Field(description="Make the gateway default")] = False backend: Annotated[BackendType, Field(description="The gateway backend")] region: Annotated[str, Field(description="The gateway region")] + router: Annotated[Optional[str], Field(description="The router type, e.g. `sglang`")] = None domain: Annotated[ Optional[str], Field(description="The gateway domain, e.g. `example.com`") ] = None diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 index b096fa80e6..5da4b62820 100644 --- a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 @@ -5,7 +5,7 @@ limit_req_zone {{ zone.key }} zone={{ zone.name }}:10m rate={{ zone.rpm }}r/m; {% if replicas %} upstream {{ domain }}.upstream { {% for replica in replicas %} - server unix:{{ replica.socket }}; # replica {{ replica.id }} + server unix:{{ replica.socket }}; # replica {{ replica.id }} router={{ router }} {% endfor %} } {% else %} diff --git a/src/dstack/_internal/proxy/gateway/routers/registry.py b/src/dstack/_internal/proxy/gateway/routers/registry.py index e1bfa4ff23..5d0aa2953b 100644 --- a/src/dstack/_internal/proxy/gateway/routers/registry.py +++ b/src/dstack/_internal/proxy/gateway/routers/registry.py @@ -13,8 +13,10 @@ from dstack._internal.proxy.gateway.services.nginx import Nginx from dstack._internal.proxy.lib.deps import get_service_connection_pool from dstack._internal.proxy.lib.services.service_connection import ServiceConnectionPool +from dstack._internal.utils.logging import get_logger router = APIRouter(prefix="/{project_name}") +logger = get_logger(__name__) @router.post("/services/register") @@ -25,6 +27,7 @@ async def register_service( nginx: Annotated[Nginx, Depends(get_nginx)], service_conn_pool: Annotated[ServiceConnectionPool, Depends(get_service_connection_pool)], ) -> OkResponse: + logger.debug(f"[SglangRouterTesting] Gateway API Reception Router: {body.router}") await registry_services.register_service( project_name=project_name.lower(), run_name=body.run_name.lower(), @@ -36,6 +39,7 @@ async def register_service( model=body.options.openai.model if body.options.openai is not None else None, ssh_private_key=body.ssh_private_key, repo=repo, + router=body.router, nginx=nginx, service_conn_pool=service_conn_pool, ) diff --git a/src/dstack/_internal/proxy/gateway/schemas/registry.py b/src/dstack/_internal/proxy/gateway/schemas/registry.py index 8ab69b6af5..117152a955 100644 --- a/src/dstack/_internal/proxy/gateway/schemas/registry.py +++ b/src/dstack/_internal/proxy/gateway/schemas/registry.py @@ -44,6 +44,7 @@ class RegisterServiceRequest(BaseModel): options: Options ssh_private_key: str rate_limits: tuple[RateLimit, ...] = () + router: Optional[str] = None class RegisterReplicaRequest(BaseModel): diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 2d3e755ac2..f51c94a02b 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -64,6 +64,7 @@ class ServiceConfig(SiteConfig): limit_req_zones: list[LimitReqZoneConfig] locations: list[LocationConfig] replicas: list[ReplicaConfig] + router: Optional[str] = None class ModelEntrypointConfig(SiteConfig): diff --git a/src/dstack/_internal/proxy/gateway/services/registry.py b/src/dstack/_internal/proxy/gateway/services/registry.py index 3ea412d79d..ce7cbca2e9 100644 --- a/src/dstack/_internal/proxy/gateway/services/registry.py +++ b/src/dstack/_internal/proxy/gateway/services/registry.py @@ -44,6 +44,7 @@ async def register_service( repo: GatewayProxyRepo, nginx: Nginx, service_conn_pool: ServiceConnectionPool, + router: Optional[str] = None, ) -> None: service = models.Service( project_name=project_name, @@ -54,6 +55,7 @@ async def register_service( auth=auth, client_max_body_size=client_max_body_size, replicas=(), + router=router, ) async with lock: @@ -335,6 +337,7 @@ async def get_nginx_service_config( limit_req_zones=limit_req_zones, locations=locations, replicas=sorted(replicas, key=lambda r: r.id), # sort for reproducible configs + router=service.router, ) diff --git a/src/dstack/_internal/proxy/lib/models.py b/src/dstack/_internal/proxy/lib/models.py index 5cb5471d81..4e7046167f 100644 --- a/src/dstack/_internal/proxy/lib/models.py +++ b/src/dstack/_internal/proxy/lib/models.py @@ -57,6 +57,7 @@ class Service(ImmutableModel): client_max_body_size: int # only enforced on gateways strip_prefix: bool = True # only used in-server replicas: tuple[Replica, ...] + router: Optional[str] = None @property def domain_safe(self) -> str: diff --git a/src/dstack/_internal/server/services/gateways/client.py b/src/dstack/_internal/server/services/gateways/client.py index f8c0900792..33bfee3f5f 100644 --- a/src/dstack/_internal/server/services/gateways/client.py +++ b/src/dstack/_internal/server/services/gateways/client.py @@ -45,6 +45,7 @@ async def register_service( options: dict, rate_limits: list[RateLimit], ssh_private_key: str, + router: Optional[str] = None, ): if "openai" in options: entrypoint = f"gateway.{domain.split('.', maxsplit=1)[1]}" @@ -59,6 +60,7 @@ async def register_service( "options": options, "rate_limits": [limit.dict() for limit in rate_limits], "ssh_private_key": ssh_private_key, + "router": router, } resp = await self._client.post( self._url(f"/api/registry/{project}/services/register"), json=payload diff --git a/src/dstack/_internal/server/services/services/__init__.py b/src/dstack/_internal/server/services/services/__init__.py index a8089a93a9..e6f2b19aac 100644 --- a/src/dstack/_internal/server/services/services/__init__.py +++ b/src/dstack/_internal/server/services/services/__init__.py @@ -82,6 +82,11 @@ async def _register_service_in_gateway( gateway_configuration = get_gateway_configuration(gateway) service_https = _get_service_https(run_spec, gateway_configuration) + router = gateway_configuration.router + logger.debug(f"[SglangRouterTesting] Configuration parsing: {router}") + logger.debug( + f"[SglangRouterTesting] Configuration parsing dict: {gateway_configuration.dict()}" + ) service_protocol = "https" if service_https else "http" if service_https and gateway_configuration.certificate is None: @@ -107,6 +112,7 @@ async def _register_service_in_gateway( conn = await get_or_add_gateway_connection(session, gateway.id) try: logger.debug("%s: registering service as %s", fmt(run_model), service_spec.url) + logger.debug(f"[SglangRouterTesting] Service Registration Router: {router}") async with conn.client() as client: await client.register_service( project=run_model.project.name, @@ -119,6 +125,7 @@ async def _register_service_in_gateway( options=service_spec.options, rate_limits=run_spec.configuration.rate_limits, ssh_private_key=run_model.project.ssh_private_key, + router=router, ) logger.info("%s: service is registered as %s", fmt(run_model), service_spec.url) except SSHError: From 82bae8ed8f407c1466911adf7f1505fb7a72c58e Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 06:53:47 +0545 Subject: [PATCH 02/22] Add Test Log to check Registration conf --- gateway/pyproject.toml | 2 +- .../_internal/core/backends/base/compute.py | 3 ++ .../_internal/proxy/gateway/services/nginx.py | 40 ++++++++++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/gateway/pyproject.toml b/gateway/pyproject.toml index a67171c25b..ee76599553 100644 --- a/gateway/pyproject.toml +++ b/gateway/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ # release builds of dstack-gateway depend on a PyPI version of dstack instead - "dstack[gateway] @ git+https://github.com/dstackai/dstack.git@master", + "dstack[gateway] @ git+https://github.com/Bihan/dstack.git@add_sglang_router_minimal_support", ] [tool.setuptools.package-data] diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 931b097aa5..1fd94e04ac 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -840,6 +840,7 @@ def get_gateway_user_data(authorized_key: str) -> str: packages=[ "nginx", "python3.10-venv", + "python3-pip", # Add pip for sglang-router installation ], snap={"commands": [["install", "--classic", "certbot"]]}, runcmd=[ @@ -850,6 +851,8 @@ def get_gateway_user_data(authorized_key: str) -> str: "s/# server_names_hash_bucket_size 64;/server_names_hash_bucket_size 128;/", "/etc/nginx/nginx.conf", ], + # Install sglang-router system-wide. Can be conditionally installed in the future. + ["pip", "install", "sglang-router"], ["su", "ubuntu", "-c", " && ".join(get_dstack_gateway_commands())], ], ssh_authorized_keys=[authorized_key], diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index f51c94a02b..30f5c232a9 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -82,7 +82,7 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None: async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug("Registering %s domain %s", conf.type, conf.domain) conf_name = self.get_config_name(conf.domain) - + logger.debug(f"[SglangRouterTesting] Register Conf object dict: {conf.dict()}") async with self._lock: if conf.https: await run_async(self.run_certbot, conf.domain, acme) @@ -107,6 +107,44 @@ def reload() -> None: if r.returncode != 0: raise UnexpectedProxyError("Failed to reload nginx") + @staticmethod + def start_sglang_router(replicas: int) -> None: + """Start sglang-router service, killing existing one if running.""" + try: + # Kill existing sglang-router if running + result = subprocess.run( + ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + logger.info("Killing existing sglang-router...") + subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + # Wait a moment for the process to terminate + import time + + time.sleep(1) + + # Generate worker URLs based on replica count + worker_urls = [] + for i in range(1, replicas + 1): + worker_urls.append(f"http://127.0.0.1:{10000 + i}") + + # Start sglang-router with system-wide installation + logger.info(f"Starting sglang-router with {replicas} replicas...") + cmd = ( + [ + "python3", + "-m", + "sglang_router.launch_router", # Use system python3 + "--worker-urls", + ] + + worker_urls + + ["--host", "0.0.0.0", "--port", "3000"] + ) + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + except Exception as e: + logger.error(f"Failed to start sglang-router: {e}") + def write_conf(self, conf: str, conf_name: str) -> None: """Update config and reload nginx. Rollback changes on error.""" conf_path = self._conf_dir / conf_name From 27c5204af9b502c14ccf01de4463807c13bd745c Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 08:24:12 +0545 Subject: [PATCH 03/22] Add start sglang-router --- src/dstack/_internal/proxy/gateway/services/nginx.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 30f5c232a9..0b1fa11445 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -87,6 +87,13 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: if conf.https: await run_async(self.run_certbot, conf.domain, acme) await run_async(self.write_conf, conf.render(), conf_name) + # Start sglang-router if router is sglang + if hasattr(conf, "router") and conf.router == "sglang": + replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 + logger.debug( + f"[SglangRouterTesting] Starting sglang-router with {replicas} replicas" + ) + await run_async(self.start_sglang_router, replicas) logger.info("Registered %s domain %s", conf.type, conf.domain) From b2f10936ebeacb904c7a479f4c5832bb2cec5dbc Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 13:48:49 +0545 Subject: [PATCH 04/22] Add sglang_workers jinga template --- .../resources/nginx/sglang_workers.jinja2 | 23 +++++++++++++++++++ .../_internal/proxy/gateway/services/nginx.py | 16 +++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 new file mode 100644 index 0000000000..a6d612d36b --- /dev/null +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 @@ -0,0 +1,23 @@ +{% for replica in replicas %} +# Worker {{ loop.index }} +upstream sglang_worker_{{ loop.index }}_upstream { + server unix:{{ replica.socket }}; +} + +server { + listen 127.0.0.1:{{ 10000 + loop.index }}; + access_log off; # disable access logs for this internal endpoint + + proxy_read_timeout 300s; + proxy_send_timeout 300s; + + location / { + proxy_pass http://sglang_worker_{{ loop.index }}_upstream; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header Connection ""; + proxy_set_header Upgrade $http_upgrade; + } +} +{% endfor %} diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 0b1fa11445..78b2b832ec 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -93,6 +93,7 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug( f"[SglangRouterTesting] Starting sglang-router with {replicas} replicas" ) + await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_sglang_router, replicas) logger.info("Registered %s domain %s", conf.type, conf.domain) @@ -214,6 +215,21 @@ def write_global_conf(self) -> None: conf = read_package_resource("00-log-format.conf") self.write_conf(conf, "00-log-format.conf") + def write_sglang_workers_conf(self, conf: SiteConfig) -> None: + workers_config = generate_sglang_workers_config(conf) + workers_conf_name = f"sglang-workers.{conf.domain}.conf" + workers_conf_path = self._conf_dir / workers_conf_name + sudo_write(workers_conf_path, workers_config) + self.reload() + + +def generate_sglang_workers_config(conf: SiteConfig) -> str: + template = read_package_resource("sglang_workers.jinja2") + return jinja2.Template(template).render( + replicas=conf.replicas, + proxy_port=PROXY_PORT_ON_GATEWAY, + ) + def read_package_resource(file: str) -> str: return ( From 3871f05ce3ee43e3f52a531541deee8d7a33abf0 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 15:09:34 +0545 Subject: [PATCH 05/22] Modify service.jinja2 upstream block --- .../_internal/proxy/gateway/resources/nginx/service.jinja2 | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 index 5da4b62820..8c50db11ec 100644 --- a/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 @@ -4,9 +4,13 @@ limit_req_zone {{ zone.key }} zone={{ zone.name }}:10m rate={{ zone.rpm }}r/m; {% if replicas %} upstream {{ domain }}.upstream { + {% if router == "sglang" %} + server 127.0.0.1:3000; # SGLang router on the gateway + {% else %} {% for replica in replicas %} - server unix:{{ replica.socket }}; # replica {{ replica.id }} router={{ router }} + server unix:{{ replica.socket }}; # replica {{ replica.id }} {% endfor %} + {% endif %} } {% else %} From fa6d992af694ce0c6c7ee58fefd7cc10522d134d Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 15:41:39 +0545 Subject: [PATCH 06/22] Add sglang log file --- src/dstack/_internal/proxy/gateway/services/nginx.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 78b2b832ec..66fecdae89 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -146,7 +146,16 @@ def start_sglang_router(replicas: int) -> None: "--worker-urls", ] + worker_urls - + ["--host", "0.0.0.0", "--port", "3000"] + + [ + "--host", + "0.0.0.0", + "--port", + "3000", + "--log-level", + "debug", + "--log-dir", + "./router_logs", + ] ) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) From 4a47d86fee72cfef8b2b58c12d787d07d6388a95 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 19:36:04 +0545 Subject: [PATCH 07/22] Add sglang router clean up in unregister method --- .../_internal/proxy/gateway/services/nginx.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 66fecdae89..1f0b015fe6 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -105,6 +105,10 @@ async def unregister(self, domain: str) -> None: return async with self._lock: await run_async(sudo_rm, conf_path) + workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" + if workers_conf_path.exists(): + await run_async(sudo_rm, workers_conf_path) + await run_async(self.stop_sglang_router) await run_async(self.reload) logger.info("Unregistered domain %s", domain) @@ -162,6 +166,30 @@ def start_sglang_router(replicas: int) -> None: except Exception as e: logger.error(f"Failed to start sglang-router: {e}") + @staticmethod + def stop_sglang_router() -> None: + try: + result = subprocess.run( + ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + logger.info("Stopping sglang-router process...") + subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + else: + logger.debug("No sglang-router process found to stop") + + log_dir = Path("./router_logs") + if log_dir.exists(): + logger.debug("Cleaning up router logs...") + import shutil + + shutil.rmtree(log_dir, ignore_errors=True) + else: + logger.debug("No router logs directory found to clean up") + + except Exception as e: + logger.error(f"Failed to stop sglang-router: {e}") + def write_conf(self, conf: str, conf_name: str) -> None: """Update config and reload nginx. Rollback changes on error.""" conf_path = self._conf_dir / conf_name From ccae80eb02f6d1fa1c9d1f767fce1062d575468b Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 14 Oct 2025 20:21:36 +0545 Subject: [PATCH 08/22] Add test log to check unregister --- src/dstack/_internal/proxy/gateway/services/nginx.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 1f0b015fe6..bf41c1f4bf 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -106,6 +106,7 @@ async def unregister(self, domain: str) -> None: async with self._lock: await run_async(sudo_rm, conf_path) workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" + logger.debug(f"[SglangRouterTesting] Workers conf path: {workers_conf_path}") if workers_conf_path.exists(): await run_async(sudo_rm, workers_conf_path) await run_async(self.stop_sglang_router) From 0b7a6a1aab64e1cf1d709eb5679173d6ad96c579 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 15 Oct 2025 13:12:17 +0545 Subject: [PATCH 09/22] Increase sglang router-request-timeout --- src/dstack/_internal/proxy/gateway/services/nginx.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index bf41c1f4bf..385e341652 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -147,7 +147,7 @@ def start_sglang_router(replicas: int) -> None: [ "python3", "-m", - "sglang_router.launch_router", # Use system python3 + "sglang_router.launch_router", "--worker-urls", ] + worker_urls @@ -160,6 +160,8 @@ def start_sglang_router(replicas: int) -> None: "debug", "--log-dir", "./router_logs", + "--request-timeout-secs", + "1800", ] ) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) From b32c8ddff3b5aa941028b20321d65df7761e34a1 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 15 Oct 2025 14:58:03 +0545 Subject: [PATCH 10/22] Change sglang process to sglang::router --- src/dstack/_internal/proxy/gateway/services/nginx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 385e341652..30b45cf026 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -126,11 +126,11 @@ def start_sglang_router(replicas: int) -> None: try: # Kill existing sglang-router if running result = subprocess.run( - ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 ) if result.returncode == 0: logger.info("Killing existing sglang-router...") - subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) # Wait a moment for the process to terminate import time @@ -173,11 +173,11 @@ def start_sglang_router(replicas: int) -> None: def stop_sglang_router() -> None: try: result = subprocess.run( - ["pgrep", "-f", "sglang_router.launch_router"], capture_output=True, timeout=5 + ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 ) if result.returncode == 0: logger.info("Stopping sglang-router process...") - subprocess.run(["pkill", "-f", "sglang_router.launch_router"], timeout=5) + subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) else: logger.debug("No sglang-router process found to stop") From 36e84e66cea9241518f03b850221bbf03c32be07 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 15 Oct 2025 18:33:56 +0545 Subject: [PATCH 11/22] Clean development code --- gateway/pyproject.toml | 2 +- src/dstack/_internal/core/backends/base/compute.py | 7 +------ src/dstack/_internal/core/models/gateways.py | 1 - .../_internal/proxy/gateway/routers/registry.py | 3 --- .../_internal/proxy/gateway/services/nginx.py | 14 +------------- .../_internal/server/services/services/__init__.py | 5 ----- 6 files changed, 3 insertions(+), 29 deletions(-) diff --git a/gateway/pyproject.toml b/gateway/pyproject.toml index ee76599553..a67171c25b 100644 --- a/gateway/pyproject.toml +++ b/gateway/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ # release builds of dstack-gateway depend on a PyPI version of dstack instead - "dstack[gateway] @ git+https://github.com/Bihan/dstack.git@add_sglang_router_minimal_support", + "dstack[gateway] @ git+https://github.com/dstackai/dstack.git@master", ] [tool.setuptools.package-data] diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 1fd94e04ac..ee3e7e10c2 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,12 +982,7 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" - # For testing - logger.debug( - "Using test gateway wheel: https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" - ) - return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" + return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/core/models/gateways.py b/src/dstack/_internal/core/models/gateways.py index d7a7fdd2c6..159ada65c7 100644 --- a/src/dstack/_internal/core/models/gateways.py +++ b/src/dstack/_internal/core/models/gateways.py @@ -44,7 +44,6 @@ class GatewayCertificate(CoreModel): ] -# https://github.com/dstackai/dstack/blob/master/src/dstack/_internal/proxy/gateway/resources/nginx/service.jinja2 class GatewayConfiguration(CoreModel): type: Literal["gateway"] = "gateway" name: Annotated[Optional[str], Field(description="The gateway name")] = None diff --git a/src/dstack/_internal/proxy/gateway/routers/registry.py b/src/dstack/_internal/proxy/gateway/routers/registry.py index 5d0aa2953b..dd4f63f325 100644 --- a/src/dstack/_internal/proxy/gateway/routers/registry.py +++ b/src/dstack/_internal/proxy/gateway/routers/registry.py @@ -13,10 +13,8 @@ from dstack._internal.proxy.gateway.services.nginx import Nginx from dstack._internal.proxy.lib.deps import get_service_connection_pool from dstack._internal.proxy.lib.services.service_connection import ServiceConnectionPool -from dstack._internal.utils.logging import get_logger router = APIRouter(prefix="/{project_name}") -logger = get_logger(__name__) @router.post("/services/register") @@ -27,7 +25,6 @@ async def register_service( nginx: Annotated[Nginx, Depends(get_nginx)], service_conn_pool: Annotated[ServiceConnectionPool, Depends(get_service_connection_pool)], ) -> OkResponse: - logger.debug(f"[SglangRouterTesting] Gateway API Reception Router: {body.router}") await registry_services.register_service( project_name=project_name.lower(), run_name=body.run_name.lower(), diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 30b45cf026..83cf22dc1d 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -82,17 +82,12 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None: async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug("Registering %s domain %s", conf.type, conf.domain) conf_name = self.get_config_name(conf.domain) - logger.debug(f"[SglangRouterTesting] Register Conf object dict: {conf.dict()}") async with self._lock: if conf.https: await run_async(self.run_certbot, conf.domain, acme) await run_async(self.write_conf, conf.render(), conf_name) - # Start sglang-router if router is sglang if hasattr(conf, "router") and conf.router == "sglang": replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 - logger.debug( - f"[SglangRouterTesting] Starting sglang-router with {replicas} replicas" - ) await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_sglang_router, replicas) @@ -106,7 +101,6 @@ async def unregister(self, domain: str) -> None: async with self._lock: await run_async(sudo_rm, conf_path) workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" - logger.debug(f"[SglangRouterTesting] Workers conf path: {workers_conf_path}") if workers_conf_path.exists(): await run_async(sudo_rm, workers_conf_path) await run_async(self.stop_sglang_router) @@ -122,26 +116,20 @@ def reload() -> None: @staticmethod def start_sglang_router(replicas: int) -> None: - """Start sglang-router service, killing existing one if running.""" try: - # Kill existing sglang-router if running result = subprocess.run( ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 ) if result.returncode == 0: - logger.info("Killing existing sglang-router...") + logger.info("Stopping existing sglang-router...") subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) - # Wait a moment for the process to terminate import time time.sleep(1) - - # Generate worker URLs based on replica count worker_urls = [] for i in range(1, replicas + 1): worker_urls.append(f"http://127.0.0.1:{10000 + i}") - # Start sglang-router with system-wide installation logger.info(f"Starting sglang-router with {replicas} replicas...") cmd = ( [ diff --git a/src/dstack/_internal/server/services/services/__init__.py b/src/dstack/_internal/server/services/services/__init__.py index e6f2b19aac..05c1fa9097 100644 --- a/src/dstack/_internal/server/services/services/__init__.py +++ b/src/dstack/_internal/server/services/services/__init__.py @@ -83,10 +83,6 @@ async def _register_service_in_gateway( gateway_configuration = get_gateway_configuration(gateway) service_https = _get_service_https(run_spec, gateway_configuration) router = gateway_configuration.router - logger.debug(f"[SglangRouterTesting] Configuration parsing: {router}") - logger.debug( - f"[SglangRouterTesting] Configuration parsing dict: {gateway_configuration.dict()}" - ) service_protocol = "https" if service_https else "http" if service_https and gateway_configuration.certificate is None: @@ -112,7 +108,6 @@ async def _register_service_in_gateway( conn = await get_or_add_gateway_connection(session, gateway.id) try: logger.debug("%s: registering service as %s", fmt(run_model), service_spec.url) - logger.debug(f"[SglangRouterTesting] Service Registration Router: {router}") async with conn.client() as client: await client.register_service( project=run_model.project.name, From 25aacca923595186591d9d14ac9d999d5db6ad28 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 18:21:20 +0545 Subject: [PATCH 12/22] Test is_sglang_router_running --- .../_internal/core/backends/base/compute.py | 3 +- .../_internal/proxy/gateway/services/nginx.py | 35 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index ee3e7e10c2..c670b980ef 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,7 +982,8 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 83cf22dc1d..a38fef06b1 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -89,7 +89,7 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: if hasattr(conf, "router") and conf.router == "sglang": replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 await run_async(self.write_sglang_workers_conf, conf) - await run_async(self.start_sglang_router, replicas) + await run_async(self.start_or_update_sglang_router, replicas) logger.info("Registered %s domain %s", conf.type, conf.domain) @@ -114,18 +114,31 @@ def reload() -> None: if r.returncode != 0: raise UnexpectedProxyError("Failed to reload nginx") + @staticmethod + def start_or_update_sglang_router(replicas: int) -> None: + """Start the sglang router if not running; otherwise update workers via HTTP API.""" + # Start router (without workers) if it's not running, then sync workers + if not Nginx.is_sglang_router_running(): + logger.info("Sglang router not running, starting with %d replicas", replicas) + Nginx.start_sglang_router(replicas) + + @staticmethod + def is_sglang_router_running() -> bool: + result = subprocess.run(["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5) + return result.returncode == 0 + @staticmethod def start_sglang_router(replicas: int) -> None: try: - result = subprocess.run( - ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 - ) - if result.returncode == 0: - logger.info("Stopping existing sglang-router...") - subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) - import time - - time.sleep(1) + # result = subprocess.run( + # ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 + # ) + # if result.returncode == 0: + # logger.info("Stopping existing sglang-router...") + # subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) + # import time + + # time.sleep(1) worker_urls = [] for i in range(1, replicas + 1): worker_urls.append(f"http://127.0.0.1:{10000 + i}") @@ -148,8 +161,6 @@ def start_sglang_router(replicas: int) -> None: "debug", "--log-dir", "./router_logs", - "--request-timeout-secs", - "1800", ] ) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) From f08f0fad5169d0f90426962663c1ec338e2b93fd Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 18:54:12 +0545 Subject: [PATCH 13/22] Add HTTP add worker endpoint --- .../_internal/core/backends/base/compute.py | 1 + .../_internal/proxy/gateway/services/nginx.py | 132 +++++++++++++++++- 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index c670b980ef..bf37a494be 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -983,6 +983,7 @@ def get_dstack_gateway_wheel(build: str) -> str: build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + logger.debug("Using temp wheel") return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index a38fef06b1..3b9ba6e93e 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -1,4 +1,5 @@ import importlib.resources +import json import subprocess import tempfile from asyncio import Lock @@ -120,13 +121,142 @@ def start_or_update_sglang_router(replicas: int) -> None: # Start router (without workers) if it's not running, then sync workers if not Nginx.is_sglang_router_running(): logger.info("Sglang router not running, starting with %d replicas", replicas) - Nginx.start_sglang_router(replicas) + Nginx.start_only_sglang_router() + Nginx.update_sglang_router_workers(replicas) @staticmethod def is_sglang_router_running() -> bool: result = subprocess.run(["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5) return result.returncode == 0 + @staticmethod + def start_only_sglang_router() -> None: + """Start sglang router""" + try: + # Start sglang router + logger.info("Starting sglang-router...") + cmd = [ + "python3", + "-m", + "sglang_router.launch_router", + "--host", + "0.0.0.0", + "--port", + "3000", + "--log-level", + "debug", + "--log-dir", + "./router_logs", + ] + subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + + # Wait for router to start + import time + + time.sleep(2) + + # Verify router is running + if not Nginx.is_sglang_router_running(): + raise Exception("Failed to start sglang router") + + logger.info("Sglang router started successfully") + + except Exception as e: + logger.error(f"Failed to start sglang-router: {e}") + raise + + @staticmethod + def get_sglang_router_workers() -> list[dict]: + try: + result = subprocess.run( + ["curl", "-s", "http://localhost:3000/workers"], capture_output=True, timeout=5 + ) + if result.returncode == 0: + response = json.loads(result.stdout.decode()) + return response.get("workers", []) + return [] + except Exception as e: + logger.error(f"Error getting sglang router workers: {e}") + return [] + + @staticmethod + def update_sglang_router_workers(replicas: int) -> None: + """Update sglang router workers via HTTP API""" + try: + # Get current workers + current_workers = Nginx.get_sglang_router_workers() + current_worker_urls = {worker["url"] for worker in current_workers} + current_count = len(current_worker_urls) + + if current_count == replicas: + logger.info("Sglang router already has %d workers, no update needed", replicas) + return + + # Calculate target worker URLs + target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)} + + # Workers to add + workers_to_add = target_worker_urls - current_worker_urls + # Workers to remove + workers_to_remove = current_worker_urls - target_worker_urls + + logger.info( + "Sglang router update: adding %d workers, removing %d workers", + len(workers_to_add), + len(workers_to_remove), + ) + + # Add new workers + for worker_url in sorted(workers_to_add): + success = Nginx.add_sglang_router_worker(worker_url) + if not success: + logger.warning("Failed to add worker %s, continuing with others", worker_url) + + # Remove old workers + # for worker_url in sorted(workers_to_remove): + # success = Nginx.remove_sglang_router_worker(worker_url) + # if not success: + # logger.warning( + # "Failed to remove worker %s, continuing with others", worker_url + # ) + + except Exception as e: + logger.error(f"Error updating sglang router workers: {e}") + + @staticmethod + def add_sglang_router_worker(worker_url: str) -> bool: + try: + payload = {"url": worker_url, "worker_type": "regular"} + result = subprocess.run( + [ + "curl", + "-X", + "POST", + "http://localhost:3000/workers", + "-H", + "Content-Type: application/json", + "-d", + json.dumps(payload), + ], + capture_output=True, + timeout=5, + ) + + if result.returncode == 0: + response = json.loads(result.stdout.decode()) + if response.get("status") == "accepted": + logger.info("Added worker %s to sglang router (queued)", worker_url) + return True + else: + logger.error("Failed to add worker %s: %s", worker_url, response) + return False + else: + logger.error("Failed to add worker %s: %s", worker_url, result.stderr.decode()) + return False + except Exception as e: + logger.error(f"Error adding worker {worker_url}: {e}") + return False + @staticmethod def start_sglang_router(replicas: int) -> None: try: From 875353ddc06fd8f8c61b812ffdd32c813f048d5a Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 19:40:50 +0545 Subject: [PATCH 14/22] Update start or update router --- src/dstack/_internal/proxy/gateway/services/nginx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 3b9ba6e93e..f2a041be8a 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -122,7 +122,7 @@ def start_or_update_sglang_router(replicas: int) -> None: if not Nginx.is_sglang_router_running(): logger.info("Sglang router not running, starting with %d replicas", replicas) Nginx.start_only_sglang_router() - Nginx.update_sglang_router_workers(replicas) + Nginx.update_sglang_router_workers(replicas) @staticmethod def is_sglang_router_running() -> bool: From 907f71ffc1f275c705e7c0c2466b33ef65d59f69 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Mon, 20 Oct 2025 20:12:34 +0545 Subject: [PATCH 15/22] Add remove worker endpoint --- .../_internal/proxy/gateway/services/nginx.py | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index f2a041be8a..342c2d2c76 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -2,6 +2,7 @@ import json import subprocess import tempfile +import urllib.parse from asyncio import Lock from pathlib import Path from typing import Optional @@ -213,12 +214,12 @@ def update_sglang_router_workers(replicas: int) -> None: logger.warning("Failed to add worker %s, continuing with others", worker_url) # Remove old workers - # for worker_url in sorted(workers_to_remove): - # success = Nginx.remove_sglang_router_worker(worker_url) - # if not success: - # logger.warning( - # "Failed to remove worker %s, continuing with others", worker_url - # ) + for worker_url in sorted(workers_to_remove): + success = Nginx.remove_sglang_router_worker(worker_url) + if not success: + logger.warning( + "Failed to remove worker %s, continuing with others", worker_url + ) except Exception as e: logger.error(f"Error updating sglang router workers: {e}") @@ -257,6 +258,34 @@ def add_sglang_router_worker(worker_url: str) -> bool: logger.error(f"Error adding worker {worker_url}: {e}") return False + @staticmethod + def remove_sglang_router_worker(worker_url: str) -> bool: + """Remove a single worker from sglang router""" + try: + # URL encode the worker URL for the DELETE request + encoded_url = urllib.parse.quote(worker_url, safe="") + + result = subprocess.run( + ["curl", "-X", "DELETE", f"http://localhost:3000/workers/{encoded_url}"], + capture_output=True, + timeout=5, + ) + + if result.returncode == 0: + response = json.loads(result.stdout.decode()) + if response.get("status") == "accepted": + logger.info("Removed worker %s from sglang router (queued)", worker_url) + return True + else: + logger.error("Failed to remove worker %s: %s", worker_url, response) + return False + else: + logger.error("Failed to remove worker %s: %s", worker_url, result.stderr.decode()) + return False + except Exception as e: + logger.error(f"Error removing worker {worker_url}: {e}") + return False + @staticmethod def start_sglang_router(replicas: int) -> None: try: From c9d77222e08002e23f8074a64bd161a52c9b86d4 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 21 Oct 2025 07:29:07 +0545 Subject: [PATCH 16/22] Clean sglang autoscaling --- .../_internal/core/backends/base/compute.py | 4 +- .../_internal/proxy/gateway/services/nginx.py | 86 +++++-------------- 2 files changed, 22 insertions(+), 68 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index bf37a494be..ee3e7e10c2 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,9 +982,7 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" - logger.debug("Using temp wheel") - return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" + return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 342c2d2c76..be3dbcd6b6 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -118,23 +118,25 @@ def reload() -> None: @staticmethod def start_or_update_sglang_router(replicas: int) -> None: - """Start the sglang router if not running; otherwise update workers via HTTP API.""" - # Start router (without workers) if it's not running, then sync workers if not Nginx.is_sglang_router_running(): - logger.info("Sglang router not running, starting with %d replicas", replicas) - Nginx.start_only_sglang_router() + Nginx.start_sglang_router() Nginx.update_sglang_router_workers(replicas) @staticmethod def is_sglang_router_running() -> bool: - result = subprocess.run(["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5) - return result.returncode == 0 + """Check if sglang router is running and responding to HTTP requests.""" + try: + result = subprocess.run( + ["curl", "-s", "http://localhost:3000/workers"], capture_output=True, timeout=5 + ) + return result.returncode == 0 + except Exception as e: + logger.error(f"Error checking sglang router status: {e}") + return False @staticmethod - def start_only_sglang_router() -> None: - """Start sglang router""" + def start_sglang_router() -> None: try: - # Start sglang router logger.info("Starting sglang-router...") cmd = [ "python3", @@ -151,7 +153,6 @@ def start_only_sglang_router() -> None: ] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - # Wait for router to start import time time.sleep(2) @@ -187,11 +188,6 @@ def update_sglang_router_workers(replicas: int) -> None: # Get current workers current_workers = Nginx.get_sglang_router_workers() current_worker_urls = {worker["url"] for worker in current_workers} - current_count = len(current_worker_urls) - - if current_count == replicas: - logger.info("Sglang router already has %d workers, no update needed", replicas) - return # Calculate target worker URLs target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)} @@ -201,19 +197,18 @@ def update_sglang_router_workers(replicas: int) -> None: # Workers to remove workers_to_remove = current_worker_urls - target_worker_urls - logger.info( - "Sglang router update: adding %d workers, removing %d workers", - len(workers_to_add), - len(workers_to_remove), - ) + if workers_to_add: + logger.info("Sglang router update: adding %d workers", len(workers_to_add)) + if workers_to_remove: + logger.info("Sglang router update: removing %d workers", len(workers_to_remove)) - # Add new workers + # Add workers for worker_url in sorted(workers_to_add): success = Nginx.add_sglang_router_worker(worker_url) if not success: logger.warning("Failed to add worker %s, continuing with others", worker_url) - # Remove old workers + # Remove workers for worker_url in sorted(workers_to_remove): success = Nginx.remove_sglang_router_worker(worker_url) if not success: @@ -223,6 +218,7 @@ def update_sglang_router_workers(replicas: int) -> None: except Exception as e: logger.error(f"Error updating sglang router workers: {e}") + raise @staticmethod def add_sglang_router_worker(worker_url: str) -> bool: @@ -246,7 +242,7 @@ def add_sglang_router_worker(worker_url: str) -> bool: if result.returncode == 0: response = json.loads(result.stdout.decode()) if response.get("status") == "accepted": - logger.info("Added worker %s to sglang router (queued)", worker_url) + logger.info("Added worker %s to sglang router", worker_url) return True else: logger.error("Failed to add worker %s: %s", worker_url, response) @@ -274,7 +270,7 @@ def remove_sglang_router_worker(worker_url: str) -> bool: if result.returncode == 0: response = json.loads(result.stdout.decode()) if response.get("status") == "accepted": - logger.info("Removed worker %s from sglang router (queued)", worker_url) + logger.info("Removed worker %s from sglang router", worker_url) return True else: logger.error("Failed to remove worker %s: %s", worker_url, response) @@ -286,47 +282,6 @@ def remove_sglang_router_worker(worker_url: str) -> bool: logger.error(f"Error removing worker {worker_url}: {e}") return False - @staticmethod - def start_sglang_router(replicas: int) -> None: - try: - # result = subprocess.run( - # ["pgrep", "-f", "sglang::router"], capture_output=True, timeout=5 - # ) - # if result.returncode == 0: - # logger.info("Stopping existing sglang-router...") - # subprocess.run(["pkill", "-f", "sglang::router"], timeout=5) - # import time - - # time.sleep(1) - worker_urls = [] - for i in range(1, replicas + 1): - worker_urls.append(f"http://127.0.0.1:{10000 + i}") - - logger.info(f"Starting sglang-router with {replicas} replicas...") - cmd = ( - [ - "python3", - "-m", - "sglang_router.launch_router", - "--worker-urls", - ] - + worker_urls - + [ - "--host", - "0.0.0.0", - "--port", - "3000", - "--log-level", - "debug", - "--log-dir", - "./router_logs", - ] - ) - subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - except Exception as e: - logger.error(f"Failed to start sglang-router: {e}") - @staticmethod def stop_sglang_router() -> None: try: @@ -350,6 +305,7 @@ def stop_sglang_router() -> None: except Exception as e: logger.error(f"Failed to stop sglang-router: {e}") + raise def write_conf(self, conf: str, conf_name: str) -> None: """Update config and reload nginx. Rollback changes on error.""" From e061049043939bac695e792da9e60a23fad66ec8 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 21 Oct 2025 09:07:40 +0545 Subject: [PATCH 17/22] Include router field in gateway tests --- src/tests/_internal/server/routers/test_gateways.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/tests/_internal/server/routers/test_gateways.py b/src/tests/_internal/server/routers/test_gateways.py index 996157350c..6d06e69692 100644 --- a/src/tests/_internal/server/routers/test_gateways.py +++ b/src/tests/_internal/server/routers/test_gateways.py @@ -70,6 +70,7 @@ async def test_list(self, test_db, session: AsyncSession, client: AsyncClient): "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": gateway.wildcard_domain, "default": False, "public_ip": True, @@ -121,6 +122,7 @@ async def test_get(self, test_db, session: AsyncSession, client: AsyncClient): "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": gateway.wildcard_domain, "default": False, "public_ip": True, @@ -201,6 +203,7 @@ async def test_create_gateway(self, test_db, session: AsyncSession, client: Asyn "name": "test", "backend": backend.type.value, "region": "us", + "router": None, "domain": None, "default": True, "public_ip": True, @@ -253,6 +256,7 @@ async def test_create_gateway_without_name( "name": "random-name", "backend": backend.type.value, "region": "us", + "router": None, "domain": None, "default": True, "public_ip": True, @@ -355,6 +359,7 @@ async def test_set_default_gateway(self, test_db, session: AsyncSession, client: "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": gateway.wildcard_domain, "default": True, "public_ip": True, @@ -477,6 +482,7 @@ def get_backend(project, backend_type): "name": gateway_gcp.name, "backend": backend_gcp.type.value, "region": gateway_gcp.region, + "router": None, "domain": gateway_gcp.wildcard_domain, "default": False, "public_ip": True, @@ -546,6 +552,7 @@ async def test_set_wildcard_domain(self, test_db, session: AsyncSession, client: "name": gateway.name, "backend": backend.type.value, "region": gateway.region, + "router": None, "domain": "test.com", "default": False, "public_ip": True, From f82ca1090dbb9359e2ec56669254d9d6ae79dca6 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 21 Oct 2025 17:36:54 +0545 Subject: [PATCH 18/22] Minor Update --- src/dstack/_internal/proxy/gateway/services/nginx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index be3dbcd6b6..015091ac74 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -89,7 +89,7 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: await run_async(self.run_certbot, conf.domain, acme) await run_async(self.write_conf, conf.render(), conf_name) if hasattr(conf, "router") and conf.router == "sglang": - replicas = len(conf.replicas) if hasattr(conf, "replicas") and conf.replicas else 1 + replicas = len(conf.replicas) await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_or_update_sglang_router, replicas) From f20c9f29a154f1dc7610c4749391eb3db86bcfc6 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 29 Oct 2025 19:54:42 +0545 Subject: [PATCH 19/22] Add concurrent services with sglang router --- src/dstack/_internal/core/backends/base/compute.py | 3 ++- src/dstack/_internal/proxy/gateway/services/nginx.py | 4 ++++ src/dstack/_internal/proxy/gateway/services/registry.py | 2 ++ src/dstack/_internal/proxy/lib/models.py | 1 + 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index ee3e7e10c2..c670b980ef 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -982,7 +982,8 @@ def get_dstack_gateway_wheel(build: str) -> str: r.raise_for_status() build = r.text.strip() logger.debug("Found the latest gateway build: %s", build) - return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + # return f"{base_url}/dstack_gateway-{build}-py3-none-any.whl" + return "https://bihan-test-bucket.s3.eu-west-1.amazonaws.com/dstack_gateway-0.0.0-py3-none-any.whl" def get_dstack_gateway_commands() -> List[str]: diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 015091ac74..2e01f64dc3 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -67,6 +67,7 @@ class ServiceConfig(SiteConfig): locations: list[LocationConfig] replicas: list[ReplicaConfig] router: Optional[str] = None + model_id: Optional[str] = None class ModelEntrypointConfig(SiteConfig): @@ -90,6 +91,8 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: await run_async(self.write_conf, conf.render(), conf_name) if hasattr(conf, "router") and conf.router == "sglang": replicas = len(conf.replicas) + model_id = conf.model_id + logger.info("Registering sglang router with model %s", model_id) await run_async(self.write_sglang_workers_conf, conf) await run_async(self.start_or_update_sglang_router, replicas) @@ -146,6 +149,7 @@ def start_sglang_router() -> None: "0.0.0.0", "--port", "3000", + "--enable-igw", "--log-level", "debug", "--log-dir", diff --git a/src/dstack/_internal/proxy/gateway/services/registry.py b/src/dstack/_internal/proxy/gateway/services/registry.py index ce7cbca2e9..2fcadfedd6 100644 --- a/src/dstack/_internal/proxy/gateway/services/registry.py +++ b/src/dstack/_internal/proxy/gateway/services/registry.py @@ -56,6 +56,7 @@ async def register_service( client_max_body_size=client_max_body_size, replicas=(), router=router, + model_id=model.name if model is not None else None, ) async with lock: @@ -338,6 +339,7 @@ async def get_nginx_service_config( locations=locations, replicas=sorted(replicas, key=lambda r: r.id), # sort for reproducible configs router=service.router, + model_id=service.model_id, ) diff --git a/src/dstack/_internal/proxy/lib/models.py b/src/dstack/_internal/proxy/lib/models.py index 4e7046167f..c24ce0fe25 100644 --- a/src/dstack/_internal/proxy/lib/models.py +++ b/src/dstack/_internal/proxy/lib/models.py @@ -58,6 +58,7 @@ class Service(ImmutableModel): strip_prefix: bool = True # only used in-server replicas: tuple[Replica, ...] router: Optional[str] = None + model_id: Optional[str] = None @property def domain_safe(self) -> str: From 3e834a178489ac8ddc724bff9577c6d55a0b70fa Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 29 Oct 2025 20:44:05 +0545 Subject: [PATCH 20/22] Add concurrent services with sglang router --- .../_internal/proxy/gateway/services/nginx.py | 72 ++++++++++++++----- 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 2e01f64dc3..3f8e912246 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -81,6 +81,7 @@ class Nginx: def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None: self._conf_dir = conf_dir self._lock: Lock = Lock() + self._domain_to_model_id: dict[str, str] = {} async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug("Registering %s domain %s", conf.type, conf.domain) @@ -92,9 +93,15 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: if hasattr(conf, "router") and conf.router == "sglang": replicas = len(conf.replicas) model_id = conf.model_id - logger.info("Registering sglang router with model %s", model_id) + logger.info( + "Registering sglang router with model %s with %d replicas in domain %s", + model_id, + replicas, + conf.domain, + ) + self._domain_to_model_id[conf.domain] = model_id await run_async(self.write_sglang_workers_conf, conf) - await run_async(self.start_or_update_sglang_router, replicas) + await run_async(self.start_or_update_sglang_router, replicas, model_id) logger.info("Registered %s domain %s", conf.type, conf.domain) @@ -108,7 +115,13 @@ async def unregister(self, domain: str) -> None: workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf" if workers_conf_path.exists(): await run_async(sudo_rm, workers_conf_path) - await run_async(self.stop_sglang_router) + # Get model_id for this domain before removing workers + model_id = self._domain_to_model_id.get(domain) + # This allows other services with different models to continue running + await run_async(self.remove_sglang_workers_for_model, model_id) + # Clean up the mapping + del self._domain_to_model_id[domain] + # await run_async(self.stop_sglang_router) await run_async(self.reload) logger.info("Unregistered domain %s", domain) @@ -120,10 +133,10 @@ def reload() -> None: raise UnexpectedProxyError("Failed to reload nginx") @staticmethod - def start_or_update_sglang_router(replicas: int) -> None: + def start_or_update_sglang_router(replicas: int, model_id: str) -> None: if not Nginx.is_sglang_router_running(): Nginx.start_sglang_router() - Nginx.update_sglang_router_workers(replicas) + Nginx.update_sglang_router_workers(replicas, model_id) @staticmethod def is_sglang_router_running() -> bool: @@ -172,25 +185,27 @@ def start_sglang_router() -> None: raise @staticmethod - def get_sglang_router_workers() -> list[dict]: + def get_sglang_router_workers(model_id: str) -> list[dict]: try: result = subprocess.run( ["curl", "-s", "http://localhost:3000/workers"], capture_output=True, timeout=5 ) if result.returncode == 0: response = json.loads(result.stdout.decode()) - return response.get("workers", []) + workers = response.get("workers", []) + workers = [w for w in workers if w.get("model_id") == model_id] + return workers return [] except Exception as e: logger.error(f"Error getting sglang router workers: {e}") return [] @staticmethod - def update_sglang_router_workers(replicas: int) -> None: + def update_sglang_router_workers(replicas: int, model_id: str) -> None: """Update sglang router workers via HTTP API""" try: # Get current workers - current_workers = Nginx.get_sglang_router_workers() + current_workers = Nginx.get_sglang_router_workers(model_id) current_worker_urls = {worker["url"] for worker in current_workers} # Calculate target worker URLs @@ -208,26 +223,26 @@ def update_sglang_router_workers(replicas: int) -> None: # Add workers for worker_url in sorted(workers_to_add): - success = Nginx.add_sglang_router_worker(worker_url) + success = Nginx.add_sglang_router_worker(worker_url, model_id) if not success: logger.warning("Failed to add worker %s, continuing with others", worker_url) # Remove workers for worker_url in sorted(workers_to_remove): - success = Nginx.remove_sglang_router_worker(worker_url) + success = Nginx.remove_sglang_router_worker(worker_url, model_id) if not success: logger.warning( "Failed to remove worker %s, continuing with others", worker_url ) except Exception as e: - logger.error(f"Error updating sglang router workers: {e}") + logger.error(f"Error updating sglang router workers for model {model_id}: {e}") raise @staticmethod - def add_sglang_router_worker(worker_url: str) -> bool: + def add_sglang_router_worker(worker_url: str, model_id: str) -> bool: try: - payload = {"url": worker_url, "worker_type": "regular"} + payload = {"url": worker_url, "worker_type": "regular", "model_id": model_id} result = subprocess.run( [ "curl", @@ -259,7 +274,7 @@ def add_sglang_router_worker(worker_url: str) -> bool: return False @staticmethod - def remove_sglang_router_worker(worker_url: str) -> bool: + def remove_sglang_router_worker(worker_url: str, model_id: str) -> bool: """Remove a single worker from sglang router""" try: # URL encode the worker URL for the DELETE request @@ -274,18 +289,37 @@ def remove_sglang_router_worker(worker_url: str) -> bool: if result.returncode == 0: response = json.loads(result.stdout.decode()) if response.get("status") == "accepted": - logger.info("Removed worker %s from sglang router", worker_url) + logger.info( + "Removed worker %s from sglang router model %s", worker_url, model_id + ) return True else: - logger.error("Failed to remove worker %s: %s", worker_url, response) + logger.error( + "Failed to remove worker %s model %s: %s", worker_url, model_id, response + ) return False else: - logger.error("Failed to remove worker %s: %s", worker_url, result.stderr.decode()) + logger.error( + "Failed to remove worker %s model %s: %s", + worker_url, + model_id, + result.stderr.decode(), + ) return False except Exception as e: - logger.error(f"Error removing worker {worker_url}: {e}") + logger.error(f"Error removing worker {worker_url} model {model_id}: {e}") return False + @staticmethod + def remove_sglang_workers_for_model(model_id: str) -> None: + try: + workers = Nginx.get_sglang_router_workers(model_id) + for worker in workers: + Nginx.remove_sglang_router_worker(worker["url"], model_id) + except Exception as e: + logger.error(f"Error removing sglang router workers for model {model_id}: {e}") + raise + @staticmethod def stop_sglang_router() -> None: try: From 8e1044f397ee47ddf4e2795b70880f542f5ef471 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Thu, 30 Oct 2025 04:57:25 +0545 Subject: [PATCH 21/22] Add concurrent services with sglang router --- .../resources/nginx/sglang_workers.jinja2 | 2 +- .../_internal/proxy/gateway/services/nginx.py | 44 +++++++++++++++---- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 index a6d612d36b..de724e68a9 100644 --- a/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 @@ -5,7 +5,7 @@ upstream sglang_worker_{{ loop.index }}_upstream { } server { - listen 127.0.0.1:{{ 10000 + loop.index }}; + listen 127.0.0.1:{{ ports[loop.index0] }}; access_log off; # disable access logs for this internal endpoint proxy_read_timeout 300s; diff --git a/src/dstack/_internal/proxy/gateway/services/nginx.py b/src/dstack/_internal/proxy/gateway/services/nginx.py index 3f8e912246..50b8be01d0 100644 --- a/src/dstack/_internal/proxy/gateway/services/nginx.py +++ b/src/dstack/_internal/proxy/gateway/services/nginx.py @@ -82,6 +82,10 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None: self._conf_dir = conf_dir self._lock: Lock = Lock() self._domain_to_model_id: dict[str, str] = {} + # Track next available port for worker allocation + self._next_worker_port: int = 10001 # Start from 10001 + # Track which ports are used by which domain + self._domain_to_ports: dict[str, list[int]] = {} # domain -> [port1, port2, ...] async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: logger.debug("Registering %s domain %s", conf.type, conf.domain) @@ -100,8 +104,18 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None: conf.domain, ) self._domain_to_model_id[conf.domain] = model_id - await run_async(self.write_sglang_workers_conf, conf) - await run_async(self.start_or_update_sglang_router, replicas, model_id) + # Allocate unique ports for this service + allocated_ports = list( + range(self._next_worker_port, self._next_worker_port + replicas) + ) + self._domain_to_ports[conf.domain] = allocated_ports + self._next_worker_port += replicas # Reserve ports for next service + + # Pass allocated ports to worker config generation + await run_async(self.write_sglang_workers_conf, conf, allocated_ports) + await run_async( + self.start_or_update_sglang_router, replicas, model_id, allocated_ports + ) logger.info("Registered %s domain %s", conf.type, conf.domain) @@ -121,6 +135,9 @@ async def unregister(self, domain: str) -> None: await run_async(self.remove_sglang_workers_for_model, model_id) # Clean up the mapping del self._domain_to_model_id[domain] + # Free up ports + if domain in self._domain_to_ports: + del self._domain_to_ports[domain] # await run_async(self.stop_sglang_router) await run_async(self.reload) logger.info("Unregistered domain %s", domain) @@ -133,10 +150,13 @@ def reload() -> None: raise UnexpectedProxyError("Failed to reload nginx") @staticmethod - def start_or_update_sglang_router(replicas: int, model_id: str) -> None: + def start_or_update_sglang_router( + replicas: int, model_id: str, allocated_ports: list[int] + ) -> None: if not Nginx.is_sglang_router_running(): Nginx.start_sglang_router() - Nginx.update_sglang_router_workers(replicas, model_id) + # Pass allocated ports to worker update + Nginx.update_sglang_router_workers(replicas, model_id, allocated_ports) @staticmethod def is_sglang_router_running() -> bool: @@ -201,7 +221,9 @@ def get_sglang_router_workers(model_id: str) -> list[dict]: return [] @staticmethod - def update_sglang_router_workers(replicas: int, model_id: str) -> None: + def update_sglang_router_workers( + replicas: int, model_id: str, allocated_ports: list[int] + ) -> None: """Update sglang router workers via HTTP API""" try: # Get current workers @@ -209,7 +231,9 @@ def update_sglang_router_workers(replicas: int, model_id: str) -> None: current_worker_urls = {worker["url"] for worker in current_workers} # Calculate target worker URLs - target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)} + # target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)} + # Use explicitly assigned ports instead of calculation + target_worker_urls = {f"http://127.0.0.1:{port}" for port in allocated_ports} # Workers to add workers_to_add = target_worker_urls - current_worker_urls @@ -407,18 +431,20 @@ def write_global_conf(self) -> None: conf = read_package_resource("00-log-format.conf") self.write_conf(conf, "00-log-format.conf") - def write_sglang_workers_conf(self, conf: SiteConfig) -> None: - workers_config = generate_sglang_workers_config(conf) + def write_sglang_workers_conf(self, conf: SiteConfig, allocated_ports: list[int]) -> None: + # Pass ports to template + workers_config = generate_sglang_workers_config(conf, allocated_ports) workers_conf_name = f"sglang-workers.{conf.domain}.conf" workers_conf_path = self._conf_dir / workers_conf_name sudo_write(workers_conf_path, workers_config) self.reload() -def generate_sglang_workers_config(conf: SiteConfig) -> str: +def generate_sglang_workers_config(conf: SiteConfig, allocated_ports: list[int]) -> str: template = read_package_resource("sglang_workers.jinja2") return jinja2.Template(template).render( replicas=conf.replicas, + ports=allocated_ports, proxy_port=PROXY_PORT_ON_GATEWAY, ) From 2b54a67832a43e12994673fb6bbaad231e315ff3 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Thu, 30 Oct 2025 11:16:51 +0545 Subject: [PATCH 22/22] Add concurrent services with sglang router --- .../proxy/gateway/resources/nginx/sglang_workers.jinja2 | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 index de724e68a9..e5ffe12ee5 100644 --- a/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 +++ b/src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2 @@ -1,6 +1,6 @@ {% for replica in replicas %} # Worker {{ loop.index }} -upstream sglang_worker_{{ loop.index }}_upstream { +upstream sglang_worker_{{ ports[loop.index0] }}_upstream { server unix:{{ replica.socket }}; } @@ -12,7 +12,7 @@ server { proxy_send_timeout 300s; location / { - proxy_pass http://sglang_worker_{{ loop.index }}_upstream; + proxy_pass http://sglang_worker_{{ ports[loop.index0] }}_upstream; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr;