Skip to content
This repository was archived by the owner on Jul 3, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createMockInstance } from '../../../shared/base/testing/create_mock_instance'
import { FakeClock } from '../../../shared/time/fake_clock'
import { DirectWebSocketClient } from '../direct_web_socket_client'
import { WebSocketProxyNUClearNetClient } from '../web_socket_proxy_nuclearnet_client'
import Mocked = jest.Mocked
Expand All @@ -9,7 +10,7 @@ describe('WebSocketProxyNUClearNetClient', () => {

beforeEach(() => {
mockWebSocket = createMockInstance(DirectWebSocketClient)
client = new WebSocketProxyNUClearNetClient(mockWebSocket)
client = new WebSocketProxyNUClearNetClient(mockWebSocket, FakeClock.of())
})

it('forwards connect calls to socket', () => {
Expand Down
9 changes: 5 additions & 4 deletions src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { NUClearNetPacket } from 'nuclearnet.js'
import { NUClearPacketListener } from '../../shared/nuclearnet/nuclearnet_client'
import { NUClearEventListener } from '../../shared/nuclearnet/nuclearnet_client'
import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client'
import { Clock } from '../../shared/time/clock'
import { BrowserSystemClock } from '../time/browser_clock'

import { WebWorkerWebSocketClient } from './webworker_web_socket_client'
import { WebSocketClient } from './web_socket_client'
Expand All @@ -24,7 +26,7 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient {
private leaveListeners: Set<NUClearEventListener>
private packetListeners: Map<string, Set<{ requestToken: string; listener: PacketListener }>>

constructor(private socket: WebSocketClient) {
constructor(private socket: WebSocketClient, private readonly clock: Clock) {
this.nextRequestToken = 0
this.joinListeners = new Set()
this.leaveListeners = new Set()
Expand All @@ -38,6 +40,7 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient {
upgrade: false,
transports: ['websocket'],
} as any),
BrowserSystemClock,
)
}

Expand Down Expand Up @@ -87,9 +90,7 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient {
this.socket.send('listen', event, requestToken)
const listener = (packet: NUClearNetPacket, ack?: () => void) => {
cb(packet)
if (ack) {
ack()
}
ack?.()
}
this.socket.on(event, listener)

Expand Down
56 changes: 44 additions & 12 deletions src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,24 @@ class WebSocketServerClient {
}
}

class PacketProcessor {
private outgoingPackets: number = 0
type NetStats = {
// The total number of packets that are out on the wire.
active: number
// The total number of payload bytes sent to the client over the lifetime of the connection.
bytesSent: number
// The total number of payload bytes acknowledged from the client over the lifetime of the connection.
bytesAcked: number
// The timestamp of when the last packet was sent.
lastSent: number
}

// The maximum number of packets to send before receiving acknowledgements.
private readonly outgoingLimit: number
class PacketProcessor {
private readonly stats: NetStats = {
active: 0,
bytesSent: 0,
bytesAcked: 0,
lastSent: 0,
}

// The number of seconds before giving up on an acknowledge
private readonly timeout: number
Expand All @@ -171,9 +184,8 @@ class PacketProcessor {
private socket: WebSocket,
private clock: Clock,
private queue: LruPriorityQueue<string, { event: string; packet: NUClearNetPacket }>,
{ outgoingLimit, timeout }: { outgoingLimit: number; timeout: number },
{ timeout }: { timeout: number },
) {
this.outgoingLimit = outgoingLimit
this.timeout = timeout
this.queue = queue
}
Expand All @@ -183,7 +195,7 @@ class PacketProcessor {
socket,
NodeSystemClock,
new LruPriorityQueue({ capacityPerKey: 2 }),
{ outgoingLimit: 10, timeout: 5 },
{ timeout: 5 },
)
}

Expand All @@ -200,22 +212,42 @@ class PacketProcessor {
}

private maybeSendNextPacket() {
if (this.outgoingPackets < this.outgoingLimit) {
const next = this.queue.pop()
if (next) {
const next = this.queue.pop()
if (next) {
if (this.canSendEvent()) {
const { event, packet } = next
let isDone = false
this.stats.active++
this.stats.bytesSent += packet.payload.length
const done = () => {
if (!isDone) {
this.outgoingPackets--
// Update our performance tracking information
this.stats.active -= 1
this.stats.bytesAcked += packet.payload.length
isDone = true
this.maybeSendNextPacket()
}
}
this.outgoingPackets++
this.socket.volatileSend(event, packet, done)
this.stats.lastSent = this.clock.performanceNow()
this.clock.setTimeout(done, this.timeout)
}
}
}

private canSendEvent(): boolean {
const bytesBehind = this.stats.bytesSent - this.stats.bytesAcked
const bytesLow = 1024 * 1024
const bytesHigh = 1024 * 1024 * 10
const behindRatio = lerp(bytesBehind, bytesLow, bytesHigh)
return behindRatio < Math.random()
}
}

function lerp(x: number, min: number, max: number): number {
return clamp((x - min) / (max - min), 0, 1)
}

function clamp(x: number, min: number, max: number) {
return Math.min(Math.max(x, min), max)
}