From 938f5d57a82aa940ad0c0fbd855b87977dc78983 Mon Sep 17 00:00:00 2001 From: Alec Gard Date: Thu, 19 Mar 2026 11:20:49 +0000 Subject: [PATCH 1/2] Tests to repro issue with case sensitivity around EA params for Streaming transports --- test/transports/websocket.test.ts | 186 ++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index e8b53cd9..940df73b 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -1711,3 +1711,189 @@ test.serial( await t.context.clock.runToLastAsync() }, ) + +test.serial( + 'does not unnecessarily unsubscribe when two requests differ only in casing for the same pair', + async (t) => { + // Regression test for the case-sensitivity mismatch between the subscription set + // (keyed by lowercased cache key) and StreamingTransport's local subscription diff + // (which uses JSON.stringify, preserving original casing). + // + // Scenario: + // 1. Request A { base: 'USDe', quote: 'USD' } → subscribes; localSubscriptions=['USDe/USD'] + // 2. Request B { base: 'usde', quote: 'usd' } → same cache key, hits cache, + // but overwrites the subscription-set value with the lowercase variant + // 3. Next background execute: desiredSubs=['usde/usd'] vs localSubscriptions=['USDe/USD'] + // → JSON.stringify mismatch → unnecessary unsubscribe + resubscribe + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + let subscribeCount = 0 + let unsubscribeCount = 0 + + mockWsServer.on('connection', (socket) => { + socket.on('message', (rawMsg) => { + const msg = rawMsg.toString() + if (msg.startsWith('S:')) { + subscribeCount++ + const pair = msg.slice(2) + socket.send(JSON.stringify({ pair, value: price })) + } else { + try { + const parsed = JSON.parse(msg) + if (parsed.request === 'unsubscribe') { + unsubscribeCount++ + } + } catch { + // Ignore non-JSON messages + } + } + }) + }) + + const adapter = createAdapter({}) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + // First request with mixed-case base — triggers subscribe and populates cache + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base: 'USDe', quote: 'USD' }, + expectedResponse: { + data: { result: price }, + result: price, + statusCode: 200, + }, + }) + + // Second request with all-lowercase — same cache key, should be a cache hit, + // but overwrites the subscription set value with the lowercase variant + const response = await testAdapter.request({ base: 'usde', quote: 'usd' }) + t.is(response.statusCode, 200) + + // Advance clock to trigger another background execute cycle + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS + 100) + + // Capture the counters before cleanup so assertions run after cleanup. + // Closing the API first (without await) signals the background executor to shut + // down via fake-timer-driven setImmediate; runToLastAsync then fires all pending + // fake timers (Fastify's close, bg executor sleep) so the executor exits cleanly. + const capturedSubscribeCount = subscribeCount + const capturedUnsubscribeCount = unsubscribeCount + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + + // With the bug: subscribeCount === 2, unsubscribeCount === 1 (unneccesary unsub+resub + // caused by case mismatch between desiredSubs and localSubscriptions) + // After the fix: subscribeCount === 1, unsubscribeCount === 0 + t.is(capturedSubscribeCount, 1) + t.is(capturedUnsubscribeCount, 0) + }, +) + +test.serial( + 'both request variants continue receiving data with case-insensitive provider', + async (t) => { + // Regression test (user-visible impact). With a case-insensitive streaming provider, + // two requests that differ only in casing should both continue receiving data. + // + // The bug: + // 1. Request A { base: 'USDe' } subscribes; localSubscriptions=['USDe/USD'] + // 2. Request B { base: 'usde' } overwrites the subscription-set value with lowercase + // 3. Next bg execute: desiredSubs=['usde/usd'] ≠ localSubscriptions=['USDe/USD'] + // → sendMessages sends subscribes first, then unsubscribes: + // subscribe usde/usd → provider (case-insensitive) starts/restarts feed + // unsubscribe USDe/USD → provider treats as the same feed and kills it + // 4. After the cycle: localSubscriptions=desiredSubs=['usde/usd'] → no diff on + // the next execute → feed is permanently dead, cache expires → 504 + // + // CACHE_MAX_AGE is reduced so the test can observe the expiry without waiting the + // full default 90 s. After the fix: no unnecessary sub/unsub, feed stays alive, + // both variants return 200. + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL, { mock: false }) + + // Simulate a case-insensitive streaming provider: sends data on subscribe and + // pushes periodic updates; unsubscribe kills the feed. + let feedActive = false + let activePair = '' + let intervalTimer: ReturnType | null = null + + mockWsServer.on('connection', (socket) => { + socket.on('message', (rawMsg) => { + const msg = rawMsg.toString() + if (msg.startsWith('S:')) { + feedActive = true + activePair = msg.slice(2) + socket.send(JSON.stringify({ pair: activePair, value: price })) + // Periodic pushes simulate a streaming provider keeping the cache warm. + if (intervalTimer) { + clearInterval(intervalTimer) + } + const sendPeriodic = () => { + if (feedActive) { + socket.send(JSON.stringify({ pair: activePair, value: price })) + } + } + intervalTimer = setInterval(sendPeriodic, BACKGROUND_EXECUTE_MS_WS) + } else { + try { + const parsed = JSON.parse(msg) + if (parsed.request === 'unsubscribe') { + feedActive = false + if (intervalTimer) { + clearInterval(intervalTimer) + intervalTimer = null + } + } + } catch { + // Ignore non-JSON messages + } + } + }) + socket.on('close', () => { + if (intervalTimer) { + clearInterval(intervalTimer) + } + }) + }) + + // Reduced CACHE_MAX_AGE so expiry is observable within the test without waiting + // the full default 90 s. Must be > BACKGROUND_EXECUTE_MS_WS so periodic pushes + // keep the cache warm when the feed is healthy (no bug). + const cacheMaxAge = Math.round(1.5 * BACKGROUND_EXECUTE_MS_WS) // 7500ms + const adapter = createAdapter({ + CACHE_MAX_AGE: cacheMaxAge, + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + // First request (mixed case) — subscribes to provider, starts periodic pushes. + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base: 'USDe', quote: 'USD' }, + expectedResponse: { data: { result: price }, result: price, statusCode: 200 }, + }) + + // Second request (lowercase) — same cache key, gets a hit. But it also overwrites + // the subscription set value, setting up the unnecessary unsub/resub cycle. + const hit = await testAdapter.request({ base: 'usde', quote: 'usd' }) + t.is(hit.statusCode, 200) + + // Advance past two bg-execute cycles and one full cacheMaxAge window. + // With bug: feed is permanently dead after cycle 1 (~5000ms); cache expires at + // ~5000ms + 7500ms = ~12500ms → both variants return 504 by assertion time. + // Without bug: periodic pushes keep refreshing the cache → both variants return 200. + await runAllUntilTime(t.context.clock, 2 * BACKGROUND_EXECUTE_MS_WS + cacheMaxAge + 100) + const response1 = await testAdapter.request({ base: 'USDe', quote: 'USD' }) + t.is(response1.statusCode, 200) + + const response2 = await testAdapter.request({ base: 'usde', quote: 'usd' }) + t.is(response2.statusCode, 200) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + }, +) From 261b937beb1b3f299b813e4cc0acea54e18101fc Mon Sep 17 00:00:00 2001 From: Alec Gard Date: Thu, 19 Mar 2026 11:45:32 +0000 Subject: [PATCH 2/2] fix: preserve subscription casing on duplicate cache key to prevent feed death MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When two requests share the same normalised cache key but differ in casing (e.g. USDe/USD and usde/usd), ExpiringSortedSet.add() was overwriting the stored params with the new casing. This caused StreamingTransport's JSON.stringify diff to see a spurious change on the next background execute, triggering an unnecessary unsubscribe then resubscribe. Because sendMessages sends subscribes before unsubscribes, a case-insensitive provider would start the feed then immediately kill it — and because localSubscriptions was then set to desiredSubs, no diff would fire again, leaving the feed permanently dead and both request variants returning 504 after cache expiry. Fix: when a key already exists in ExpiringSortedSet, only refresh the TTL and keep the original value (first-writer-wins). Emit a warning when the incoming value differs from the stored one, as this indicates inconsistent request casing that may cause issues with case-sensitive providers. --- .../subscription-set/expiring-sorted-set.ts | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/util/subscription-set/expiring-sorted-set.ts b/src/util/subscription-set/expiring-sorted-set.ts index 6ad1ec3a..fecaac7e 100644 --- a/src/util/subscription-set/expiring-sorted-set.ts +++ b/src/util/subscription-set/expiring-sorted-set.ts @@ -32,8 +32,23 @@ export class ExpiringSortedSet implements SubscriptionSet { add(value: T, ttl: number, key: string) { let node = this.map.get(key) if (node) { + if (JSON.stringify(value) !== JSON.stringify(node.data.value)) { + logger.warn( + `Subscription set received a value for key "${key}" that differs from the stored value. ` + + `Keeping the original value to avoid unnecessary subscription churn. ` + + `This indicates requests are using inconsistent parameter casing - ` + + `stored: ${JSON.stringify(node.data.value)}, incoming: ${JSON.stringify(value)}`, + ) + } node.data = { - value, + // Preserve the existing value rather than overwriting it. The key is the + // normalised cache key (e.g. lowercased), so two entries that share a key + // represent the same logical subscription. Overwriting the value with a + // differently-cased variant would cause the streaming transport's + // JSON.stringify-based diff to see a change, triggering an + // unnecessary unsubscribe + resubscribe cycle that can permanently kill + // the provider feed. Only the TTL needs refreshing here. + value: node.data.value, expirationTimestamp: Date.now() + ttl, } this.moveToTail(node)