diff --git a/core/src/ndk/index.ts b/core/src/ndk/index.ts index ed5c34c8b..20a2e2e85 100644 --- a/core/src/ndk/index.ts +++ b/core/src/ndk/index.ts @@ -7,6 +7,8 @@ import type { NDKCacheAdapter } from "../cache/index.js"; import dedupEvent from "../events/dedup.js"; import { NDKEvent } from "../events/index.js"; import { signatureVerificationInit } from "../events/signature.js"; +import { NIP66LivenessFilter } from "../outbox/nip66.js"; +import { ThompsonSampler } from "../outbox/thompson.js"; import { OutboxTracker } from "../outbox/tracker.js"; import type { NDKAuthPolicy } from "../relay/auth-policies.js"; import { NDKRelay } from "../relay/index.js"; @@ -232,6 +234,47 @@ export interface NDKConstructorParams { */ aiGuardrails?: boolean | { skip?: Set }; + /** + * Relay URLs to fetch NIP-66 monitor data from. + * When set, dead relays will be filtered from outbox candidate sets. + * Requires monitor relays that serve kind 30166 events. + * + * @example + * ```typescript + * const ndk = new NDK({ + * nip66MonitorRelays: ['wss://relay.nostr.watch'], + * }); + * ``` + */ + nip66MonitorRelays?: string[]; + + /** + * Enable Thompson Sampling for outbox relay selection. + * When enabled, relays are scored using Bayesian learning from delivery outcomes. + * @default false + */ + enableThompsonSampling?: boolean; + + /** + * Maximum number of outbox relays to connect to. + * Works independently of Thompson Sampling. + * @default undefined (no cap) + */ + maxOutboxRelays?: number; + + /** + * Enable CG3 (Coverage Guarantee v3) for sole-source authors. + * Only effective when Thompson Sampling is enabled. + * @default true (when Thompson is enabled) + */ + enableCoverageGuarantee?: boolean; + + /** + * Fraction of maxOutboxRelays budget reserved for CG3 sole-source relays. + * @default 0.5 + */ + cgBudgetFraction?: number; + /** * Optional grace period (in seconds) for future timestamps. * @@ -357,6 +400,11 @@ export class NDK extends EventEmitter<{ public subManager: NDKSubscriptionManager; public aiGuardrails: AIGuardrails; public futureTimestampGrace?: number; + public nip66Filter?: NIP66LivenessFilter; + public thompsonSampler?: ThompsonSampler; + public maxOutboxRelays?: number; + public enableCoverageGuarantee?: boolean; + public cgBudgetFraction?: number; /** * Private storage for the signature verification function @@ -503,6 +551,20 @@ export class NDK extends EventEmitter<{ this.aiGuardrails = new AIGuardrails(opts.aiGuardrails || false); this.futureTimestampGrace = opts.futureTimestampGrace; + if (opts.nip66MonitorRelays?.length) { + this.nip66Filter = new NIP66LivenessFilter(this, { + monitorRelays: opts.nip66MonitorRelays, + }); + } + + if (opts.enableThompsonSampling) { + this.thompsonSampler = new ThompsonSampler(); + } + + this.maxOutboxRelays = opts.maxOutboxRelays; + this.enableCoverageGuarantee = opts.enableCoverageGuarantee; + this.cgBudgetFraction = opts.cgBudgetFraction; + // Trigger guardrails hook for NDK instantiation this.aiGuardrails.ndkInstantiated(this); diff --git a/core/src/outbox/coverage-guarantee.test.ts b/core/src/outbox/coverage-guarantee.test.ts new file mode 100644 index 000000000..a7bd8a350 --- /dev/null +++ b/core/src/outbox/coverage-guarantee.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it } from "vitest"; +import { applyCoverageGuarantee } from "./coverage-guarantee.js"; + +describe("applyCoverageGuarantee", () => { + it("force-selects relays for sole-source authors", () => { + const pubkeysToRelays = new Map([ + ["alice", new Set(["wss://relay1.com/"])], // sole-source + ["bob", new Set(["wss://relay1.com/", "wss://relay2.com/"])], // multi-relay + ["carol", new Set(["wss://relay3.com/"])], // sole-source + ]); + + const result = applyCoverageGuarantee(pubkeysToRelays, 20, 0.5); + + expect(result.skipped).toBe(false); + expect(result.forcedRelays.size).toBe(2); + expect(result.forcedRelays.get("wss://relay1.com/")!.has("alice")).toBe(true); + expect(result.forcedRelays.get("wss://relay3.com/")!.has("carol")).toBe(true); + // Bob is not sole-source, so shouldn't be in forced relays + expect(result.forcedRelays.get("wss://relay1.com/")!.has("bob")).toBe(false); + }); + + it("skips when sole-source relays exceed budget", () => { + const pubkeysToRelays = new Map>(); + // Create 11 sole-source authors on different relays + for (let i = 0; i < 11; i++) { + pubkeysToRelays.set(`author${i}`, new Set([`wss://relay${i}.com/`])); + } + + // maxConnections=20, budgetFraction=0.5 → budget=10 + // 11 sole-source relays >= 10 → should skip + const result = applyCoverageGuarantee(pubkeysToRelays, 20, 0.5); + + expect(result.skipped).toBe(true); + expect(result.forcedRelays.size).toBe(0); + }); + + it("respects budget cap", () => { + const pubkeysToRelays = new Map>(); + // Create 3 sole-source authors on different relays + for (let i = 0; i < 3; i++) { + pubkeysToRelays.set(`author${i}`, new Set([`wss://relay${i}.com/`])); + } + // Add multi-relay authors to avoid triggering conditional skip + pubkeysToRelays.set("multi1", new Set(["wss://relay0.com/", "wss://relay1.com/"])); + + // maxConnections=8, budgetFraction=0.5 → budget=4 + // 3 sole-source relays < 4 → should NOT skip, but return at most 3 (all fit) + const result = applyCoverageGuarantee(pubkeysToRelays, 8, 0.5); + + expect(result.skipped).toBe(false); + expect(result.forcedRelays.size).toBe(3); + + // Now test actual capping: 5 sole-source relays with budget of 6 + const pubkeysToRelays2 = new Map>(); + for (let i = 0; i < 5; i++) { + pubkeysToRelays2.set(`author${i}`, new Set([`wss://relay${i}.com/`])); + } + // maxConnections=12, budgetFraction=0.5 → budget=6, 5 sole-source < 6 → no skip + const result2 = applyCoverageGuarantee(pubkeysToRelays2, 12, 0.5); + expect(result2.skipped).toBe(false); + expect(result2.forcedRelays.size).toBe(5); + }); + + it("sorts by coverage value — relay with more sole-source authors selected first", () => { + const pubkeysToRelays = new Map([ + ["alice", new Set(["wss://popular.com/"])], // sole-source on popular + ["bob", new Set(["wss://popular.com/"])], // sole-source on popular + ["carol", new Set(["wss://popular.com/"])], // sole-source on popular + ["dave", new Set(["wss://unpopular.com/"])], // sole-source on unpopular + ]); + + // maxConnections=6, budgetFraction=0.5 → budget=3 + // 2 sole-source relays (popular + unpopular) < 3 → no skip + // Budget cap of 3 means both fit, but let's test with budget=1 (maxConn=2, fraction=1.0) + // Actually budget=floor(2*1.0)=2, so 2 sole-source relays >= 2 → skip. + // Use maxConn=10, fraction=0.3 → budget=3, 2 < 3 → no skip, both fit. + // To actually test ordering with a cap, we need 3+ sole-source relays with budget 2. + const pubkeysToRelays2 = new Map([ + ["a1", new Set(["wss://relay-a.com/"])], // sole on relay-a + ["a2", new Set(["wss://relay-a.com/"])], // sole on relay-a + ["a3", new Set(["wss://relay-a.com/"])], // sole on relay-a + ["b1", new Set(["wss://relay-b.com/"])], // sole on relay-b + ["b2", new Set(["wss://relay-b.com/"])], // sole on relay-b + ["c1", new Set(["wss://relay-c.com/"])], // sole on relay-c + ]); + // 3 sole-source relays. maxConn=8, fraction=0.5 → budget=4. 3 < 4 → no skip. + // But we want to test ordering, so cap at budget=2: maxConn=4, fraction=0.5 → budget=2, 3 >= 2 → skip. + // Try maxConn=8, fraction=0.5 → budget=4, 3<4 → no skip, all 3 fit (tests ordering but not cap) + + const result = applyCoverageGuarantee(pubkeysToRelays2, 8, 0.5); + expect(result.skipped).toBe(false); + expect(result.forcedRelays.size).toBe(3); + + // Verify ordering: first relay (relay-a with 3 authors) should be first + const entries = Array.from(result.forcedRelays.entries()); + expect(entries[0][0]).toBe("wss://relay-a.com/"); + expect(entries[0][1].size).toBe(3); + }); + + it("returns empty when no sole-source authors exist", () => { + const pubkeysToRelays = new Map([ + ["alice", new Set(["wss://relay1.com/", "wss://relay2.com/"])], + ["bob", new Set(["wss://relay2.com/", "wss://relay3.com/"])], + ]); + + const result = applyCoverageGuarantee(pubkeysToRelays, 20, 0.5); + + expect(result.skipped).toBe(false); + expect(result.forcedRelays.size).toBe(0); + }); + + it("handles empty input", () => { + const pubkeysToRelays = new Map>(); + const result = applyCoverageGuarantee(pubkeysToRelays, 20, 0.5); + + expect(result.skipped).toBe(false); + expect(result.forcedRelays.size).toBe(0); + }); +}); diff --git a/core/src/outbox/coverage-guarantee.ts b/core/src/outbox/coverage-guarantee.ts new file mode 100644 index 000000000..250702b7a --- /dev/null +++ b/core/src/outbox/coverage-guarantee.ts @@ -0,0 +1,82 @@ +import createDebug from "debug"; +import type { Hexpubkey } from "../user/index.js"; + +const d = createDebug("ndk:coverage-guarantee"); + +export interface CoverageGuaranteeResult { + /** Relays force-selected for sole-source authors: relay → sole-source pubkeys */ + forcedRelays: Map>; + /** True if CG3 was skipped (too many sole-source relays to fit in budget) */ + skipped: boolean; +} + +/** + * Apply Coverage Guarantee v3 (CG3) for sole-source authors. + * + * Protects authors who have only one write relay. Without CG3, + * Thompson Sampling may deprioritize their sole relay if it scored + * poorly on other authors. + * + * **Algorithm:** + * 1. Scan pubkeysToRelays for authors with exactly 1 relay + * 2. Group by relay: Map> + * 3. If unique sole-source relays >= budget, skip (conditional skip) + * 4. Sort sole-source relays by coverage value (most pubkeys first) + * 5. Return top relays up to budget cap + * + * @param pubkeysToRelays - Map of author → their write relays + * @param maxConnections - Maximum outbox relay connections + * @param budgetFraction - Fraction of maxConnections reserved for CG3 (default 0.5) + */ +export function applyCoverageGuarantee( + pubkeysToRelays: Map>, + maxConnections: number, + budgetFraction: number, +): CoverageGuaranteeResult { + const budget = Math.floor(maxConnections * budgetFraction); + + // Step 1-2: Find sole-source authors and group by relay + const soleSourceRelays = new Map>(); + + for (const [author, relays] of pubkeysToRelays) { + if (relays.size === 1) { + const relay = relays.values().next().value!; + const pubkeys = soleSourceRelays.get(relay) ?? new Set(); + pubkeys.add(author); + soleSourceRelays.set(relay, pubkeys); + } + } + + // Step 3: Conditional skip + if (soleSourceRelays.size >= budget) { + d( + "CG3 skipped: %d sole-source relays >= budget %d (maxConn=%d × fraction=%f)", + soleSourceRelays.size, + budget, + maxConnections, + budgetFraction, + ); + return { forcedRelays: new Map(), skipped: true }; + } + + // Step 4: Sort by coverage value (most sole-source pubkeys first) + const sorted = Array.from(soleSourceRelays.entries()).sort( + (a, b) => b[1].size - a[1].size, + ); + + // Step 5: Take top relays up to budget + const forcedRelays = new Map>(); + for (const [relay, pubkeys] of sorted) { + if (forcedRelays.size >= budget) break; + forcedRelays.set(relay, pubkeys); + } + + if (forcedRelays.size > 0) { + d("CG3 force-selected %d relays for %d sole-source authors", + forcedRelays.size, + Array.from(forcedRelays.values()).reduce((sum, s) => sum + s.size, 0), + ); + } + + return { forcedRelays, skipped: false }; +} diff --git a/core/src/outbox/index.ts b/core/src/outbox/index.ts index 3bc466418..6c2b3b215 100644 --- a/core/src/outbox/index.ts +++ b/core/src/outbox/index.ts @@ -1,6 +1,7 @@ import type { NDK } from "../ndk"; import type { NDKRelay } from "../relay"; import type { Hexpubkey } from "../user"; +import { applyCoverageGuarantee } from "./coverage-guarantee"; import { getTopRelaysForAuthors } from "./relay-ranking"; import { getRelaysForSync } from "./write"; @@ -20,9 +21,23 @@ export function getAllRelaysForAllPubkeys( const pubkeysToRelays = new Map>(); const authorsMissingRelays = new Set(); + // Trigger NIP-66 refresh (non-blocking, skips if data is fresh) + if (ndk.nip66Filter) { + ndk.nip66Filter.refresh().catch(() => {}); + } + pubkeys.forEach((pubkey) => { - const relays = getRelaysForSync(ndk, pubkey, type); + let relays = getRelaysForSync(ndk, pubkey, type); if (relays && relays.size > 0) { + // Apply NIP-66 liveness filtering if available + if (ndk.nip66Filter) { + const filtered = ndk.nip66Filter.filterAlive(relays); + // If filtering empties the set, preserve the original (don't orphan the author) + if (filtered.size > 0) { + relays = filtered; + } + } + relays.forEach((relay) => { const pubkeysInRelay = pubkeysToRelays.get(relay) || new Set(); pubkeysInRelay.add(pubkey); @@ -65,14 +80,41 @@ export function chooseRelayCombinationForPubkeys( const sortedRelays = getTopRelaysForAuthors(ndk, pubkeys); - const addAuthorToRelay = (author: Hexpubkey, relay: WebSocket["url"]) => { + // Track unique relays for maxOutboxRelays enforcement (P12) + const maxRelays = ndk.maxOutboxRelays; + const selectedRelays = new Set(); + + const addAuthorToRelay = (author: Hexpubkey, relay: WebSocket["url"]): boolean => { + // Enforce maxOutboxRelays connection cap + if (maxRelays && !selectedRelays.has(relay) && selectedRelays.size >= maxRelays) { + return false; // would exceed connection cap + } const authorsInRelay = relayToAuthorsMap.get(relay) || []; authorsInRelay.push(author); relayToAuthorsMap.set(relay, authorsInRelay); + selectedRelays.add(relay); + return true; }; + // CG3: Force-select sole-source relays before the main loop + const cgAssignedAuthors = new Set(); + if (ndk.thompsonSampler && ndk.enableCoverageGuarantee !== false) { + const maxConn = maxRelays ?? 20; + const cg = applyCoverageGuarantee(pubkeysToRelays, maxConn, ndk.cgBudgetFraction ?? 0.5); + if (!cg.skipped) { + for (const [relay, pubkeys] of cg.forcedRelays) { + for (const pk of pubkeys) { + addAuthorToRelay(pk, relay); + cgAssignedAuthors.add(pk); + } + } + } + } + // Go through the pubkeys that have relays for (const [author, authorRelays] of pubkeysToRelays.entries()) { + // Skip authors already fully assigned by CG3 + if (cgAssignedAuthors.has(author)) continue; let missingRelayCount = count; const addedRelaysForAuthor = new Set(); @@ -80,9 +122,10 @@ export function chooseRelayCombinationForPubkeys( // If we are already connected to some of this user's relays, add those first for (const relay of connectedRelays) { if (authorRelays.has(relay.url)) { - addAuthorToRelay(author, relay.url); - addedRelaysForAuthor.add(relay.url); - missingRelayCount--; + if (addAuthorToRelay(author, relay.url)) { + addedRelaysForAuthor.add(relay.url); + missingRelayCount--; + } } } @@ -91,9 +134,10 @@ export function chooseRelayCombinationForPubkeys( if (addedRelaysForAuthor.has(authorRelay)) continue; if (relayToAuthorsMap.has(authorRelay)) { - addAuthorToRelay(author, authorRelay); - addedRelaysForAuthor.add(authorRelay); - missingRelayCount--; + if (addAuthorToRelay(author, authorRelay)) { + addedRelaysForAuthor.add(authorRelay); + missingRelayCount--; + } } } @@ -108,9 +152,10 @@ export function chooseRelayCombinationForPubkeys( if (addedRelaysForAuthor.has(relay)) continue; if (authorRelays.has(relay)) { - addAuthorToRelay(author, relay); - addedRelaysForAuthor.add(relay); - missingRelayCount--; + if (addAuthorToRelay(author, relay)) { + addedRelaysForAuthor.add(relay); + missingRelayCount--; + } } } } @@ -118,9 +163,7 @@ export function chooseRelayCombinationForPubkeys( // For the pubkey that are missing relays, pool's relays for (const author of authorsMissingRelays) { pool.permanentAndConnectedRelays().forEach((relay: NDKRelay) => { - const authorsInRelay = relayToAuthorsMap.get(relay.url) || []; - authorsInRelay.push(author); - relayToAuthorsMap.set(relay.url, authorsInRelay); + addAuthorToRelay(author, relay.url); }); } diff --git a/core/src/outbox/integration.test.ts b/core/src/outbox/integration.test.ts new file mode 100644 index 000000000..3a3b36cd9 --- /dev/null +++ b/core/src/outbox/integration.test.ts @@ -0,0 +1,250 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { NDK } from "../ndk/index.js"; +import { NDKSubscription } from "../subscription/index.js"; +import { chooseRelayCombinationForPubkeys } from "./index.js"; +import { NIP66LivenessFilter } from "./nip66.js"; + +/** + * Mock getRelaysForSync to return controlled relay sets per author. + * This simulates outbox tracker data without needing real relay connections. + */ +const mockRelayData = new Map>(); + +vi.mock("./write.js", () => ({ + getRelaysForSync: vi.fn((ndk: any, pubkey: string) => { + return mockRelayData.get(pubkey) ?? null; + }), + getWriteRelaysFor: vi.fn(async (ndk: any, pubkey: string) => { + return mockRelayData.get(pubkey) ?? null; + }), +})); + +function setupMockRelayData(data: Record) { + mockRelayData.clear(); + for (const [pubkey, relays] of Object.entries(data)) { + mockRelayData.set(pubkey, new Set(relays)); + } +} + +describe("Integration: maxOutboxRelays", () => { + let ndk: NDK; + + beforeEach(() => { + ndk = new NDK({ + explicitRelayUrls: ["wss://explicit.com/"], + enableOutboxModel: false, + }); + }); + + it("maxOutboxRelays: 5 returns ≤5 unique relay URLs", () => { + ndk.maxOutboxRelays = 5; + + // Set up 10 authors, each on 2-3 unique relays (many more than 5 total) + setupMockRelayData({ + author1: ["wss://relay1.com/", "wss://relay2.com/"], + author2: ["wss://relay2.com/", "wss://relay3.com/"], + author3: ["wss://relay3.com/", "wss://relay4.com/"], + author4: ["wss://relay5.com/", "wss://relay6.com/"], + author5: ["wss://relay7.com/", "wss://relay8.com/"], + author6: ["wss://relay9.com/", "wss://relay10.com/"], + author7: ["wss://relay1.com/", "wss://relay11.com/"], + author8: ["wss://relay12.com/", "wss://relay13.com/"], + author9: ["wss://relay14.com/", "wss://relay15.com/"], + author10: ["wss://relay16.com/", "wss://relay17.com/"], + }); + + const pubkeys = Array.from({ length: 10 }, (_, i) => `author${i + 1}`); + const result = chooseRelayCombinationForPubkeys(ndk, pubkeys, "write"); + + const uniqueRelays = new Set(result.keys()); + expect(uniqueRelays.size).toBeLessThanOrEqual(5); + }); + + it("returns more relays when maxOutboxRelays is not set", () => { + // No cap + ndk.maxOutboxRelays = undefined; + + setupMockRelayData({ + author1: ["wss://relay1.com/", "wss://relay2.com/"], + author2: ["wss://relay3.com/", "wss://relay4.com/"], + author3: ["wss://relay5.com/", "wss://relay6.com/"], + author4: ["wss://relay7.com/", "wss://relay8.com/"], + }); + + const pubkeys = ["author1", "author2", "author3", "author4"]; + const result = chooseRelayCombinationForPubkeys(ndk, pubkeys, "write"); + + const uniqueRelays = new Set(result.keys()); + // Without a cap, should use more than 5 relays for 4 authors × 2 relays each + expect(uniqueRelays.size).toBeGreaterThan(4); + }); +}); + +describe("Integration: Thompson priors populate after EOSE", () => { + // Valid 64-char hex pubkeys for filter validation + const ALICE = "a".repeat(64); + const BOB = "b".repeat(64); + const SOLE = "c".repeat(64); + const MULTI = "d".repeat(64); + + it("observeDelivery updates Thompson priors from relay attribution data", () => { + const ndk = new NDK({ + explicitRelayUrls: ["wss://relay.com/"], + enableOutboxModel: false, + enableThompsonSampling: true, + }); + + expect(ndk.thompsonSampler).toBeDefined(); + expect(ndk.thompsonSampler!.relayScores.size).toBe(0); + + // Create a subscription with closeOnEose + const sub = new NDKSubscription( + ndk, + [{ kinds: [1], authors: [ALICE, BOB] }], + { closeOnEose: true }, + ); + + // Simulate what startWithRelays would set up + sub.authorRelayAssignments = new Map([ + ["wss://relay-a.com/", [ALICE, BOB]], + ["wss://relay-b.com/", [BOB]], + ]); + sub.pubkeysToRelays = new Map([ + [ALICE, new Set(["wss://relay-a.com/"])], + [BOB, new Set(["wss://relay-a.com/", "wss://relay-b.com/"])], + ]); + + // Simulate relay attribution tracking (what eventReceived would do) + // relay-a delivered events for ALICE and BOB + // relay-b delivered nothing for BOB + (sub as any)._receivedAuthorsByRelay = new Map([ + [ALICE, new Set(["wss://relay-a.com/"])], + [BOB, new Set(["wss://relay-a.com/"])], + ]); + + // Set up relayFilters so eoseReceived can compute hasSeenAllEoses + sub.relayFilters = new Map([ + ["wss://relay-a.com/", [{ kinds: [1], authors: [ALICE, BOB] }]], + ["wss://relay-b.com/", [{ kinds: [1], authors: [BOB] }]], + ]); + + // Mock the pool so eoseReceived doesn't fail + const mockRelay = (url: string) => ({ url, status: 1 } as any); + sub.eosesSeen.add(mockRelay("wss://relay-a.com/")); + sub.eosesSeen.add(mockRelay("wss://relay-b.com/")); + + // Trigger observeDelivery directly (normally called from performEose) + (sub as any).observeDelivery(); + + // Verify Thompson priors were updated + const scores = ndk.thompsonSampler!.relayScores; + expect(scores.size).toBeGreaterThan(0); + + // relay-a delivered for both alice and bob → should have successes + const relayA = scores.get("wss://relay-a.com/"); + expect(relayA).toBeDefined(); + expect(relayA!.alpha).toBeGreaterThan(1); // had deliveries + + // relay-b was assigned bob but didn't deliver → should have failure + const relayB = scores.get("wss://relay-b.com/"); + expect(relayB).toBeDefined(); + expect(relayB!.beta).toBeGreaterThan(1); // had a miss + }); + + it("skips observation on non-closeOnEose subscriptions", () => { + const ndk = new NDK({ + explicitRelayUrls: ["wss://relay.com/"], + enableOutboxModel: false, + enableThompsonSampling: true, + }); + + // closeOnEose: false (long-running subscription) + const sub = new NDKSubscription( + ndk, + [{ kinds: [1], authors: [ALICE] }], + { closeOnEose: false }, + ); + + sub.authorRelayAssignments = new Map([ + ["wss://relay-a.com/", [ALICE]], + ]); + (sub as any)._receivedAuthorsByRelay = new Map([ + [ALICE, new Set(["wss://relay-a.com/"])], + ]); + + (sub as any).observeDelivery(); + + // Should NOT have updated priors — P2 says only observe finite subs + expect(ndk.thompsonSampler!.relayScores.size).toBe(0); + }); + + it("applies 0.3x weight for sole-source authors", () => { + const ndk = new NDK({ + explicitRelayUrls: ["wss://relay.com/"], + enableOutboxModel: false, + enableThompsonSampling: true, + }); + + const sub = new NDKSubscription( + ndk, + [{ kinds: [1], authors: [SOLE, MULTI] }], + { closeOnEose: true }, + ); + + sub.authorRelayAssignments = new Map([ + ["wss://relay-a.com/", [SOLE, MULTI]], + ]); + sub.pubkeysToRelays = new Map([ + [SOLE, new Set(["wss://relay-a.com/"])], // sole-source + [MULTI, new Set(["wss://relay-a.com/", "wss://other.com/"])], // multi-relay + ]); + + (sub as any)._receivedAuthorsByRelay = new Map([ + [SOLE, new Set(["wss://relay-a.com/"])], + [MULTI, new Set(["wss://relay-a.com/"])], + ]); + + (sub as any).observeDelivery(); + + const relayA = ndk.thompsonSampler!.relayScores.get("wss://relay-a.com/")!; + // sole delivered with weight 0.3, multi delivered with weight 1.0 + // Before decay: alpha = 1 + 0.3 + 1.0 = 2.3 + // After decay (0.95): alpha = 1 + (2.3 - 1) * 0.95 = 2.235 + expect(relayA.alpha).toBeCloseTo(2.235, 5); + }); +}); + +describe("Integration: NIP-66 passes through on empty/stale monitor data", () => { + it("filterAlive returns input unchanged when no refresh has been called", () => { + const ndk = new NDK({ enableOutboxModel: false }); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.com/"], + }); + + const relays = new Set(["wss://maybe-dead.com/", "wss://alive.com/"]); + const result = filter.filterAlive(relays); + + // No data fetched → pass-through + expect(result).toEqual(relays); + }); + + it("NIP-66 integrated into outbox selection preserves authors on empty data", () => { + const ndk = new NDK({ + explicitRelayUrls: ["wss://explicit.com/"], + enableOutboxModel: false, + nip66MonitorRelays: ["wss://monitor.com/"], + }); + + expect(ndk.nip66Filter).toBeDefined(); + + // NIP-66 has no data (refresh not called) → filtering should pass through + setupMockRelayData({ + author1: ["wss://relay1.com/", "wss://relay2.com/"], + }); + + const result = chooseRelayCombinationForPubkeys(ndk, ["author1"], "write"); + + // Author should still have relay assignments despite NIP-66 being configured + expect(result.size).toBeGreaterThan(0); + }); +}); diff --git a/core/src/outbox/nip66.test.ts b/core/src/outbox/nip66.test.ts new file mode 100644 index 000000000..13d7195c0 --- /dev/null +++ b/core/src/outbox/nip66.test.ts @@ -0,0 +1,167 @@ +import { describe, expect, it, vi } from "vitest"; +import { NIP66LivenessFilter } from "./nip66.js"; + +// Minimal mock of NDK for NIP66LivenessFilter +function mockNdk(events: Array<{ tags: string[][] }> = []) { + return { + fetchEvents: vi.fn().mockResolvedValue(new Set(events)), + } as any; +} + +function makeMonitorEvent(relayUrl: string) { + return { tags: [["d", relayUrl]] }; +} + +describe("NIP66LivenessFilter", () => { + it("returns input unchanged when no data has been fetched", () => { + const ndk = mockNdk(); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + }); + + const input = new Set(["wss://relay1.com/", "wss://relay2.com/"]); + const result = filter.filterAlive(input); + expect(result).toEqual(input); + }); + + it("filters dead relays after successful refresh", async () => { + const events = [ + makeMonitorEvent("wss://relay1.com"), + makeMonitorEvent("wss://relay3.com"), + ]; + // Need >= 100 relays to pass the minAliveThreshold + for (let i = 0; i < 100; i++) { + events.push(makeMonitorEvent(`wss://filler${i}.com`)); + } + + const ndk = mockNdk(events); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + }); + + await filter.refresh(); + + const result = filter.filterAlive( + new Set(["wss://relay1.com/", "wss://relay2.com/", "wss://relay3.com/"]), + ); + + expect(result.has("wss://relay1.com/")).toBe(true); + expect(result.has("wss://relay3.com/")).toBe(true); + expect(result.has("wss://relay2.com/")).toBe(false); + }); + + it("passes through .onion relays even when they are not in alive set", async () => { + const events: Array<{ tags: string[][] }> = []; + for (let i = 0; i < 110; i++) { + events.push(makeMonitorEvent(`wss://filler${i}.com`)); + } + + const ndk = mockNdk(events); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + }); + + await filter.refresh(); + + const result = filter.filterAlive( + new Set(["wss://abc.onion/", "wss://filler0.com/"]), + ); + expect(result.has("wss://abc.onion/")).toBe(true); + expect(result.has("wss://filler0.com/")).toBe(true); + }); + + it("returns input unchanged when alive set is below minAliveThreshold", async () => { + // Only 5 relays — below default threshold of 100 + const events = [ + makeMonitorEvent("wss://relay1.com"), + makeMonitorEvent("wss://relay2.com"), + makeMonitorEvent("wss://relay3.com"), + makeMonitorEvent("wss://relay4.com"), + makeMonitorEvent("wss://relay5.com"), + ]; + + const ndk = mockNdk(events); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + }); + + await filter.refresh(); + + const input = new Set(["wss://dead.com/"]); + const result = filter.filterAlive(input); + // Should pass through since data is insufficient + expect(result).toEqual(input); + }); + + it("returns input unchanged when data is stale", async () => { + const events: Array<{ tags: string[][] }> = []; + for (let i = 0; i < 110; i++) { + events.push(makeMonitorEvent(`wss://filler${i}.com`)); + } + + const ndk = mockNdk(events); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + maxAge: 1, // 1ms — will be stale immediately + }); + + await filter.refresh(); + // Wait for data to become stale + await new Promise((r) => setTimeout(r, 5)); + + const input = new Set(["wss://dead.com/"]); + const result = filter.filterAlive(input); + expect(result).toEqual(input); + }); + + it("handles refresh failure gracefully", async () => { + const ndk = { + fetchEvents: vi.fn().mockRejectedValue(new Error("network error")), + } as any; + + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + }); + + // Should not throw + await filter.refresh(); + + const input = new Set(["wss://relay1.com/"]); + const result = filter.filterAlive(input); + expect(result).toEqual(input); + }); + + it("deduplicates concurrent refresh calls", async () => { + const events: Array<{ tags: string[][] }> = []; + for (let i = 0; i < 110; i++) { + events.push(makeMonitorEvent(`wss://filler${i}.com`)); + } + const ndk = mockNdk(events); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + }); + + // Call refresh twice concurrently + await Promise.all([filter.refresh(), filter.refresh()]); + + // fetchEvents should only be called once + expect(ndk.fetchEvents).toHaveBeenCalledTimes(1); + }); + + it("skips refresh when data is still fresh", async () => { + const events: Array<{ tags: string[][] }> = []; + for (let i = 0; i < 110; i++) { + events.push(makeMonitorEvent(`wss://filler${i}.com`)); + } + const ndk = mockNdk(events); + const filter = new NIP66LivenessFilter(ndk, { + monitorRelays: ["wss://monitor.example.com"], + maxAge: 60_000, + }); + + await filter.refresh(); + await filter.refresh(); // should skip + + expect(ndk.fetchEvents).toHaveBeenCalledTimes(1); + }); +}); diff --git a/core/src/outbox/nip66.ts b/core/src/outbox/nip66.ts new file mode 100644 index 000000000..9748531ab --- /dev/null +++ b/core/src/outbox/nip66.ts @@ -0,0 +1,138 @@ +import createDebug from "debug"; +import type { NDK } from "../ndk/index.js"; +import { normalizeRelayUrl } from "../utils/normalize-url.js"; + +const d = createDebug("ndk:nip66"); + +export interface NIP66Options { + /** Relay URLs to fetch NIP-66 monitor data from */ + monitorRelays: string[]; + /** Maximum age of cached monitor data in ms. Default: 14400000 (4 hours) */ + maxAge?: number; + /** Minimum alive relay count to trust the data. Default: 100 */ + minAliveThreshold?: number; +} + +/** + * NIP-66 relay liveness filter. + * + * Fetches relay monitor data (kind 30166) and filters dead relays + * from candidate sets before outbox relay selection. + * + * **Graceful degradation (P7):** + * - If alive set has (); + private fetchedAt = 0; + private refreshPromise: Promise | null = null; + private readonly maxAge: number; + private readonly minAliveThreshold: number; + + constructor( + private ndk: NDK, + private options: NIP66Options, + ) { + this.maxAge = options.maxAge ?? 14_400_000; // 4 hours + this.minAliveThreshold = options.minAliveThreshold ?? 100; + } + + /** + * Fetch/refresh monitor data (kind 30166 events). + * Safe to call frequently — skips if data is still fresh. + */ + async refresh(): Promise { + // Skip if data is fresh + if (this.fetchedAt > 0 && Date.now() - this.fetchedAt < this.maxAge) { + return; + } + + // Deduplicate concurrent refresh calls + if (this.refreshPromise) return this.refreshPromise; + + this.refreshPromise = this._doRefresh(); + try { + await this.refreshPromise; + } finally { + this.refreshPromise = null; + } + } + + private async _doRefresh(): Promise { + try { + const events = await this.ndk.fetchEvents( + { kinds: [30166 as any] }, + { + closeOnEose: true, + groupable: false, + relayUrls: this.options.monitorRelays, + }, + ); + + const alive = new Set(); + for (const event of events) { + // kind 30166: the "d" tag contains the relay URL + const dTag = event.tags.find((t) => t[0] === "d"); + if (dTag?.[1]) { + try { + alive.add(normalizeRelayUrl(dTag[1])); + } catch { + // skip malformed URLs + } + } + } + + if (alive.size > 0) { + this.aliveRelays = alive; + this.fetchedAt = Date.now(); + d("Refreshed NIP-66 data: %d alive relays", alive.size); + } else { + d("NIP-66 refresh returned 0 relays, keeping stale data"); + } + } catch (err) { + d("NIP-66 refresh failed: %O", err); + // Keep stale data; if no data at all, pass-through in filterAlive() + } + } + + /** + * Whether the current data is considered valid for filtering. + */ + private isDataValid(): boolean { + if (this.aliveRelays.size < this.minAliveThreshold) return false; + if (this.fetchedAt === 0) return false; + if (Date.now() - this.fetchedAt > this.maxAge) return false; + return true; + } + + /** + * Filter a set of relay URLs, removing dead relays. + * + * Returns input unchanged if data is stale/insufficient (P7 graceful degradation). + * .onion relays always pass through. + */ + filterAlive(relayUrls: Iterable): Set { + const input = new Set(relayUrls); + + if (!this.isDataValid()) { + return input; + } + + const result = new Set(); + for (const url of input) { + // .onion relays always pass through — monitors can't reach them + if (url.includes(".onion")) { + result.add(url); + continue; + } + if (this.aliveRelays.has(url)) { + result.add(url); + } + } + + return result; + } +} diff --git a/core/src/outbox/relay-ranking.ts b/core/src/outbox/relay-ranking.ts index bd85de007..423128e40 100644 --- a/core/src/outbox/relay-ranking.ts +++ b/core/src/outbox/relay-ranking.ts @@ -15,12 +15,15 @@ export function getTopRelaysForAuthors(ndk: NDK, authors: Hexpubkey[]): WebSocke } }); - /** - * TODO: Here we are sorting the relays just by number of authors that write to them. - * Here is the place where the relay scoring can be used to modify the weights of the relays. - */ + if (ndk.thompsonSampler) { + // Sample once per relay to ensure stable, transitive sort order + const scored = Array.from(relaysWithCount.entries()).map( + ([url, count]) => [url, ndk.thompsonSampler!.weightedScore(url, count)] as const, + ); + return scored.sort((a, b) => b[1] - a[1]).map((e) => e[0]); + } - // Sort the relays by the number of authors that write to them + // Fallback: existing popularity sort (unchanged default behavior) const sortedRelays = Array.from(relaysWithCount.entries()).sort((a, b) => b[1] - a[1]); return sortedRelays.map((entry) => entry[0]); diff --git a/core/src/outbox/thompson.test.ts b/core/src/outbox/thompson.test.ts new file mode 100644 index 000000000..8e48561d4 --- /dev/null +++ b/core/src/outbox/thompson.test.ts @@ -0,0 +1,169 @@ +import { describe, expect, it } from "vitest"; +import { ThompsonSampler } from "./thompson.js"; + +// Deterministic RNG — always returns 0.5 +const fixedRng = () => 0.5; + +// Incrementing RNG for differentiation tests +function seqRng() { + let i = 0; + return () => { + i = (i + 1) % 1000; + return i / 1000; + }; +} + +describe("ThompsonSampler", () => { + describe("weightedScore", () => { + it("returns weight × rng for unknown relays (uniform prior)", () => { + const sampler = new ThompsonSampler({ rng: fixedRng }); + // authorCount = 1 → weight = 1 + ln(1) = 1 + // rng() = 0.5 → score = 1 * 0.5 = 0.5 + expect(sampler.weightedScore("wss://unknown.com/", 1)).toBeCloseTo(0.5, 5); + }); + + it("scores higher for relays with more authors", () => { + const sampler = new ThompsonSampler({ rng: fixedRng }); + const score1 = sampler.weightedScore("wss://relay.com/", 1); + const score10 = sampler.weightedScore("wss://relay.com/", 10); + expect(score10).toBeGreaterThan(score1); + }); + + it("uses Beta priors when available", () => { + const sampler = new ThompsonSampler({ rng: fixedRng }); + // Set up a relay with strong success record + sampler.relayScores.set("wss://good.com/", { alpha: 10, beta: 2 }); + // Set up a relay with poor record + sampler.relayScores.set("wss://bad.com/", { alpha: 2, beta: 10 }); + + // With fixed rng, the sampling should reflect the priors + // Run many times to test the relative ordering + let goodWins = 0; + const rng = seqRng(); + const s = new ThompsonSampler({ rng }); + s.relayScores.set("wss://good.com/", { alpha: 10, beta: 2 }); + s.relayScores.set("wss://bad.com/", { alpha: 2, beta: 10 }); + + for (let i = 0; i < 100; i++) { + const good = s.weightedScore("wss://good.com/", 5); + const bad = s.weightedScore("wss://bad.com/", 5); + if (good > bad) goodWins++; + } + + // Good relay should win most of the time + expect(goodWins).toBeGreaterThan(60); + }); + }); + + describe("observe", () => { + it("updates priors on delivery", () => { + const sampler = new ThompsonSampler(); + sampler.observe("wss://relay.com/", "pubkey1", true); + + const prior = sampler.relayScores.get("wss://relay.com/")!; + expect(prior.alpha).toBe(2); // 1 + 1 + expect(prior.beta).toBe(1); // unchanged + }); + + it("updates priors on miss", () => { + const sampler = new ThompsonSampler(); + sampler.observe("wss://relay.com/", "pubkey1", false); + + const prior = sampler.relayScores.get("wss://relay.com/")!; + expect(prior.alpha).toBe(1); // unchanged + expect(prior.beta).toBe(2); // 1 + 1 + }); + + it("deduplicates observations within a round (P8)", () => { + const sampler = new ThompsonSampler(); + sampler.observe("wss://relay.com/", "pubkey1", true); + sampler.observe("wss://relay.com/", "pubkey1", true); // duplicate + sampler.observe("wss://relay.com/", "pubkey1", false); // duplicate, different result + + const prior = sampler.relayScores.get("wss://relay.com/")!; + // Only first observation counts + expect(prior.alpha).toBe(2); + expect(prior.beta).toBe(1); + }); + + it("allows same relay-author pair in new round after decay", () => { + const sampler = new ThompsonSampler(); + sampler.observe("wss://relay.com/", "pubkey1", true); + sampler.decay(); // resets dedup set + sampler.observe("wss://relay.com/", "pubkey1", true); + + const prior = sampler.relayScores.get("wss://relay.com/")!; + // After decay: alpha was 2 → 1 + (2-1)*0.95 = 1.95 + // After second observe: 1.95 + 1 = 2.95 + expect(prior.alpha).toBeCloseTo(2.95, 5); + }); + + it("supports weighted observations", () => { + const sampler = new ThompsonSampler(); + sampler.observe("wss://relay.com/", "pubkey1", true, 0.3); + + const prior = sampler.relayScores.get("wss://relay.com/")!; + expect(prior.alpha).toBeCloseTo(1.3, 5); + expect(prior.beta).toBe(1); + }); + }); + + describe("decay", () => { + it("decays priors toward uniform (1,1)", () => { + const sampler = new ThompsonSampler({ decayFactor: 0.5 }); + sampler.relayScores.set("wss://relay.com/", { alpha: 5, beta: 3 }); + sampler.decay(); + + const prior = sampler.relayScores.get("wss://relay.com/")!; + // alpha: 1 + (5-1) * 0.5 = 3 + // beta: 1 + (3-1) * 0.5 = 2 + expect(prior.alpha).toBe(3); + expect(prior.beta).toBe(2); + }); + + it("prunes near-uniform priors", () => { + const sampler = new ThompsonSampler({ decayFactor: 0.001 }); + sampler.relayScores.set("wss://relay.com/", { alpha: 1.005, beta: 1.005 }); + sampler.decay(); + + // Should be pruned since decayed values are nearly 1 + expect(sampler.relayScores.has("wss://relay.com/")).toBe(false); + }); + }); + + describe("export/import", () => { + it("round-trips priors correctly", () => { + const sampler = new ThompsonSampler(); + sampler.relayScores.set("wss://relay1.com/", { alpha: 5, beta: 3 }); + sampler.relayScores.set("wss://relay2.com/", { alpha: 2, beta: 8 }); + + const exported = sampler.exportPriors(); + const sampler2 = new ThompsonSampler(); + sampler2.importPriors(exported); + + expect(sampler2.relayScores.get("wss://relay1.com/")).toEqual({ alpha: 5, beta: 3 }); + expect(sampler2.relayScores.get("wss://relay2.com/")).toEqual({ alpha: 2, beta: 8 }); + }); + + it("clamps imported values to minimum of 1", () => { + const sampler = new ThompsonSampler(); + sampler.importPriors({ + "wss://relay.com/": { alpha: 0.5, beta: -1 }, + }); + + const prior = sampler.relayScores.get("wss://relay.com/")!; + expect(prior.alpha).toBe(1); + expect(prior.beta).toBe(1); + }); + + it("exports as JSON-serializable", () => { + const sampler = new ThompsonSampler(); + sampler.relayScores.set("wss://relay.com/", { alpha: 3, beta: 7 }); + + const json = JSON.stringify(sampler.exportPriors()); + const parsed = JSON.parse(json); + + expect(parsed["wss://relay.com/"]).toEqual({ alpha: 3, beta: 7 }); + }); + }); +}); diff --git a/core/src/outbox/thompson.ts b/core/src/outbox/thompson.ts new file mode 100644 index 000000000..a1e4919b9 --- /dev/null +++ b/core/src/outbox/thompson.ts @@ -0,0 +1,135 @@ +import createDebug from "debug"; +import { sampleBeta } from "../utils/sample-beta.js"; + +const d = createDebug("ndk:thompson"); + +export interface RelayPrior { + alpha: number; // successes + 1 + beta: number; // failures + 1 +} + +export interface ThompsonSamplerOptions { + /** Decay factor applied each observation round. Default: 0.95 */ + decayFactor?: number; + /** Injectable RNG for deterministic tests. Default: Math.random */ + rng?: () => number; +} + +/** + * Thompson Sampling relay scorer. + * + * Maintains Beta(α,β) priors per relay, updated from binary delivery + * observations after each subscription EOSE. Uses Bayesian sampling + * to balance exploration vs exploitation when ranking relays. + * + * **Key design decisions:** + * - Binary signal only (P1): hit/miss per (relay, author) pair + * - Per-round dedup (P8): multiple subs querying same relay-author + * only generate one observation per round + * - No NDKRelayScore type change (P6): uses separate RelayPrior type + * - Persistence via export/import (P9): plain JSON-serializable + */ +export class ThompsonSampler { + public relayScores = new Map(); + private observedThisRound = new Set(); // "relay|author" dedup [P8] + private readonly decayFactor: number; + private readonly rng: () => number; + private readonly debug: ReturnType; + + constructor(options?: ThompsonSamplerOptions) { + this.decayFactor = options?.decayFactor ?? 0.95; + this.rng = options?.rng ?? Math.random; + this.debug = d; + } + + /** + * Weighted Thompson score: (1 + ln(authorCount)) × sampleBeta(α, β) + * + * The log-weighted author count gives relays serving more authors a + * higher baseline, while the Beta sample introduces Bayesian exploration. + * + * P16: On invalid priors, returns fallback of (1 + ln(authorCount)) × 0.5. + */ + weightedScore(relayUrl: string, authorCount: number): number { + const weight = 1 + Math.log(Math.max(1, authorCount)); + const prior = this.relayScores.get(relayUrl); + + if (!prior) { + // No data — uniform prior + return weight * this.rng(); + } + + const sample = sampleBeta(prior.alpha, prior.beta, this.rng); + return weight * sample; + } + + /** + * Observe delivery outcome for a (relay, author) pair. + * + * P8: Deduplicates — skips if this (relay, author) pair was already + * observed this round. First observer wins. + * + * P4: Caller must skip inactive authors (authors for which no relay + * delivered events). + * + * @param weight - Observation weight (default 1.0). Use 0.3 for sole-source relays. + */ + observe(relayUrl: string, authorPubkey: string, delivered: boolean, weight = 1.0): void { + const key = `${relayUrl}|${authorPubkey}`; + if (this.observedThisRound.has(key)) return; // P8 dedup + this.observedThisRound.add(key); + + const prior = this.relayScores.get(relayUrl) ?? { alpha: 1, beta: 1 }; + + if (delivered) { + prior.alpha += weight; + } else { + prior.beta += weight; + } + + this.relayScores.set(relayUrl, prior); + } + + /** + * Apply decay and reset dedup set. + * Call once per observation round (after processing all observations for a subscription). + */ + decay(): void { + for (const [url, prior] of this.relayScores) { + prior.alpha = 1 + (prior.alpha - 1) * this.decayFactor; + prior.beta = 1 + (prior.beta - 1) * this.decayFactor; + + // Prune relays that have decayed back to near-uniform + if (Math.abs(prior.alpha - 1) < 0.01 && Math.abs(prior.beta - 1) < 0.01) { + this.relayScores.delete(url); + } + } + + this.observedThisRound.clear(); + } + + /** + * Export priors for persistence (P9). + * Returns a plain JSON-serializable object. + */ + exportPriors(): Record { + const result: Record = {}; + for (const [url, prior] of this.relayScores) { + result[url] = { alpha: prior.alpha, beta: prior.beta }; + } + return result; + } + + /** + * Import priors from persistence (P9). + * Clamps all α,β to Math.max(1, value) to prevent corrupt data. + */ + importPriors(priors: Record): void { + for (const [url, prior] of Object.entries(priors)) { + this.relayScores.set(url, { + alpha: Math.max(1, prior.alpha), + beta: Math.max(1, prior.beta), + }); + } + } +} diff --git a/core/src/subscription/index.ts b/core/src/subscription/index.ts index 382dff286..8a49a33f7 100644 --- a/core/src/subscription/index.ts +++ b/core/src/subscription/index.ts @@ -6,11 +6,13 @@ import type { NDKKind } from "../events/kinds/index.js"; import { verifiedSignatures } from "../events/validation.js"; import { wrapEvent } from "../events/wrap.js"; import type { NDK } from "../ndk/index.js"; +import { getAllRelaysForAllPubkeys } from "../outbox/index.js"; import type { NDKRelay } from "../relay"; import type { NDKPool } from "../relay/pool/index.js"; import { calculateRelaySetsFromFilters } from "../relay/sets/calculate"; import { NDKRelaySet } from "../relay/sets/index.js"; import { NDKFilterValidationMode, processFilters } from "../utils/filter-validation.js"; +import type { Hexpubkey } from "../user/index.js"; import { queryFullyFilled } from "./utils.js"; export type NDKSubscriptionInternalId = string; @@ -431,6 +433,18 @@ export class NDKSubscription extends EventEmitter<{ */ public cacheUnconstrainFilter?: Array; + /** + * Relay → authors assignment map for Thompson observation (P5). + * Set by the outbox relay selection pipeline. + */ + public authorRelayAssignments?: Map; + + /** + * Author → relays map for detecting sole-source authors during observation. + * Set by the outbox relay selection pipeline. + */ + public pubkeysToRelays?: Map>; + public constructor(ndk: NDK, filters: NDKFilter | NDKFilter[], opts?: NDKSubscriptionOptions, subId?: string) { super(); this.ndk = ndk; @@ -772,6 +786,40 @@ export class NDKSubscription extends EventEmitter<{ } } + // Build authorRelayAssignments for Thompson observation if enabled + if (this.ndk.thompsonSampler && this.relayFilters) { + this.authorRelayAssignments = new Map(); + for (const [relayUrl, filters] of this.relayFilters) { + const authors: Hexpubkey[] = []; + for (const filter of filters) { + if (filter.authors) { + for (const author of filter.authors) { + if (!authors.includes(author)) authors.push(author); + } + } + } + if (authors.length > 0) { + this.authorRelayAssignments.set(relayUrl, authors); + } + } + + // Build pubkeysToRelays for sole-source detection + const allAuthors = new Set(); + for (const filter of this.filters) { + if (filter.authors) { + for (const a of filter.authors) allAuthors.add(a); + } + } + if (allAuthors.size > 0) { + const { pubkeysToRelays } = getAllRelaysForAllPubkeys( + this.ndk, + Array.from(allAuthors), + "write", + ); + this.pubkeysToRelays = pubkeysToRelays; + } + } + // iterate through the this.relayFilters for (const [relayUrl, filters] of this.relayFilters) { const relay = this.pool.getRelay(relayUrl, true, true, filters); @@ -910,6 +958,11 @@ export class NDKSubscription extends EventEmitter<{ return; } + // Track relay attribution for Thompson observation + if (relay && !fromCache && !optimisticPublish && ndkEvent.pubkey) { + this.trackRelayAttribution(ndkEvent.pubkey, relay.url); + } + // emit it if (!optimisticPublish || this.skipOptimisticPublishEvent !== true) { this.emitEvent(this.opts?.wrap ?? false, ndkEvent, relay, fromCache, optimisticPublish); @@ -917,6 +970,12 @@ export class NDKSubscription extends EventEmitter<{ this.eventFirstSeen.set(eventId, Date.now()); } } else { + // Track relay attribution for duplicates too — the relay did deliver + // Use event.pubkey (not ndkEvent) since ndkEvent may not be initialized for raw NostrEvents + if (relay && !fromCache && !optimisticPublish && event.pubkey) { + this.trackRelayAttribution(event.pubkey, relay.url); + } + const timeSinceFirstSeen = Date.now() - (this.eventFirstSeen.get(eventId) || 0); this.emit("event:dup", event, relay, timeSinceFirstSeen, this, fromCache, optimisticPublish); @@ -979,6 +1038,111 @@ export class NDKSubscription extends EventEmitter<{ } } + /** + * Observe delivery outcomes for Thompson Sampling. + * + * Called at subscription-level EOSE. Only observes on finite + * subscriptions (closeOnEose: true) [P2]. + * + * For each (relay, author) assignment: + * - Skip if author is inactive (no relay delivered events for them) [P4] + * - Observe hit if this relay delivered ≥1 event for this author + * - Observe miss otherwise + * + * Uses an optimized lookup via Map> [P1, P8]. + */ + private observeDelivery(): void { + if (!this.ndk.thompsonSampler || !this.authorRelayAssignments) return; + if (!this.opts?.closeOnEose) return; // P2: only finite subscriptions + + const sampler = this.ndk.thompsonSampler; + + // Build optimized lookup: which relays delivered events for each author + const authorToRelays = new Map>(); + for (const [eventId] of this.eventFirstSeen) { + // eventFirstSeen only has IDs — we need to look at relayFilters events + // Instead, iterate the events we know about through the subscription manager + } + + // Use relayFilters to find events: check each relay's event delivery + // We'll build from the eventFirstSeen map — but we need relay info. + // The most reliable approach: scan eventFirstSeen and use ndk.subManager's seenEvents + // However, the simplest correct approach is to collect events from the 'event' emissions. + + // Since eventFirstSeen only tracks event IDs and NDKSubscription doesn't maintain + // a received events array by default, we'll use a simpler approach: + // Check if any event was seen from each relay for each author. + // We can reconstruct this from the relayFilters (which relays we queried) + // and cross-reference with events in the subscription manager. + + // Actually, the subscription manager's seenEvents has the events, but they are + // not directly accessible. The practical approach is to use the event:dup tracking + // that already exists. But the simplest correct implementation that works with the + // existing NDK architecture: + + // We need to track events with relay attribution. Let's use a lighter approach: + // check per-relay EOSE status — if a relay didn't connect, it's inconclusive. + // For connected relays, if the relay has entries in eventFirstSeen that match + // an author, it delivered. + + // The cleanest approach: we build authorToRelays from scratch. + // We already have this.eosesSeen (relays that completed) and this.relayFilters. + // We need event→relay mapping which is on NDKEvent.relay. + + // Unfortunately, NDKSubscription doesn't store received events. + // We'll need to collect them. Let's add collection in eventReceived. + // But we can't modify eventReceived extensively without risking existing behavior. + + // SIMPLEST CORRECT APPROACH: Use the subscription's event tracking. + // The subscription emits events, but doesn't store them by relay. + // We'll use the receivedEventsByRelay map we'll add as a lightweight tracker. + if (!this._receivedAuthorsByRelay) return; + + for (const [relayUrl, authors] of this.authorRelayAssignments) { + for (const author of authors) { + // P4: Skip inactive authors (no relay delivered events for them) + if (!this._receivedAuthorsByRelay.has(author)) continue; + + const delivered = this._receivedAuthorsByRelay.get(author)!.has(relayUrl); + + // Determine weight: sole-source authors get 0.3x weight + let weight = 1.0; + if (this.pubkeysToRelays) { + const authorRelays = this.pubkeysToRelays.get(author); + if (authorRelays && authorRelays.size === 1) { + weight = 0.3; + } + } + + sampler.observe(relayUrl, author, delivered, weight); + } + } + + sampler.decay(); + } + + /** + * Tracks which relays delivered events for which authors. + * Built lazily by eventReceived when Thompson sampling is enabled. + */ + private _receivedAuthorsByRelay?: Map>; + + /** + * Record relay attribution for Thompson observation. + * Called from eventReceived for new (non-duplicate) events. + */ + private trackRelayAttribution(pubkey: Hexpubkey, relayUrl: string): void { + if (!this.ndk.thompsonSampler || !this.authorRelayAssignments) return; + + if (!this._receivedAuthorsByRelay) { + this._receivedAuthorsByRelay = new Map(); + } + + const relays = this._receivedAuthorsByRelay.get(pubkey) ?? new Set(); + relays.add(relayUrl); + this._receivedAuthorsByRelay.set(pubkey, relays); + } + public closedReceived(relay: NDKRelay, reason: string): void { this.emit("closed", relay, reason); } @@ -998,6 +1162,10 @@ export class NDKSubscription extends EventEmitter<{ const performEose = (reason: string) => { if (this.eosed) return; if (this.eoseTimeout) clearTimeout(this.eoseTimeout); + + // Observe delivery outcomes before emitting EOSE (non-blocking, P10) + this.observeDelivery(); + this.emit("eose", this); this.eosed = true; diff --git a/core/src/utils/sample-beta.test.ts b/core/src/utils/sample-beta.test.ts new file mode 100644 index 000000000..d176d3617 --- /dev/null +++ b/core/src/utils/sample-beta.test.ts @@ -0,0 +1,89 @@ +import { describe, expect, it } from "vitest"; +import { sampleBeta } from "./sample-beta.js"; + +// Seeded PRNG for deterministic tests (mulberry32) +function mulberry32(seed: number): () => number { + return () => { + seed |= 0; + seed = (seed + 0x6d2b79f5) | 0; + let t = Math.imul(seed ^ (seed >>> 15), 1 | seed); + t = (t + Math.imul(t ^ (t >>> 7), 61 | t)) ^ t; + return ((t ^ (t >>> 14)) >>> 0) / 4294967296; + }; +} + +describe("sampleBeta", () => { + it("uniform prior (1,1) returns values in [0,1]", () => { + const rng = mulberry32(42); + for (let i = 0; i < 100; i++) { + const v = sampleBeta(1, 1, rng); + expect(v).toBeGreaterThanOrEqual(0); + expect(v).toBeLessThanOrEqual(1); + } + }); + + it("Beta(2,5) mean ≈ 0.286", () => { + const rng = mulberry32(123); + const N = 10000; + let sum = 0; + for (let i = 0; i < N; i++) { + sum += sampleBeta(2, 5, rng); + } + const mean = sum / N; + // Expected mean = alpha / (alpha + beta) = 2/7 ≈ 0.286 + expect(mean).toBeCloseTo(2 / 7, 1); + }); + + it("Beta(10,10) mean ≈ 0.5", () => { + const rng = mulberry32(456); + const N = 10000; + let sum = 0; + for (let i = 0; i < N; i++) { + sum += sampleBeta(10, 10, rng); + } + const mean = sum / N; + expect(mean).toBeCloseTo(0.5, 1); + }); + + it("Beta(0.5, 0.5) uses Jöhnk's algorithm, mean ≈ 0.5", () => { + const rng = mulberry32(789); + const N = 10000; + let sum = 0; + for (let i = 0; i < N; i++) { + sum += sampleBeta(0.5, 0.5, rng); + } + const mean = sum / N; + expect(mean).toBeCloseTo(0.5, 1); + }); + + it("returns 0.5 on invalid inputs", () => { + const rng = mulberry32(1); + expect(sampleBeta(0, 1, rng)).toBe(0.5); + expect(sampleBeta(1, 0, rng)).toBe(0.5); + expect(sampleBeta(-1, 1, rng)).toBe(0.5); + expect(sampleBeta(1, -1, rng)).toBe(0.5); + expect(sampleBeta(Infinity, 1, rng)).toBe(0.5); + expect(sampleBeta(1, Infinity, rng)).toBe(0.5); + expect(sampleBeta(NaN, 1, rng)).toBe(0.5); + expect(sampleBeta(1, NaN, rng)).toBe(0.5); + }); + + it("is deterministic with seeded RNG", () => { + const results1: number[] = []; + const results2: number[] = []; + for (let i = 0; i < 10; i++) { + results1.push(sampleBeta(3, 7, mulberry32(999))); + } + for (let i = 0; i < 10; i++) { + results2.push(sampleBeta(3, 7, mulberry32(999))); + } + // Each call with the same seed should produce the same first sample + expect(results1[0]).toBe(results2[0]); + }); + + it("defaults to Math.random when no rng provided", () => { + const v = sampleBeta(2, 2); + expect(v).toBeGreaterThanOrEqual(0); + expect(v).toBeLessThanOrEqual(1); + }); +}); diff --git a/core/src/utils/sample-beta.ts b/core/src/utils/sample-beta.ts new file mode 100644 index 000000000..40b19b512 --- /dev/null +++ b/core/src/utils/sample-beta.ts @@ -0,0 +1,84 @@ +import createDebug from "debug"; + +const d = createDebug("ndk:sample-beta"); + +/** Clamp rng() to avoid 0 (which breaks Math.log / Math.pow). */ +const EPS = Number.MIN_VALUE; +function rngPos(rng: () => number): number { + return Math.max(rng(), EPS); +} + +/** + * Sample from a Beta(alpha, beta) distribution. + * Returns a value in [0, 1]. + * + * Uses Jöhnk's algorithm for small alpha,beta and + * gamma sampling (Marsaglia & Tsang) for larger values. + * + * **Production safety (P16):** Returns 0.5 on invalid inputs + * (non-finite, zero, negative) instead of throwing. + * + * @param alpha - Shape parameter (successes + 1). Must be > 0. + * @param beta - Shape parameter (failures + 1). Must be > 0. + * @param rng - Random number generator. Default: Math.random. + */ +export function sampleBeta(alpha: number, beta: number, rng: () => number = Math.random): number { + if (!Number.isFinite(alpha) || !Number.isFinite(beta) || alpha <= 0 || beta <= 0) { + d("Invalid parameters alpha=%d beta=%d, returning 0.5", alpha, beta); + return 0.5; + } + + // For alpha=1, beta=1 (uniform prior), just return rng() + if (alpha === 1 && beta === 1) return rng(); + + // Jöhnk's algorithm for alpha < 1 and beta < 1 + if (alpha < 1 && beta < 1) { + while (true) { + const u = rngPos(rng); + const v = rngPos(rng); + const x = Math.pow(u, 1 / alpha); + const y = Math.pow(v, 1 / beta); + if (x + y <= 1) { + if (x + y > 0) return x / (x + y); + // Handle underflow by taking logs + const logX = Math.log(u) / alpha; + const logY = Math.log(v) / beta; + const logM = logX > logY ? logX : logY; + return Math.exp(logX - logM) / (Math.exp(logX - logM) + Math.exp(logY - logM)); + } + } + } + + // For larger alpha/beta, use gamma sampling approach + const x = sampleGamma(alpha, rng); + const y = sampleGamma(beta, rng); + return x / (x + y); +} + +/** + * Sample from a Gamma(shape, 1) distribution using Marsaglia and Tsang's method. + */ +function sampleGamma(shape: number, rng: () => number): number { + if (shape < 1) { + // Boost: Gamma(shape) = Gamma(shape+1) * U^(1/shape) + return sampleGamma(shape + 1, rng) * Math.pow(rngPos(rng), 1 / shape); + } + + const dd = shape - 1 / 3; + const c = 1 / Math.sqrt(9 * dd); + + while (true) { + let x: number; + let v: number; + do { + // Box-Muller for normal sample + x = Math.sqrt(-2 * Math.log(rngPos(rng))) * Math.cos(2 * Math.PI * rng()); + v = 1 + c * x; + } while (v <= 0); + + v = v * v * v; + const u = rng(); + if (u < 1 - 0.0331 * (x * x) * (x * x)) return dd * v; + if (Math.log(u) < 0.5 * x * x + dd * (1 - v + Math.log(v))) return dd * v; + } +}