Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ All notable Darc release changes should be summarized here.
- Clarify README guidance for agent setup and prompt-driven prior-session investigations.
- Show release dates in changelog version headings and have release preparation add them automatically.
- Speed up regex search for queries with a required literal prefix.
- Harden auto-refresh restart, debounce, stale lock, and service status reporting.
- Avoid killing freshly bootstrapped auto-refresh services during start.

## [0.1.4] - 2026-05-08

Expand Down
267 changes: 259 additions & 8 deletions crates/cli/src/refresh.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
env, fs,
fs::{File, OpenOptions},
io::{self, IsTerminal, Write},
io::{self, IsTerminal, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::mpsc,
time::{Duration, Instant},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

use anyhow::{Context, Result, bail};
Expand All @@ -15,7 +15,8 @@ use darc_core::{
};
use darc_paths::current_utc_timestamp;
use fs2::FileExt;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;

use crate::args::{ProviderArg, RefreshArgs};
use crate::output::{HumanStyle, print_field, print_line, print_project_warning, print_section};
Expand Down Expand Up @@ -132,6 +133,7 @@ impl<W: Write> RefreshProgressPrinter<W> {
pub(crate) const DEFAULT_WATCH_DEBOUNCE: Duration = Duration::from_secs(30);
pub(crate) const DEFAULT_WATCH_MIN_INTERVAL: Duration = Duration::from_secs(60);
pub(crate) const DEFAULT_WATCH_RECONCILE_INTERVAL: Duration = Duration::from_secs(600);
pub(crate) const REFRESH_LOCK_SCHEMA: &str = "darc.refresh.lock.v1";

/// Stores one parsed refresh invocation for one-shot and watch modes.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -164,6 +166,7 @@ pub(crate) struct WatchSettings {
/// Stores the latest foreground or service refresh state.
#[derive(Debug, Default, Clone)]
pub(crate) struct WatchState {
pub(crate) watch_identity: Option<WatchIdentity>,
pub(crate) last_event_at: Option<String>,
pub(crate) last_refresh_reason: Option<String>,
pub(crate) last_refresh_started_at: Option<String>,
Expand All @@ -180,6 +183,8 @@ pub(crate) struct WatchStatus<'a> {
pub(crate) root: String,
pub(crate) mode: &'a str,
pub(crate) running: bool,
pub(crate) watch_pid: Option<u32>,
pub(crate) watch_token: Option<&'a str>,
pub(crate) debounce: Option<String>,
pub(crate) min_interval: Option<String>,
pub(crate) reconcile_interval: Option<String>,
Expand All @@ -192,17 +197,99 @@ pub(crate) struct WatchStatus<'a> {
pub(crate) last_error: Option<&'a str>,
}

/// Identifies one running watch loop instance in the status file.
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub(crate) struct WatchIdentity {
pub(crate) pid: u32,
pub(crate) token: String,
}

impl WatchIdentity {
/// Builds identity metadata for the current watch process.
pub(crate) fn current() -> Self {
let pid = std::process::id();
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or_default();
Self {
pid,
token: format!("{pid}:{nonce}"),
}
}

/// Returns whether a status file still belongs to this watch instance.
pub(crate) fn matches_status(&self, status: &JsonValue) -> bool {
status
.get("watch_pid")
.and_then(JsonValue::as_u64)
.is_some_and(|pid| pid == u64::from(self.pid))
&& status
.get("watch_token")
.and_then(JsonValue::as_str)
.is_some_and(|token| token == self.token.as_str())
}
}

/// Stores the active refresh lock holder for diagnostics.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct RefreshLockInfo {
pub(crate) schema: String,
pub(crate) pid: u32,
pub(crate) started_at: String,
}

impl RefreshLockInfo {
/// Builds lock metadata for the current process.
pub(crate) fn current() -> Self {
Self {
schema: REFRESH_LOCK_SCHEMA.to_owned(),
pid: std::process::id(),
started_at: current_utc_timestamp(),
}
}
}

/// Describes whether the refresh lock file is currently held.
#[cfg(any(target_os = "macos", test))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum RefreshLockSnapshot {
Missing,
Available { stale_info: Option<RefreshLockInfo> },
Held { holder: Option<RefreshLockInfo> },
}

/// Holds an advisory refresh lock until dropped.
pub(crate) struct RefreshLock {
pub(crate) file: File,
}

impl Drop for RefreshLock {
fn drop(&mut self) {
let _ = clear_refresh_lock_info(&mut self.file);
let _ = self.file.unlock();
}
}

/// Marks watch status as stopped when the foreground loop exits.
pub(crate) struct WatchStatusGuard {
pub(crate) root: PathBuf,
pub(crate) identity: WatchIdentity,
}

impl WatchStatusGuard {
/// Builds one status guard for a running watch loop.
pub(crate) fn new(root: PathBuf, identity: WatchIdentity) -> Self {
Self { root, identity }
}
}

