Skip to content

Commit 3e834a1

Browse files
Bihan  RanaBihan  Rana
authored andcommitted
Add concurrent services with sglang router
1 parent f20c9f2 commit 3e834a1

1 file changed

Lines changed: 53 additions & 19 deletions

File tree

  • src/dstack/_internal/proxy/gateway/services

src/dstack/_internal/proxy/gateway/services/nginx.py

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class Nginx:
8181
def __init__(self, conf_dir: Path = Path("/etc/nginx/sites-enabled")) -> None:
8282
self._conf_dir = conf_dir
8383
self._lock: Lock = Lock()
84+
self._domain_to_model_id: dict[str, str] = {}
8485

8586
async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
8687
logger.debug("Registering %s domain %s", conf.type, conf.domain)
@@ -92,9 +93,15 @@ async def register(self, conf: SiteConfig, acme: ACMESettings) -> None:
9293
if hasattr(conf, "router") and conf.router == "sglang":
9394
replicas = len(conf.replicas)
9495
model_id = conf.model_id
95-
logger.info("Registering sglang router with model %s", model_id)
96+
logger.info(
97+
"Registering sglang router with model %s with %d replicas in domain %s",
98+
model_id,
99+
replicas,
100+
conf.domain,
101+
)
102+
self._domain_to_model_id[conf.domain] = model_id
96103
await run_async(self.write_sglang_workers_conf, conf)
97-
await run_async(self.start_or_update_sglang_router, replicas)
104+
await run_async(self.start_or_update_sglang_router, replicas, model_id)
98105

99106
logger.info("Registered %s domain %s", conf.type, conf.domain)
100107

@@ -108,7 +115,13 @@ async def unregister(self, domain: str) -> None:
108115
workers_conf_path = self._conf_dir / f"sglang-workers.{domain}.conf"
109116
if workers_conf_path.exists():
110117
await run_async(sudo_rm, workers_conf_path)
111-
await run_async(self.stop_sglang_router)
118+
# Get model_id for this domain before removing workers
119+
model_id = self._domain_to_model_id.get(domain)
120+
# This allows other services with different models to continue running
121+
await run_async(self.remove_sglang_workers_for_model, model_id)
122+
# Clean up the mapping
123+
del self._domain_to_model_id[domain]
124+
# await run_async(self.stop_sglang_router)
112125
await run_async(self.reload)
113126
logger.info("Unregistered domain %s", domain)
114127

@@ -120,10 +133,10 @@ def reload() -> None:
120133
raise UnexpectedProxyError("Failed to reload nginx")
121134

122135
@staticmethod
123-
def start_or_update_sglang_router(replicas: int) -> None:
136+
def start_or_update_sglang_router(replicas: int, model_id: str) -> None:
124137
if not Nginx.is_sglang_router_running():
125138
Nginx.start_sglang_router()
126-
Nginx.update_sglang_router_workers(replicas)
139+
Nginx.update_sglang_router_workers(replicas, model_id)
127140

128141
@staticmethod
129142
def is_sglang_router_running() -> bool:
@@ -172,25 +185,27 @@ def start_sglang_router() -> None:
172185
raise
173186

174187
@staticmethod
175-
def get_sglang_router_workers() -> list[dict]:
188+
def get_sglang_router_workers(model_id: str) -> list[dict]:
176189
try:
177190
result = subprocess.run(
178191
["curl", "-s", "http://localhost:3000/workers"], capture_output=True, timeout=5
179192
)
180193
if result.returncode == 0:
181194
response = json.loads(result.stdout.decode())
182-
return response.get("workers", [])
195+
workers = response.get("workers", [])
196+
workers = [w for w in workers if w.get("model_id") == model_id]
197+
return workers
183198
return []
184199
except Exception as e:
185200
logger.error(f"Error getting sglang router workers: {e}")
186201
return []
187202

