Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/gitlawb-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ async fn main() -> Result<()> {
sync::start(
Arc::clone(&state.db),
Arc::clone(&state.config),
Arc::clone(&state.node_keypair),
state.subscribe_shutdown(),
);
info!("auto-sync worker started");
Expand Down
116 changes: 113 additions & 3 deletions crates/gitlawb-node/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
//! 2. If the repo doesn't exist locally → `git clone --mirror`.
//! 3. If it exists → `git fetch --prune` from the origin.
//! 4. Mark done or failed.
//! 5. On success, register ourselves as a replica with the origin node so
//! its `replica_count` reflects reality (best-effort, idempotent).

use std::path::Path;
use std::sync::Arc;

use gitlawb_core::identity::Keypair;
use tracing::{info, warn};

use crate::config::Config;
Expand Down Expand Up @@ -50,20 +53,23 @@ fn classify_mirror(withheld: Option<Vec<String>>) -> MirrorMode {
pub fn start(
db: Arc<Db>,
config: Arc<Config>,
keypair: Arc<Keypair>,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
) {
tokio::spawn(async move {
run(db, config, &mut shutdown_rx).await;
run(db, config, keypair, &mut shutdown_rx).await;
});
}

async fn run(
db: Arc<Db>,
config: Arc<Config>,
keypair: Arc<Keypair>,
shutdown_rx: &mut tokio::sync::watch::Receiver<bool>,
) {
let machine_id = std::env::var("FLY_MACHINE_ID").ok();
// Bound each withheld-paths lookup so a stalled peer cannot hang the worker.
// Bound each peer HTTP call (withheld-paths lookup + replica registration)
// so a stalled peer cannot hang the worker.
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
Expand All @@ -73,7 +79,7 @@ async fn run(
loop {
tokio::select! {
_ = interval.tick() => {
process_batch(&db, &config, machine_id.as_deref(), &client).await;
process_batch(&db, &config, &keypair, machine_id.as_deref(), &client).await;
}
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
Expand All @@ -88,6 +94,7 @@ async fn run(
async fn process_batch(
db: &Db,
config: &Config,
keypair: &Keypair,
machine_id: Option<&str>,
client: &reqwest::Client,
) {
Expand Down Expand Up @@ -162,6 +169,19 @@ async fn process_batch(
.await;
let _ = db.mark_sync_done(&item.id).await;
crate::metrics::record_sync_processed("done");

// Tell the origin we now host a replica so its replica_count
// reflects reality. Best-effort: idempotent on the origin and
// never fails the sync.
register_replica_with_origin(
client,
keypair,
config.public_url.as_deref(),
&origin_url,
owner_short,
repo_name,
)
.await;
}
Err(e) => {
warn!(repo = %item.repo, origin = %origin_url, err = %e, "repo sync failed");
Expand Down Expand Up @@ -196,6 +216,67 @@ async fn fetch_withheld(
Some(globs)
}

/// Signed request path for replica registration on the origin node.
fn replica_registration_path(owner: &str, repo: &str) -> String {
format!("/api/v1/repos/{owner}/{repo}/replicas")
}

/// Best-effort `PUT /api/v1/repos/{owner}/{repo}/replicas` against the origin
/// node after a successful mirror, signed with our node keypair. The origin
/// records (our DID, our public URL) and exposes it via its public replica
/// list. PUT is idempotent there (201 on first registration, 200 after), so
/// re-registering on every successful sync is safe and self-healing.
///
/// Skipped when we have no public URL to advertise. Failures are logged and
/// never affect the sync result. Reuses the worker's shared `client` (30s
/// timeout) with a tighter per-request timeout.
async fn register_replica_with_origin(
client: &reqwest::Client,
keypair: &Keypair,
public_url: Option<&str>,
origin_url: &str,
owner: &str,
repo: &str,
) {
let self_url = match public_url {
Some(u) if !u.is_empty() => u,
_ => return,
};

let path = replica_registration_path(owner, repo);
let body = serde_json::json!({ "url": self_url });
let body_bytes = match serde_json::to_vec(&body) {
Ok(b) => b,
Err(e) => {
warn!(owner, repo, err = %e, "failed to serialize replica registration");
return;
}
};

let signed = gitlawb_core::http_sig::sign_request(keypair, "PUT", &path, &body_bytes);
match client
.put(format!("{origin_url}{path}"))
.header("Content-Type", "application/json")
.header("Content-Digest", signed.content_digest)
.header("Signature-Input", signed.signature_input)
.header("Signature", signed.signature)
.body(body_bytes)
.timeout(std::time::Duration::from_secs(10))
.send()
.await
{
Ok(r) if r.status().is_success() => {
info!(owner, repo, origin = %origin_url, "registered as replica with origin");
}
Ok(r) => {
warn!(owner, repo, origin = %origin_url, status = %r.status(), "replica registration rejected by origin");
}
Err(e) => {
warn!(owner, repo, origin = %origin_url, err = %e, "replica registration request failed");
}
}
}

/// Run a git subprocess, returning an error with stderr on non-zero exit.
async fn git_run(args: &[&str]) -> anyhow::Result<()> {
let out = tokio::process::Command::new("git")
Expand Down Expand Up @@ -465,4 +546,33 @@ mod tests {
assert_eq!(git_config(&dest, "remote.origin.promisor"), "");
assert_eq!(git_config(&dest, "remote.origin.partialclonefilter"), "");
}

#[test]
fn registration_path_matches_replicas_route() {
// Must stay in sync with the route in api/mod.rs:
// PUT /api/v1/repos/:owner/:repo/replicas
assert_eq!(
replica_registration_path("z6MkOwner", "my-repo"),
"/api/v1/repos/z6MkOwner/my-repo/replicas"
);
}

#[tokio::test]
async fn registration_skipped_without_public_url() {
// No public URL to advertise → must return without sending anything.
// An unroutable origin URL would otherwise surface as a warn + delay.
let client = reqwest::Client::new();
let keypair = Keypair::generate();
register_replica_with_origin(
&client,
&keypair,
None,
"http://127.0.0.1:1", // would fail instantly if contacted
"owner",
"repo",
)
.await;
register_replica_with_origin(&client, &keypair, Some(""), "http://127.0.0.1:1", "o", "r")
.await;
}
}
Loading