diff --git a/crates/gitlawb-node/src/main.rs b/crates/gitlawb-node/src/main.rs index 1946637..87f3432 100644 --- a/crates/gitlawb-node/src/main.rs +++ b/crates/gitlawb-node/src/main.rs @@ -330,6 +330,7 @@ async fn main() -> Result<()> { sync::start( Arc::clone(&state.db), Arc::clone(&state.config), + Arc::clone(&state.node_keypair), state.subscribe_shutdown(), ); info!("auto-sync worker started"); diff --git a/crates/gitlawb-node/src/sync.rs b/crates/gitlawb-node/src/sync.rs index df41470..f1ffecc 100644 --- a/crates/gitlawb-node/src/sync.rs +++ b/crates/gitlawb-node/src/sync.rs @@ -9,10 +9,13 @@ //! 2. If the repo doesn't exist locally → `git clone --mirror`. //! 3. If it exists → `git fetch --prune` from the origin. //! 4. Mark done or failed. +//! 5. On success, register ourselves as a replica with the origin node so +//! its `replica_count` reflects reality (best-effort, idempotent). use std::path::Path; use std::sync::Arc; +use gitlawb_core::identity::Keypair; use tracing::{info, warn}; use crate::config::Config; @@ -50,20 +53,23 @@ fn classify_mirror(withheld: Option>) -> MirrorMode { pub fn start( db: Arc, config: Arc, + keypair: Arc, mut shutdown_rx: tokio::sync::watch::Receiver, ) { tokio::spawn(async move { - run(db, config, &mut shutdown_rx).await; + run(db, config, keypair, &mut shutdown_rx).await; }); } async fn run( db: Arc, config: Arc, + keypair: Arc, shutdown_rx: &mut tokio::sync::watch::Receiver, ) { let machine_id = std::env::var("FLY_MACHINE_ID").ok(); - // Bound each withheld-paths lookup so a stalled peer cannot hang the worker. + // Bound each peer HTTP call (withheld-paths lookup + replica registration) + // so a stalled peer cannot hang the worker. let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .build() @@ -73,7 +79,7 @@ async fn run( loop { tokio::select! { _ = interval.tick() => { - process_batch(&db, &config, machine_id.as_deref(), &client).await; + process_batch(&db, &config, &keypair, machine_id.as_deref(), &client).await; } _ = shutdown_rx.changed() => { if *shutdown_rx.borrow() { @@ -88,6 +94,7 @@ async fn run( async fn process_batch( db: &Db, config: &Config, + keypair: &Keypair, machine_id: Option<&str>, client: &reqwest::Client, ) { @@ -162,6 +169,19 @@ async fn process_batch( .await; let _ = db.mark_sync_done(&item.id).await; crate::metrics::record_sync_processed("done"); + + // Tell the origin we now host a replica so its replica_count + // reflects reality. Best-effort: idempotent on the origin and + // never fails the sync. + register_replica_with_origin( + client, + keypair, + config.public_url.as_deref(), + &origin_url, + owner_short, + repo_name, + ) + .await; } Err(e) => { warn!(repo = %item.repo, origin = %origin_url, err = %e, "repo sync failed"); @@ -196,6 +216,67 @@ async fn fetch_withheld( Some(globs) } +/// Signed request path for replica registration on the origin node. +fn replica_registration_path(owner: &str, repo: &str) -> String { + format!("/api/v1/repos/{owner}/{repo}/replicas") +} + +/// Best-effort `PUT /api/v1/repos/{owner}/{repo}/replicas` against the origin +/// node after a successful mirror, signed with our node keypair. The origin +/// records (our DID, our public URL) and exposes it via its public replica +/// list. PUT is idempotent there (201 on first registration, 200 after), so +/// re-registering on every successful sync is safe and self-healing. +/// +/// Skipped when we have no public URL to advertise. Failures are logged and +/// never affect the sync result. Reuses the worker's shared `client` (30s +/// timeout) with a tighter per-request timeout. +async fn register_replica_with_origin( + client: &reqwest::Client, + keypair: &Keypair, + public_url: Option<&str>, + origin_url: &str, + owner: &str, + repo: &str, +) { + let self_url = match public_url { + Some(u) if !u.is_empty() => u, + _ => return, + }; + + let path = replica_registration_path(owner, repo); + let body = serde_json::json!({ "url": self_url }); + let body_bytes = match serde_json::to_vec(&body) { + Ok(b) => b, + Err(e) => { + warn!(owner, repo, err = %e, "failed to serialize replica registration"); + return; + } + }; + + let signed = gitlawb_core::http_sig::sign_request(keypair, "PUT", &path, &body_bytes); + match client + .put(format!("{origin_url}{path}")) + .header("Content-Type", "application/json") + .header("Content-Digest", signed.content_digest) + .header("Signature-Input", signed.signature_input) + .header("Signature", signed.signature) + .body(body_bytes) + .timeout(std::time::Duration::from_secs(10)) + .send() + .await + { + Ok(r) if r.status().is_success() => { + info!(owner, repo, origin = %origin_url, "registered as replica with origin"); + } + Ok(r) => { + warn!(owner, repo, origin = %origin_url, status = %r.status(), "replica registration rejected by origin"); + } + Err(e) => { + warn!(owner, repo, origin = %origin_url, err = %e, "replica registration request failed"); + } + } +} + /// Run a git subprocess, returning an error with stderr on non-zero exit. async fn git_run(args: &[&str]) -> anyhow::Result<()> { let out = tokio::process::Command::new("git") @@ -465,4 +546,33 @@ mod tests { assert_eq!(git_config(&dest, "remote.origin.promisor"), ""); assert_eq!(git_config(&dest, "remote.origin.partialclonefilter"), ""); } + + #[test] + fn registration_path_matches_replicas_route() { + // Must stay in sync with the route in api/mod.rs: + // PUT /api/v1/repos/:owner/:repo/replicas + assert_eq!( + replica_registration_path("z6MkOwner", "my-repo"), + "/api/v1/repos/z6MkOwner/my-repo/replicas" + ); + } + + #[tokio::test] + async fn registration_skipped_without_public_url() { + // No public URL to advertise → must return without sending anything. + // An unroutable origin URL would otherwise surface as a warn + delay. + let client = reqwest::Client::new(); + let keypair = Keypair::generate(); + register_replica_with_origin( + &client, + &keypair, + None, + "http://127.0.0.1:1", // would fail instantly if contacted + "owner", + "repo", + ) + .await; + register_replica_with_origin(&client, &keypair, Some(""), "http://127.0.0.1:1", "o", "r") + .await; + } }