From 41c2cdf8cb992808bbc4cfd83819c8459cd49481 Mon Sep 17 00:00:00 2001 From: iceteaSA <171169159+iceteaSA@users.noreply.github.com> Date: Sun, 17 May 2026 11:20:10 +0200 Subject: [PATCH] feat(core): add QuotaManager for unified in-memory quota cache All quota consumers (routing, commands, background timers) can share a single QuotaManager instance with one in-memory cache. This eliminates: - Redundant API calls via inflight request deduplication - 429 rate-limit cascades via global 60s backoff - Stale quota reads across consumers QuotaManager is purely in-memory with no file I/O. FallbackAccountManager accepts an optional quotaManager in its constructor and syncs writes to it after each refresh. Seeding from persisted account.quota prevents unnecessary API calls when on-disk snapshots are still fresh. Non-breaking: quotaManager is optional in AccountManagerOptions. Consumers that don't use it get the existing behavior unchanged. --- packages/core/src/accounts.ts | 50 ++++- packages/core/src/index.ts | 1 + packages/core/src/quota-manager.ts | 303 +++++++++++++++++++++++++++++ 3 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 packages/core/src/quota-manager.ts diff --git a/packages/core/src/accounts.ts b/packages/core/src/accounts.ts index 48e62d4..3bb11cd 100644 --- a/packages/core/src/accounts.ts +++ b/packages/core/src/accounts.ts @@ -100,6 +100,7 @@ export type AccountManagerOptions = { now?: () => number fetchImpl?: typeof fetch configPath?: string + quotaManager?: import('./quota-manager.ts').QuotaManager } export type AccountRefreshError = { @@ -567,11 +568,37 @@ export class FallbackAccountManager { private readonly refreshPromises = new Map>() private refreshTimer: ReturnType | null = null private quotaTimer: ReturnType | null = null + readonly quotaManager: import('./quota-manager.ts').QuotaManager | null constructor(options: AccountManagerOptions = {}) { this.now = options.now ?? Date.now this.fetchImpl = options.fetchImpl ?? fetch this.configPath = options.configPath ?? getAccountStoragePath() + this.quotaManager = options.quotaManager ?? null + } + + /** + * Seed QuotaManager from persisted account.quota if no cache entry exists + * yet. Prevents unnecessary API calls when the on-disk snapshot is fresh. + */ + private seedFallbackQuota( + account: OAuthAccount, + storage: AccountStorage, + ): void { + if (!this.quotaManager) return + if (this.quotaManager.getFallback(account.id)) return + if (!account.quota) return + const checkedAt = Math.max( + account.quota.five_hour?.checkedAt ?? 0, + account.quota.seven_day?.checkedAt ?? 0, + ) + if (checkedAt <= 0) return + const checkInterval = getQuotaCheckIntervalMs(storage) + this.quotaManager.setFallback(account.id, { + quota: account.quota, + refreshAfter: checkedAt + checkInterval, + checkedAt, + }) } async load() { @@ -620,7 +647,11 @@ export class FallbackAccountManager { next = await this.refreshAccount(next, storage) changed = true } - if (quotaIsStale(next, storage, this.now())) { + this.seedFallbackQuota(next, storage) + const stale = this.quotaManager + ? this.quotaManager.isFallbackStale(next.id) + : quotaIsStale(next, storage, this.now()) + if (stale) { next = await this.refreshAccountQuota(next, storage) changed = true } @@ -684,7 +715,13 @@ export class FallbackAccountManager { next = await this.refreshAccount(next, storage) changed = true } - if (!quotaIsStale(next, storage, this.now())) continue + this.seedFallbackQuota(next, storage) + // Use QuotaManager staleness when available (shared cache); + // fall back to per-account on-disk staleness otherwise. + const stale = this.quotaManager + ? this.quotaManager.isFallbackStale(next.id) + : quotaIsStale(next, storage, this.now()) + if (!stale) continue await this.refreshAccountQuota(next, storage) changed = true } catch (error) { @@ -810,6 +847,15 @@ export class FallbackAccountManager { } target.lastQuotaRefreshError = undefined updateStoredAccount(storage, target) + // Sync to shared QuotaManager so all consumers see the same cache + if (this.quotaManager && target.quota) { + const now = this.now() + this.quotaManager.setFallback(target.id, { + quota: target.quota, + refreshAfter: now + getQuotaCheckIntervalMs(storage), + checkedAt: now, + }) + } return target } } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9e73ac7..1d03880 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -8,5 +8,6 @@ export * from './dump.ts' export * from './fast.ts' export * from './logger.ts' export * from './pkce.ts' +export * from './quota-manager.ts' export * from './quotas.ts' export * from './relay.ts' diff --git a/packages/core/src/quota-manager.ts b/packages/core/src/quota-manager.ts new file mode 100644 index 0000000..8ea0739 --- /dev/null +++ b/packages/core/src/quota-manager.ts @@ -0,0 +1,303 @@ +/** + * Unified quota cache and API gateway. + * + * Single source of truth for main + fallback quota state. All consumers + * share one QuotaManager instance so they see the same in-memory cache. + * Handles deduplication, rate-limiting (429 backoff), and staleness. + */ + +import type { + AccountStorage, + OAuthAccount, + OAuthQuotaSnapshot, +} from './accounts.ts' +import { + fetchOAuthQuotaSnapshot, + getQuotaCheckIntervalMs, + getQuotaNextRefreshAt, +} from './accounts.ts' + +/** + * Read the optional refreshEveryNRequests config value. + * Returns 0 (disabled) if not configured or invalid. + */ +function getQuotaRefreshEveryNRequests(storage: AccountStorage | null): number { + const n = (storage?.quota as Record | undefined) + ?.refreshEveryNRequests + return typeof n === 'number' && Number.isFinite(n) && n > 0 + ? Math.floor(n) + : 0 +} + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export type QuotaEntry = { + quota: OAuthQuotaSnapshot + refreshAfter: number // Unix ms — earliest next refresh + checkedAt: number // when snapshot was fetched +} + +export type QuotaManagerOptions = { + storage: AccountStorage | null + fetchImpl?: typeof fetch + now?: () => number +} + +// --------------------------------------------------------------------------- +// Class +// --------------------------------------------------------------------------- + +export class QuotaManager { + // --- State --- + private main: QuotaEntry | null = null + private mainAccessToken: string | null = null + private fallbacks = new Map() + + // --- Inflight deduplication --- + private inflightMain: Promise | null = null + private inflightFallbacks = new Map>() + + // --- Rate-limiting --- + private apiBackoffUntil = 0 + private static readonly BACKOFF_MS = 60_000 + + // --- Config --- + private storage: AccountStorage | null + private readonly fetchImpl: typeof fetch + private readonly now: () => number + + constructor(opts: QuotaManagerOptions) { + this.storage = opts.storage + this.fetchImpl = opts.fetchImpl ?? fetch + this.now = opts.now ?? Date.now + } + + // ========================================================================= + // Get (synchronous, from cache) + // ========================================================================= + + getMain(): QuotaEntry | null { + return this.main + } + + getFallback(accountId: string): QuotaEntry | null { + return this.fallbacks.get(accountId) ?? null + } + + getAllFallbacks(): Map { + return this.fallbacks + } + + // ========================================================================= + // Set (manual inject — seeding from persisted account.quota on boot) + // ========================================================================= + + setMain(accessToken: string, entry: QuotaEntry): void { + this.mainAccessToken = accessToken + this.main = entry + } + + setFallback(accountId: string, entry: QuotaEntry): void { + this.fallbacks.set(accountId, entry) + } + + // ========================================================================= + // Refresh (async, deduplicated, rate-limited) + // ========================================================================= + + async refreshMain(accessToken: string): Promise { + // If token changed, invalidate cache + if (this.mainAccessToken && this.mainAccessToken !== accessToken) { + this.main = null + this.mainAccessToken = null + } + + // Deduplicate — return in-flight promise if already fetching + if (this.inflightMain) return this.inflightMain + + // Rate-limit — if API recently 429'd, return stale or throw + if (this.now() < this.apiBackoffUntil) { + if (this.main) return this.main.quota + throw new Error('Quota API rate-limited — try again later') + } + + this.inflightMain = this._fetchMain(accessToken) + return this.inflightMain + } + + async refreshFallback( + accountId: string, + accessToken: string, + ): Promise { + // Deduplicate + const inflight = this.inflightFallbacks.get(accountId) + if (inflight) return inflight + + // Rate-limit + if (this.now() < this.apiBackoffUntil) { + const cached = this.fallbacks.get(accountId) + if (cached) return cached.quota + throw new Error('Quota API rate-limited — try again later') + } + + const promise = this._fetchFallback(accountId, accessToken) + this.inflightFallbacks.set(accountId, promise) + return promise + } + + async refreshAllFallbacks(accounts: OAuthAccount[]): Promise { + const now = this.now() + + for (const account of accounts) { + if (account.enabled === false) continue + if (!account.access) continue + + const cached = this.fallbacks.get(account.id) + if (cached && now < cached.refreshAfter) continue + + try { + await this.refreshFallback(account.id, account.access) + } catch { + // Best-effort — keep stale cache entry if fetch fails + } + } + } + + /** + * Fire-and-forget refresh. Does not await, swallows errors. + */ + refreshMainInBackground(accessToken: string): void { + if (this.inflightMain) return + if (this.now() < this.apiBackoffUntil) return + void this.refreshMain(accessToken).catch(() => {}) + } + + // ========================================================================= + // Staleness queries + // ========================================================================= + + isMainStale(): boolean { + if (!this.main) return true + return this.now() >= this.main.refreshAfter + } + + isFallbackStale(accountId: string): boolean { + const entry = this.fallbacks.get(accountId) + if (!entry) return true + return this.now() >= entry.refreshAfter + } + + shouldRefreshOnRequestCount(requestCount: number): boolean { + const everyN = getQuotaRefreshEveryNRequests(this.storage) + if (everyN <= 0) return false + return requestCount > 0 && requestCount % everyN === 0 + } + + /** + * Combined check: should a refresh happen right now? + * True if main is stale by time OR triggered by request count. + */ + needsRefresh(requestCount: number): boolean { + return this.isMainStale() || this.shouldRefreshOnRequestCount(requestCount) + } + + // ========================================================================= + // Config + // ========================================================================= + + updateStorage(storage: AccountStorage | null): void { + this.storage = storage + } + + /** + * Seed fallback cache entries from persisted account.quota data. + * Only seeds accounts that don't already have a cache entry. + * Prevents unnecessary API calls when persisted quota is still fresh. + */ + seedFallbacksFromAccounts(accounts: OAuthAccount[]): void { + const checkInterval = getQuotaCheckIntervalMs(this.storage) + for (const account of accounts) { + if (account.enabled === false) continue + if (this.fallbacks.has(account.id)) continue + if (!account.quota) continue + const checkedAt = Math.max( + account.quota.five_hour?.checkedAt ?? 0, + account.quota.seven_day?.checkedAt ?? 0, + ) + if (checkedAt <= 0) continue + this.fallbacks.set(account.id, { + quota: account.quota, + refreshAfter: checkedAt + checkInterval, + checkedAt, + }) + } + } + + /** + * Whether the API is currently in backoff due to a recent 429. + */ + isBackedOff(): boolean { + return this.now() < this.apiBackoffUntil + } + + // ========================================================================= + // Private + // ========================================================================= + + private async _fetchMain(accessToken: string): Promise { + try { + const quota = await fetchOAuthQuotaSnapshot({ + accessToken, + fetchImpl: this.fetchImpl, + now: this.now, + }) + const now = this.now() + this.mainAccessToken = accessToken + this.main = { + quota, + refreshAfter: getQuotaNextRefreshAt(quota, this.storage, now), + checkedAt: now, + } + return quota + } catch (error) { + this._handleFetchError(error) + throw error + } finally { + this.inflightMain = null + } + } + + private async _fetchFallback( + accountId: string, + accessToken: string, + ): Promise { + try { + const quota = await fetchOAuthQuotaSnapshot({ + accessToken, + fetchImpl: this.fetchImpl, + now: this.now, + }) + const now = this.now() + this.fallbacks.set(accountId, { + quota, + refreshAfter: now + getQuotaCheckIntervalMs(this.storage), + checkedAt: now, + }) + return quota + } catch (error) { + this._handleFetchError(error) + throw error + } finally { + this.inflightFallbacks.delete(accountId) + } + } + + private _handleFetchError(error: unknown): void { + const msg = error instanceof Error ? error.message : String(error) + if (msg.includes('429')) { + this.apiBackoffUntil = this.now() + QuotaManager.BACKOFF_MS + } + } +}