diff --git a/CHANGELOG.md b/CHANGELOG.md index f9ee4b9..fbfeb75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ versioning for public release policy decisions. ## [Unreleased] +### Changed + +- Unify init'd and stateless apply dispatch behind a single `ApplyTarget` abstraction so `core-ops apply --source-repo PATH` gains the streaming + interactive output that `core-ops apply` (init'd) has had since spec/006. Previously stateless apply printed a single wall-of-text summary at the end regardless of TTY state because `apply_with_report_stateless` was a separate batch-only entry point that never picked up the spec/006 streaming/interactive variants. ## [2.2.3] - 2026-05-07 diff --git a/Cargo.lock b/Cargo.lock index 826b824..87fca05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,7 +157,7 @@ checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" [[package]] name = "core-ops" -version = "2.2.3" +version = "2.2.4" dependencies = [ "clap", "libc", diff --git a/Cargo.toml b/Cargo.toml index 5a749f5..cc8b122 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "core-ops" -version = "2.2.3" +version = "2.2.4" edition = "2021" license = "AGPL-3.0-or-later" diff --git a/changes/refactor-apply-unify-streaming.md b/changes/refactor-apply-unify-streaming.md new file mode 100644 index 0000000..6c9ed27 --- /dev/null +++ b/changes/refactor-apply-unify-streaming.md @@ -0,0 +1,7 @@ +--- +change_id: refactor-apply-unify-streaming +release_intent: patch +summary: Unify init'd and stateless apply dispatch behind a single `ApplyTarget` abstraction so `core-ops apply --source-repo PATH` gains the streaming + interactive output that `core-ops apply` (init'd) has had since spec/006. Previously stateless apply printed a single wall-of-text summary at the end regardless of TTY state because `apply_with_report_stateless` was a separate batch-only entry point that never picked up the spec/006 streaming/interactive variants. +scope: cli +release_preparation: false +--- diff --git a/src/cli/agent.rs b/src/cli/agent.rs index 1ecdcab..864a10b 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -68,11 +68,9 @@ pub fn run_agent(config: &AgentConfig) -> Result { .map_err(|err| CoreError::new(FailureClass::Apply, err.to_string()))?; let result = apply_cmd::apply_with_report( - &repo, - &rev, + &apply_cmd::ApplyTarget::initd(&repo, &rev, Some(state_path.clone())), &config.quadlet_dir, config.reload_systemd, - Some(state_path.clone()), ); let release_result = lock diff --git a/src/cli/apply.rs b/src/cli/apply.rs index 73e8808..0ca0e98 100644 --- a/src/cli/apply.rs +++ b/src/cli/apply.rs @@ -70,16 +70,129 @@ pub struct ApplyReportBundle { pub plan: crate::core::types::ReconciliationPlan, } +/// Source of the desired state plus state-backend behaviour for an +/// `apply` invocation. Init'd mode (`core-ops apply` after +/// `core-ops init`) reads + writes the persisted controller state +/// at `state_path`; stateless mode (`--source-repo PATH`) keeps no +/// persistent state by design (FR-013 / SC-009 of spec/017). +/// +/// All three streaming variants +/// (`apply_with_report`, `apply_with_report_streaming`, +/// `apply_with_report_streaming_interactive`) accept `&ApplyTarget` +/// so the same TTY-detection ladder in `src/main.rs` can drive both +/// modes without duplicate dispatch logic. +#[derive(Clone, Debug)] +pub enum ApplyTarget { + Initd { + repo_source: String, + revision: String, + state_path: Option, + }, + Stateless { + source: crate::io::source_ref::StatelessSource, + }, +} + +impl ApplyTarget { + pub fn initd( + repo_source: impl Into, + revision: impl Into, + state_path: Option, + ) -> Self { + Self::Initd { + repo_source: repo_source.into(), + revision: revision.into(), + state_path, + } + } + + pub fn stateless(source: crate::io::source_ref::StatelessSource) -> Self { + Self::Stateless { source } + } + + pub fn state_path(&self) -> Option<&Path> { + match self { + Self::Initd { state_path, .. } => state_path.as_deref(), + Self::Stateless { .. } => None, + } + } + + pub fn is_stateless(&self) -> bool { + matches!(self, Self::Stateless { .. }) + } + + /// String to record as the apply event's `repo_source` (audit / + /// persisted state). For init'd mode, the operator's repo URL or + /// path; for stateless mode, the canonical source-repo path. + pub fn repo_source_for_record(&self) -> &str { + match self { + Self::Initd { repo_source, .. } => repo_source, + Self::Stateless { source } => &source.requested_repository, + } + } + + /// Revision string to record. For init'd mode, the operator's + /// requested ref. For stateless mode, the path-based provenance + /// sentinel (`` / `(stateless)` / `(stateless+dirty)`). + pub fn revision_for_record(&self) -> &str { + match self { + Self::Initd { revision, .. } => revision, + Self::Stateless { source } => &source.requested_ref, + } + } + + fn load_desired( + &self, + ) -> Result { + match self { + Self::Initd { + repo_source, + revision, + .. + } => load_desired_state(repo_source, revision).map_err(map_plan_error), + Self::Stateless { source } => crate::io::repo::load_desired_state_from_path( + &source.repo_path, + &source.requested_repository, + &source.requested_ref, + ) + // Per `contracts/cli-flag.md` Error semantics: a path that + // exists as a directory but is not a spec/016-conformant + // source-repo (missing services/, legacy artifacts, etc.) + // exits 65 (`EX_DATAERR`). + .map_err(|err| CoreError::with_exit_code(FailureClass::Plan, err.to_string(), 65)), + } + } + + /// Pick the run-display state for header rendering. Init'd mode + /// uses the three-way `last_applied + observed` heuristic; stateless + /// mode uses the dedicated `Stateless` variant when the host has + /// observed objects (the controller cannot distinguish a successful + /// prior apply from a failed one without `/var/lib/core-ops/`), + /// falling back to `FirstRun` for an empty host. + fn classify_run_display( + &self, + last_applied: Option<&str>, + observed: &crate::core::types::NormalizedSnapshot, + ) -> ApplyRunDisplayState { + if self.is_stateless() { + if observed.objects.is_empty() { + ApplyRunDisplayState::FirstRun + } else { + ApplyRunDisplayState::Stateless + } + } else { + classify_apply_run_display_state(last_applied, observed) + } + } +} + pub fn apply_with_report( - repo_source: &str, - revision: &str, + target: &ApplyTarget, quadlet_dir: &Path, reload_systemd: bool, - state_path: Option, ) -> Result { - let repo_source = repo_source.to_string(); let deps = ReconcileDependencies { - load_desired: &|| load_desired_state(&repo_source, revision).map_err(map_plan_error), + load_desired: &|| target.load_desired(), read_observed: &|desired| { read_observed_state(quadlet_dir, Some(desired), None).map_err(map_plan_error) }, @@ -96,10 +209,11 @@ pub fn apply_with_report( let desired_snapshot = build_desired_snapshot_from_state(&plan_result.desired, &scope_id); let observed_snapshot = build_observed_snapshot(&observed_before, Some(&plan_result.desired), &scope_id); - let last_applied_revision = last_applied_revision_from_state(state_path.as_deref())?; - let last_applied_snapshot = last_applied_snapshot_for_scope(state_path.as_deref(), &scope_id)?; + let state_path = target.state_path(); + let last_applied_revision = last_applied_revision_from_state(state_path)?; + let last_applied_snapshot = last_applied_snapshot_for_scope(state_path, &scope_id)?; let run_display_state = - classify_apply_run_display_state(last_applied_revision.as_deref(), &observed_snapshot); + target.classify_run_display(last_applied_revision.as_deref(), &observed_snapshot); let verification_results_before = normalize_verification_results_for_desired( &plan_result.desired, verify_state(&plan_result.desired, &observed_before), @@ -125,12 +239,12 @@ pub fn apply_with_report( deterministic.last_applied_requested_ref = last_applied_snapshot .as_ref() .and_then(|snapshot| snapshot.requested_ref.clone()); - let attempt = match state_path.as_ref() { + let attempt = match state_path { Some(path) => Some( persist_in_progress_state( path, - &repo_source, - revision, + target.repo_source_for_record(), + target.revision_for_record(), &plan_result.desired.revision_id, None, ) @@ -174,7 +288,7 @@ pub fn apply_with_report( let result_report = format_result_output_report(&result_view); let result_machine_report = format_result_output_json(&result_view); - if let Some(status_path) = state_path.as_ref() { + if let Some(status_path) = state_path { let deterministic_state_path = deterministic_state_path(status_path); let mut deterministic_state = load_or_init_deterministic_state(&deterministic_state_path).map_err(map_apply_error)?; @@ -200,15 +314,15 @@ pub fn apply_with_report( .map_err(map_apply_error)?; } } - if let (Some(path), Some(attempt)) = (state_path.as_ref(), attempt.as_ref()) { + if let (Some(path), Some(attempt)) = (state_path, attempt.as_ref()) { let status = match result.run.status { RunStatus::Success => ReconciliationStatus::Success, RunStatus::Failure => ReconciliationStatus::Failed, }; persist_finished_state( path, - &repo_source, - revision, + target.repo_source_for_record(), + target.revision_for_record(), &result.desired.revision_id, None, attempt, @@ -228,21 +342,17 @@ pub fn apply_with_report( } pub fn apply_with_report_streaming( - repo_source: &str, - revision: &str, + target: &ApplyTarget, quadlet_dir: &Path, reload_systemd: bool, - state_path: Option, mode: ApplyHumanMode, emit: F, ) -> Result where F: FnMut(&str), { - let repo_source = repo_source.to_string(); - let plan_deps = ReconcileDependencies { - load_desired: &|| load_desired_state(&repo_source, revision).map_err(map_plan_error), + load_desired: &|| target.load_desired(), read_observed: &|desired| { read_observed_state(quadlet_dir, Some(desired), None).map_err(map_plan_error) }, @@ -254,10 +364,11 @@ where let desired_snapshot = build_desired_snapshot_from_state(&plan_result.desired, &scope_id); let observed_snapshot = build_observed_snapshot(&observed_before, Some(&plan_result.desired), &scope_id); - let last_applied_revision = last_applied_revision_from_state(state_path.as_deref())?; - let last_applied_snapshot = last_applied_snapshot_for_scope(state_path.as_deref(), &scope_id)?; + let state_path = target.state_path(); + let last_applied_revision = last_applied_revision_from_state(state_path)?; + let last_applied_snapshot = last_applied_snapshot_for_scope(state_path, &scope_id)?; let run_display_state = - classify_apply_run_display_state(last_applied_revision.as_deref(), &observed_snapshot); + target.classify_run_display(last_applied_revision.as_deref(), &observed_snapshot); let verification_results_before = normalize_verification_results_for_desired( &plan_result.desired, verify_state(&plan_result.desired, &observed_before), @@ -291,11 +402,9 @@ where let emit = RefCell::new(emit); emit.borrow_mut()(&renderer.borrow().begin()); - let stream_state_path = state_path.clone(); let stream_quadlet_dir = quadlet_dir.to_path_buf(); - let stream_repo_source = repo_source.clone(); let deps = ReconcileDependencies { - load_desired: &|| load_desired_state(&stream_repo_source, revision).map_err(map_plan_error), + load_desired: &|| target.load_desired(), read_observed: &|desired| { read_observed_state(&stream_quadlet_dir, Some(desired), None).map_err(map_plan_error) }, @@ -328,12 +437,12 @@ where }, }; - let attempt = match stream_state_path.as_ref() { + let attempt = match state_path { Some(path) => Some( persist_in_progress_state( path, - &repo_source, - revision, + target.repo_source_for_record(), + target.revision_for_record(), &plan_result.desired.revision_id, None, ) @@ -383,7 +492,7 @@ where let result_report = format_result_output_report(&result_view); let result_machine_report = format_result_output_json(&result_view); - if let Some(status_path) = stream_state_path.as_ref() { + if let Some(status_path) = state_path { let deterministic_state_path = deterministic_state_path(status_path); let mut deterministic_state = load_or_init_deterministic_state(&deterministic_state_path).map_err(map_apply_error)?; @@ -409,15 +518,15 @@ where .map_err(map_apply_error)?; } } - if let (Some(path), Some(attempt)) = (stream_state_path.as_ref(), attempt.as_ref()) { + if let (Some(path), Some(attempt)) = (state_path, attempt.as_ref()) { let status = match result.run.status { RunStatus::Success => ReconciliationStatus::Success, RunStatus::Failure => ReconciliationStatus::Failed, }; persist_finished_state( path, - &repo_source, - revision, + target.repo_source_for_record(), + target.revision_for_record(), &result.desired.revision_id, None, attempt, @@ -438,21 +547,17 @@ where } pub fn apply_with_report_streaming_interactive( - repo_source: &str, - revision: &str, + target: &ApplyTarget, quadlet_dir: &Path, reload_systemd: bool, - state_path: Option, mode: ApplyHumanMode, emit: F, ) -> Result where F: FnMut(ApplyInteractiveEvent), { - let repo_source = repo_source.to_string(); - let plan_deps = ReconcileDependencies { - load_desired: &|| load_desired_state(&repo_source, revision).map_err(map_plan_error), + load_desired: &|| target.load_desired(), read_observed: &|desired| { read_observed_state(quadlet_dir, Some(desired), None).map_err(map_plan_error) }, @@ -464,10 +569,11 @@ where let desired_snapshot = build_desired_snapshot_from_state(&plan_result.desired, &scope_id); let observed_snapshot = build_observed_snapshot(&observed_before, Some(&plan_result.desired), &scope_id); - let last_applied_revision = last_applied_revision_from_state(state_path.as_deref())?; - let last_applied_snapshot = last_applied_snapshot_for_scope(state_path.as_deref(), &scope_id)?; + let state_path = target.state_path(); + let last_applied_revision = last_applied_revision_from_state(state_path)?; + let last_applied_snapshot = last_applied_snapshot_for_scope(state_path, &scope_id)?; let run_display_state = - classify_apply_run_display_state(last_applied_revision.as_deref(), &observed_snapshot); + target.classify_run_display(last_applied_revision.as_deref(), &observed_snapshot); let verification_results_before = normalize_verification_results_for_desired( &plan_result.desired, verify_state(&plan_result.desired, &observed_before), @@ -501,11 +607,9 @@ where let emit = RefCell::new(emit); emit.borrow_mut()(renderer.borrow().begin_interactive()); - let stream_state_path = state_path.clone(); let stream_quadlet_dir = quadlet_dir.to_path_buf(); - let stream_repo_source = repo_source.clone(); let deps = ReconcileDependencies { - load_desired: &|| load_desired_state(&stream_repo_source, revision).map_err(map_plan_error), + load_desired: &|| target.load_desired(), read_observed: &|desired| { read_observed_state(&stream_quadlet_dir, Some(desired), None).map_err(map_plan_error) }, @@ -545,12 +649,12 @@ where }, }; - let attempt = match stream_state_path.as_ref() { + let attempt = match state_path { Some(path) => Some( persist_in_progress_state( path, - &repo_source, - revision, + target.repo_source_for_record(), + target.revision_for_record(), &plan_result.desired.revision_id, None, ) @@ -600,7 +704,7 @@ where let result_report = format_result_output_report(&result_view); let result_machine_report = format_result_output_json(&result_view); - if let Some(status_path) = stream_state_path.as_ref() { + if let Some(status_path) = state_path { let deterministic_state_path = deterministic_state_path(status_path); let mut deterministic_state = load_or_init_deterministic_state(&deterministic_state_path).map_err(map_apply_error)?; @@ -626,15 +730,15 @@ where .map_err(map_apply_error)?; } } - if let (Some(path), Some(attempt)) = (stream_state_path.as_ref(), attempt.as_ref()) { + if let (Some(path), Some(attempt)) = (state_path, attempt.as_ref()) { let status = match result.run.status { RunStatus::Success => ReconciliationStatus::Success, RunStatus::Failure => ReconciliationStatus::Failed, }; persist_finished_state( path, - &repo_source, - revision, + target.repo_source_for_record(), + target.revision_for_record(), &result.desired.revision_id, None, attempt, @@ -762,11 +866,9 @@ pub fn execute_rollback_with_report( } let output = apply_with_report( - repo_source, - target_revision_id, + &ApplyTarget::initd(repo_source, target_revision_id, Some(state_path.clone())), quadlet_dir, reload_systemd, - Some(state_path.clone()), )?; // After a successful rollback, mark controller as Detached. @@ -827,134 +929,6 @@ pub fn execute_rollback_with_report( }) } -/// Stateless apply entry point (spec/017): converges host state from a -/// filesystem-resident source-repo without consulting or mutating the -/// persisted controller state at `/var/lib/core-ops/status.json`. -/// -/// Per FR-013 and SC-009, stateless apply MUST NOT mutate the -/// init'd-mode persisted `desired_state.repository` / -/// `desired_state.requested_ref`. This function achieves that by -/// performing zero state-file I/O — no `persist_in_progress_state`, -/// no `persist_finished_state`, no deterministic-state writes. The -/// audit chain (emitted by the caller) is the persisted record. -/// -/// `source` carries the canonical path plus path-based provenance -/// strings produced by [`crate::io::source_ref::detect_provenance`]. -pub fn apply_with_report_stateless( - source: &crate::io::source_ref::StatelessSource, - quadlet_dir: &Path, - reload_systemd: bool, -) -> Result { - let repo_path = source.repo_path.clone(); - let requested_repository = source.requested_repository.clone(); - let requested_ref = source.requested_ref.clone(); - let deps = ReconcileDependencies { - load_desired: &|| { - crate::io::repo::load_desired_state_from_path( - &repo_path, - &requested_repository, - &requested_ref, - ) - // Per `contracts/cli-flag.md` Error semantics: a path that - // exists as a directory but is not a spec/016-conformant - // source-repo (missing services/, legacy artifacts, etc.) - // exits 65 (`EX_DATAERR`). Thread the documented exit code - // through `CoreError.exit_code` so automation can classify - // by status alone. - .map_err(|err| { - CoreError::with_exit_code(FailureClass::Plan, err.to_string(), 65) - }) - }, - read_observed: &|desired| { - read_observed_state(quadlet_dir, Some(desired), None).map_err(map_plan_error) - }, - apply_plan: &|plan, desired| { - apply_plan_with_desired(plan, desired, quadlet_dir, reload_systemd) - .map(|_| ()) - .map_err(map_apply_error) - }, - }; - - let plan_result = reconcile_plan(&deps)?; - let observed_before = (deps.read_observed)(&plan_result.desired)?; - let scope_id = scope_id_for_observed(&observed_before); - let desired_snapshot = build_desired_snapshot_from_state(&plan_result.desired, &scope_id); - let observed_snapshot = - build_observed_snapshot(&observed_before, Some(&plan_result.desired), &scope_id); - let verification_results_before = normalize_verification_results_for_desired( - &plan_result.desired, - verify_state(&plan_result.desired, &observed_before), - ); - // Stateless mode has no last_applied baseline (init'd state is - // intentionally not consulted). Treat as FirstRun semantically. - let mut deterministic = reconcile_deterministic_plan_with_runtime( - &desired_snapshot, - None, - &observed_snapshot, - &verification_results_before, - )? - .plan; - deterministic.requested_repository = plan_result.desired.requested_repository.clone(); - deterministic.requested_ref = plan_result.desired.requested_ref.clone(); - - let result = reconcile_apply_with_retry(&deps, DEFAULT_RETRY_BUDGET)?; - if result - .desired - .mount_declarations - .iter() - .any(|mount| mount.automount) - { - deterministic.scope_id = scope_id.clone(); - } - // Stateless mode: empty host → FirstRun (informative for an actual - // initial apply); non-empty host → Stateless (suppress the - // misleading "recovery from failed initial apply" suffix — the - // controller cannot distinguish a successful prior apply from a - // failed one in stateless mode). The `(stateless)` path-based - // provenance prefix carries the operationally relevant signal. - let run_display_state = if observed_snapshot.objects.is_empty() { - ApplyRunDisplayState::FirstRun - } else { - ApplyRunDisplayState::Stateless - }; - let human_report = format_apply_output_report( - &deterministic, - &result.verification_results, - result.convergence.as_ref(), - ApplyHumanMode::Default, - run_display_state, - ); - let verbose_report = format_apply_output_report( - &deterministic, - &result.verification_results, - result.convergence.as_ref(), - ApplyHumanMode::Verbose, - run_display_state, - ); - let machine_report = format_apply_output_json( - &deterministic, - &result.verification_results, - result.convergence.as_ref(), - ); - let result_view = build_result_output( - &deterministic, - &result.verification_results, - result.convergence.as_ref(), - ); - let result_report = format_result_output_report(&result_view); - let result_machine_report = format_result_output_json(&result_view); - - Ok(ApplyReportBundle { - result, - human_report, - verbose_report, - machine_report, - result_report, - result_machine_report, - plan: plan_result.plan, - }) -} - /// Build a synthetic `PersistedProvenanceState` for stateless apply /// so that the audit event surfaces path-based provenance plus the /// actual run outcome (success / failure) without consulting any diff --git a/src/main.rs b/src/main.rs index 2138550..b2f3c46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -140,73 +140,30 @@ fn run(cli: Cli) -> Result<(), CoreError> { // mutate /var/lib/core-ops/status.json (FR-013, SC-009). // Audit records are written; the persisted controller state // is left byte-identical pre/post. - if let Some(source_repo) = args.source_repo { - if rollback_to.is_some() { - return Err(CoreError::new( - FailureClass::Apply, - "--rollback-to is incompatible with stateless --source-repo \ - (rollback requires the persisted retention chain set by 'core-ops init')" - .to_string(), - )); - } - let source = detect_provenance(&source_repo).map_err(map_source_ref_error)?; - let output = apply_cmd::apply_with_report_stateless( - &source, - &quadlet_dir, - !no_reload, - )?; - let run = output.result.run.clone(); - let synthetic = apply_cmd::synthetic_stateless_provenance( - output - .result - .desired - .requested_repository - .as_deref() - .unwrap_or(""), - output - .result - .desired - .requested_ref - .as_deref() - .unwrap_or(""), - &output.result.desired.revision_id, - run.status.clone(), - ); - let event = core_ops::core::audit::build_audit_event( - &run, - Some(&output.plan), - &output.result.verification_results, - Some(&synthetic), - ); - audit_io::emit_journal_event(&event).map_err(map_apply_error)?; - if let Some(dir) = audit_dir { - let mut record = core_ops::core::audit::build_audit_record( - &run.run_id, - Vec::new(), - &output.plan, - output.result.verification_results.clone(), - ); - record - .operator_messages - .push(core_ops::core::audit::summarize_evaluation( - &output.result.desired, - )); - let _ = audit_io::write_audit_record(&dir, &record).map_err(map_apply_error)?; - } - if json { - println!("{}", output.machine_report); - } else if verbose { - println!("{}", output.verbose_report); - } else { - println!("{}", output.human_report); - } - if run.status == RunStatus::Failure { - std::process::exit(1); - } - return Ok(()); + // + // Stateless and init'd share the apply dispatch ladder + // below: the same `ApplyTarget` abstraction feeds the same + // three streaming variants (`apply_with_report{,_streaming,_streaming_interactive}`). + // The single divergence is that rollback (`--rollback-to`) + // is init'd-only — it requires the persisted retention + // chain `core-ops init` sets up. + if args.source_repo.is_some() && rollback_to.is_some() { + return Err(CoreError::new( + FailureClass::Apply, + "--rollback-to is incompatible with stateless --source-repo \ + (rollback requires the persisted retention chain set by 'core-ops init')" + .to_string(), + )); } - let state_file = if args.force_no_state { + let stateless_source = if let Some(source_repo) = args.source_repo.as_ref() { + Some(detect_provenance(source_repo).map_err(map_source_ref_error)?) + } else { + None + }; + + let state_file = if stateless_source.is_some() || args.force_no_state { + // Stateless mode: never read or write `/var/lib/core-ops/status.json`. None } else { Some(resolve_state_file(args.state_file)) @@ -224,66 +181,86 @@ fn run(cli: Cli) -> Result<(), CoreError> { state_file.clone(), rollback_plan_only, )? - } else if json { - let (repo_source, rev) = resolve_from_state(state_file.clone())?; - apply_cmd::apply_with_report( - &repo_source, - &rev, - &quadlet_dir, - !no_reload, - state_file.clone(), - )? } else { - let (repo_source, rev) = resolve_from_state(state_file.clone())?; - let stdout = io::stdout(); - let interactive = stdout.is_terminal(); - streamed_human_output = true; - let mode = if verbose { - core_ops::cli::report::ApplyHumanMode::Verbose + let target = if let Some(source) = stateless_source.as_ref() { + apply_cmd::ApplyTarget::stateless(source.clone()) } else { - core_ops::cli::report::ApplyHumanMode::Default + let (repo_source, rev) = resolve_from_state(state_file.clone())?; + apply_cmd::ApplyTarget::initd(repo_source, rev, state_file.clone()) }; - if interactive { - let mut handle = io::stdout(); - let mut spinner = InteractiveApplyDisplay::new(); - let output = apply_cmd::apply_with_report_streaming_interactive( - &repo_source, - &rev, - &quadlet_dir, - !no_reload, - state_file.clone(), - mode, - |event| { - let _ = spinner.render(&mut handle, event); - }, - )?; - let _ = spinner.finish(&mut handle); - output + + if json { + apply_cmd::apply_with_report(&target, &quadlet_dir, !no_reload)? } else { - let mut handle = stdout.lock(); - apply_cmd::apply_with_report_streaming( - &repo_source, - &rev, - &quadlet_dir, - !no_reload, - state_file.clone(), - mode, - |chunk| { - let _ = handle.write_all(chunk.as_bytes()); - let _ = handle.flush(); - }, - )? + let stdout = io::stdout(); + let interactive = stdout.is_terminal(); + streamed_human_output = true; + let mode = if verbose { + core_ops::cli::report::ApplyHumanMode::Verbose + } else { + core_ops::cli::report::ApplyHumanMode::Default + }; + if interactive { + let mut handle = io::stdout(); + let mut spinner = InteractiveApplyDisplay::new(); + let output = apply_cmd::apply_with_report_streaming_interactive( + &target, + &quadlet_dir, + !no_reload, + mode, + |event| { + let _ = spinner.render(&mut handle, event); + }, + )?; + let _ = spinner.finish(&mut handle); + output + } else { + let mut handle = stdout.lock(); + apply_cmd::apply_with_report_streaming( + &target, + &quadlet_dir, + !no_reload, + mode, + |chunk| { + let _ = handle.write_all(chunk.as_bytes()); + let _ = handle.flush(); + }, + )? + } } }; let run = output.result.run.clone(); + // Stateless mode (no state_file) builds a synthetic + // PersistedProvenanceState carrying path-based provenance + + // the actual run outcome so the audit event surfaces the + // same fields as init'd mode (vs. the old prevailing + // `reconciliation_status = "never_run"` bug). + let stateless_provenance = stateless_source.as_ref().map(|_| { + apply_cmd::synthetic_stateless_provenance( + output + .result + .desired + .requested_repository + .as_deref() + .unwrap_or(""), + output + .result + .desired + .requested_ref + .as_deref() + .unwrap_or(""), + &output.result.desired.revision_id, + run.status.clone(), + ) + }); + let initd_persisted = state_file + .as_ref() + .and_then(|path| read_persisted_state(path).ok().flatten()); let event = core_ops::core::audit::build_audit_event( &run, Some(&output.plan), &output.result.verification_results, - state_file - .as_ref() - .and_then(|path| read_persisted_state(path).ok().flatten()) - .as_ref(), + stateless_provenance.as_ref().or(initd_persisted.as_ref()), ); audit_io::emit_journal_event(&event).map_err(map_apply_error)?; if let Some(dir) = audit_dir { diff --git a/tests/fixtures/provenance_state/valid-success.json b/tests/fixtures/provenance_state/valid-success.json index b74aa6c..dd86955 100644 --- a/tests/fixtures/provenance_state/valid-success.json +++ b/tests/fixtures/provenance_state/valid-success.json @@ -1,7 +1,7 @@ { "schema_version": 1, "controller": { - "version": "2.2.3", + "version": "2.2.4", "revision": "8f3c2ab", "build_time": "2026-03-23T10:00:00Z", "tree_state": "clean" diff --git a/tests/integration/test_apply_report.rs b/tests/integration/test_apply_report.rs index 6597483..caed8ec 100644 --- a/tests/integration/test_apply_report.rs +++ b/tests/integration/test_apply_report.rs @@ -101,7 +101,7 @@ fn apply_report_uses_phase_aware_human_and_machine_output() { let host_quadlets = temp_dir("core_ops_host_apply_report"); fs::create_dir_all(&host_quadlets).expect("create host quadlets"); - let output = apply_with_report(repo.to_str().unwrap(), &rev, &host_quadlets, false, None) + let output = apply_with_report(&core_ops::cli::apply::ApplyTarget::initd(repo.to_str().unwrap(), &rev, None), &host_quadlets, false) .expect("apply report"); assert!(output.human_report.contains("Apply for host")); @@ -144,7 +144,7 @@ fn apply_report_only_narrates_phases_in_verbose_mode() { let host_quadlets = temp_dir("core_ops_host_apply_narration"); fs::create_dir_all(&host_quadlets).expect("create host quadlets"); - let output = apply_with_report(repo.to_str().unwrap(), &rev, &host_quadlets, false, None) + let output = apply_with_report(&core_ops::cli::apply::ApplyTarget::initd(repo.to_str().unwrap(), &rev, None), &host_quadlets, false) .expect("apply report"); assert!(!output.human_report.contains("Phases")); @@ -196,11 +196,9 @@ fn apply_report_distinguishes_first_run_and_recovery_headers() { let first_run_quadlets = temp_dir("core_ops_host_apply_first_run"); fs::create_dir_all(&first_run_quadlets).expect("create host quadlets"); let first_run_output = apply_with_report( - repo.to_str().unwrap(), - &rev, + &core_ops::cli::apply::ApplyTarget::initd(repo.to_str().unwrap(), &rev, None), &first_run_quadlets, false, - None, ) .expect("first run apply report"); assert!(first_run_output.human_report.contains("(first run)")); @@ -213,11 +211,9 @@ fn apply_report_distinguishes_first_run_and_recovery_headers() { ) .expect("write residual quadlet"); let recovery_output = apply_with_report( - repo.to_str().unwrap(), - &rev, + &core_ops::cli::apply::ApplyTarget::initd(repo.to_str().unwrap(), &rev, None), &recovery_quadlets, false, - None, ) .expect("recovery apply report"); assert!(recovery_output @@ -368,11 +364,9 @@ fn apply_streaming_report_emits_progress_then_terminal_lines() { let mut transcript = String::new(); let output = apply_with_report_streaming( - repo.to_str().unwrap(), - &rev, + &core_ops::cli::apply::ApplyTarget::initd(repo.to_str().unwrap(), &rev, None), &host_quadlets, false, - None, ApplyHumanMode::Default, |chunk| transcript.push_str(chunk), ) @@ -414,11 +408,13 @@ fn apply_report_fails_when_persisted_state_path_is_unreadable() { fs::create_dir_all(&state_dir).expect("create state dir"); let err = match apply_with_report( - repo.to_str().unwrap(), - &rev, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().unwrap(), + &rev, + Some(state_dir.clone()), + ), &host_quadlets, false, - Some(state_dir.clone()), ) { Ok(_) => panic!("apply should fail on unreadable state path"), Err(err) => err, diff --git a/tests/integration/test_reboot_recovery.rs b/tests/integration/test_reboot_recovery.rs index 5b3fc2b..08e4b64 100644 --- a/tests/integration/test_reboot_recovery.rs +++ b/tests/integration/test_reboot_recovery.rs @@ -117,11 +117,13 @@ fn apply_persists_status_snapshot_across_repeat_runs() { fs::create_dir_all(&host_quadlets).expect("host quadlets"); let first = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - &rev, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + &rev, + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("first apply"); assert_eq!(first.result.run.summary, "converged"); @@ -133,11 +135,13 @@ fn apply_persists_status_snapshot_across_repeat_runs() { assert!(first_contents.contains("\"generation\": 1")); let second = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - &rev, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + &rev, + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("second apply"); assert_eq!(second.result.run.summary, "converged"); diff --git a/tests/integration/test_reconcile_provenance.rs b/tests/integration/test_reconcile_provenance.rs index 7cb2008..7b6a1de 100644 --- a/tests/integration/test_reconcile_provenance.rs +++ b/tests/integration/test_reconcile_provenance.rs @@ -137,11 +137,13 @@ fn failed_reconciliation_preserves_last_applied_revision_and_desired_state_field fs::create_dir_all(&host_quadlets).expect("host quadlets"); let first = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - "main", + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + "main", + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("first apply"); assert_eq!(first.result.run.summary, "converged"); @@ -151,11 +153,13 @@ fn failed_reconciliation_preserves_last_applied_revision_and_desired_state_field fs::write(&fail_marker, "").expect("write fail marker"); let second = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - "main", + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + "main", + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("second apply"); assert_eq!( @@ -219,11 +223,13 @@ fn desired_state_provenance_remains_host_scoped() { fs::create_dir_all(&host_quadlets).expect("host quadlets"); let result = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - "main", + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + "main", + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("apply"); assert_eq!(result.result.run.summary, "converged"); @@ -280,11 +286,13 @@ fn deterministic_apply_persists_convergence_state_next_to_status_snapshot() { fs::create_dir_all(&host_quadlets).expect("host quadlets"); let output = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - "main", + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + "main", + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("apply"); assert_eq!(output.result.run.summary, "converged"); @@ -356,21 +364,25 @@ fn apply_report_uses_retained_baseline_for_unchanged_reruns() { fs::create_dir_all(&host_quadlets).expect("host quadlets"); let first = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - "main", + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + "main", + Some(state_file.clone()), + ), &host_quadlets, true, - Some(state_file.clone()), ) .expect("first apply"); assert_eq!(first.result.run.summary, "converged"); let second = core_ops::cli::apply::apply_with_report( - repo.to_str().expect("repo path"), - "main", + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().expect("repo path"), + "main", + Some(state_file), + ), &host_quadlets, true, - Some(state_file), ) .expect("second apply"); diff --git a/tests/integration/test_rollback_lifecycle.rs b/tests/integration/test_rollback_lifecycle.rs index f538f6f..0d33120 100644 --- a/tests/integration/test_rollback_lifecycle.rs +++ b/tests/integration/test_rollback_lifecycle.rs @@ -162,11 +162,13 @@ fn rollback_from_converged_sets_detached_flag() { // Apply at rev2 to create a converged + retained snapshot state let bundle = apply_with_report( - repo.to_str().unwrap(), - &rev2, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().unwrap(), + &rev2, + Some(state_path.clone()), + ), &quadlet_dir, false, - Some(state_path.clone()), ) .expect("initial apply"); @@ -255,11 +257,13 @@ fn rollback_sets_detached_flag_even_when_already_detached() { // Apply at rev2 to build retained snapshots let bundle = apply_with_report( - repo.to_str().unwrap(), - &rev2, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().unwrap(), + &rev2, + Some(state_path.clone()), + ), &quadlet_dir, false, - Some(state_path.clone()), ) .expect("initial apply"); @@ -269,11 +273,13 @@ fn rollback_sets_detached_flag_even_when_already_detached() { // Also apply at rev1 to build retained snapshot for rev1 let bundle2 = apply_with_report( - repo.to_str().unwrap(), - &rev1, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().unwrap(), + &rev1, + Some(state_path.clone()), + ), &quadlet_dir, false, - Some(state_path.clone()), ) .expect("apply rev1"); diff --git a/tests/integration/test_stateless_apply.rs b/tests/integration/test_stateless_apply.rs index a4c87f6..726b5ce 100644 --- a/tests/integration/test_stateless_apply.rs +++ b/tests/integration/test_stateless_apply.rs @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf}; use std::process::Command as ProcessCommand; use std::time::{SystemTime, UNIX_EPOCH}; -use core_ops::cli::apply::{apply_with_report_stateless, synthetic_stateless_provenance}; +use core_ops::cli::apply::{apply_with_report, synthetic_stateless_provenance, ApplyTarget}; use core_ops::core::types::{ReconciliationStatus, RunStatus}; use core_ops::io::repo::HOST_OVERRIDE_ENV; use core_ops::io::source_ref::detect_provenance; @@ -105,7 +105,7 @@ fn stateless_apply_records_path_based_provenance_in_audit_event() { let stateless = detect_provenance(&source).expect("detect provenance"); assert_eq!(stateless.requested_ref, "(stateless+dirty)"); - let bundle = apply_with_report_stateless(&stateless, &host_quadlets, false) + let bundle = apply_with_report(&ApplyTarget::stateless(stateless.clone()), &host_quadlets, false) .expect("stateless apply"); // (a)+(b): apply produced a populated bundle. @@ -169,7 +169,7 @@ fn stateless_apply_preserves_initd_persisted_state() { let host_quadlets = temp_dir("core_ops_stateless_apply_other_qdir"); fs::create_dir_all(&host_quadlets).expect("quadlet dir"); let stateless = detect_provenance(&source).expect("detect provenance"); - let _bundle = apply_with_report_stateless(&stateless, &host_quadlets, false) + let _bundle = apply_with_report(&ApplyTarget::stateless(stateless.clone()), &host_quadlets, false) .expect("stateless apply"); // (3) Re-read the init'd state file and assert byte-identical @@ -206,7 +206,7 @@ fn stateless_apply_provenance_shapes_match_working_tree_state() { fs::create_dir_all(&qdir).expect("qdir"); let stateless = detect_provenance(&source).expect("detect provenance"); assert_eq!(stateless.requested_ref, "(stateless)"); - let bundle = apply_with_report_stateless(&stateless, &qdir, false) + let bundle = apply_with_report(&ApplyTarget::stateless(stateless.clone()), &qdir, false) .expect("apply non-git"); assert_eq!( bundle.result.desired.requested_ref.as_deref(), @@ -225,7 +225,7 @@ fn stateless_apply_provenance_shapes_match_working_tree_state() { fs::create_dir_all(&qdir).expect("qdir"); let stateless = detect_provenance(&source).expect("detect provenance"); assert_eq!(stateless.requested_ref, sha); - let bundle = apply_with_report_stateless(&stateless, &qdir, false) + let bundle = apply_with_report(&ApplyTarget::stateless(stateless.clone()), &qdir, false) .expect("apply clean"); assert_eq!(bundle.result.desired.requested_ref.as_deref(), Some(sha.as_str())); } @@ -242,7 +242,7 @@ fn stateless_apply_provenance_shapes_match_working_tree_state() { fs::create_dir_all(&qdir).expect("qdir"); let stateless = detect_provenance(&source).expect("detect provenance"); assert_eq!(stateless.requested_ref, "(stateless+dirty)"); - let bundle = apply_with_report_stateless(&stateless, &qdir, false) + let bundle = apply_with_report(&ApplyTarget::stateless(stateless.clone()), &qdir, false) .expect("apply dirty"); assert_eq!( bundle.result.desired.requested_ref.as_deref(), @@ -323,7 +323,7 @@ fn stateless_apply_then_initd_plan_against_same_tree_does_not_surface_detached_s // Stateless apply phase. let stateless = detect_provenance(&source).expect("detect provenance"); - apply_with_report_stateless(&stateless, &qdir, false).expect("stateless apply"); + apply_with_report(&ApplyTarget::stateless(stateless.clone()), &qdir, false).expect("stateless apply"); // Sanity check: after stateless apply, no canonical state file at // `state_file` was created (we explicitly never wrote one). diff --git a/tests/integration/test_status_state.rs b/tests/integration/test_status_state.rs index 59766d2..efac9ff 100644 --- a/tests/integration/test_status_state.rs +++ b/tests/integration/test_status_state.rs @@ -252,11 +252,13 @@ fn apply_creates_state_snapshot_on_first_run_from_implicit_path() { let implicit_state_path = resolve_state_file(None); let _output = apply_with_report( - repo.to_str().unwrap(), - &rev, + &core_ops::cli::apply::ApplyTarget::initd( + repo.to_str().unwrap(), + &rev, + Some(implicit_state_path), + ), &host_quadlets, false, - Some(implicit_state_path), ) .expect("apply"); @@ -323,7 +325,7 @@ fn apply_can_explicitly_opt_out_of_state_persistence() { let host_quadlets = temp.join("host_quadlets"); fs::create_dir_all(&host_quadlets).expect("host quadlets"); - let output = apply_with_report(repo.to_str().unwrap(), &rev, &host_quadlets, false, None) + let output = apply_with_report(&core_ops::cli::apply::ApplyTarget::initd(repo.to_str().unwrap(), &rev, None), &host_quadlets, false) .expect("apply"); assert!(!state_path.exists());