188203
@staticmethod
189-
def update_sglang_router_workers(replicas: int) -> None:
204+
def update_sglang_router_workers(replicas: int, model_id: str) -> None:
190205
"""Update sglang router workers via HTTP API"""
191206
try:
192207
# Get current workers
193-
current_workers = Nginx.get_sglang_router_workers()
208+
current_workers = Nginx.get_sglang_router_workers(model_id)
194209
current_worker_urls = {worker["url"] for worker in current_workers}
195210

196211
# Calculate target worker URLs
@@ -208,26 +223,26 @@ def update_sglang_router_workers(replicas: int) -> None:
208223

209224
# Add workers
210225
for worker_url in sorted(workers_to_add):
211-
success = Nginx.add_sglang_router_worker(worker_url)
226+
success = Nginx.add_sglang_router_worker(worker_url, model_id)
212227
if not success:
213228
logger.warning("Failed to add worker %s, continuing with others", worker_url)
214229

215230
# Remove workers
216231
for worker_url in sorted(workers_to_remove):
217-
success = Nginx.remove_sglang_router_worker(worker_url)
232+
success = Nginx.remove_sglang_router_worker(worker_url, model_id)
218233
if not success:
219234
logger.warning(
220235
"Failed to remove worker %s, continuing with others", worker_url
221236
)
222237

223238
except Exception as e:
224-
logger.error(f"Error updating sglang router workers: {e}")
239+
logger.error(f"Error updating sglang router workers for model {model_id}: {e}")
225240
raise
226241

227242
@staticmethod
228-
def add_sglang_router_worker(worker_url: str) -> bool:
243+
def add_sglang_router_worker(worker_url: str, model_id: str) -> bool:
229244
try:
230-
payload = {"url": worker_url, "worker_type": "regular"}
245+
payload = {"url": worker_url, "worker_type": "regular", "model_id": model_id}
231246
result = subprocess.run(
232247
[
233248
"curl",
@@ -259,7 +274,7 @@ def add_sglang_router_worker(worker_url: str) -> bool:
259274
return False
260275

261276
@staticmethod
262-
def remove_sglang_router_worker(worker_url: str) -> bool:
277+
def remove_sglang_router_worker(worker_url: str, model_id: str) -> bool:
263278
"""Remove a single worker from sglang router"""
264279
try:
265280
# URL encode the worker URL for the DELETE request
@@ -274,18 +289,37 @@ def remove_sglang_router_worker(worker_url: str) -> bool:
274289
if result.returncode == 0:
275290
response = json.loads(result.stdout.decode())
276291
if response.get("status") == "accepted":
277-
logger.info("Removed worker %s from sglang router", worker_url)
292+
logger.info(
293+
"Removed worker %s from sglang router model %s", worker_url, model_id
294+
)
278295
return True
279296
else:
280-
logger.error("Failed to remove worker %s: %s", worker_url, response)
297+
logger.error(
298+
"Failed to remove worker %s model %s: %s", worker_url, model_id, response
299+
)
281300
return False
282301
else:
283-
logger.error("Failed to remove worker %s: %s", worker_url, result.stderr.decode())
302+
logger.error(
303+
"Failed to remove worker %s model %s: %s",
304+
worker_url,
305+
model_id,
306+
result.stderr.decode(),
307+
)
284308
return False
285309
except Exception as e:
286-
logger.error(f"Error removing worker {worker_url}: {e}")
310+
logger.error(f"Error removing worker {worker_url} model {model_id}: {e}")
287311
return False
288312

313+
@staticmethod
314+
def remove_sglang_workers_for_model(model_id: str) -> None:
315+
try:
316+
workers = Nginx.get_sglang_router_workers(model_id)
317+
for worker in workers:
318+
Nginx.remove_sglang_router_worker(worker["url"], model_id)
319+
except Exception as e:
320+
logger.error(f"Error removing sglang router workers for model {model_id}: {e}")
321+
raise
322+
289323
@staticmethod
290324
def stop_sglang_router() -> None:
291325
try:

0 commit comments

Comments
 (0)