diff --git a/.changeset/app-worker-unit-tests.md b/.changeset/app-worker-unit-tests.md new file mode 100644 index 00000000..04e35275 --- /dev/null +++ b/.changeset/app-worker-unit-tests.md @@ -0,0 +1,5 @@ +--- +"nostream": patch +--- + +Add comprehensive unit tests for remaining app-level workers (AppWorker, App, StaticMirroringWorker) with 65+ test cases covering lifecycle, configuration, error handling, and dependency injection diff --git a/src/app/app.ts b/src/app/app.ts index 9ac4bd94..bdca812e 100644 --- a/src/app/app.ts +++ b/src/app/app.ts @@ -47,8 +47,8 @@ export class App implements IRunnable { ░ ░ ░ ░ ░ ░ ▒ ░ ░ ░ ░ ░░ ░ ░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░`) const width = 74 - const torHiddenServicePort = process.env.HIDDEN_SERVICE_PORT ? Number(process.env.HIDDEN_SERVICE_PORT) : 80 - const port = process.env.RELAY_PORT ? Number(process.env.RELAY_PORT) : 8008 + const torHiddenServicePort = this.process.env.HIDDEN_SERVICE_PORT ? Number(this.process.env.HIDDEN_SERVICE_PORT) : 80 + const port = this.process.env.RELAY_PORT ? Number(this.process.env.RELAY_PORT) : 8008 const logCentered = (input: string, width: number) => { const start = (width - input.length) >> 1 @@ -72,8 +72,8 @@ export class App implements IRunnable { this.process.exit(1) } - const workerCount = process.env.WORKER_COUNT - ? Number(process.env.WORKER_COUNT) + const workerCount = this.process.env.WORKER_COUNT + ? Number(this.process.env.WORKER_COUNT) : this.settings().workers?.count || cpus().length const createWorker = (env: Record) => { diff --git a/src/app/static-mirroring-worker.ts b/src/app/static-mirroring-worker.ts index 4ec79cdc..977349e5 100644 --- a/src/app/static-mirroring-worker.ts +++ b/src/app/static-mirroring-worker.ts @@ -55,7 +55,10 @@ export class StaticMirroringWorker implements IRunnable { logger.info('mirroring', currentSettings.mirroring) - this.config = path(['mirroring', 'static', process.env.MIRROR_INDEX], currentSettings) as Mirror + this.config = path(['mirroring', 'static', this.process.env.MIRROR_INDEX], currentSettings) as Mirror + if (!this.config) { + throw new Error(`Mirror configuration not found for index ${this.process.env.MIRROR_INDEX}`) + } let since = Math.floor(Date.now() / 1000) - 60 * 10 @@ -172,7 +175,7 @@ export class StaticMirroringWorker implements IRunnable { const eventLimits = this.settings().limits?.event ?? {} - const eventLimitOverrides = this.config.limits?.event ?? {} + const eventLimitOverrides = this.config?.limits?.event ?? {} const limits = mergeDeepRight(eventLimits, eventLimitOverrides) as EventLimits @@ -278,7 +281,7 @@ export class StaticMirroringWorker implements IRunnable { protected async isUserAdmitted(event: Event): Promise { const currentSettings = this.settings() - if (this.config.skipAdmissionCheck === true) { + if (this.config?.skipAdmissionCheck === true) { return true } diff --git a/test/unit/app/app.spec.ts b/test/unit/app/app.spec.ts new file mode 100644 index 00000000..99862775 --- /dev/null +++ b/test/unit/app/app.spec.ts @@ -0,0 +1,392 @@ +import EventEmitter from 'events' + +import chai from 'chai' +import Sinon from 'sinon' +import sinonChai from 'sinon-chai' + +import { App } from '../../../src/app/app' +import { Settings } from '../../../src/@types/settings' +import * as settingsUtils from '../../../src/utils/settings' +import * as torClient from '../../../src/tor/client' + +chai.use(sinonChai) + +const { expect } = chai + +describe('App', () => { + let sandbox: Sinon.SinonSandbox + let app: App + let fakeProcess: NodeJS.Process & { exit: Sinon.SinonStub } + let fakeCluster: any + let settingsStub: Sinon.SinonStub + let watchSettingsStub: Sinon.SinonStub + let addOnionStub: Sinon.SinonStub + let settingsState: Partial + + const defaultSettings = (): Partial => ({ + workers: { count: 2 }, + mirroring: { + static: [], + }, + info: { + relay_url: 'wss://relay.example.com', + name: 'test', + description: 'test relay', + pubkey: 'a'.repeat(64), + contact: 'test@example.com', + } as any, + }) + + let workerIdCounter = 0 + + const createFakeWorker = (): any => { + const id = ++workerIdCounter + return { + id, + process: { pid: id + 100000 }, + send: sandbox.stub(), + } + } + + beforeEach(() => { + sandbox = Sinon.createSandbox() + workerIdCounter = 0 + + fakeProcess = Object.assign(new EventEmitter(), { + exit: sandbox.stub(), + env: { RELAY_PORT: '8008' }, + }) as any + + const fakeWorker1 = createFakeWorker() + const fakeWorker2 = createFakeWorker() + + fakeCluster = Object.assign(new EventEmitter(), { + workers: { + [fakeWorker1.id]: fakeWorker1, + [fakeWorker2.id]: fakeWorker2, + }, + fork: sandbox.stub().callsFake((env: Record) => { + const newWorker = createFakeWorker() + fakeCluster.workers[newWorker.id] = newWorker + return newWorker + }), + }) + + settingsState = defaultSettings() + settingsStub = sandbox.stub().callsFake(() => settingsState) + + const fakeWatcher = { close: sandbox.stub() } as any + watchSettingsStub = sandbox.stub(settingsUtils.SettingsStatic, 'watchSettings').returns([fakeWatcher] as any) + + addOnionStub = sandbox.stub(torClient, 'addOnion').resolves('onion-address.onion') + }) + + afterEach(() => { + sandbox.restore() + }) + + describe('constructor', () => { + it('initializes the app with process and cluster', () => { + app = new App(fakeProcess, fakeCluster, settingsStub) + + expect(fakeCluster.listenerCount('message')).to.equal(1) + expect(fakeCluster.listenerCount('exit')).to.equal(1) + expect(fakeProcess.listenerCount('SIGTERM')).to.equal(1) + }) + + it('creates a WeakMap for tracking workers', () => { + app = new App(fakeProcess, fakeCluster, settingsStub) + + expect(app).to.be.an('object') + }) + }) + + describe('run', () => { + beforeEach(() => { + fakeCluster.fork.resetHistory() + fakeCluster.workers = {} + app = new App(fakeProcess, fakeCluster, settingsStub) + }) + + it('watches settings on startup', () => { + app.run() + + expect(watchSettingsStub).to.have.been.calledOnce + }) + + it('forks worker processes based on configured count', () => { + settingsState.workers = { count: 3 } + + app.run() + + // Should fork 3 client workers + 1 maintenance worker + expect(fakeCluster.fork.callCount).to.be.at.least(4) + }) + + it('uses CPU count as default worker count when not configured', () => { + settingsState.workers = undefined + + app.run() + + expect(fakeCluster.fork.callCount).to.be.greaterThan(0) + }) + + it('respects WORKER_COUNT environment variable', () => { + fakeCluster.fork.resetHistory() + fakeProcess.env.WORKER_COUNT = '2' + settingsState.workers = { count: 10 } + + const appInstance = new App(fakeProcess, fakeCluster, settingsStub) + appInstance.run() + + // WORKER_COUNT overrides settings, so should fork 2 + 1 maintenance + expect(fakeCluster.fork.callCount).to.equal(3) + }) + + it('forks one maintenance worker', () => { + settingsState.workers = { count: 2 } + + app.run() + + const maintenanceCall = Array.from((fakeCluster.fork as any).getCalls()).find( + (call: any) => call.args?.[0]?.WORKER_TYPE === 'maintenance', + ) + + expect(maintenanceCall).to.exist + }) + + it('forks static-mirroring workers when mirroring is configured', () => { + settingsState.workers = { count: 1 } + settingsState.mirroring = { + static: [ + { address: 'ws://mirror1.com', filters: [] } as any, + { address: 'ws://mirror2.com', filters: [] } as any, + ], + } + + app.run() + + const mirrorCalls = Array.from((fakeCluster.fork as any).getCalls()).filter( + (call: any) => call.args?.[0]?.WORKER_TYPE === 'static-mirroring', + ) + + expect(mirrorCalls).to.have.lengthOf(2) + }) + + it('assigns MIRROR_INDEX to mirroring workers', () => { + settingsState.workers = { count: 1 } + settingsState.mirroring = { + static: [{ address: 'ws://mirror.com', filters: [] } as any], + } + + app.run() + + const mirrorCall = Array.from((fakeCluster.fork as any).getCalls()).find( + (call: any) => call.args?.[0]?.WORKER_TYPE === 'static-mirroring', + ) + + expect((mirrorCall as any)?.args?.[0]?.MIRROR_INDEX).to.equal('0') + }) + + it('assigns WORKER_INDEX to client workers', () => { + settingsState.workers = { count: 2 } + + app.run() + + const workerCalls = Array.from((fakeCluster.fork as any).getCalls()).filter( + (call: any) => call.args?.[0]?.WORKER_TYPE === 'worker', + ) + + expect((workerCalls[0] as any)?.args?.[0]?.WORKER_INDEX).to.equal('0') + expect((workerCalls[1] as any)?.args?.[0]?.WORKER_INDEX).to.equal('1') + }) + + it('attempts to add Tor hidden service', () => { + fakeProcess.env.HIDDEN_SERVICE_PORT = '80' + fakeProcess.env.RELAY_PORT = '8008' + + app.run() + + expect(addOnionStub).to.have.been.called + }) + + it('handles Tor hidden service setup failure gracefully', async () => { + addOnionStub.rejects(new Error('Tor unavailable')) + + app.run() + + // Should not throw + expect(app).to.exist + }) + + it('exits when SECRET is missing but payments are enabled', () => { + settingsState.payments = { enabled: true } as any + fakeProcess.env.SECRET = '' + + app.run() + + expect(fakeProcess.exit).to.have.been.calledWith(1) + }) + + it('exits when SECRET is default and payments are enabled', () => { + settingsState.payments = { enabled: true } as any + fakeProcess.env.SECRET = 'changeme' + + app.run() + + expect(fakeProcess.exit).to.have.been.calledWith(1) + }) + + it('does not exit when SECRET is valid and payments are enabled', () => { + settingsState.payments = { enabled: true } as any + fakeProcess.env.SECRET = 'secure-secret-key' + + app.run() + + expect(fakeProcess.exit).not.to.have.been.called + }) + + it('does not require SECRET when payments are disabled', () => { + settingsState.payments = { enabled: false } as any + fakeProcess.env.SECRET = '' + + app.run() + + expect(fakeProcess.exit).not.to.have.been.called + }) + }) + + describe('onClusterMessage', () => { + let worker1: any + let worker2: any + + beforeEach(() => { + worker1 = createFakeWorker() + worker2 = createFakeWorker() + + fakeCluster.workers = { + [worker1.id]: worker1, + [worker2.id]: worker2, + } + + app = new App(fakeProcess, fakeCluster, settingsStub) + }) + + it('broadcasts message to all workers except sender', () => { + const message = { eventName: 'test', event: {} } + + fakeCluster.emit('message', worker1, message) + + expect(worker2.send).to.have.been.calledWith(message) + expect(worker1.send).not.to.have.been.called + }) + + it('handles messages from multiple sources', () => { + const message1 = { eventName: 'event1', event: {} } + const message2 = { eventName: 'event2', event: {} } + + fakeCluster.emit('message', worker1, message1) + fakeCluster.emit('message', worker2, message2) + + expect(worker2.send).to.have.been.calledWith(message1) + expect(worker1.send).to.have.been.calledWith(message2) + }) + }) + + describe('onClusterExit', () => { + let worker: any + let deadWorker: any + let clock: Sinon.SinonFakeTimers + + beforeEach(() => { + clock = sandbox.useFakeTimers() + + worker = createFakeWorker() + deadWorker = createFakeWorker() + + fakeCluster.workers = { + [worker.id]: worker, + [deadWorker.id]: deadWorker, + } + + app = new App(fakeProcess, fakeCluster, settingsStub) + }) + + it('does not restart worker on clean exit (code 0)', () => { + fakeCluster.emit('exit', deadWorker, 0, '') + + // No restart scheduled + expect(fakeCluster.fork).not.to.have.been.called + }) + + it('does not restart worker on SIGINT signal', () => { + fakeCluster.emit('exit', deadWorker, null, 'SIGINT') + + expect(fakeCluster.fork).not.to.have.been.called + }) + + it('schedules worker restart on unexpected exit', () => { + settingsState.workers = { count: 1 } + settingsState.mirroring = { static: [] } + + app.run() + + const registeredWorker = (fakeCluster.fork as Sinon.SinonStub).firstCall.returnValue + fakeCluster.fork.resetHistory() + + fakeCluster.emit('exit', registeredWorker, 1, '') + clock.tick(10001) + + expect(fakeCluster.fork).to.have.been.called + }) + }) + + describe('onExit', () => { + beforeEach(() => { + app = new App(fakeProcess, fakeCluster, settingsStub) + }) + + it('closes watchers and exits process with code 0', () => { + app.run() + fakeProcess.emit('SIGTERM') + + expect(fakeProcess.exit).to.have.been.calledOnceWithExactly(0) + }) + }) + + describe('close', () => { + beforeEach(() => { + app = new App(fakeProcess, fakeCluster, settingsStub) + }) + + it('closes all file watchers', () => { + const fakeWatcher1 = { close: sandbox.stub() } + const fakeWatcher2 = { close: sandbox.stub() } + watchSettingsStub.returns([fakeWatcher1, fakeWatcher2]) + + app.run() + app.close() + + expect(fakeWatcher1.close).to.have.been.called + expect(fakeWatcher2.close).to.have.been.called + }) + + it('invokes the callback', () => { + const callback = sandbox.stub() + + app.close(callback) + + expect(callback).to.have.been.calledOnce + }) + + it('does not throw when called without watchers', () => { + watchSettingsStub.returns([]) + + expect(() => app.close()).not.to.throw() + }) + + it('handles undefined watchers gracefully', () => { + expect(() => app.close()).not.to.throw() + }) + }) +}) diff --git a/test/unit/app/static-mirroring-worker.spec.ts b/test/unit/app/static-mirroring-worker.spec.ts new file mode 100644 index 00000000..8041b582 --- /dev/null +++ b/test/unit/app/static-mirroring-worker.spec.ts @@ -0,0 +1,462 @@ +import EventEmitter from 'events' + +import chai from 'chai' +import Sinon from 'sinon' +import sinonChai from 'sinon-chai' + +import { StaticMirroringWorker } from '../../../src/app/static-mirroring-worker' +import { Event } from '../../../src/@types/event' +import { Settings } from '../../../src/@types/settings' +import { IEventRepository, IUserRepository } from '../../../src/@types/repositories' +import { getPublicKey, getRelayPrivateKey } from '../../../src/utils/event' +import { WebSocketServerAdapterEvent } from '../../../src/constants/adapter' + +chai.use(sinonChai) + +const { expect } = chai + +describe('StaticMirroringWorker', () => { + let sandbox: Sinon.SinonSandbox + let worker: StaticMirroringWorker + let fakeProcess: EventEmitter & { exit: Sinon.SinonStub; env: Record } + let eventRepository: Sinon.SinonStubbedInstance + let userRepository: Sinon.SinonStubbedInstance + let settingsStub: Sinon.SinonStub + let settingsState: Partial + + const defaultSettings = (): Partial => ({ + mirroring: { + static: [ + { + address: 'ws://source-relay.com', + filters: [{ kinds: [1, 2] }], + limits: { event: { content: { maxLength: 10000 } } }, + } as any, + ], + }, + info: { + relay_url: 'wss://relay.example.com', + name: 'test', + description: 'test', + pubkey: 'a'.repeat(64), + contact: 'test@example.com', + } as any, + limits: { event: { content: { maxLength: 20000 } } }, + payments: { enabled: false } as any, + }) + + const createEvent = (overrides: Partial = {}): Event => ({ + id: 'a'.repeat(64), + pubkey: 'b'.repeat(64), + created_at: Math.floor(Date.now() / 1000), + kind: 1, + tags: [], + content: 'test event', + sig: 'c'.repeat(128), + ...overrides, + }) + + let savedSecret: string | undefined + + beforeEach(() => { + sandbox = Sinon.createSandbox() + savedSecret = process.env.SECRET + process.env.SECRET = 'test-secret-for-unit-tests' + + fakeProcess = Object.assign(new EventEmitter(), { + exit: sandbox.stub(), + send: sandbox.stub(), + env: { MIRROR_INDEX: '0' }, + }) as EventEmitter & { exit: Sinon.SinonStub; env: Record; send: Sinon.SinonStub } + + eventRepository = { + create: sandbox.stub().resolves(true), + } as any + + userRepository = { + findByPubkey: sandbox.stub().resolves(null), + } as any + + settingsState = defaultSettings() + settingsStub = sandbox.stub().callsFake(() => settingsState) + + worker = new StaticMirroringWorker( + eventRepository, + userRepository, + fakeProcess as any, + settingsStub, + ) + }) + + afterEach(() => { + if (savedSecret === undefined) delete process.env.SECRET + else process.env.SECRET = savedSecret + sandbox.restore() + }) + + describe('constructor', () => { + it('registers SIGINT, SIGHUP, and SIGTERM handlers', () => { + expect(fakeProcess.listenerCount('SIGINT')).to.equal(1) + expect(fakeProcess.listenerCount('SIGHUP')).to.equal(1) + expect(fakeProcess.listenerCount('SIGTERM')).to.equal(1) + }) + + it('registers uncaughtException and unhandledRejection handlers', () => { + expect(fakeProcess.listenerCount('uncaughtException')).to.equal(1) + expect(fakeProcess.listenerCount('unhandledRejection')).to.equal(1) + }) + + it('registers message handler', () => { + expect(fakeProcess.listenerCount('message')).to.equal(1) + }) + }) + + describe('run', () => { + it('initializes the worker with mirror config from settings', () => { + // We can't fully test WebSocket creation, but we verify settings are accessed + worker.run() + + expect(settingsStub).to.have.been.called + }) + + it('uses MIRROR_INDEX from environment', () => { + fakeProcess.env.MIRROR_INDEX = '0' + + worker.run() + + expect(settingsStub).to.have.been.called + }) + }) + + describe('canAcceptEvent', () => { + it('rejects events from the relay itself', () => { + const relayPrivkey = getRelayPrivateKey(settingsState.info!.relay_url) + const relayPubkey = getPublicKey(relayPrivkey) + + const event = createEvent({ pubkey: relayPubkey }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('accepts valid events within limits', () => { + const event = createEvent({ pubkey: 'd'.repeat(64) }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(true) + }) + + it('rejects events with content exceeding limits', () => { + settingsState.limits = { + event: { content: { maxLength: 10 } }, + } + + const event = createEvent({ content: 'this is a very long content' }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('rejects events with created_at too far in the future', () => { + const now = Math.floor(Date.now() / 1000) + settingsState.limits = { + event: { createdAt: { maxPositiveDelta: 60 } }, + } + + const event = createEvent({ created_at: now + 3600 }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('rejects events with created_at too far in the past', () => { + const now = Math.floor(Date.now() / 1000) + settingsState.limits = { + event: { createdAt: { maxNegativeDelta: 60 } }, + } + + const event = createEvent({ created_at: now - 3600 }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('accepts events within pubkey whitelist', () => { + const pubkey = 'e'.repeat(64) + settingsState.limits = { + event: { pubkey: { whitelist: [pubkey] } as any }, + } + + const event = createEvent({ pubkey }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(true) + }) + + it('rejects events outside pubkey whitelist', () => { + settingsState.limits = { + event: { pubkey: { whitelist: ['f'.repeat(64)] } as any }, + } + + const event = createEvent({ pubkey: 'e'.repeat(64) }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('rejects events in pubkey blacklist', () => { + const pubkey = 'e'.repeat(64) + settingsState.limits = { + event: { pubkey: { blacklist: [pubkey] } as any }, + } + + const event = createEvent({ pubkey }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('accepts events not in pubkey blacklist', () => { + settingsState.limits = { + event: { pubkey: { blacklist: ['f'.repeat(64)] } as any }, + } + + const event = createEvent({ pubkey: 'e'.repeat(64) }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(true) + }) + + it('accepts events in kind whitelist', () => { + settingsState.limits = { + event: { kind: { whitelist: [1, 2, 3] } as any }, + } + + const event = createEvent({ kind: 1 }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(true) + }) + + it('rejects events outside kind whitelist', () => { + settingsState.limits = { + event: { kind: { whitelist: [1, 2, 3] } as any }, + } + + const event = createEvent({ kind: 5 }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('rejects events in kind blacklist', () => { + settingsState.limits = { + event: { kind: { blacklist: [1, 2] } as any }, + } + + const event = createEvent({ kind: 1 }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + + it('applies mirror-specific limits over global limits', () => { + settingsState.limits = { + event: { content: { maxLength: 5000 } }, + } + settingsState.mirroring = { + static: [ + { + address: 'ws://source-relay.com', + filters: [], + limits: { event: { content: { maxLength: 1000 } } }, + }, + ], + } + fakeProcess.env.MIRROR_INDEX = '0' + + worker.run() + + const event = createEvent({ content: 'x'.repeat(2000) }) + const result = (worker as any).canAcceptEvent(event) + + expect(result).to.equal(false) + }) + }) + + describe('isUserAdmitted', () => { + it('admits users when payments are disabled', async () => { + settingsState.payments = { enabled: false } as any + + const event = createEvent() + const result = await (worker as any).isUserAdmitted(event) + + expect(result).to.equal(true) + }) + + it('admits users when skipAdmissionCheck is true', async () => { + settingsState.payments = { enabled: true } as any + settingsState.mirroring = { + static: [ + { + address: 'ws://source-relay.com', + filters: [], + skipAdmissionCheck: true, + } as any, + ], + } + fakeProcess.env.MIRROR_INDEX = '0' + + worker.run() + + const event = createEvent() + const result = await (worker as any).isUserAdmitted(event) + + expect(result).to.equal(true) + }) + + it('rejects users not admitted when payments required', async () => { + settingsState.payments = { + enabled: true, + feeSchedules: { + admission: [{ enabled: true } as any], + }, + } as any + userRepository.findByPubkey.resolves({ isAdmitted: false, balance: 0 } as any) + + const event = createEvent() + const result = await (worker as any).isUserAdmitted(event) + + expect(result).to.equal(false) + }) + + it('checks user balance against minimum requirement', async () => { + settingsState.payments = { + enabled: true, + feeSchedules: { + admission: [{ enabled: true } as any], + }, + } as any + settingsState.limits = { + event: { pubkey: { minBalance: 1000 } as any }, + } + userRepository.findByPubkey.resolves({ isAdmitted: true, balance: 500 } as any) + + const event = createEvent() + const result = await (worker as any).isUserAdmitted(event) + + expect(result).to.equal(false) + }) + + it('admits users with sufficient balance', async () => { + settingsState.payments = { + enabled: true, + feeSchedules: { + admission: [{ enabled: true } as any], + }, + } as any + settingsState.limits = { + event: { pubkey: { minBalance: 1000 } as any }, + } + userRepository.findByPubkey.resolves({ isAdmitted: true, balance: 2000 } as any) + + const event = createEvent() + const result = await (worker as any).isUserAdmitted(event) + + expect(result).to.equal(true) + }) + }) + + describe('onMessage', () => { + let mockClient: { send: Sinon.SinonStub; readyState: number } + + beforeEach(() => { + mockClient = { send: sandbox.stub(), readyState: 1 /* WebSocket.OPEN */ } + ;(worker as any).config = settingsState.mirroring!.static![0] + ;(worker as any).client = mockClient + }) + + it('relays broadcast messages to connected mirror', () => { + fakeProcess.emit('message', { + eventName: WebSocketServerAdapterEvent.Broadcast, + event: createEvent(), + source: 'local', + }) + + expect(mockClient.send).to.have.been.called + }) + + it('ignores messages from the same source as the mirror', () => { + fakeProcess.emit('message', { + eventName: WebSocketServerAdapterEvent.Broadcast, + event: createEvent(), + source: 'ws://source-relay.com', + }) + + expect(mockClient.send).not.to.have.been.called + }) + + it('ignores non-broadcast messages', () => { + fakeProcess.emit('message', { + eventName: 'other-event', + event: createEvent(), + source: 'local', + }) + + expect(mockClient.send).not.to.have.been.called + }) + }) + + describe('onError', () => { + it('throws the error received from the process', () => { + const error = new Error('connection error') + + expect(() => { + fakeProcess.emit('uncaughtException', error) + }).to.throw('connection error') + }) + }) + + describe('onExit', () => { + it('closes the worker and exits the process with code 0', () => { + fakeProcess.emit('SIGTERM') + + expect(fakeProcess.exit).to.have.been.calledOnceWithExactly(0) + }) + + it('handles SIGINT', () => { + fakeProcess.emit('SIGINT') + + expect(fakeProcess.exit).to.have.been.calledOnceWithExactly(0) + }) + + it('handles SIGHUP', () => { + fakeProcess.emit('SIGHUP') + + expect(fakeProcess.exit).to.have.been.calledOnceWithExactly(0) + }) + }) + + describe('close', () => { + it('terminates the WebSocket client', () => { + const mockClient = { terminate: sandbox.stub() } + ;(worker as any).client = mockClient + + worker.close() + + expect(mockClient.terminate).to.have.been.called + }) + + it('invokes the callback when provided', () => { + const callback = sandbox.stub() + + worker.close(callback) + + expect(callback).to.have.been.calledOnce + }) + + it('does not throw when called without a callback', () => { + expect(() => worker.close()).not.to.throw() + }) + }) +}) diff --git a/test/unit/app/worker.spec.ts b/test/unit/app/worker.spec.ts new file mode 100644 index 00000000..457aaf80 --- /dev/null +++ b/test/unit/app/worker.spec.ts @@ -0,0 +1,245 @@ +import EventEmitter from 'events' + +import chai from 'chai' +import Sinon from 'sinon' +import sinonChai from 'sinon-chai' + +import { AppWorker } from '../../../src/app/worker' +import * as settingsUtils from '../../../src/utils/settings' + +chai.use(sinonChai) + +const { expect } = chai + +describe('AppWorker', () => { + let sandbox: Sinon.SinonSandbox + let worker: AppWorker + let fakeProcess: EventEmitter & { exit: Sinon.SinonStub; env: Record } + let fakeAdapter: any + let watchSettingsStub: Sinon.SinonStub + + let savedEnv: { PORT: string | undefined; RELAY_PORT: string | undefined } + + beforeEach(() => { + sandbox = Sinon.createSandbox() + + savedEnv = { PORT: process.env.PORT, RELAY_PORT: process.env.RELAY_PORT } + + fakeProcess = Object.assign(new EventEmitter(), { + exit: sandbox.stub(), + env: process.env, + }) as EventEmitter & { exit: Sinon.SinonStub; env: Record } + + const fakeWatcher = { + close: sandbox.stub(), + } as any + + watchSettingsStub = sandbox.stub(settingsUtils.SettingsStatic, 'watchSettings').returns([fakeWatcher] as any) + + fakeAdapter = { + listen: sandbox.stub(), + emit: sandbox.stub(), + close: sandbox.stub().callsFake((callback: Function) => { + if (typeof callback === 'function') { + callback() + } + }), + } + + worker = new AppWorker(fakeProcess as any, fakeAdapter) + }) + + afterEach(() => { + if (savedEnv.PORT === undefined) delete process.env.PORT + else process.env.PORT = savedEnv.PORT + if (savedEnv.RELAY_PORT === undefined) delete process.env.RELAY_PORT + else process.env.RELAY_PORT = savedEnv.RELAY_PORT + sandbox.restore() + }) + + describe('constructor', () => { + it('registers SIGINT, SIGHUP, and SIGTERM handlers', () => { + expect(fakeProcess.listenerCount('SIGINT')).to.equal(1) + expect(fakeProcess.listenerCount('SIGHUP')).to.equal(1) + expect(fakeProcess.listenerCount('SIGTERM')).to.equal(1) + }) + + it('registers uncaughtException and unhandledRejection handlers', () => { + expect(fakeProcess.listenerCount('uncaughtException')).to.equal(1) + expect(fakeProcess.listenerCount('unhandledRejection')).to.equal(1) + }) + + it('registers message handler', () => { + expect(fakeProcess.listenerCount('message')).to.equal(1) + }) + }) + + describe('run', () => { + beforeEach(() => { + fakeAdapter.listen.resetHistory() + watchSettingsStub.resetHistory() + }) + + it('watches settings on startup', () => { + delete process.env.PORT + delete process.env.RELAY_PORT + worker.run() + + expect(watchSettingsStub).to.have.been.calledOnce + }) + + it('listens on default port 8008 when PORT and RELAY_PORT env vars are not set', () => { + delete process.env.PORT + delete process.env.RELAY_PORT + worker.run() + + expect(fakeAdapter.listen).to.have.been.calledOnceWith(8008) + }) + + it('uses PORT env var if set', () => { + delete process.env.RELAY_PORT + process.env.PORT = '9000' + worker.run() + + expect(fakeAdapter.listen).to.have.been.calledOnceWith(9000) + }) + + it('uses RELAY_PORT env var as fallback', () => { + delete process.env.PORT + process.env.RELAY_PORT = '9001' + worker.run() + + expect(fakeAdapter.listen).to.have.been.calledOnceWith(9001) + }) + + it('prefers PORT over RELAY_PORT', () => { + process.env.PORT = '9000' + process.env.RELAY_PORT = '9001' + worker.run() + + expect(fakeAdapter.listen).to.have.been.calledOnceWith(9000) + }) + + it('converts string port to number', () => { + delete process.env.PORT + process.env.RELAY_PORT = '3000' + worker.run() + + expect(fakeAdapter.listen).to.have.been.calledOnceWith(3000) + }) + }) + + describe('onMessage', () => { + it('emits the eventName and event to the adapter', () => { + const message = { eventName: 'test_event', event: { id: '123' } } + + fakeProcess.emit('message', message) + + expect(fakeAdapter.emit).to.have.been.calledOnceWith('test_event', { id: '123' }) + }) + + it('handles multiple messages', () => { + fakeProcess.emit('message', { eventName: 'event1', event: { data: 'first' } }) + fakeProcess.emit('message', { eventName: 'event2', event: { data: 'second' } }) + + expect(fakeAdapter.emit).to.have.been.calledTwice + expect(fakeAdapter.emit.firstCall).to.have.been.calledWith('event1', { data: 'first' }) + expect(fakeAdapter.emit.secondCall).to.have.been.calledWith('event2', { data: 'second' }) + }) + }) + + describe('onError', () => { + it('handles TypeError about database connection without throwing', () => { + const error = new TypeError("Cannot read properties of undefined (reading '__knexUid')") + + expect(() => fakeProcess.emit('uncaughtException', error)).not.to.throw() + }) + + it('logs other errors without throwing', () => { + const error = new Error('test error') + + expect(() => fakeProcess.emit('uncaughtException', error)).not.to.throw() + }) + }) + + describe('onExit', () => { + it('closes the worker on SIGTERM', () => { + fakeProcess.emit('SIGTERM') + + expect(fakeAdapter.close).to.have.been.called + }) + + it('handles SIGINT', () => { + fakeProcess.emit('SIGINT') + + expect(fakeAdapter.close).to.have.been.called + }) + + it('handles SIGHUP', () => { + fakeProcess.emit('SIGHUP') + + expect(fakeAdapter.close).to.have.been.called + }) + + it('calls process.exit in the close callback', (done) => { + fakeAdapter.close.callsFake((callback: Function) => { + callback() + }) + + fakeProcess.emit('SIGTERM') + + setImmediate(() => { + expect(fakeProcess.exit).to.have.been.calledOnceWithExactly(0) + done() + }) + }) + }) + + describe('close', () => { + it('closes the adapter', () => { + worker.close() + + expect(fakeAdapter.close).to.have.been.calledOnce + }) + + it('invokes the callback when adapter is closed', (done) => { + const callback = sandbox.stub() + + fakeAdapter.close.callsFake((cb: Function) => { + cb() + }) + + worker.close(callback) + + setImmediate(() => { + expect(callback).to.have.been.calledOnce + done() + }) + }) + + it('does not throw when called without a callback', () => { + expect(() => worker.close()).not.to.throw() + }) + + it('closes watchers when present', () => { + const fakeWatcher1 = { close: sandbox.stub() } as any + const fakeWatcher2 = { close: sandbox.stub() } as any + watchSettingsStub.returns([fakeWatcher1, fakeWatcher2] as any) + + worker.run() + worker.close() + + expect(fakeAdapter.close).to.have.been.called + expect(fakeWatcher1.close).to.have.been.called + expect(fakeWatcher2.close).to.have.been.called + }) + + it('handles no watchers gracefully', () => { + watchSettingsStub.returns(undefined as any) + + worker.close() + + expect(fakeAdapter.close).to.have.been.calledOnce + }) + }) +}) diff --git a/tsconfig.json b/tsconfig.json index 255bd9a5..a552e082 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -27,5 +27,11 @@ ], "exclude": [ "node_modules" - ] + ], + "ts-node": { + "compilerOptions": { + "module": "CommonJS" + }, + "transpileOnly": false + } }