diff --git a/.gitignore b/.gitignore index 23e2222f..95eeb9c3 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ ui/extension/dist ui/extension/packages ui/extension/.env ui/wp-plugin.zip +ui/public/widget.wasm # netstated globe web client and build netstate/d/webclients/globe/node_modules diff --git a/clientcore/broflake.go b/clientcore/broflake.go index deb97187..54299436 100644 --- a/clientcore/broflake.go +++ b/clientcore/broflake.go @@ -117,6 +117,10 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt egOpt = NewDefaultEgressOptions() } + // obtain the donor's team member ID if they are a donor seeking "credit" for their facilitated connections + // TODO this is a mock/placeholder, replace with real value + egOpt.TeamMemberID = fmt.Sprintf("MOCK-TEAM-ID-%v", time.Now().UTC().Format("2006-01-02T15:04:05")) + // The boot DAG: // build cTable/pTable -> build the Broflake struct -> run ui.Init -> set up the bus and bind // the upstream/downstream handlers -> build cRouter/pRouter -> start the bus, init the routers, diff --git a/clientcore/egress_consumer.go b/clientcore/egress_consumer.go index 1aa5be22..e4053ba3 100644 --- a/clientcore/egress_consumer.go +++ b/clientcore/egress_consumer.go @@ -6,6 +6,7 @@ package clientcore import ( "context" + "net/http" "sync" "time" @@ -42,7 +43,16 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor // TODO: WSS - c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, nil) + var opts websocket.DialOptions + if options.TeamMemberID != "" { + headers := http.Header{} + headers.Add("X-Unbounded", options.TeamMemberID) + opts = websocket.DialOptions{ + HTTPHeader: headers, + } + } + + c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, &opts) if err != nil { common.Debugf("Couldn't connect to egress server at %v: %v", options.Addr, err) <-time.After(options.ErrorBackoff) diff --git a/clientcore/settings.go b/clientcore/settings.go index 7163939b..d4e71bb6 100644 --- a/clientcore/settings.go +++ b/clientcore/settings.go @@ -43,6 +43,7 @@ type EgressOptions struct { Endpoint string ConnectTimeout time.Duration ErrorBackoff time.Duration + TeamMemberID string // an optional team member ID recording donated connectivity } func NewDefaultEgressOptions() *EgressOptions { diff --git a/diagrams.md b/diagrams.md new file mode 100644 index 00000000..ce1c36b5 --- /dev/null +++ b/diagrams.md @@ -0,0 +1,29 @@ +# diagrams + +Temporary file for updated diagrams from the [README](./README.md) done natively in mermaid to allow version-controlled updates. + +```mermaid +flowchart LR +subgraph blocked-peer + client +end +subgraph unbounded.lantern.io + widget + leaderboard +end +subgraph matchmaking + freddie <--> widget +end +subgraph lantern-cloud + subgraph http-proxy + widget <==> |WebSocket| egress + end + egress-->redis[(redis)] + redis-.->api + api<-->db[(database)] +end +client <==> |proxy| widget +client <--> freddie +api --> leaderboard +internet((open internet)) <==> egress +``` diff --git a/egress/egresslib.go b/egress/egresslib.go index dc017eac..8acf9c46 100644 --- a/egress/egresslib.go +++ b/egress/egresslib.go @@ -57,6 +57,7 @@ type websocketPacketConn struct { addr net.Addr keepalive time.Duration tcpAddr *net.TCPAddr + foo string } func (q websocketPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { @@ -105,10 +106,12 @@ func (q websocketPacketConn) LocalAddr() net.Addr { type proxyListener struct { net.Listener - connections chan net.Conn - tlsConfig *tls.Config - addr net.Addr - closeMetrics func(ctx context.Context) error + connections chan net.Conn + tlsConfig *tls.Config + addr net.Addr + closeMetrics func(ctx context.Context) error + ReportConnection func(ctx context.Context) error // TODO make it make sense + ReportBytes func(ctx context.Context) error // TODO make it real } func (l proxyListener) Accept() (net.Conn, error) { @@ -191,6 +194,10 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { common.Debugf("Accepted a new WebSocket connection! (%v total)", atomic.AddUint64(&nClients, 1)) nClientsCounter.Add(context.Background(), 1) + // check for optional tracking identifier for donors who wish to be credited the facilitated connections + unboundedID := r.Header.Get("X-Unbounded") + common.Debugf("X-Unbounded: %v", unboundedID) + listener, err := quic.Listen(wspconn, l.tlsConfig, &common.QUICCfg) if err != nil { common.Debugf("Error creating QUIC listener: %v", err) @@ -205,12 +212,16 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { break } - nQUICConnectionsCounter.Add(context.Background(), 1) + nQUICConnectionsCounter.Add(conn.Context(), 1) common.Debugf("%v accepted a new QUIC connection!", wspconn.addr) + go func() { + common.Debugf("[placeholder] POST new connection: %v", unboundedID) + }() + go func() { for { - stream, err := conn.AcceptStream(context.Background()) + stream, err := conn.AcceptStream(conn.Context()) if err != nil { // We interpret an error while accepting a stream to indicate an unrecoverable error with @@ -223,13 +234,24 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { } common.Debugf("Accepted a new QUIC stream! (%v total)", atomic.AddUint64(&nQUICStreams, 1)) - nQUICStreamsCounter.Add(context.Background(), 1) + // TODO does this even work? + err = l.ReportConnection(conn.Context()) + if err != nil { + common.Debugf("Error updating leaderboard metrics: %v", err) + } + + nQUICStreamsCounter.Add(conn.Context(), 1) l.connections <- common.QUICStreamNetConn{ Stream: stream, OnClose: func() { defer common.Debugf("Closed a QUIC stream! (%v total)", atomic.AddUint64(&nQUICStreams, ^uint64(0))) nQUICStreamsCounter.Add(context.Background(), -1) + // TODO (maybe) capture and report total transferred bytes + err := l.ReportBytes(conn.Context()) + if err != nil { + common.Debugf("Error updating leaderboard metrics: %w", err) + } }, AddrLocal: l.addr, AddrRemote: tcpAddr, @@ -240,9 +262,9 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { } func NewListener(ctx context.Context, ll net.Listener, certPEM, keyPEM string) (net.Listener, error) { + var err error closeFuncMetric := telemetry.EnableOTELMetrics(ctx) m := otel.Meter("github.com/getlantern/broflake/egress") - var err error nClientsCounter, err = m.Int64UpDownCounter("concurrent-websockets") if err != nil { closeFuncMetric(ctx) diff --git a/spinup.sh b/spinup.sh new file mode 100755 index 00000000..1344b991 --- /dev/null +++ b/spinup.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +SESSION_NAME="unbounded-sandbox" +# FREDDIE_DEFAULT=9000 +# PORT_EGRESS_DEFAULT=8000 + +# if [ -z "$FREDDIE" ]; then +# echo "PORT_FREDDIE is not set. Defaulting to $FREDDIE_DEFAULT." +# exit 1 +# fi +# FREDDIE=9000 + +# if [ -z "$PORT_EGRESS" ]; then +# echo "PORT_EGRESS is not set. Defaulting to $PORT_EGRESS_DEFAULT." +# exit 1 +# fi + +create_tmux_session() { + local session_name=$1 + shift + local commands=("$@") + + tmux new-session -d -s "$session_name" + + # Split the window into four panes + tmux split-window -h + tmux split-window -v + tmux select-pane -t 0 + tmux split-window -v + + # split four panes + # TODO more? + # Send commands to each pane + for i in "${!commands[@]}"; do + tmux select-pane -t "$i" + tmux send-keys "${commands[$i]}" C-m + done + + # Attach to the tmux session + tmux attach-session -t "$session_name" +} + +commands=( + # build and start native binary desktop client + "cd cmd && ./build.sh desktop && cd dist/bin && FREDDIE=http://localhost:9000 EGRESS=http://localhost:8000 ./desktop" + + # build and start native binary widget + "cd cmd && ./build.sh widget && cd dist/bin && FREDDIE=http://localhost:9000 EGRESS=http://localhost:8000 ./widget" + + # build browser widget + # "cd cmd && ./build_web.sh" + + # start freddie + "cd freddie/cmd && PORT=9000 go run main.go" + + # start egress + "cd egress/cmd && PORT=8000 go run egress.go" +) + +create_tmux_session "$SESSION_NAME" "${commands[@]}" diff --git a/ui/public/widget.wasm b/ui/public/widget.wasm index 94f4c870..a70cff56 100755 Binary files a/ui/public/widget.wasm and b/ui/public/widget.wasm differ diff --git a/ui/src/utils/reporting.ts b/ui/src/utils/reporting.ts new file mode 100644 index 00000000..a6bce223 --- /dev/null +++ b/ui/src/utils/reporting.ts @@ -0,0 +1,18 @@ +export async function report() { + const headers = { + "X-Lantern-TeamID": 'fake-team-iden-tify', + "X-Lantern-UserID": 'fake-user-iden-tify', + }; + + fetch('http://localhost:8888', { + method: 'POST', + headers: headers, + body: JSON.stringify({}) + }) + .then(response => { + console.log('report sent, response:', response); + }) + .catch(error => { + console.error('report send failed:', error); + }); +} diff --git a/ui/src/utils/wasmInterface.ts b/ui/src/utils/wasmInterface.ts index 49c0794a..788daa4e 100644 --- a/ui/src/utils/wasmInterface.ts +++ b/ui/src/utils/wasmInterface.ts @@ -3,6 +3,7 @@ import {StateEmitter} from '../hooks/useStateEmitter' import MockWasmClient from '../mocks/mockWasmClient' import {MessageTypes, SIGNATURE, Targets, WASM_CLIENT_CONFIG} from '../constants' import {messageCheck} from './messages' +import {report} from './reporting' type WebAssemblyInstance = InstanceType @@ -233,9 +234,14 @@ export class WasmInterface { } this.connections = this.idxMapToArr(this.connectionMap) // emit state + const result = report() + console.log('report result:', result) + connectionsEmitter.update(this.connections) if (existingState === -1 && state === 1) { lifetimeConnectionsEmitter.update(lifetimeConnectionsEmitter.state + 1) + // const result = report() + // console.log('report result:', result) } }