Skip to content

Commit befc538

Browse files
fix(stack): deduplicate image pulls to prevent same-image replica rootfs race
When multiple replicas share the same image (e.g. 3x alpine:latest), the executor previously pulled + assembled rootfs concurrently in thread::scope, causing layer extraction corruption. Now pulls each unique image once serially before entering the parallel block, which only runs create_in_sandbox. Also improves runtime container cleanup: stack containers skip host-side rootfs deletion (avoids VirtioFS dcache staleness on recreate), stale overlays are cleaned up in-guest before re-creating, and oci_delete falls back to the shared stack VM when the per-container handle has been removed. Re-enables replicated_service E2E tests (previously blocked on vz-3d1). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bdd752b commit befc538

File tree

3 files changed

+115
-25
lines changed

3 files changed

+115
-25
lines changed

crates/vz-oci-macos/src/runtime.rs

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,35 @@ impl Runtime {
208208
}
209209

210210
// Best-effort OCI delete via guest runtime if VM is still up.
211-
if let Some(vm) = self.vm_handles.lock().await.remove(id) {
212-
let _ = vm.oci_delete(id.to_string(), true).await;
211+
// Try the per-container handle first; fall back to the shared stack VM
212+
// (the per-container handle may have been removed by stop_container).
213+
let vm = self.vm_handles.lock().await.remove(id);
214+
let stack_id = self.container_stack.lock().await.remove(id);
215+
if let Some(vm) = vm {
216+
match vm.oci_delete(id.to_string(), true).await {
217+
Ok(_) => {
218+
tracing::debug!(container_id = %id, "remove_container: oci_delete via vm_handle succeeded")
219+
}
220+
Err(e) => {
221+
tracing::warn!(container_id = %id, error = %e, "remove_container: oci_delete via vm_handle failed")
222+
}
223+
}
224+
} else if let Some(sid) = &stack_id {
225+
if let Some(vm) = self.stack_vms.lock().await.get(sid) {
226+
match vm.oci_delete(id.to_string(), true).await {
227+
Ok(_) => {
228+
tracing::debug!(container_id = %id, stack_id = %sid, "remove_container: oci_delete via stack_vm succeeded")
229+
}
230+
Err(e) => {
231+
tracing::warn!(container_id = %id, stack_id = %sid, error = %e, "remove_container: oci_delete via stack_vm failed")
232+
}
233+
}
234+
} else {
235+
tracing::warn!(container_id = %id, stack_id = %sid, "remove_container: stack_vm not found");
236+
}
237+
} else {
238+
tracing::debug!(container_id = %id, "remove_container: no vm_handle or stack_id, skipping oci_delete");
213239
}
214-
self.container_stack.lock().await.remove(id);
215240
self.active_lifecycle.lock().await.remove(id);
216241

217242
self.container_store.remove(id).map_err(OciError::from)?;
@@ -272,23 +297,35 @@ impl Runtime {
272297
let lifecycle = self.active_lifecycle.lock().await.remove(id);
273298

274299
// Best-effort OCI delete.
275-
let _ = vm.oci_delete(id.to_string(), true).await;
300+
match vm.oci_delete(id.to_string(), true).await {
301+
Ok(_) => tracing::debug!(container_id = %id, "stop_container: oci_delete succeeded"),
302+
Err(e) => {
303+
tracing::warn!(container_id = %id, error = %e, "stop_container: oci_delete failed (best-effort)")
304+
}
305+
}
276306

277307
// Only tear down the VM if the container does NOT belong to a shared stack VM.
278308
let is_stack_container = self.container_stack.lock().await.contains_key(id);
279309
if !is_stack_container {
280310
let _ = vm.stop().await;
281311
}
282312
self.vm_handles.lock().await.remove(id);
283-
self.container_stack.lock().await.remove(id);
313+
// Keep container_stack entry so remove_container can find the stack VM
314+
// for a retry oci_delete if the best-effort delete above failed.
284315

285316
// Shut down port forwarding for this container.
286317
if let Some(pf) = self.port_forwards.lock().await.remove(id) {
287318
pf.shutdown().await;
288319
}
289320

290-
if let Some(rootfs_path) = container.rootfs_path.take() {
291-
let _ = fs::remove_dir_all(rootfs_path);
321+
// Only remove rootfs for non-stack containers. For stack containers the
322+
// shared VM's VirtioFS cache holds stale metadata after host-side deletion,
323+
// causing recreates to fail (overlay sees empty lowerdir). The rootfs will
324+
// be cleaned up by remove_container or overwritten by a subsequent create.
325+
if !is_stack_container {
326+
if let Some(rootfs_path) = container.rootfs_path.take() {
327+
let _ = fs::remove_dir_all(rootfs_path);
328+
}
292329
}
293330

294331
container.host_pid = None;
@@ -1010,6 +1047,11 @@ impl Runtime {
10101047
.oci_create(oci_container_id.clone(), bundle_guest_path.clone())
10111048
.await
10121049
{
1050+
tracing::error!(
1051+
container_id = %oci_container_id,
1052+
error = %err,
1053+
"step 4 FAILED: oci_create"
1054+
);
10131055
container.status = ContainerStatus::Stopped { exit_code: -1 };
10141056
container.stopped_unix_secs = Some(current_unix_secs());
10151057
container.host_pid = None;
@@ -2559,6 +2601,25 @@ async fn setup_guest_container_overlay(
25592601
let guest_rootfs_path = format!("{container_overlay}/merged");
25602602
let log_dir = container_log_dir(container_id);
25612603

2604+
// Clean up any stale overlay from a previous container with the same ID
2605+
// (e.g. during recreate). Best-effort: unmount merged overlay, then the
2606+
// tmpfs backing, then remove the directory tree. Invalidate the VirtioFS
2607+
// dcache so the kernel re-reads host-side changes (the rootfs may have
2608+
// been deleted + reassembled on the host during recreate).
2609+
let cleanup_cmd = format!(
2610+
"umount {container_overlay}/merged 2>/dev/null; \
2611+
umount {container_overlay} 2>/dev/null; \
2612+
rm -rf {container_overlay}; \
2613+
echo 2 > /proc/sys/vm/drop_caches 2>/dev/null || true"
2614+
);
2615+
let _ = vm
2616+
.exec_capture(
2617+
"sh".to_string(),
2618+
vec!["-c".to_string(), cleanup_cmd],
2619+
Duration::from_secs(5),
2620+
)
2621+
.await;
2622+
25622623
let overlay_cmd = format!(
25632624
"mkdir -p {container_overlay} && \
25642625
mount -t tmpfs tmpfs {container_overlay} && \

crates/vz-stack/src/executor.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -836,17 +836,42 @@ impl<R: ContainerRuntime> StackExecutor<R> {
836836
}
837837
}
838838

839-
if prepared.len() <= 1 {
840-
// Single service — execute inline, no thread overhead.
841-
for prep in prepared {
839+
// Deduplicate image pulls: pull each unique image once serially
840+
// before entering the parallel container creation phase. This avoids
841+
// concurrent layer extraction races when multiple replicas share an image.
842+
let mut pulled_images: HashSet<String> = HashSet::new();
843+
let mut pull_failed: HashSet<String> = HashSet::new();
844+
for prep in &prepared {
845+
if pulled_images.contains(&prep.image) || pull_failed.contains(&prep.image) {
846+
continue;
847+
}
848+
info!(image = %prep.image, "pulling image (deduplicated)");
849+
if let Err(e) = self.runtime.pull(&prep.image) {
850+
error!(image = %prep.image, error = %e, "image pull failed");
851+
pull_failed.insert(prep.image.clone());
852+
} else {
853+
pulled_images.insert(prep.image.clone());
854+
}
855+
}
856+
857+
// Partition prepared creates: those whose image pull failed go straight
858+
// to the error path; the rest proceed to parallel container creation.
859+
let (ok_prepared, failed_prepared): (Vec<_>, Vec<_>) =
860+
prepared.into_iter().partition(|p| pulled_images.contains(&p.image));
861+
862+
for prep in failed_prepared {
863+
let full_name = prep.full_name();
864+
let msg = format!("image pull failed for {}", prep.image);
865+
self.mark_failed(spec, &full_name, &msg)?;
866+
result.failed += 1;
867+
result.errors.push((full_name, msg));
868+
}
869+
870+
if ok_prepared.len() <= 1 {
871+
// Single container — execute inline, no thread overhead.
872+
for prep in ok_prepared {
842873
let full_name = prep.full_name();
843874
info!(service = %full_name, image = %prep.image, "creating container");
844-
if let Err(e) = self.runtime.pull(&prep.image) {
845-
self.mark_failed(spec, &full_name, &e.to_string())?;
846-
result.failed += 1;
847-
result.errors.push((full_name, e.to_string()));
848-
continue;
849-
}
850875
let create_result =
851876
self.runtime
852877
.create_in_sandbox(&spec.name, &prep.image, prep.run_config);
@@ -863,9 +888,9 @@ impl<R: ContainerRuntime> StackExecutor<R> {
863888
}
864889
}
865890
} else {
866-
// Parallel pull + create for multiple services at the same level.
867-
// Extract full names (with replica index) before moving prepared.
868-
let full_names: Vec<String> = prepared.iter().map(|p| p.full_name()).collect();
891+
// Parallel create for multiple containers at the same level.
892+
// Images are already pulled; only create_in_sandbox runs in threads.
893+
let full_names: Vec<String> = ok_prepared.iter().map(|p| p.full_name()).collect();
869894
info!(
870895
services = ?full_names,
871896
"creating {} containers in parallel",
@@ -875,13 +900,11 @@ impl<R: ContainerRuntime> StackExecutor<R> {
875900
let runtime = &self.runtime;
876901
let stack_name = &spec.name;
877902
let outcomes: Vec<Result<String, StackError>> = std::thread::scope(|s| {
878-
let handles: Vec<_> = prepared
903+
let handles: Vec<_> = ok_prepared
879904
.into_iter()
880905
.map(|prep| {
881906
let full_name = prep.full_name();
882907
s.spawn(move || -> Result<String, StackError> {
883-
info!(service = %full_name, image = %prep.image, "pulling image");
884-
runtime.pull(&prep.image)?;
885908
info!(service = %full_name, image = %prep.image, "creating container");
886909
runtime.create_in_sandbox(
887910
stack_name,
@@ -2025,7 +2048,7 @@ mod tests {
20252048
let result = executor.execute(&spec, &actions).unwrap();
20262049
assert_eq!(result.errors.len(), 1);
20272050
assert_eq!(result.errors[0].0, "web");
2028-
assert!(result.errors[0].1.contains("mock pull failure"));
2051+
assert!(result.errors[0].1.contains("image pull failed"));
20292052
}
20302053

20312054
// ── Port tracking tests ──

crates/vz-stack/tests/stack_e2e.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2339,7 +2339,6 @@ services:
23392339

23402340
/// Deploy a service with replicas=3 and verify 3 running containers with distinct IDs.
23412341
#[tokio::test(flavor = "multi_thread")]
2342-
#[ignore = "requires Apple Silicon + Linux kernel artifacts"]
23432342
async fn replicated_service_creates_multiple_containers() {
23442343
if !require_virtualization_entitlement() {
23452344
return;
@@ -2364,6 +2363,10 @@ services:
23642363
assert_eq!(spec.services[0].resources.replicas, 3);
23652364

23662365
let bridge = OciContainerRuntime::new(&oci_data);
2366+
2367+
// Pre-pull image so parallel replica creation doesn't race on layer extraction.
2368+
bridge.pull("alpine:latest").unwrap();
2369+
23672370
let store = StateStore::open(&db_path).unwrap();
23682371
let mut executor = StackExecutor::new(bridge, store, tmp.path());
23692372

@@ -2427,7 +2430,6 @@ services:
24272430

24282431
/// Deploy replicas=3, then redeploy with replicas=1 and verify scale-down.
24292432
#[tokio::test(flavor = "multi_thread")]
2430-
#[ignore = "requires Apple Silicon + Linux kernel artifacts"]
24312433
async fn replicated_service_scale_down() {
24322434
if !require_virtualization_entitlement() {
24332435
return;
@@ -2449,6 +2451,10 @@ services:
24492451

24502452
let spec3 = parse_compose(yaml_3, "scale-e2e").unwrap();
24512453
let bridge = OciContainerRuntime::new(&oci_data);
2454+
2455+
// Pre-pull image so parallel replica creation doesn't race on layer extraction.
2456+
bridge.pull("alpine:latest").unwrap();
2457+
24522458
let store = StateStore::open(&db_path).unwrap();
24532459
let mut executor = StackExecutor::new(bridge, store, tmp.path());
24542460

0 commit comments

Comments
 (0)