Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ui/extension/dist
ui/extension/packages
ui/extension/.env
ui/wp-plugin.zip
ui/public/widget.wasm

# netstated globe web client and build
netstate/d/webclients/globe/node_modules
Expand Down
4 changes: 4 additions & 0 deletions clientcore/broflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt
egOpt = NewDefaultEgressOptions()
}

// obtain the donor's team member ID if they are a donor seeking "credit" for their facilitated connections
// TODO this is a mock/placeholder, replace with real value
egOpt.TeamMemberID = fmt.Sprintf("MOCK-TEAM-ID-%v", time.Now().UTC().Format("2006-01-02T15:04:05"))

// The boot DAG:
// build cTable/pTable -> build the Broflake struct -> run ui.Init -> set up the bus and bind
// the upstream/downstream handlers -> build cRouter/pRouter -> start the bus, init the routers,
Expand Down
12 changes: 11 additions & 1 deletion clientcore/egress_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package clientcore

import (
"context"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -42,7 +43,16 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor

// TODO: WSS

c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, nil)
var opts websocket.DialOptions
if options.TeamMemberID != "" {
headers := http.Header{}
headers.Add("X-Unbounded", options.TeamMemberID)
opts = websocket.DialOptions{
HTTPHeader: headers,
}
}

c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, &opts)
if err != nil {
common.Debugf("Couldn't connect to egress server at %v: %v", options.Addr, err)
<-time.After(options.ErrorBackoff)
Expand Down
1 change: 1 addition & 0 deletions clientcore/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type EgressOptions struct {
Endpoint string
ConnectTimeout time.Duration
ErrorBackoff time.Duration
TeamMemberID string // an optional team member ID recording donated connectivity
}

func NewDefaultEgressOptions() *EgressOptions {
Expand Down
29 changes: 29 additions & 0 deletions diagrams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diagrams

Temporary file for updated diagrams from the [README](./README.md) done natively in mermaid to allow version-controlled updates.

```mermaid
flowchart LR
subgraph blocked-peer
client
end
subgraph unbounded.lantern.io
widget
leaderboard
end
subgraph matchmaking
freddie <--> widget
end
subgraph lantern-cloud
subgraph http-proxy
widget <==> |WebSocket| egress
end
egress-->redis[(redis)]
redis-.->api
api<-->db[(database)]
end
client <==> |proxy| widget
client <--> freddie
api --> leaderboard
internet((open internet)) <==> egress
```
38 changes: 30 additions & 8 deletions egress/egresslib.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type websocketPacketConn struct {
addr net.Addr
keepalive time.Duration
tcpAddr *net.TCPAddr
foo string
}

func (q websocketPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
Expand Down Expand Up @@ -105,10 +106,12 @@ func (q websocketPacketConn) LocalAddr() net.Addr {

type proxyListener struct {
net.Listener
connections chan net.Conn
tlsConfig *tls.Config
addr net.Addr
closeMetrics func(ctx context.Context) error
connections chan net.Conn
tlsConfig *tls.Config
addr net.Addr
closeMetrics func(ctx context.Context) error
ReportConnection func(ctx context.Context) error // TODO make it make sense
ReportBytes func(ctx context.Context) error // TODO make it real
}

func (l proxyListener) Accept() (net.Conn, error) {
Expand Down Expand Up @@ -191,6 +194,10 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
common.Debugf("Accepted a new WebSocket connection! (%v total)", atomic.AddUint64(&nClients, 1))
nClientsCounter.Add(context.Background(), 1)

// check for optional tracking identifier for donors who wish to be credited the facilitated connections
unboundedID := r.Header.Get("X-Unbounded")
common.Debugf("X-Unbounded: %v", unboundedID)

listener, err := quic.Listen(wspconn, l.tlsConfig, &common.QUICCfg)
if err != nil {
common.Debugf("Error creating QUIC listener: %v", err)
Expand All @@ -205,12 +212,16 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
break
}

nQUICConnectionsCounter.Add(context.Background(), 1)
nQUICConnectionsCounter.Add(conn.Context(), 1)
common.Debugf("%v accepted a new QUIC connection!", wspconn.addr)

go func() {
common.Debugf("[placeholder] POST new connection: %v", unboundedID)
}()

go func() {
for {
stream, err := conn.AcceptStream(context.Background())
stream, err := conn.AcceptStream(conn.Context())

if err != nil {
// We interpret an error while accepting a stream to indicate an unrecoverable error with
Expand All @@ -223,13 +234,24 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
}

common.Debugf("Accepted a new QUIC stream! (%v total)", atomic.AddUint64(&nQUICStreams, 1))
nQUICStreamsCounter.Add(context.Background(), 1)
// TODO does this even work?
err = l.ReportConnection(conn.Context())
if err != nil {
common.Debugf("Error updating leaderboard metrics: %v", err)
}

nQUICStreamsCounter.Add(conn.Context(), 1)

l.connections <- common.QUICStreamNetConn{
Stream: stream,
OnClose: func() {
defer common.Debugf("Closed a QUIC stream! (%v total)", atomic.AddUint64(&nQUICStreams, ^uint64(0)))
nQUICStreamsCounter.Add(context.Background(), -1)
// TODO (maybe) capture and report total transferred bytes
err := l.ReportBytes(conn.Context())
if err != nil {
common.Debugf("Error updating leaderboard metrics: %w", err)
}
},
AddrLocal: l.addr,
AddrRemote: tcpAddr,
Expand All @@ -240,9 +262,9 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
}

func NewListener(ctx context.Context, ll net.Listener, certPEM, keyPEM string) (net.Listener, error) {
var err error
closeFuncMetric := telemetry.EnableOTELMetrics(ctx)
m := otel.Meter("github.com/getlantern/broflake/egress")
var err error
nClientsCounter, err = m.Int64UpDownCounter("concurrent-websockets")
if err != nil {
closeFuncMetric(ctx)
Expand Down
60 changes: 60 additions & 0 deletions spinup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env bash

SESSION_NAME="unbounded-sandbox"
# FREDDIE_DEFAULT=9000
# PORT_EGRESS_DEFAULT=8000

# if [ -z "$FREDDIE" ]; then
# echo "PORT_FREDDIE is not set. Defaulting to $FREDDIE_DEFAULT."
# exit 1
# fi
# FREDDIE=9000

# if [ -z "$PORT_EGRESS" ]; then
# echo "PORT_EGRESS is not set. Defaulting to $PORT_EGRESS_DEFAULT."
# exit 1
# fi

create_tmux_session() {
local session_name=$1
shift
local commands=("$@")

tmux new-session -d -s "$session_name"

# Split the window into four panes
tmux split-window -h
tmux split-window -v
tmux select-pane -t 0
tmux split-window -v

# split four panes
# TODO more?
# Send commands to each pane
for i in "${!commands[@]}"; do
tmux select-pane -t "$i"
tmux send-keys "${commands[$i]}" C-m
done

# Attach to the tmux session
tmux attach-session -t "$session_name"
}

commands=(
# build and start native binary desktop client
"cd cmd && ./build.sh desktop && cd dist/bin && FREDDIE=http://localhost:9000 EGRESS=http://localhost:8000 ./desktop"

# build and start native binary widget
"cd cmd && ./build.sh widget && cd dist/bin && FREDDIE=http://localhost:9000 EGRESS=http://localhost:8000 ./widget"

# build browser widget
# "cd cmd && ./build_web.sh"

# start freddie
"cd freddie/cmd && PORT=9000 go run main.go"

# start egress
"cd egress/cmd && PORT=8000 go run egress.go"
)

create_tmux_session "$SESSION_NAME" "${commands[@]}"
Binary file modified ui/public/widget.wasm
Binary file not shown.
18 changes: 18 additions & 0 deletions ui/src/utils/reporting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export async function report() {
const headers = {
"X-Lantern-TeamID": 'fake-team-iden-tify',
"X-Lantern-UserID": 'fake-user-iden-tify',
};

fetch('http://localhost:8888', {
method: 'POST',
headers: headers,
body: JSON.stringify({})
})
.then(response => {
console.log('report sent, response:', response);
})
.catch(error => {
console.error('report send failed:', error);
});
}
6 changes: 6 additions & 0 deletions ui/src/utils/wasmInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {StateEmitter} from '../hooks/useStateEmitter'
import MockWasmClient from '../mocks/mockWasmClient'
import {MessageTypes, SIGNATURE, Targets, WASM_CLIENT_CONFIG} from '../constants'
import {messageCheck} from './messages'
import {report} from './reporting'

type WebAssemblyInstance = InstanceType<typeof WebAssembly.Instance>

Expand Down Expand Up @@ -233,9 +234,14 @@ export class WasmInterface {
}
this.connections = this.idxMapToArr(this.connectionMap)
// emit state
const result = report()
console.log('report result:', result)

connectionsEmitter.update(this.connections)
if (existingState === -1 && state === 1) {
lifetimeConnectionsEmitter.update(lifetimeConnectionsEmitter.state + 1)
// const result = report()
// console.log('report result:', result)
}
}

Expand Down