impl Drop for WatchStatusGuard {
fn drop(&mut self) {
let _ = mark_watch_status_stopped_if_current(&self.root, &self.identity);
}
}

/// Represents filesystem watcher notifications consumed by the refresh loop.
#[cfg_attr(not(target_os = "macos"), allow(dead_code))]
pub(crate) enum WatchSignal {
Expand Down Expand Up @@ -356,14 +443,19 @@ pub(crate) fn run_refresh_watch(
);
}

let mut state = WatchState::default();
let watch_identity = WatchIdentity::current();
let mut state = WatchState {
watch_identity: Some(watch_identity.clone()),
..WatchState::default()
};
write_watch_status(
&request.root,
&state,
true,
"refresh-watch",
Some(&settings),
)?;
let _status_guard = WatchStatusGuard::new(request.root.clone(), watch_identity);
run_refresh_cycle(&request, &mut state, &settings, "initial")?;

let mut dirty_since: Option<Instant> = None;
Expand All @@ -372,8 +464,7 @@ pub(crate) fn run_refresh_watch(
let timeout = watch_loop_timeout(dirty_since, last_refresh_at, &settings);
match rx.recv_timeout(timeout) {
Ok(WatchSignal::Changed) => {
state.last_event_at = Some(current_utc_timestamp());
dirty_since.get_or_insert_with(Instant::now);
record_watch_change(&mut state, &mut dirty_since, Instant::now());
write_watch_status(
&request.root,
&state,
Expand Down Expand Up @@ -401,6 +492,16 @@ pub(crate) fn run_refresh_watch(
}
}

/// Records one filesystem change and restarts the debounce quiet period.
pub(crate) fn record_watch_change(
state: &mut WatchState,
dirty_since: &mut Option<Instant>,
now: Instant,
) {
state.last_event_at = Some(current_utc_timestamp());
*dirty_since = Some(now);
}

/// Runs one watched refresh cycle and records status without terminating on refresh failure.
pub(crate) fn run_refresh_cycle(
request: &RefreshRunRequest,
Expand Down Expand Up @@ -655,22 +756,87 @@ pub(crate) fn acquire_refresh_lock(root: &Path) -> Result<RefreshLock> {
fs::create_dir_all(&run_dir)
.with_context(|| format!("failed to create {}", run_dir.display()))?;
let lock_path = run_dir.join("refresh.lock");
let file = OpenOptions::new()
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&lock_path)
.with_context(|| format!("failed to open {}", lock_path.display()))?;
file.try_lock_exclusive().with_context(|| {
let holder = read_refresh_lock_info(&lock_path)
.ok()
.flatten()
.map(|info| format!(" by process {} since {}", info.pid, info.started_at))
.unwrap_or_default();
format!(
"another Darc refresh is already running ({})",
"another Darc refresh is already running{holder} ({})",
lock_path.display()
)
})?;
write_refresh_lock_info(&mut file, &RefreshLockInfo::current())?;
Ok(RefreshLock { file })
}

/// Inspects the shared refresh lock without taking ownership of it.
#[cfg(any(target_os = "macos", test))]
pub(crate) fn inspect_refresh_lock(root: &Path) -> Result<RefreshLockSnapshot> {
let lock_path = root.join("run/refresh.lock");
if !lock_path.exists() {
return Ok(RefreshLockSnapshot::Missing);
}
let file = OpenOptions::new()
.read(true)
.write(true)
.open(&lock_path)
.with_context(|| format!("failed to open {}", lock_path.display()))?;
let info = read_refresh_lock_info(&lock_path)?;
match file.try_lock_exclusive() {
Ok(()) => {
file.unlock()
.with_context(|| format!("failed to unlock {}", lock_path.display()))?;
Ok(RefreshLockSnapshot::Available { stale_info: info })
}
Err(error) if error.kind() == io::ErrorKind::WouldBlock => {
Ok(RefreshLockSnapshot::Held { holder: info })
}
Err(error) => Err(error)
.with_context(|| format!("failed to inspect refresh lock {}", lock_path.display())),
}
}

/// Writes refresh lock holder metadata into an acquired lock file.
pub(crate) fn write_refresh_lock_info(file: &mut File, info: &RefreshLockInfo) -> Result<()> {
file.set_len(0).context("failed to truncate refresh lock")?;
file.seek(SeekFrom::Start(0))
.context("failed to rewind refresh lock")?;
serde_json::to_writer_pretty(&mut *file, info).context("failed to serialize refresh lock")?;
file.write_all(b"\n")
.context("failed to write refresh lock newline")?;
file.flush().context("failed to flush refresh lock")
}

/// Reads refresh lock holder metadata when the lock file contains it.
pub(crate) fn read_refresh_lock_info(lock_path: &Path) -> Result<Option<RefreshLockInfo>> {
let content = fs::read_to_string(lock_path)
.with_context(|| format!("failed to read {}", lock_path.display()))?;
if content.trim().is_empty() {
return Ok(None);
}
let Ok(info) = serde_json::from_str::<RefreshLockInfo>(&content) else {
return Ok(None);
};
Ok((info.schema == REFRESH_LOCK_SCHEMA).then_some(info))
}

/// Clears refresh lock holder metadata before releasing the lock.
pub(crate) fn clear_refresh_lock_info(file: &mut File) -> Result<()> {
file.set_len(0).context("failed to clear refresh lock")?;
file.seek(SeekFrom::Start(0))
.context("failed to rewind refresh lock")?;
file.flush().context("failed to flush refresh lock")
}

/// Writes the current continuous refresh status JSON.
pub(crate) fn write_watch_status(
root: &Path,
Expand All @@ -682,12 +848,15 @@ pub(crate) fn write_watch_status(
let run_dir = root.join("run");
fs::create_dir_all(&run_dir)
.with_context(|| format!("failed to create {}", run_dir.display()))?;
let watch_identity = state.watch_identity.as_ref();
let status = WatchStatus {
schema: "darc.watch.status.v1",
generated_at: current_utc_timestamp(),
root: root.display().to_string(),
mode,
running,
watch_pid: watch_identity.map(|identity| identity.pid),
watch_token: watch_identity.map(|identity| identity.token.as_str()),
debounce: settings.map(|settings| format_duration(settings.debounce)),
min_interval: settings.map(|settings| format_duration(settings.min_interval)),
reconcile_interval: settings.map(|settings| format_duration(settings.reconcile_interval)),
Expand All @@ -705,6 +874,88 @@ pub(crate) fn write_watch_status(
.with_context(|| format!("failed to write {}", status_path.display()))
}

/// Marks an existing watch status file as stopped while preserving its last refresh details.
#[cfg(any(target_os = "macos", test))]
pub(crate) fn mark_watch_status_stopped(root: &Path) -> Result<()> {
mark_watch_status_stopped_matching(root, None)
}

/// Marks watch status stopped only when it still belongs to this watch instance.
pub(crate) fn mark_watch_status_stopped_if_current(
root: &Path,
identity: &WatchIdentity,
) -> Result<()> {
mark_watch_status_stopped_matching(root, Some(identity))
}

/// Marks watch status stopped after optionally checking status ownership.
pub(crate) fn mark_watch_status_stopped_matching(
root: &Path,
expected_identity: Option<&WatchIdentity>,
) -> Result<()> {
let run_dir = root.join("run");
fs::create_dir_all(&run_dir)
.with_context(|| format!("failed to create {}", run_dir.display()))?;
let status_path = run_dir.join("status.json");
let mut status = stopped_watch_status_value(root, &status_path)?;
if expected_identity.is_some_and(|identity| !identity.matches_status(&status)) {
return Ok(());
}
let object = status.as_object_mut().expect("stopped status is an object");
object.insert(
"generated_at".to_owned(),
JsonValue::String(current_utc_timestamp()),
);
object.insert("running".to_owned(), JsonValue::Bool(false));
let content = serde_json::to_vec_pretty(&status).context("failed to serialize watch status")?;
fs::write(&status_path, content)
.with_context(|| format!("failed to write {}", status_path.display()))
}

/// Returns the status object to update when marking a watch stopped.
pub(crate) fn stopped_watch_status_value(root: &Path, status_path: &Path) -> Result<JsonValue> {
let content = match fs::read_to_string(status_path) {
Ok(content) => content,
Err(error) if error.kind() == io::ErrorKind::NotFound => {
return Ok(minimal_stopped_watch_status(root));
}
Err(error) => {
return Err(error).with_context(|| format!("failed to read {}", status_path.display()));
}
};
let Ok(status) = serde_json::from_str::<JsonValue>(&content) else {
return Ok(minimal_stopped_watch_status(root));
};
if status.is_object() {
Ok(status)
} else {
Ok(minimal_stopped_watch_status(root))
}
}

/// Builds a minimal stopped watch status for missing or malformed status files.
pub(crate) fn minimal_stopped_watch_status(root: &Path) -> JsonValue {
serde_json::json!({
"schema": "darc.watch.status.v1",
"generated_at": current_utc_timestamp(),
"root": root.display().to_string(),
"mode": "refresh-watch",
"running": false,
"watch_pid": null,
"watch_token": null,
"debounce": null,
"min_interval": null,
"reconcile_interval": null,
"poll": null,
"last_event_at": null,
"last_refresh_reason": null,
"last_refresh_started_at": null,
"last_refresh_completed_at": null,
"last_refresh_succeeded": null,
"last_error": null,
})
}

/// Installs native macOS watchers for the selected paths.
#[cfg(target_os = "macos")]
pub(crate) fn install_native_watchers(
Expand Down
Loading
Loading