-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadd_worker.yaml
More file actions
499 lines (454 loc) · 21.8 KB
/
add_worker.yaml
File metadata and controls
499 lines (454 loc) · 21.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# yaml-language-server: $schema=https://activate.parallel.works/workflow.schema.json
---
# Add Worker to Ray Cluster — attach new compute resources to a running Ray cluster
# Dispatches additional Ray workers that connect to an existing head node.
# The original cluster workflow must still be running.
permissions:
- "*"
jobs:
# =============================================================================
# CHECKOUT — Clone scripts to head node
# =============================================================================
checkout:
steps:
- name: Checkout scripts to head node
uses: parallelworks/checkout
ssh:
remoteHost: ${{ inputs.head.resource.ip }}
with:
repo: https://github.com/parallelworks/ray-cluster.git
branch: main
sparse_checkout:
- scripts
# =============================================================================
# VALIDATE — Verify existing cluster is running and read coordination files
# =============================================================================
validate_cluster:
needs: [checkout]
ssh:
remoteHost: ${{ inputs.head.resource.ip }}
steps:
- name: Validate existing Ray cluster
run: |
set -e
CLUSTER_JOB_DIR="${{ inputs.cluster_job_dir }}"
# Auto-detect: find the latest job directory with RAY_HEAD_IP
if [ -z "${CLUSTER_JOB_DIR}" ] || [ "${CLUSTER_JOB_DIR}" = "auto" ]; then
echo "Auto-detecting latest Ray cluster job..."
FOUND=""
# Search all workflow job directories for the latest run with coordination files
for workflow_dir in ~/pw/jobs/*/; do
[ ! -d "${workflow_dir}" ] && continue
# Find the highest-numbered subdirectory with RAY_HEAD_IP
latest=$(ls -1d "${workflow_dir}"[0-9]* 2>/dev/null | sort -V | tail -1)
if [ -n "${latest}" ] && [ -f "${latest}/RAY_HEAD_IP" ] && [ -f "${latest}/job.started" ]; then
# Pick the most recently modified one
if [ -z "${FOUND}" ] || [ "${latest}/RAY_HEAD_IP" -nt "${FOUND}/RAY_HEAD_IP" ]; then
FOUND="${latest}"
fi
fi
done
if [ -z "${FOUND}" ]; then
echo "ERROR: Could not find any Ray cluster job directories."
echo "Ensure the original Ray cluster workflow has been run at least once."
exit 1
fi
CLUSTER_JOB_DIR="${FOUND}"
echo "Found: ${CLUSTER_JOB_DIR}"
fi
echo "=========================================="
echo "Validating existing Ray cluster"
echo "=========================================="
echo "Cluster job dir: ${CLUSTER_JOB_DIR}"
# Check coordination files exist
for file in RAY_HEAD_IP SESSION_PORT PYTHON_VERSION RAY_VENV_DIR; do
if [ ! -f "${CLUSTER_JOB_DIR}/${file}" ]; then
echo "ERROR: ${file} not found in ${CLUSTER_JOB_DIR}"
echo "Is the original cluster workflow still running?"
exit 1
fi
done
RAY_HEAD_IP=$(cat "${CLUSTER_JOB_DIR}/RAY_HEAD_IP")
SESSION_PORT=$(cat "${CLUSTER_JOB_DIR}/SESSION_PORT")
PYTHON_VERSION=$(cat "${CLUSTER_JOB_DIR}/PYTHON_VERSION")
# Verify Ray head is reachable
VENV_DIR="$(cat "${CLUSTER_JOB_DIR}/RAY_VENV_DIR" 2>/dev/null || echo "")"
if [ -n "${VENV_DIR}" ] && [ -f "${VENV_DIR}/bin/activate" ]; then
source "${VENV_DIR}/bin/activate"
fi
echo "Checking Ray head at ${RAY_HEAD_IP}:6379..."
if python3 -c "
import ray
ray.init(address='${RAY_HEAD_IP}:6379', ignore_reinit_error=True)
resources = ray.cluster_resources()
print(f'Cluster resources: {dict(resources)}')
nodes = ray.nodes()
alive = [n for n in nodes if n['Alive']]
print(f'Alive nodes: {len(alive)}')
ray.shutdown()
" 2>&1; then
echo "Ray cluster is healthy!"
else
echo "ERROR: Cannot connect to Ray head at ${RAY_HEAD_IP}:6379"
echo "Is the original cluster workflow still running?"
exit 1
fi
# Check dashboard is responding
if curl -s --connect-timeout 5 "http://localhost:${SESSION_PORT}/" -o /dev/null 2>&1; then
echo "Dashboard is responding on port ${SESSION_PORT}"
else
echo "WARNING: Dashboard not responding (non-fatal)"
fi
# Count existing remote worker sites (tunnel IPs 127.0.X.Y)
# so new workers get non-conflicting IPs and ports
EXISTING_SITES=$(python3 -c "
import ray
ray.init(address='${RAY_HEAD_IP}:6379', ignore_reinit_error=True)
nodes = ray.nodes()
# Count unique site indices from tunnel IPs (127.0.X.Y → X is site index)
site_indices = set()
for n in nodes:
if n.get('Alive'):
ip = n.get('NodeManagerAddress', '')
parts = ip.split('.')
if len(parts) == 4 and parts[0] == '127' and parts[1] == '0':
site_indices.add(int(parts[2]))
ray.shutdown()
print(len(site_indices))
" 2>/dev/null || echo "0")
echo "Existing remote worker sites: ${EXISTING_SITES}"
# Pass values to downstream jobs
echo "RAY_HEAD_IP=${RAY_HEAD_IP}" >> "${OUTPUTS}"
echo "SESSION_PORT=${SESSION_PORT}" >> "${OUTPUTS}"
echo "PYTHON_VERSION=${PYTHON_VERSION}" >> "${OUTPUTS}"
echo "SITE_INDEX_OFFSET=${EXISTING_SITES}" >> "${OUTPUTS}"
echo "=========================================="
echo "Cluster validated — ready to add workers"
echo "=========================================="
# =============================================================================
# DISPATCH WORKERS — Add new Ray workers to the existing cluster
# =============================================================================
dispatch_workers:
needs: [validate_cluster, checkout]
ssh:
remoteHost: ${{ inputs.head.resource.ip }}
steps:
- name: Dispatch additional workers
early-cancel: any-job-failed
run: |
set -e
export RAY_VERSION="${{ inputs.ray_settings.ray_version }}"
export DASHBOARD_PORT="${{ needs.validate_cluster.outputs.SESSION_PORT }}"
export RAY_HEAD_IP="${{ needs.validate_cluster.outputs.RAY_HEAD_IP }}"
export PYTHON_VERSION="${{ needs.validate_cluster.outputs.PYTHON_VERSION }}"
export WORKERS_JSON='${{ inputs.workers }}'
export HEAD_RESOURCE_NAME='${{ inputs.head.resource.name }}'
export SITE_INDEX_OFFSET="${{ needs.validate_cluster.outputs.SITE_INDEX_OFFSET }}"
export DISPATCH_AND_EXIT=true
bash scripts/dispatch_workers.sh
cleanup: |
echo "dispatch_workers cleanup — cancelling added workers..."
export WORKERS_JSON='${{ inputs.workers }}'
export PW_USER="${PW_USER}"
export DASHBOARD_PORT="${{ needs.validate_cluster.outputs.SESSION_PORT }}"
export SITE_INDEX_OFFSET="${{ needs.validate_cluster.outputs.SITE_INDEX_OFFSET }}"
export PW_CMD=""
for try_cmd in pw ~/pw/pw; do
command -v ${try_cmd} &>/dev/null && { PW_CMD=${try_cmd}; break; }
[ -x "${try_cmd}" ] && { PW_CMD=${try_cmd}; break; }
done
export PW_CMD
python3 -c "
import json, os, subprocess
workers = json.loads(os.environ.get('WORKERS_JSON', '[]'))
pw_user = os.environ.get('PW_USER', '')
pw_cmd = os.environ.get('PW_CMD', 'pw')
# Build scheduler type lookup from pw cluster ls
cluster_types = {}
try:
out = subprocess.check_output([pw_cmd, 'cluster', 'ls'], text=True, timeout=10)
for line in out.strip().split('\n'):
parts = line.split()
if len(parts) >= 3 and parts[0].startswith('pw://'):
cname = parts[0].split('/')[-1]
ctype = parts[2]
if 'slurm' in ctype: cluster_types[cname] = 'slurm'
elif 'pbs' in ctype: cluster_types[cname] = 'pbs'
else: cluster_types[cname] = ctype
except Exception as e:
print(f'Warning: could not list clusters: {e}')
seen = set()
for w in workers:
res = w.get('resource', {})
if isinstance(res, str):
name = res
sched = cluster_types.get(name, '')
else:
name = res.get('name', res.get('ip', ''))
sched = res.get('schedulerType', cluster_types.get(name, ''))
if not name or name in seen:
continue
seen.add(name)
print(f'Cleaning up {name} (scheduler={sched})...')
ssh_base = [
'ssh', '-i', os.path.expanduser('~/.ssh/pwcli'),
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'ConnectTimeout=15',
'-o', f'ProxyCommand={pw_cmd} ssh --proxy-command %h',
f'{pw_user}@{name}',
]
if sched == 'slurm':
subprocess.run(
ssh_base + [
'WORK=\${PW_PARENT_JOB_DIR:-\${HOME}/pw/jobs/ray_worker_remote}; '
'if [ -f \"\${WORK}/slurm_jobid\" ]; then '
' jid=\$(cat \"\${WORK}/slurm_jobid\"); '
' echo \"Cancelling SLURM job \${jid}\"; '
' scancel \"\${jid}\" 2>/dev/null; '
'fi; '
'scancel --name=\"ray-worker-*\" 2>/dev/null; '
'ray stop --force 2>/dev/null; '
'echo \"SLURM cleanup done\"'],
timeout=30, capture_output=False
)
elif sched == 'pbs':
subprocess.run(
ssh_base + [
'WORK=\${PW_PARENT_JOB_DIR:-\${HOME}/pw/jobs/ray_worker_remote}; '
'if [ -f \"\${WORK}/pbs_jobid\" ]; then '
' jid=\$(cat \"\${WORK}/pbs_jobid\"); '
' echo \"Cancelling PBS job \${jid}\"; '
' qdel \"\${jid}\" 2>/dev/null; '
'fi; '
'ray stop --force 2>/dev/null; '
'echo \"PBS cleanup done\"'],
timeout=30, capture_output=False
)
else:
subprocess.run(
ssh_base + ['ray stop --force 2>/dev/null || true'],
timeout=30, capture_output=False
)
" 2>&1 || echo "Remote cleanup had errors (non-fatal)"
# Notify dashboard to remove these sites from the topology immediately
# (done after remote ray stop so nodes are already disconnecting)
OFFSET=${SITE_INDEX_OFFSET:-0}
SITE_IDX=$((2 + OFFSET))
NUM_WORKERS=$(echo "${WORKERS_JSON}" | python3 -c "import sys,json; print(len(json.loads(sys.stdin.read())))" 2>/dev/null || echo "0")
for i in $(seq 0 $((NUM_WORKERS - 1))); do
SITE_ID="site-${SITE_IDX}"
echo "Removing ${SITE_ID} from dashboard topology..."
curl -s -X POST "http://localhost:${DASHBOARD_PORT}/api/remove_site" \
-H "Content-Type: application/json" \
-d "{\"site_id\": \"${SITE_ID}\"}" 2>&1 || true
SITE_IDX=$((SITE_IDX + 1))
done
echo "Add-worker cleanup done."
# =============================================================================
# CONFIRM — Report that workers have been added
# =============================================================================
confirm:
needs: [validate_cluster]
steps:
- name: Workers added
run: |
echo "=========================================="
echo "Additional workers dispatched!"
echo "=========================================="
echo " Ray head: ${{ needs.validate_cluster.outputs.RAY_HEAD_IP }}:6379"
echo ""
echo " New workers are connecting to the existing cluster."
echo " Check the dashboard to see them appear."
echo "=========================================="
"on":
execute:
inputs:
# -----------------------------------------------------------------------
# Head Node (must be the same resource running the original cluster)
# -----------------------------------------------------------------------
head:
type: group
label: Head Node
items:
resource:
label: Resource
type: compute-clusters
include-workspace: true
autoselect: true
tooltip: |
Select the same resource running the Ray head node.
This is used to read cluster coordination files and dispatch workers.
# -----------------------------------------------------------------------
# Existing Cluster Job Directory
# -----------------------------------------------------------------------
cluster_job_dir:
type: string
label: Cluster Job Directory
default: "auto"
tooltip: |
The job directory of the running Ray cluster workflow. Set to "auto" to
automatically find the latest Ray cluster run, or specify a full path
(e.g., ~/pw/jobs/raycluster/00032).
# -----------------------------------------------------------------------
# New Compute Workers
# -----------------------------------------------------------------------
workers:
type: list
label: New Compute Workers
tooltip: Add one or more compute worker sites to attach to the existing Ray cluster.
template:
resource:
label: Resource
type: compute-clusters
include-workspace: false
tooltip: Select a compute resource for new Ray workers.
scheduler:
type: boolean
default: true
label: Schedule Job?
hidden: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' && inputs.workers.[index].resource.schedulerType != 'pbs' }}
ignore: ${{ .hidden }}
tooltip: |
Yes - Workers are submitted to the scheduler using sbatch, qsub, etc. (recommended)
No - Workers run directly on the login/controller node via SSH.
slurm:
type: group
label: SLURM Directives
hidden: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
items:
is_disabled:
type: boolean
hidden: true
default: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
partition:
type: slurm-partitions
label: Partition
optional: true
resource: ${{ inputs.workers.[index].resource }}
tooltip: Select a SLURM partition
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
account:
label: Account
type: slurm-accounts
resource: ${{ inputs.workers.[index].resource }}
tooltip: SLURM account (--account)
optional: true
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
qos:
label: QoS
type: slurm-qos
resource: ${{ inputs.workers.[index].resource }}
optional: true
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
nodes:
label: Nodes
type: number
slider: true
default: 1
min: 1
max: 16
step: 1
tooltip: Number of compute nodes to request. Each node runs a Ray worker.
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
gres:
label: GPUs per Node (gres)
type: dropdown
default: ""
tooltip: "GPUs per node (--gres). For example, 4 GPUs/node × 2 nodes = 8 GPUs total."
options:
- value: ""
label: "None (CPU only)"
- value: "gpu:1"
label: "1 GPU/node"
- value: "gpu:2"
label: "2 GPUs/node"
- value: "gpu:4"
label: "4 GPUs/node"
- value: "gpu:8"
label: "8 GPUs/node"
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
time:
label: Walltime
type: string
default: "01:00:00"
tooltip: "SLURM walltime (HH:MM:SS). Workers terminate when this expires."
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
scheduler_directives:
type: editor
label: Additional Directives
default: "##SBATCH --constraint=mla # uncomment for Navy and AFRL DSRC systems"
optional: true
tooltip: Additional SLURM directives (include #SBATCH prefix)
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'slurm' || inputs.workers.[index].scheduler == false }}
pbs:
type: group
label: PBS Directives
hidden: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
items:
is_disabled:
type: boolean
hidden: true
default: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
queue:
label: Queue
type: string
optional: true
tooltip: PBS queue name (-q). Leave empty to use the default queue.
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
account:
label: Account
type: string
optional: true
tooltip: PBS account/project (-A)
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
nodes:
label: Nodes
type: number
slider: true
default: 1
min: 1
max: 16
step: 1
tooltip: Number of compute nodes to request. Each node runs a Ray worker.
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
select:
label: Resource Selection
type: string
optional: true
placeholder: "1:ncpus=44:ngpus=4"
tooltip: "PBS resource selection string (-l select=). Overrides the Nodes slider when set. Example: 1:ncpus=44:ngpus=4"
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
walltime:
label: Walltime
type: string
default: "01:00:00"
tooltip: "PBS walltime (HH:MM:SS). Workers terminate when this expires."
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
scheduler_directives:
label: Additional Directives
type: editor
tooltip: Additional PBS directives (include #PBS prefix)
optional: true
ignore: ${{ inputs.workers.[index].resource.schedulerType != 'pbs' || inputs.workers.[index].scheduler == false }}
# -----------------------------------------------------------------------
# Ray Settings (must match existing cluster)
# -----------------------------------------------------------------------
ray_settings:
type: group
label: Ray Configuration
items:
ray_version:
type: dropdown
label: Ray Version
tooltip: Must match the Ray version of the running cluster
default: "2.40.0"
options:
- value: "2.40.0"
label: "2.40.0 (latest stable)"
- value: "2.39.0"
label: "2.39.0"
- value: "2.38.0"
label: "2.38.0"