From 29856a9e4616bfd75b5f447fd604897819beb64f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bahad=C4=B1r=20Arda?= Date: Mon, 25 May 2026 22:21:17 +0300 Subject: [PATCH] feat(a2a): cross-device relay ingress with pairing gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second of the cross-device messaging PRs (after the pairing store in #84). Adds the receive side of the LAN relay: the endpoint a remote daemon POSTs to, gated by the operator's chosen trust model. - POST /v1/relay (circle-aware, same circleOrBearer gate as /v1/agents): a paired peer relays a message here over the LAN. The handler runs the first-contact pairing gate — an unknown sender is recorded pending (202 + a 4-digit code) and refused until the receiving operator approves; an approved sender's message is delivered. Not a code-execution surface (it enqueues into local inboxes), so circle auth + pairing approval are the trust boundary. - a2a.InstallID / InstallDisplayName: a stable per-install fingerprint (persisted UUID) the relay will present as the sender identity — peer_ids churn per session and can't anchor a pairing decision. - Registry.DeliverToLocal: enqueues a relayed message into every LOCAL agent's inbox (source != mdns), so it reaches whatever agent is live here without looping back out to other remote peers. - a2a.GlobalPairingStore: lazy process-wide store the handler reads; corrupt-file-safe (re-prompts rather than wedging delivery). Tests: relay first-contact→202+code (nothing delivered), approve→200 delivered into the local inbox, missing-field rejects; InstallID stability; DeliverToLocal skips mdns peers. Send side next: `peer send` to an mdns target relays to the remote's /v1/relay (instead of writing the local inbox) and surfaces the pairing-required response. --- internal/a2a/identity.go | 48 ++++++++++ internal/a2a/identity_test.go | 51 ++++++++++ internal/a2a/inbox.go | 31 ++++++ internal/a2a/pairing.go | 33 +++++++ internal/server/http.go | 7 ++ internal/server/relay_handler.go | 109 +++++++++++++++++++++ internal/server/relay_handler_test.go | 131 ++++++++++++++++++++++++++ 7 files changed, 410 insertions(+) create mode 100644 internal/a2a/identity.go create mode 100644 internal/a2a/identity_test.go create mode 100644 internal/server/relay_handler.go create mode 100644 internal/server/relay_handler_test.go diff --git a/internal/a2a/identity.go b/internal/a2a/identity.go new file mode 100644 index 0000000..62f84b3 --- /dev/null +++ b/internal/a2a/identity.go @@ -0,0 +1,48 @@ +// Package a2a — stable per-install identity. +// +// Cross-device pairing keys on a fingerprint that must survive daemon +// restarts: peer_ids are minted per session and churn (the heartbeat +// handler warns against caching them), so they can't anchor a trust +// decision. InstallID is the stable anchor — a UUID generated once on +// first use and persisted, presented by the relay as the sender's +// fingerprint so a device paired once stays trusted. +package a2a + +import ( + "os" + "path/filepath" + "strings" + + "github.com/cogitave/clawtool/internal/atomicfile" + "github.com/cogitave/clawtool/internal/xdg" + "github.com/google/uuid" +) + +func installIDPath() string { + return filepath.Join(xdg.ConfigDir(), "install-id") +} + +// InstallID returns this clawtool installation's stable identifier, +// minting + persisting one on first use. Best-effort persistence: if +// the write fails the freshly minted id is still returned so the +// current relay works; a later call mints a new one (the remote +// re-prompts pairing — mildly annoying, never broken). +func InstallID() string { + if b, err := os.ReadFile(installIDPath()); err == nil { + if id := strings.TrimSpace(string(b)); id != "" { + return id + } + } + id := uuid.NewString() + _ = atomicfile.WriteFileMkdir(installIDPath(), []byte(id+"\n"), 0o600, 0o700) + return id +} + +// InstallDisplayName is a human label for this device in pairing +// prompts — the OS hostname, or "clawtool" when unavailable. +func InstallDisplayName() string { + if h, err := os.Hostname(); err == nil && strings.TrimSpace(h) != "" { + return strings.TrimSpace(h) + } + return "clawtool" +} diff --git a/internal/a2a/identity_test.go b/internal/a2a/identity_test.go new file mode 100644 index 0000000..b66a655 --- /dev/null +++ b/internal/a2a/identity_test.go @@ -0,0 +1,51 @@ +package a2a + +import ( + "path/filepath" + "testing" +) + +func TestInstallID_StableAcrossCalls(t *testing.T) { + t.Setenv("XDG_CONFIG_HOME", t.TempDir()) + first := InstallID() + if first == "" { + t.Fatal("InstallID returned empty") + } + if second := InstallID(); second != first { + t.Errorf("InstallID not stable: %q != %q", first, second) + } +} + +func TestInstallDisplayName_NonEmpty(t *testing.T) { + if InstallDisplayName() == "" { + t.Error("InstallDisplayName returned empty") + } +} + +func TestDeliverToLocal_SkipsMdnsPeers(t *testing.T) { + reg := NewRegistry(filepath.Join(t.TempDir(), "peers.json")) + + local, err := reg.Register(RegisterInput{DisplayName: "local", Backend: "claude-code"}) + if err != nil { + t.Fatalf("register local: %v", err) + } + remote, err := reg.Register(RegisterInput{ + DisplayName: "remote", + Backend: "clawtool-mdns", + Metadata: map[string]string{"source": "mdns", "address": "192.168.1.9:8765"}, + }) + if err != nil { + t.Fatalf("register remote: %v", err) + } + + n := reg.DeliverToLocal(Message{Text: "hi", FromPeer: "sender"}) + if n != 1 { + t.Errorf("DeliverToLocal reached %d peers, want 1 (local only)", n) + } + if got := reg.DrainInbox(local.PeerID, true); len(got) != 1 { + t.Errorf("local peer inbox = %d, want 1", len(got)) + } + if got := reg.DrainInbox(remote.PeerID, true); len(got) != 0 { + t.Errorf("mdns peer inbox = %d, want 0 (must not loop back to remote)", len(got)) + } +} diff --git a/internal/a2a/inbox.go b/internal/a2a/inbox.go index 2f7754b..3ae671a 100644 --- a/internal/a2a/inbox.go +++ b/internal/a2a/inbox.go @@ -243,6 +243,37 @@ func (r *Registry) Broadcast(msg Message) int { return len(peerIDs) } +// DeliverToLocal enqueues msg into every LOCAL peer's inbox — peers +// whose metadata source != "mdns", i.e. agents running on THIS device +// rather than remote daemons discovered over the LAN. The cross-device +// relay (POST /v1/relay) uses it: a message arriving from another +// machine should reach whatever agent is live here, not loop back out +// to other remote peers. Each recipient gets its own ID + timestamp, +// mirroring Broadcast. Returns the recipient count — 0 means no local +// agent is registered, so the message is dropped (nobody here to read +// it yet); the relay surfaces that count so the sender knows. +func (r *Registry) DeliverToLocal(msg Message) int { + r.mu.RLock() + ids := make([]string, 0, len(r.peers)) + for id, p := range r.peers { + if p.Metadata["source"] == "mdns" { + continue + } + ids = append(ids, id) + } + r.mu.RUnlock() + sort.Strings(ids) + + for _, id := range ids { + cp := msg + cp.ToPeer = id + cp.ID = uuid.NewString() + cp.Timestamp = time.Now().UTC() + r.SendTo(id, cp) + } + return len(ids) +} + // DrainInbox returns the pending messages for peerID and clears // them (or peeks, leaving them queued). Non-existent peers return // an empty slice — the inbox is created lazily and an empty drain diff --git a/internal/a2a/pairing.go b/internal/a2a/pairing.go index fbf79c6..c189866 100644 --- a/internal/a2a/pairing.go +++ b/internal/a2a/pairing.go @@ -256,6 +256,39 @@ func cloneReq(r *PairingRequest) *PairingRequest { return &c } +// ── process-global pairing store ────────────────────────────────────── + +var ( + globalPairingMu sync.Mutex + globalPairing *PairingStore +) + +// GlobalPairingStore returns the process-wide pairing store, lazily +// loading it from disk on first access. A corrupt/unreadable file +// yields a fresh empty store rather than wedging the relay path — +// refusing all cross-device delivery because one JSON file went bad +// is worse than re-prompting pairing. Safe for concurrent use. +func GlobalPairingStore() *PairingStore { + globalPairingMu.Lock() + defer globalPairingMu.Unlock() + if globalPairing == nil { + s, err := LoadPairingStore() + if err != nil || s == nil { + s = &PairingStore{path: PairingStorePath(), requests: map[string]*PairingRequest{}} + } + globalPairing = s + } + return globalPairing +} + +// SetGlobalPairingStore overrides the global store. Tests inject a +// temp-dir-backed store with it; production never calls it. +func SetGlobalPairingStore(s *PairingStore) { + globalPairingMu.Lock() + defer globalPairingMu.Unlock() + globalPairing = s +} + // pairingCodeDigits is the length of the human-typed pairing code. // Four digits balances "easy to read aloud / type" against a 1-in- // 10000 guess for an attacker who already holds the circle key — the diff --git a/internal/server/http.go b/internal/server/http.go index e3a137d..c97df4d 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -163,6 +163,13 @@ func ServeHTTP(ctx context.Context, opts HTTPOptions) error { // Single mux entry routes all subpaths via the trailing slash. mux.Handle("/v1/peers", authed(http.HandlerFunc(handlePeers))) mux.Handle("/v1/peers/", authed(http.HandlerFunc(handlePeers))) + // /v1/relay — cross-device message ingress. Circle-aware like + // /v1/agents: a paired peer on the same circle POSTs a message + // here over the LAN. The handler enforces the operator's + // first-contact pairing gate before any delivery (it enqueues + // into local agents' inboxes, not code execution — so circle + // auth + the pairing approval are the trust boundary). + mux.Handle("/v1/relay", circleOrBearer(token)(http.HandlerFunc(handleRelay))) // /v1/biam/subscribe — SSE A2A async-push (ADR-024 Phase 4). // task-scoped, with Last-Event-ID replay against the per-task // ring buffer in internal/agents/biam.Events. diff --git a/internal/server/relay_handler.go b/internal/server/relay_handler.go new file mode 100644 index 0000000..d335bde --- /dev/null +++ b/internal/server/relay_handler.go @@ -0,0 +1,109 @@ +// Package server — POST /v1/relay, the cross-device message ingress. +// +// `clawtool peer send ` on one device relays the message +// to the target daemon's /v1/relay over the LAN, authenticated with +// the shared circle key (X-Clawtool-Circle, same gate /v1/agents +// uses). Here we enforce the operator's chosen trust model — "circle +// key + first-contact pairing approval": a sender we haven't paired +// with is recorded pending (with a short code) and the message is +// REFUSED (202, pairing_required) until the receiving operator +// approves it via `clawtool peer pair approve `. Once approved, +// the message is delivered into every local agent's inbox so whatever +// agent is live here surfaces it on its next prompt. +package server + +import ( + "encoding/json" + "net" + "net/http" + "strings" + + "github.com/cogitave/clawtool/internal/a2a" +) + +// relayInbound is the wire body a sending daemon POSTs to /v1/relay. +// FromFingerprint is the sender's stable install id (a2a.InstallID), +// not a session peer_id — that's what the pairing decision keys on. +type relayInbound struct { + FromFingerprint string `json:"from_fingerprint"` + FromDisplayName string `json:"from_display_name,omitempty"` + Text string `json:"text"` + Type a2a.MessageType `json:"type,omitempty"` + CorrelationID string `json:"correlation_id,omitempty"` +} + +func handleRelay(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Allow", http.MethodPost) + writeJSON(w, http.StatusMethodNotAllowed, map[string]any{"error": "POST only"}) + return + } + reg := a2a.GetGlobal() + if reg == nil { + writeJSON(w, http.StatusServiceUnavailable, map[string]any{ + "error": "peer registry not initialised — was clawtool daemon started with --listen?", + }) + return + } + + var in relayInbound + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid JSON body: " + err.Error()}) + return + } + if strings.TrimSpace(in.FromFingerprint) == "" { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": "from_fingerprint is required"}) + return + } + if strings.TrimSpace(in.Text) == "" { + writeJSON(w, http.StatusBadRequest, map[string]any{"error": "text is required"}) + return + } + + store := a2a.GlobalPairingStore() + req, approved, err := store.Observe(in.FromFingerprint, in.FromDisplayName, remoteHost(r)) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]any{"error": "pairing store: " + err.Error()}) + return + } + if !approved { + // First contact, still-pending, or denied. Refuse delivery and + // surface the code the receiving operator approves with. 202: + // the request was well-formed and accepted for a decision, but + // not delivered yet. + writeJSON(w, http.StatusAccepted, map[string]any{ + "pairing_required": true, + "state": req.State, + "code": req.Code, + "from": req.DisplayName, + "hint": "ask the receiving device's operator to run `clawtool peer pair approve " + req.Code + "`", + }) + return + } + + msgType := in.Type + if msgType == "" { + msgType = a2a.MsgNotification + } + delivered := reg.DeliverToLocal(a2a.Message{ + Type: msgType, + FromPeer: in.FromFingerprint, + Text: in.Text, + CorrelationID: in.CorrelationID, + }) + writeJSON(w, http.StatusOK, map[string]any{ + "delivered": true, + "delivered_to": delivered, + }) +} + +// remoteHost returns the sender's IP (host part of RemoteAddr) for the +// pairing record's advisory Address field. Best-effort — returns the +// raw RemoteAddr when it isn't host:port shaped. +func remoteHost(r *http.Request) string { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return r.RemoteAddr + } + return host +} diff --git a/internal/server/relay_handler_test.go b/internal/server/relay_handler_test.go new file mode 100644 index 0000000..51cc940 --- /dev/null +++ b/internal/server/relay_handler_test.go @@ -0,0 +1,131 @@ +package server + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "path/filepath" + "testing" + + "github.com/cogitave/clawtool/internal/a2a" +) + +// newRelayTestMux mounts /v1/relay on a fresh registry + a fresh +// temp-backed pairing store, and pre-seeds one LOCAL agent peer so +// DeliverToLocal has a recipient. Returns the server, the pairing +// store (so a test can approve), the local peer id, and cleanup. +func newRelayTestMux(t *testing.T) (*httptest.Server, *a2a.PairingStore, string, func()) { + t.Helper() + prevReg := a2a.GetGlobal() + reg := a2a.NewRegistry(filepath.Join(t.TempDir(), "peers.json")) + a2a.SetGlobal(reg) + + // A local agent peer (no source=mdns) — the relay delivers here. + local, err := reg.Register(a2a.RegisterInput{DisplayName: "local-claude", Backend: "claude-code"}) + if err != nil { + t.Fatalf("register local peer: %v", err) + } + + // Point the pairing store at a temp file so the test never touches + // the real ~/.config/clawtool/paired-peers.json. + t.Setenv("XDG_CONFIG_HOME", t.TempDir()) + store, err := a2a.LoadPairingStore() + if err != nil { + t.Fatalf("load store: %v", err) + } + a2a.SetGlobalPairingStore(store) + + mux := http.NewServeMux() + mux.Handle("/v1/relay", http.HandlerFunc(handleRelay)) + srv := httptest.NewServer(mux) + + cleanup := func() { + srv.Close() + reg.WaitForSaves() + a2a.SetGlobal(prevReg) + a2a.SetGlobalPairingStore(nil) + } + return srv, store, local.PeerID, cleanup +} + +func relayPost(t *testing.T, srv *httptest.Server, body map[string]any) (*http.Response, map[string]any) { + t.Helper() + b, _ := json.Marshal(body) + resp, err := http.Post(srv.URL+"/v1/relay", "application/json", bytes.NewReader(b)) + if err != nil { + t.Fatalf("relay POST: %v", err) + } + var out map[string]any + _ = json.NewDecoder(resp.Body).Decode(&out) + resp.Body.Close() + return resp, out +} + +func TestRelay_FirstContactRequiresPairing(t *testing.T) { + srv, _, localID, cleanup := newRelayTestMux(t) + defer cleanup() + + resp, out := relayPost(t, srv, map[string]any{ + "from_fingerprint": "install-namzu", + "from_display_name": "namzu ~", + "text": "test ediyoruz", + }) + if resp.StatusCode != http.StatusAccepted { + t.Fatalf("status = %d, want 202 (pairing required); body=%v", resp.StatusCode, out) + } + if out["pairing_required"] != true { + t.Errorf("expected pairing_required=true, got %v", out) + } + if code, _ := out["code"].(string); len(code) != 4 { + t.Errorf("expected 4-digit code, got %v", out["code"]) + } + // Nothing delivered to the local agent yet. + if msgs := a2a.GetGlobal().DrainInbox(localID, true); len(msgs) != 0 { + t.Errorf("message leaked into local inbox before approval: %d", len(msgs)) + } +} + +func TestRelay_DeliversAfterApproval(t *testing.T) { + srv, store, localID, cleanup := newRelayTestMux(t) + defer cleanup() + + // First contact → pending. + relayPost(t, srv, map[string]any{ + "from_fingerprint": "install-namzu", + "text": "ping 1", + }) + // Operator approves the sender. + if _, err := store.Approve("install-namzu"); err != nil { + t.Fatalf("approve: %v", err) + } + // Now a message goes through. + resp, out := relayPost(t, srv, map[string]any{ + "from_fingerprint": "install-namzu", + "text": "ping 2", + }) + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%v", resp.StatusCode, out) + } + if out["delivered"] != true { + t.Errorf("expected delivered=true, got %v", out) + } + msgs := a2a.GetGlobal().DrainInbox(localID, false) + if len(msgs) != 1 || msgs[0].Text != "ping 2" { + t.Fatalf("local inbox = %+v, want one 'ping 2'", msgs) + } +} + +func TestRelay_RejectsMissingFields(t *testing.T) { + srv, _, _, cleanup := newRelayTestMux(t) + defer cleanup() + + resp, _ := relayPost(t, srv, map[string]any{"text": "no fingerprint"}) + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("missing fingerprint: status = %d, want 400", resp.StatusCode) + } + resp, _ = relayPost(t, srv, map[string]any{"from_fingerprint": "fp", "text": " "}) + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("blank text: status = %d, want 400", resp.StatusCode) + } +}