Skip to content

Commit 8e1044f

Browse files
Bihan  RanaBihan  Rana
authored andcommitted
Add concurrent services with sglang router
1 parent 3e834a1 commit 8e1044f

File tree

2 files changed

+36
-10
lines changed

2 files changed

+36
-10
lines changed

src/dstack/_internal/proxy/gateway/resources/nginx/sglang_workers.jinja2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ upstream sglang_worker_{{ loop.index }}_upstream {
55
}
66

77
server {
8-
listen 127.0.0.1:{{ 10000 + loop.index }};
8+
listen 127.0.0.1:{{ ports[loop.index0] }};
99
access_log off; # disable access logs for this internal endpoint
1010

1111
proxy_read_timeout 300s;

src/dstack/_internal/proxy/gateway/services/nginx.py

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None:
8282
self._conf_dir = conf_dir
8383
self._lock: Lock = Lock()
8484
self._domain_to_model_id: dict[str, str] = {}
85+
# Track next available port for worker allocation
86+
self._next_worker_port: int = 10001 # Start from 10001
87+
# Track which ports are used by which domain
88+
self._domain_to_ports: dict[str, list[int]] = {} # domain -> [port1, port2, ...]
8589

8690
async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
8791
logger.debug("Registering %s domain %s", conf.type, conf.domain)
@@ -100,8 +104,18 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
100104
conf.domain,
101105
)
102106
self._domain_to_model_id[conf.domain] = model_id
103-
await run_async(self.write_sglang_workers_conf, conf)
104-
await run_async(self.start_or_update_sglang_router, replicas, model_id)
107+
# Allocate unique ports for this service
108+
allocated_ports = list(
109+
range(self._next_worker_port, self._next_worker_port + replicas)
110+
)
111+
self._domain_to_ports[conf.domain] = allocated_ports
112+
self._next_worker_port += replicas # Reserve ports for next service
113+
114+
# Pass allocated ports to worker config generation
115+
await run_async(self.write_sglang_workers_conf, conf, allocated_ports)
116+
await run_async(
117+
self.start_or_update_sglang_router, replicas, model_id, allocated_ports
118+
)
105119

106120
logger.info("Registered %s domain %s", conf.type, conf.domain)
107121

@@ -121,6 +135,9 @@ async def unregister(self, domain: str) -> None:
121135
await run_async(self.remove_sglang_workers_for_model, model_id)
122136
# Clean up the mapping
123137
del self._domain_to_model_id[domain]
138+
# Free up ports
139+
if domain in self._domain_to_ports:
140+
del self._domain_to_ports[domain]
124141
# await run_async(self.stop_sglang_router)
125142
await run_async(self.reload)
126143
logger.info("Unregistered domain %s", domain)
@@ -133,10 +150,13 @@ def reload() -> None:
133150
raise UnexpectedProxyError("Failed to reload nginx")
134151

135152
@staticmethod
136-
def start_or_update_sglang_router(replicas: int, model_id: str) -> None:
153+
def start_or_update_sglang_router(
154+
replicas: int, model_id: str, allocated_ports: list[int]
155+
) -> None:
137156
if not Nginx.is_sglang_router_running():
138157
Nginx.start_sglang_router()
139-
Nginx.update_sglang_router_workers(replicas, model_id)
158+
# Pass allocated ports to worker update
159+
Nginx.update_sglang_router_workers(replicas, model_id, allocated_ports)
140160

141161
@staticmethod
142162
def is_sglang_router_running() -> bool:
@@ -201,15 +221,19 @@ def get_sglang_router_workers(model_id: str) -> list[dict]:
201221
return []
202222

203223
@staticmethod
204-
def update_sglang_router_workers(replicas: int, model_id: str) -> None:
224+
def update_sglang_router_workers(
225+
replicas: int, model_id: str, allocated_ports: list[int]
226+
) -> None:
205227
"""Update sglang router workers via HTTP API"""
206228
try:
207229
# Get current workers
208230
current_workers = Nginx.get_sglang_router_workers(model_id)
209231
current_worker_urls = {worker["url"] for worker in current_workers}
210232

211233
# Calculate target worker URLs
212-
target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)}
234+
# target_worker_urls = {f"http://127.0.0.1:{10000 + i}" for i in range(1, replicas + 1)}
235+
# Use explicitly assigned ports instead of calculation
236+
target_worker_urls = {f"http://127.0.0.1:{port}" for port in allocated_ports}
213237

214238
# Workers to add
215239
workers_to_add = target_worker_urls - current_worker_urls
@@ -407,18 +431,20 @@ def write_global_conf(self) -> None:
407431
conf = read_package_resource("00-log-format.conf")
408432
self.write_conf(conf, "00-log-format.conf")
409433

410-
def write_sglang_workers_conf(self, conf: SiteConfig) -> None:
411-
workers_config = generate_sglang_workers_config(conf)
434+
def write_sglang_workers_conf(self, conf: SiteConfig, allocated_ports: list[int]) -> None:
435+
# Pass ports to template
436+
workers_config = generate_sglang_workers_config(conf, allocated_ports)
412437
workers_conf_name = f"sglang-workers.{conf.domain}.conf"
413438
workers_conf_path = self._conf_dir / workers_conf_name
414439
sudo_write(workers_conf_path, workers_config)
415440
self.reload()
416441

417442

418-
def generate_sglang_workers_config(conf: SiteConfig) -> str:
443+
def generate_sglang_workers_config(conf: SiteConfig, allocated_ports: list[int]) -> str:
419444
template = read_package_resource("sglang_workers.jinja2")
420445
return jinja2.Template(template).render(
421446
replicas=conf.replicas,
447+
ports=allocated_ports,
422448
proxy_port=PROXY_PORT_ON_GATEWAY,
423449
)
424450

0 commit comments

Comments
 (0)