diff --git a/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts b/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts index d29601f8..67386f3c 100644 --- a/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts +++ b/src/server/nuclearnet/web_socket_proxy_nuclearnet_server.ts @@ -1,6 +1,7 @@ import { NUClearNetOptions } from 'nuclearnet.js' import { NUClearNetPeer } from 'nuclearnet.js' import { NUClearNetPacket } from 'nuclearnet.js' +import * as stream from 'stream' import { NUClearNetClient } from '../../shared/nuclearnet/nuclearnet_client' import { Clock } from '../../shared/time/clock' @@ -40,7 +41,7 @@ class WebSocketServerClient { private offJoin: () => void private offLeave: () => void private offListenMap: Map void> - private processors: Map + private processors: Map public constructor(private nuclearnetClient: NUClearNetClient, private socket: WebSocket) { this.connected = false @@ -61,7 +62,8 @@ class WebSocketServerClient { private onJoin = (peer: NUClearNetPeer) => { this.socket.send('nuclear_join', peer) - this.processors.set(peer, PacketProcessor.of(this.socket)) + // this.processors.set(peer, PacketProcessor.of(this.socket)) + this.processors.set(peer, PacketStream.of(this.socket)) } private onLeave = (peer: NUClearNetPeer) => { @@ -108,7 +110,8 @@ class WebSocketServerClient { if (peerKey) { const processor = this.processors.get(peerKey) if (processor) { - processor.onPacket(event, packet) + const streamPacket: StreamPacket = { event, packet } + processor.write(streamPacket) } } } @@ -123,6 +126,32 @@ class WebSocketServerClient { } } +type StreamPacket = { + event: string, + packet: NUClearNetPacket +} + +class PacketStream extends stream.Writable { + public constructor(private socket: WebSocket) { + super({ + objectMode: true, + }) + } + + public static of(socket: WebSocket) { + return new PacketStream(socket) + } + + public _write(streamPacket: StreamPacket) { + const { event, packet } = streamPacket + if (packet.reliable) { + this.socket.send(event, packet) + } else { + this.socket.sendVolatile(event, packet) + } + } +} + class PacketProcessor { private eventQueueSize: Map @@ -149,9 +178,10 @@ class PacketProcessor { this.sendReliablePacket(event, packet) } else if (this.isEventBelowLimit(event)) { this.sendUnreliablePacket(event, packet) - }/* else { - // This event is unreliable and already at the limit, simply drop the packet. - }*/ + } + /* else { + // This event is unreliable and already at the limit, simply drop the packet. + }*/ } private isEventBelowLimit(event: string) { diff --git a/src/server/nuclearnet/web_socket_server.ts b/src/server/nuclearnet/web_socket_server.ts index 35082c7c..ad112645 100644 --- a/src/server/nuclearnet/web_socket_server.ts +++ b/src/server/nuclearnet/web_socket_server.ts @@ -37,4 +37,8 @@ export class WebSocket { public send(event: string, ...args: any[]) { this.sioSocket.emit(event, ...args) } + + public sendVolatile(event: string, ...args: any[]) { + this.sioSocket.volatile.emit(event, ...args) + } }