diff --git a/src/client/nuclearnet/tests/web_socket_proxy_nuclearnet_client.tests.ts b/src/client/nuclearnet/tests/web_socket_proxy_nuclearnet_client.tests.ts index a8615ffd..0f8553e3 100644 --- a/src/client/nuclearnet/tests/web_socket_proxy_nuclearnet_client.tests.ts +++ b/src/client/nuclearnet/tests/web_socket_proxy_nuclearnet_client.tests.ts @@ -1,4 +1,5 @@ import { createMockInstance } from '../../../shared/base/testing/create_mock_instance' +import { FakeClock } from '../../../shared/time/fake_clock' import { DirectWebSocketClient } from '../direct_web_socket_client' import { WebSocketProxyNUClearNetClient } from '../web_socket_proxy_nuclearnet_client' import Mocked = jest.Mocked @@ -9,7 +10,7 @@ describe('WebSocketProxyNUClearNetClient', () => { beforeEach(() => { mockWebSocket = createMockInstance(DirectWebSocketClient) - client = new WebSocketProxyNUClearNetClient(mockWebSocket) + client = new WebSocketProxyNUClearNetClient(mockWebSocket, FakeClock.of()) }) it('forwards connect calls to socket', () => { diff --git a/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts b/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts index ded36621..bb4029a0 100644 --- a/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts +++ b/src/client/nuclearnet/web_socket_proxy_nuclearnet_client.ts @@ -5,6 +5,8 @@ import { NUClearNetPacket } from 'nuclearnet.js' import { NUClearPacketListener } from '../../shared/nuclearnet/nuclearnet_client' import { NUClearEventListener } from '../../shared/nuclearnet/nuclearnet_client' import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' +import { Clock } from '../../shared/time/clock' +import { BrowserSystemClock } from '../time/browser_clock' import { WebWorkerWebSocketClient } from './webworker_web_socket_client' import { WebSocketClient } from './web_socket_client' @@ -24,7 +26,7 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient { private leaveListeners: Set private packetListeners: Map> - constructor(private socket: WebSocketClient) { + constructor(private socket: WebSocketClient, private readonly clock: Clock) { this.nextRequestToken = 0 this.joinListeners = new Set() this.leaveListeners = new Set() @@ -38,6 +40,7 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient { upgrade: false, transports: ['websocket'], } as any), + BrowserSystemClock, ) } @@ -87,9 +90,7 @@ export class WebSocketProxyNUClearNetClient implements NUClearNetClient { this.socket.send('listen', event, requestToken) const listener = (packet: NUClearNetPacket, ack?: () => void) => { cb(packet) - if (ack) { - ack() - } + ack?.() } this.socket.on(event, listener) diff --git a/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts b/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts index a920ac45..ec62e046 100644 --- a/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts +++ b/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts @@ -158,11 +158,24 @@ class WebSocketServerClient { } } -class PacketProcessor { - private outgoingPackets: number = 0 +type NetStats = { + // The total number of packets that are out on the wire. + active: number + // The total number of payload bytes sent to the client over the lifetime of the connection. + bytesSent: number + // The total number of payload bytes acknowledged from the client over the lifetime of the connection. + bytesAcked: number + // The timestamp of when the last packet was sent. + lastSent: number +} - // The maximum number of packets to send before receiving acknowledgements. - private readonly outgoingLimit: number +class PacketProcessor { + private readonly stats: NetStats = { + active: 0, + bytesSent: 0, + bytesAcked: 0, + lastSent: 0, + } // The number of seconds before giving up on an acknowledge private readonly timeout: number @@ -171,9 +184,8 @@ class PacketProcessor { private socket: WebSocket, private clock: Clock, private queue: LruPriorityQueue, - { outgoingLimit, timeout }: { outgoingLimit: number; timeout: number }, + { timeout }: { timeout: number }, ) { - this.outgoingLimit = outgoingLimit this.timeout = timeout this.queue = queue } @@ -183,7 +195,7 @@ class PacketProcessor { socket, NodeSystemClock, new LruPriorityQueue({ capacityPerKey: 2 }), - { outgoingLimit: 10, timeout: 5 }, + { timeout: 5 }, ) } @@ -200,22 +212,42 @@ class PacketProcessor { } private maybeSendNextPacket() { - if (this.outgoingPackets < this.outgoingLimit) { - const next = this.queue.pop() - if (next) { + const next = this.queue.pop() + if (next) { + if (this.canSendEvent()) { const { event, packet } = next let isDone = false + this.stats.active++ + this.stats.bytesSent += packet.payload.length const done = () => { if (!isDone) { - this.outgoingPackets-- + // Update our performance tracking information + this.stats.active -= 1 + this.stats.bytesAcked += packet.payload.length isDone = true this.maybeSendNextPacket() } } - this.outgoingPackets++ this.socket.volatileSend(event, packet, done) + this.stats.lastSent = this.clock.performanceNow() this.clock.setTimeout(done, this.timeout) } } } + + private canSendEvent(): boolean { + const bytesBehind = this.stats.bytesSent - this.stats.bytesAcked + const bytesLow = 1024 * 1024 + const bytesHigh = 1024 * 1024 * 10 + const behindRatio = lerp(bytesBehind, bytesLow, bytesHigh) + return behindRatio < Math.random() + } +} + +function lerp(x: number, min: number, max: number): number { + return clamp((x - min) / (max - min), 0, 1) +} + +function clamp(x: number, min: number, max: number) { + return Math.min(Math.max(x, min), max) }