From e96174b56123ca58cc9290ed0c2f912f74ab820d Mon Sep 17 00:00:00 2001 From: Myles Horton Date: Mon, 23 Feb 2026 12:43:27 -0700 Subject: [PATCH 1/7] Update clientcore/settings.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- clientcore/settings.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clientcore/settings.go b/clientcore/settings.go index 84d5427..005d47d 100644 --- a/clientcore/settings.go +++ b/clientcore/settings.go @@ -62,6 +62,8 @@ func NewDefaultEgressOptions() *EgressOptions { // ConnectionChangeFunc is a callback for consumer connection state changes. // state: 1 = connected, -1 = disconnected. +// When state == 1 (connected), addr is the IPv4 or IPv6 address of the new consumer. +// When state == -1 (disconnected), addr may be nil and should not be assumed to be non-nil. type ConnectionChangeFunc func(state int, workerIdx int, addr net.IP) type BroflakeOptions struct { From b35aefcc55140f3ed0a069aabf7c467dbb551b0a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 19:44:04 +0000 Subject: [PATCH 2/7] Initial plan From cab19142b16f81ada4e1934af96b8be7a4fd1355 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 19:46:58 +0000 Subject: [PATCH 3/7] Add OnConnectionChangeFunc field and callback to WASM UIImpl Co-authored-by: myleshorton <1143966+myleshorton@users.noreply.github.com> --- clientcore/ui_wasm_impl.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clientcore/ui_wasm_impl.go b/clientcore/ui_wasm_impl.go index a70e683..a9fb7b8 100644 --- a/clientcore/ui_wasm_impl.go +++ b/clientcore/ui_wasm_impl.go @@ -13,8 +13,9 @@ import ( type UIImpl struct { UI - BroflakeEngine *BroflakeEngine - ID string + BroflakeEngine *BroflakeEngine + ID string + OnConnectionChangeFunc ConnectionChangeFunc } func (ui *UIImpl) Init(bf *BroflakeEngine) { @@ -93,6 +94,10 @@ func (ui UIImpl) OnDownstreamThroughput(bytesPerSec int) { // consumer (or a 0-length string indicating that address extraction failed); when state == -1, // addr == "" func (ui UIImpl) OnConsumerConnectionChange(state int, workerIdx int, addr net.IP) { + if ui.OnConnectionChangeFunc != nil { + ui.OnConnectionChangeFunc(state, workerIdx, addr) + } + addrString := "" if addr != nil { addrString = addr.String() From f18fe9e2d8dfde78bd90d31acec1cbf50254b29e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 19:49:09 +0000 Subject: [PATCH 4/7] Complete WASM UIImpl updates - all checks passed Co-authored-by: myleshorton <1143966+myleshorton@users.noreply.github.com> --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 1658180..6303afa 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,14 @@ replace github.com/enobufs/go-nats => github.com/noahlevenson/go-nats v0.0.0-202 replace github.com/quic-go/quic-go => github.com/getlantern/quic-go-unbounded-fork v0.51.3-unbounded require ( + github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 github.com/coder/websocket v1.8.12 github.com/elazarl/goproxy v1.7.2 github.com/enobufs/go-nats v0.0.1 github.com/getlantern/geo v0.0.0-20240108161311-50692a1b69a9 github.com/getlantern/telemetry v0.0.0-20250606052628-8960164ec1f5 github.com/google/uuid v1.6.0 + github.com/pion/transport/v3 v3.0.7 github.com/pion/webrtc/v4 v4.1.2 github.com/quic-go/quic-go v0.51.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 @@ -24,7 +26,6 @@ require ( require ( github.com/andybalholm/brotli v1.0.4 // indirect - github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect @@ -68,7 +69,6 @@ require ( github.com/pion/stun/v3 v3.0.0 // indirect github.com/pion/transport v0.14.1 // indirect github.com/pion/transport/v2 v2.2.10 // indirect - github.com/pion/transport/v3 v3.0.7 // indirect github.com/pion/turn v1.3.7 // indirect github.com/pion/turn/v4 v4.0.2 // indirect github.com/pkg/errors v0.8.1 // indirect From 0ca43d6a5a06a64c932a4a00f0f53879bcfe0744 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 23 Feb 2026 13:09:16 -0700 Subject: [PATCH 5/7] Add per-peer bandwidth attribution to egress server Previously, the egress server tracked ingress bytes as a single global counter. This change attributes bytes to individual peers so the data can be consumed by a future reward oracle. Changes: - Add PeerID field to EgressOptions (UUID by default) - Include peer ID in WebSocket subprotocol header alongside consumer session ID (backwards compatible: old clients fall back to CSID) - Replace global nIngressBytes atomic with per-peer sync.Map of atomic counters, shared across all connections from the same peer - Add "ingress-bytes-by-peer" OTel metric with peer_id attribute - Preserve existing "ingress-bytes" total metric for backward compat Co-Authored-By: Claude Opus 4.6 --- clientcore/jit_egress_consumer.go | 2 +- clientcore/settings.go | 2 ++ common/resource.go | 17 ++++++---- egress/egresslib.go | 56 ++++++++++++++++++++++++------- egress/websocket.go | 14 ++++---- 5 files changed, 66 insertions(+), 25 deletions(-) diff --git a/clientcore/jit_egress_consumer.go b/clientcore/jit_egress_consumer.go index bae9064..1d1e33c 100644 --- a/clientcore/jit_egress_consumer.go +++ b/clientcore/jit_egress_consumer.go @@ -69,7 +69,7 @@ func NewJITEgressConsumer(options *EgressOptions, wg *sync.WaitGroup) *WorkerFSM defer cancel() dialOpts := &websocket.DialOptions{ - Subprotocols: common.NewSubprotocolsRequest(consumerInfoMsg.SessionID, common.Version), + Subprotocols: common.NewSubprotocolsRequest(consumerInfoMsg.SessionID, options.PeerID, common.Version), } // TODO: WSS diff --git a/clientcore/settings.go b/clientcore/settings.go index 005d47d..b3da67b 100644 --- a/clientcore/settings.go +++ b/clientcore/settings.go @@ -49,6 +49,7 @@ type EgressOptions struct { Endpoint string ConnectTimeout time.Duration ErrorBackoff time.Duration + PeerID string } func NewDefaultEgressOptions() *EgressOptions { @@ -57,6 +58,7 @@ func NewDefaultEgressOptions() *EgressOptions { Endpoint: "/ws", ConnectTimeout: 5 * time.Second, ErrorBackoff: 5 * time.Second, + PeerID: uuid.NewString(), } } diff --git a/common/resource.go b/common/resource.go index 0ffb046..61c99c9 100644 --- a/common/resource.go +++ b/common/resource.go @@ -188,15 +188,20 @@ func DecodeSignalMsg(raw []byte) (string, interface{}, error) { // coder/websocket API, to pass arbitrary data. Note that a server receiving a populated // Sec-Websocket-Protocols header must reply with a reciprocal header containing some selected // protocol from the request. -func NewSubprotocolsRequest(csid, version string) []string { - return []string{subprotocolsMagicCookie, csid, version} +func NewSubprotocolsRequest(csid, peerID, version string) []string { + return []string{subprotocolsMagicCookie, csid, peerID, version} } -func ParseSubprotocolsRequest(s []string) (csid string, version string, ok bool) { - if len(s) != 3 { - return "", "", false +func ParseSubprotocolsRequest(s []string) (csid string, peerID string, version string, ok bool) { + switch len(s) { + case 4: + return s[1], s[2], s[3], true + case 3: + // Backwards compat: old clients don't send peerID + return s[1], "", s[2], true + default: + return "", "", "", false } - return s[1], s[2], true } func NewSubprotocolsResponse() []string { diff --git a/egress/egresslib.go b/egress/egresslib.go index e1f3612..3e6bb0b 100644 --- a/egress/egresslib.go +++ b/egress/egresslib.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "strings" + "sync" "sync/atomic" "time" @@ -14,6 +15,7 @@ import ( "github.com/google/uuid" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "github.com/getlantern/broflake/common" @@ -34,8 +36,9 @@ var nClients uint64 // nQUICStreams is the number of open QUIC streams (not to be confused with QUIC connections) var nQUICStreams uint64 -// nIngressBytes is the number of bytes received over all WebSocket connections since the last otel measurement callback -var nIngressBytes uint64 +// peerIngressBytes tracks ingress bytes per peer ID since the last otel measurement callback. +// Keys are peer ID strings, values are *uint64 (atomic counters). +var peerIngressBytes sync.Map // Otel instruments var nClientsCounter metric.Int64UpDownCounter @@ -44,6 +47,7 @@ var nClientsCounter metric.Int64UpDownCounter var nQUICConnectionsCounter metric.Int64UpDownCounter var nQUICStreamsCounter metric.Int64UpDownCounter var nIngressBytesCounter metric.Int64ObservableUpDownCounter +var nIngressBytesByPeerCounter metric.Int64ObservableUpDownCounter type proxyListener struct { net.Listener @@ -85,7 +89,7 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { } } - consumerSessionID, version, ok := common.ParseSubprotocolsRequest(subprotocols) + consumerSessionID, peerID, version, ok := common.ParseSubprotocolsRequest(subprotocols) if !ok { common.Debugf("Refused WebSocket connection, missing subprotocols") return @@ -111,6 +115,11 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { return } + // Old clients don't send a peer ID; fall back to consumer session ID so bytes are still tracked + if peerID == "" { + peerID = consumerSessionID + } + c, err := websocket.Accept( w, r, @@ -131,12 +140,18 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { return } + // Get or create the per-peer ingress byte counter + counterPtr := new(uint64) + actual, _ := peerIngressBytes.LoadOrStore(peerID, counterPtr) + wspconn := errorlessWebSocketPacketConn{ - w: c, - addr: common.DebugAddr(fmt.Sprintf("WebSocket connection %v", uuid.NewString())), - keepalive: websocketKeepalive, - tcpAddr: tcpAddr, - readError: make(chan error), + w: c, + addr: common.DebugAddr(fmt.Sprintf("WebSocket connection %v", uuid.NewString())), + keepalive: websocketKeepalive, + tcpAddr: tcpAddr, + readError: make(chan error), + peerID: peerID, + ingressBytes: actual.(*uint64), } defer wspconn.Close() @@ -246,15 +261,32 @@ func NewListener(ctx context.Context, ll net.Listener, tlsConfig *tls.Config) (n return nil, err } + nIngressBytesByPeerCounter, err = m.Int64ObservableUpDownCounter("ingress-bytes-by-peer") + if err != nil { + closeFuncMetric(ctx) + return nil, err + } + _, err = m.RegisterCallback( func(ctx context.Context, o metric.Observer) error { - b := atomic.LoadUint64(&nIngressBytes) - o.ObserveInt64(nIngressBytesCounter, int64(b)) - common.Debugf("Ingress bytes: %v", b) - atomic.StoreUint64(&nIngressBytes, uint64(0)) + var total int64 + peerIngressBytes.Range(func(key, value any) bool { + pid := key.(string) + ptr := value.(*uint64) + b := int64(atomic.SwapUint64(ptr, 0)) + if b > 0 { + o.ObserveInt64(nIngressBytesByPeerCounter, b, + metric.WithAttributes(attribute.String("peer_id", pid))) + } + total += b + return true + }) + o.ObserveInt64(nIngressBytesCounter, total) + common.Debugf("Ingress bytes: %v", total) return nil }, nIngressBytesCounter, + nIngressBytesByPeerCounter, ) if err != nil { closeFuncMetric(ctx) diff --git a/egress/websocket.go b/egress/websocket.go index 3b64de1..1ff639d 100644 --- a/egress/websocket.go +++ b/egress/websocket.go @@ -23,11 +23,13 @@ import ( // at some point in the future. Intercepted *read* errors are sent over the readError channel. // Currently, intercepted *write* errors are simply discarded. type errorlessWebSocketPacketConn struct { - w *websocket.Conn - addr net.Addr - keepalive time.Duration - tcpAddr *net.TCPAddr - readError chan error + w *websocket.Conn + addr net.Addr + keepalive time.Duration + tcpAddr *net.TCPAddr + readError chan error + peerID string + ingressBytes *uint64 // per-peer atomic counter, shared across all connections from the same peer } func (q errorlessWebSocketPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { @@ -84,7 +86,7 @@ func (q errorlessWebSocketPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, } copy(p, b) - atomic.AddUint64(&nIngressBytes, uint64(len(b))) + atomic.AddUint64(q.ingressBytes, uint64(len(b))) return len(b), q.tcpAddr, err } From b16c014e39270a7c49adac56f188c96a5931b00b Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Mon, 23 Feb 2026 13:56:01 -0700 Subject: [PATCH 6/7] Add persistent Ed25519 peer identity keys Add a PeerIdentity type wrapping an Ed25519 keypair for persistent peer identification in the LANTERN token compensation system. The public key serves as the PeerID for bandwidth attribution, and the keypair doubles as a Solana wallet (Ed25519 is Solana's native curve). Desktop clients load/generate identity from ~/.unbounded/identity.key. WASM clients expose generateIdentity() to JS and accept an optional private key arg, with localStorage persistence on the JS side. External callers continue getting UUID PeerIDs with no changes. Co-Authored-By: Claude Opus 4.6 --- clientcore/identity.go | 53 ++++++++++++++++ clientcore/identity_test.go | 110 ++++++++++++++++++++++++++++++++++ clientcore/settings.go | 8 +++ cmd/client_default_impl.go | 50 ++++++++++++++++ cmd/client_wasm_impl.go | 32 ++++++++++ ui/src/utils/wasmInterface.ts | 27 ++++++++- 6 files changed, 278 insertions(+), 2 deletions(-) create mode 100644 clientcore/identity.go create mode 100644 clientcore/identity_test.go diff --git a/clientcore/identity.go b/clientcore/identity.go new file mode 100644 index 0000000..7870ccc --- /dev/null +++ b/clientcore/identity.go @@ -0,0 +1,53 @@ +package clientcore + +import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "fmt" +) + +// PeerIdentity wraps an Ed25519 keypair that serves as a peer's persistent +// cryptographic identity. The public key is used as the PeerID for bandwidth +// attribution, and the keypair doubles as a Solana wallet (Ed25519 is Solana's +// native curve). +type PeerIdentity struct { + privateKey ed25519.PrivateKey + publicKey ed25519.PublicKey +} + +// NewPeerIdentity generates a new random Ed25519 keypair. +func NewPeerIdentity() (*PeerIdentity, error) { + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + return nil, fmt.Errorf("generating ed25519 keypair: %w", err) + } + return &PeerIdentity{privateKey: priv, publicKey: pub}, nil +} + +// PeerIdentityFromPrivateKeyHex reconstructs a PeerIdentity from a hex-encoded +// 64-byte Ed25519 private key (128 hex characters). +func PeerIdentityFromPrivateKeyHex(hexKey string) (*PeerIdentity, error) { + keyBytes, err := hex.DecodeString(hexKey) + if err != nil { + return nil, fmt.Errorf("decoding hex private key: %w", err) + } + if len(keyBytes) != ed25519.PrivateKeySize { + return nil, fmt.Errorf("invalid private key length: got %d bytes, want %d", len(keyBytes), ed25519.PrivateKeySize) + } + priv := ed25519.PrivateKey(keyBytes) + pub := priv.Public().(ed25519.PublicKey) + return &PeerIdentity{privateKey: priv, publicKey: pub}, nil +} + +// PeerID returns the hex-encoded 32-byte public key (64 hex characters), +// suitable for use as EgressOptions.PeerID. +func (id *PeerIdentity) PeerID() string { + return hex.EncodeToString(id.publicKey) +} + +// PrivateKeyHex returns the hex-encoded 64-byte private key (128 hex characters), +// the value to persist to disk or localStorage. +func (id *PeerIdentity) PrivateKeyHex() string { + return hex.EncodeToString(id.privateKey) +} diff --git a/clientcore/identity_test.go b/clientcore/identity_test.go new file mode 100644 index 0000000..30c32ea --- /dev/null +++ b/clientcore/identity_test.go @@ -0,0 +1,110 @@ +package clientcore + +import ( + "crypto/ed25519" + "encoding/hex" + "strings" + "testing" +) + +func TestNewPeerIdentity(t *testing.T) { + id, err := NewPeerIdentity() + if err != nil { + t.Fatalf("NewPeerIdentity() error: %v", err) + } + + // PeerID should be 64 hex chars (32 bytes) + peerID := id.PeerID() + if len(peerID) != 64 { + t.Errorf("PeerID length = %d, want 64", len(peerID)) + } + if _, err := hex.DecodeString(peerID); err != nil { + t.Errorf("PeerID is not valid hex: %v", err) + } + + // PrivateKeyHex should be 128 hex chars (64 bytes) + privHex := id.PrivateKeyHex() + if len(privHex) != 128 { + t.Errorf("PrivateKeyHex length = %d, want 128", len(privHex)) + } + if _, err := hex.DecodeString(privHex); err != nil { + t.Errorf("PrivateKeyHex is not valid hex: %v", err) + } +} + +func TestNewPeerIdentityUniqueness(t *testing.T) { + id1, err := NewPeerIdentity() + if err != nil { + t.Fatalf("NewPeerIdentity() error: %v", err) + } + id2, err := NewPeerIdentity() + if err != nil { + t.Fatalf("NewPeerIdentity() error: %v", err) + } + if id1.PeerID() == id2.PeerID() { + t.Error("two generated identities have the same PeerID") + } +} + +func TestPeerIdentityFromPrivateKeyHex_RoundTrip(t *testing.T) { + original, err := NewPeerIdentity() + if err != nil { + t.Fatalf("NewPeerIdentity() error: %v", err) + } + + restored, err := PeerIdentityFromPrivateKeyHex(original.PrivateKeyHex()) + if err != nil { + t.Fatalf("PeerIdentityFromPrivateKeyHex() error: %v", err) + } + + if original.PeerID() != restored.PeerID() { + t.Errorf("PeerID mismatch: original=%s, restored=%s", original.PeerID(), restored.PeerID()) + } + if original.PrivateKeyHex() != restored.PrivateKeyHex() { + t.Errorf("PrivateKeyHex mismatch after round-trip") + } +} + +func TestPeerIdentityFromPrivateKeyHex_SignVerify(t *testing.T) { + id, err := NewPeerIdentity() + if err != nil { + t.Fatalf("NewPeerIdentity() error: %v", err) + } + + restored, err := PeerIdentityFromPrivateKeyHex(id.PrivateKeyHex()) + if err != nil { + t.Fatalf("PeerIdentityFromPrivateKeyHex() error: %v", err) + } + + msg := []byte("test message for signing") + sig := ed25519.Sign(restored.privateKey, msg) + + pubBytes, _ := hex.DecodeString(id.PeerID()) + pub := ed25519.PublicKey(pubBytes) + if !ed25519.Verify(pub, msg, sig) { + t.Error("signature verification failed after round-trip") + } +} + +func TestPeerIdentityFromPrivateKeyHex_InvalidHex(t *testing.T) { + _, err := PeerIdentityFromPrivateKeyHex("not-valid-hex!") + if err == nil { + t.Error("expected error for invalid hex, got nil") + } +} + +func TestPeerIdentityFromPrivateKeyHex_WrongLength(t *testing.T) { + // 32 bytes (64 hex chars) instead of 64 bytes + shortKey := strings.Repeat("ab", 32) + _, err := PeerIdentityFromPrivateKeyHex(shortKey) + if err == nil { + t.Error("expected error for wrong key length, got nil") + } +} + +func TestPeerIdentityFromPrivateKeyHex_Empty(t *testing.T) { + _, err := PeerIdentityFromPrivateKeyHex("") + if err == nil { + t.Error("expected error for empty string, got nil") + } +} diff --git a/clientcore/settings.go b/clientcore/settings.go index b3da67b..2c1c504 100644 --- a/clientcore/settings.go +++ b/clientcore/settings.go @@ -50,6 +50,14 @@ type EgressOptions struct { ConnectTimeout time.Duration ErrorBackoff time.Duration PeerID string + Identity *PeerIdentity +} + +// SetIdentity sets the peer identity and updates PeerID to the identity's +// hex-encoded public key. +func (o *EgressOptions) SetIdentity(id *PeerIdentity) { + o.Identity = id + o.PeerID = id.PeerID() } func NewDefaultEgressOptions() *EgressOptions { diff --git a/cmd/client_default_impl.go b/cmd/client_default_impl.go index c8c01a3..70d7d68 100644 --- a/cmd/client_default_impl.go +++ b/cmd/client_default_impl.go @@ -8,6 +8,8 @@ import ( "net/http" _ "net/http/pprof" "os" + "path/filepath" + "strings" "github.com/getlantern/broflake/clientcore" "github.com/getlantern/broflake/common" @@ -61,6 +63,14 @@ func main() { egOpt.Addr = egress } + // Load or generate persistent peer identity + if id, err := loadOrGenerateIdentity(); err != nil { + common.Debugf("Warning: failed to load/generate peer identity, using UUID: %v", err) + } else { + egOpt.SetIdentity(id) + common.Debugf("PeerID (ed25519 public key): %v", egOpt.PeerID) + } + bfconn, _, err := clientcore.NewBroflake(bfOpt, rtcOpt, egOpt) if err != nil { log.Fatal(err) @@ -78,3 +88,43 @@ func main() { select {} } + +func identityFilePath() string { + if p := os.Getenv("IDENTITY_FILE"); p != "" { + return p + } + home, err := os.UserHomeDir() + if err != nil { + return filepath.Join(".", ".unbounded", "identity.key") + } + return filepath.Join(home, ".unbounded", "identity.key") +} + +func loadOrGenerateIdentity() (*clientcore.PeerIdentity, error) { + path := identityFilePath() + + data, err := os.ReadFile(path) + if err == nil { + hexKey := strings.TrimSpace(string(data)) + return clientcore.PeerIdentityFromPrivateKeyHex(hexKey) + } + + if !os.IsNotExist(err) { + return nil, err + } + + id, err := clientcore.NewPeerIdentity() + if err != nil { + return nil, err + } + + if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + return nil, err + } + if err := os.WriteFile(path, []byte(id.PrivateKeyHex()+"\n"), 0600); err != nil { + return nil, err + } + + common.Debugf("Generated new peer identity, saved to %v", path) + return id, nil +} diff --git a/cmd/client_wasm_impl.go b/cmd/client_wasm_impl.go index c1824cb..9364444 100644 --- a/cmd/client_wasm_impl.go +++ b/cmd/client_wasm_impl.go @@ -13,6 +13,24 @@ import ( func main() { common.Debugf("wasm client started...") + // generateIdentity generates a new Ed25519 keypair and returns it as a JS + // object with publicKeyHex and privateKeyHex fields. JS should call this on + // first run and persist privateKeyHex in localStorage. + js.Global().Set( + "generateIdentity", + js.FuncOf(func(this js.Value, args []js.Value) interface{} { + id, err := clientcore.NewPeerIdentity() + if err != nil { + common.Debugf("generateIdentity error: %v", err) + return nil + } + result := js.Global().Get("Object").New() + result.Set("publicKeyHex", id.PeerID()) + result.Set("privateKeyHex", id.PrivateKeyHex()) + return result + }), + ) + // A constructor is exposed to JS. Some (but not all) defaults are forcibly overridden by passing // args. You *must* pass valid values for all of these args: // @@ -28,6 +46,7 @@ func main() { // WebRTCOptions.Tag // EgressOptions.Addr // EgressOptions.Endpoint + // (optional) privateKeyHex — hex-encoded Ed25519 private key for persistent identity // ) // // Returns a reference to a Broflake JS API impl (defined in ui_wasm_impl.go) @@ -52,6 +71,19 @@ func main() { egOpt.Addr = args[9].String() egOpt.Endpoint = args[10].String() + // Optional 12th arg: hex-encoded Ed25519 private key for persistent identity + if len(args) > 11 && args[11].Type() == js.TypeString { + privKeyHex := args[11].String() + if privKeyHex != "" { + if id, err := clientcore.PeerIdentityFromPrivateKeyHex(privKeyHex); err != nil { + common.Debugf("Invalid identity key from JS, using UUID: %v", err) + } else { + egOpt.SetIdentity(id) + common.Debugf("PeerID (ed25519 public key): %v", egOpt.PeerID) + } + } + } + _, ui, err := clientcore.NewBroflake(&bfOpt, rtcOpt, egOpt) if err != nil { common.Debugf("newBroflake error: %v", err) diff --git a/ui/src/utils/wasmInterface.ts b/ui/src/utils/wasmInterface.ts index 39f5c3f..5acd912 100644 --- a/ui/src/utils/wasmInterface.ts +++ b/ui/src/utils/wasmInterface.ts @@ -70,7 +70,7 @@ export interface WasmClient extends EventTarget { } -// bind the client constructor +// bind the client constructor and identity generator declare global { function newBroflake( type: string, @@ -84,7 +84,10 @@ declare global { tag: string, egressAddr: string, egressEndpoint: string, + privateKeyHex?: string, ): WasmClient; + + function generateIdentity(): { publicKeyHex: string; privateKeyHex: string } | null; } interface Config { @@ -149,6 +152,7 @@ export class WasmInterface { if (mock) { // fake it till you make it this.wasmClient = new MockWasmClient(this) } else { + const privateKeyHex = this.loadOrGenerateIdentityKey() this.wasmClient = globalThis.newBroflake( WASM_CLIENT_CONFIG.type, WASM_CLIENT_CONFIG.cTableSz, @@ -160,11 +164,30 @@ export class WasmInterface { WASM_CLIENT_CONFIG.stunBatchSize, WASM_CLIENT_CONFIG.tag, WASM_CLIENT_CONFIG.egressAddr, - WASM_CLIENT_CONFIG.egressEndpoint + WASM_CLIENT_CONFIG.egressEndpoint, + privateKeyHex ) } } + private loadOrGenerateIdentityKey = (): string | undefined => { + const storageKey = 'unbounded_identity_key' + try { + const existing = localStorage.getItem(storageKey) + if (existing) return existing + + const identity = globalThis.generateIdentity() + if (identity) { + localStorage.setItem(storageKey, identity.privateKeyHex) + console.log('Generated new peer identity, public key:', identity.publicKeyHex) + return identity.privateKeyHex + } + } catch (e) { + console.warn('Failed to load/generate peer identity:', e) + } + return undefined + } + start = () => { if (!this.ready) return console.warn('Wasm client is not in ready state, aborting start') if (!this.wasmClient) return console.warn('Wasm client has not been initialized, aborting start.') From 85d2b885aac041fdb0c1ed3b1a083fecd7d3cb30 Mon Sep 17 00:00:00 2001 From: Adam Fisk Date: Tue, 24 Feb 2026 11:24:27 -0700 Subject: [PATCH 7/7] Add onBytesReceived callback hook to egress WebSocket listener Replace the tightly-coupled receipt infrastructure with a single optional callback so that external modules (lantern-token) can observe per-peer bandwidth without broflake depending on any LANTERN-specific crypto or receipt logic. Co-Authored-By: Claude Opus 4.6 --- egress/cmd/http/egress_http.go | 2 +- egress/cmd/socks5/egress_socks5.go | 2 +- egress/egresslib.go | 25 ++++++++++++++----------- egress/websocket.go | 8 ++++++-- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/egress/cmd/http/egress_http.go b/egress/cmd/http/egress_http.go index f4051a8..b98f333 100644 --- a/egress/cmd/http/egress_http.go +++ b/egress/cmd/http/egress_http.go @@ -38,7 +38,7 @@ func main() { // And here's why it doesn't use secure TLS at the QUIC layer tlsConfig := egcmdcommon.GenerateSelfSignedTLSConfig(true) - ll, err := egress.NewListener(ctx, l, tlsConfig) + ll, err := egress.NewListener(ctx, l, tlsConfig, nil) if err != nil { panic(err) } diff --git a/egress/cmd/socks5/egress_socks5.go b/egress/cmd/socks5/egress_socks5.go index cd092d1..f8e9d37 100644 --- a/egress/cmd/socks5/egress_socks5.go +++ b/egress/cmd/socks5/egress_socks5.go @@ -37,7 +37,7 @@ func main() { // And here's why it doesn't use secure TLS at the QUIC layer tlsConfig := egcmdcommon.GenerateSelfSignedTLSConfig(true) - ll, err := egress.NewListener(ctx, l, tlsConfig) + ll, err := egress.NewListener(ctx, l, tlsConfig, nil) if err != nil { panic(err) } diff --git a/egress/egresslib.go b/egress/egresslib.go index 3e6bb0b..c5d0a85 100644 --- a/egress/egresslib.go +++ b/egress/egresslib.go @@ -52,9 +52,10 @@ var nIngressBytesByPeerCounter metric.Int64ObservableUpDownCounter type proxyListener struct { net.Listener *connectionManager - connections chan net.Conn - addr net.Addr - closeMetrics func(ctx context.Context) error + connections chan net.Conn + addr net.Addr + closeMetrics func(ctx context.Context) error + onBytesReceived func(peerID string, n int) } func (l proxyListener) Accept() (net.Conn, error) { @@ -145,13 +146,14 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { actual, _ := peerIngressBytes.LoadOrStore(peerID, counterPtr) wspconn := errorlessWebSocketPacketConn{ - w: c, - addr: common.DebugAddr(fmt.Sprintf("WebSocket connection %v", uuid.NewString())), - keepalive: websocketKeepalive, - tcpAddr: tcpAddr, - readError: make(chan error), - peerID: peerID, - ingressBytes: actual.(*uint64), + w: c, + addr: common.DebugAddr(fmt.Sprintf("WebSocket connection %v", uuid.NewString())), + keepalive: websocketKeepalive, + tcpAddr: tcpAddr, + readError: make(chan error), + peerID: peerID, + ingressBytes: actual.(*uint64), + onBytesReceived: l.onBytesReceived, } defer wspconn.Close() @@ -233,7 +235,7 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { } } -func NewListener(ctx context.Context, ll net.Listener, tlsConfig *tls.Config) (net.Listener, error) { +func NewListener(ctx context.Context, ll net.Listener, tlsConfig *tls.Config, onBytesReceived func(peerID string, n int)) (net.Listener, error) { closeFuncMetric := telemetry.EnableOTELMetrics(ctx) m := otel.Meter("github.com/getlantern/broflake/egress") var err error @@ -307,6 +309,7 @@ func NewListener(ctx context.Context, ll net.Listener, tlsConfig *tls.Config) (n connections: make(chan net.Conn, 2048), addr: ll.Addr(), closeMetrics: closeFuncMetric, + onBytesReceived: onBytesReceived, } srv := &http.Server{ diff --git a/egress/websocket.go b/egress/websocket.go index 1ff639d..623d5d1 100644 --- a/egress/websocket.go +++ b/egress/websocket.go @@ -28,8 +28,9 @@ type errorlessWebSocketPacketConn struct { keepalive time.Duration tcpAddr *net.TCPAddr readError chan error - peerID string - ingressBytes *uint64 // per-peer atomic counter, shared across all connections from the same peer + peerID string + ingressBytes *uint64 // per-peer atomic counter for OTEL metrics + onBytesReceived func(peerID string, n int) } func (q errorlessWebSocketPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { @@ -87,6 +88,9 @@ func (q errorlessWebSocketPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, copy(p, b) atomic.AddUint64(q.ingressBytes, uint64(len(b))) + if q.onBytesReceived != nil { + q.onBytesReceived(q.peerID, len(b)) + } return len(b), q.tcpAddr, err }