From fcd906acca1425b58d2d40d76881a5b387977b19 Mon Sep 17 00:00:00 2001 From: Hernan Alvarado Date: Wed, 10 Jun 2026 15:29:45 -0500 Subject: [PATCH] feat(rate-limiter): implement leaky bucket rate limiting algorithm --- packages/rate-limiter/CHANGELOG.md | 6 +- packages/rate-limiter/src/algorithms/index.ts | 1 + .../src/algorithms/leaky-bucket.ts | 98 +++++++++++++ packages/rate-limiter/src/rate-limiter.ts | 10 +- packages/rate-limiter/src/types.ts | 34 +++-- .../test/algorithms/leaky-bucket.test.ts | 134 ++++++++++++++++++ 6 files changed, 270 insertions(+), 13 deletions(-) create mode 100644 packages/rate-limiter/src/algorithms/leaky-bucket.ts create mode 100644 packages/rate-limiter/test/algorithms/leaky-bucket.test.ts diff --git a/packages/rate-limiter/CHANGELOG.md b/packages/rate-limiter/CHANGELOG.md index 851f4763..1e968577 100644 --- a/packages/rate-limiter/CHANGELOG.md +++ b/packages/rate-limiter/CHANGELOG.md @@ -10,6 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), ### Added -- Added support for the `fixed-window` rate-limiter algorithm via the `createFixedWindow` function. The function can be used standalone or integrated with the centralized `createRateLimiter` function. [#185](https://github.com/aura-stack-ts/auth/pull/185) +- Added support for the `leaky-bucket` rate-limiter algorithm via the `createLeakyBucketAlgorithm` function. The function can be used standalone or integrated with the centralized `createRateLimiter` function. [#186](https://github.com/aura-stack-ts/auth/pull/186) -- Introduced `createRateLimiter` function to create a rate limiter. Currently, the only supported algorithm is `token-bucket`, implemented by the `createTokenBucket` function. [#131](https://github.com/aura-stack-ts/auth/pull/131) +- Added support for the `fixed-window` rate-limiter algorithm via the `createFixedWindowAlgorithm` function. The function can be used standalone or integrated with the centralized `createRateLimiter` function. [#185](https://github.com/aura-stack-ts/auth/pull/185) + +- Introduced `createRateLimiter` function to create a rate limiter. Currently, the only supported algorithm is `token-bucket`, implemented by the `createTokenBucketAlgorithm` function. [#131](https://github.com/aura-stack-ts/auth/pull/131) diff --git a/packages/rate-limiter/src/algorithms/index.ts b/packages/rate-limiter/src/algorithms/index.ts index b72ab9b2..11f9c759 100644 --- a/packages/rate-limiter/src/algorithms/index.ts +++ b/packages/rate-limiter/src/algorithms/index.ts @@ -1,2 +1,3 @@ export { createTokenBucketAlgorithm } from "@/algorithms/token-bucket.ts" export { createFixedWindowAlgorithm } from "@/algorithms/fixed-window.ts" +export { createLeakyBucketAlgorithm } from "@/algorithms/leaky-bucket.ts" diff --git a/packages/rate-limiter/src/algorithms/leaky-bucket.ts b/packages/rate-limiter/src/algorithms/leaky-bucket.ts new file mode 100644 index 00000000..e3002561 --- /dev/null +++ b/packages/rate-limiter/src/algorithms/leaky-bucket.ts @@ -0,0 +1,98 @@ +import { toContent } from "@/utils.ts" +import { createMemoryStorage } from "@/memory.ts" +import type { LeakyBucketRule, RateLimiterAlgorithm, RateLimitResult, StorageEntry } from "@/types.ts" + +interface BucketState { + level: number + lastLeakAt: number +} + +/** + * Leaky Bucket + * + * Models a bucket that accumulates requests and drains at a constant rate. + * When full, new requests are dropped immediately. Unlike token bucket, there + * is no burst tolerance — the output rate is always exactly `leakRatePerMs`. + * + * currentLevel = max(0, storedLevel - elapsed * leakRatePerMs) + * newLevel = currentLevel + 1 + * ok = newLevel <= capacity + * + * Rejected requests do NOT raise the level — a hammering client cannot + * permanently block legitimate traffic. + * + * Recommended for: outbound webhook dispatch, downstream call smoothing. + */ +export const createLeakyBucketAlgorithm = ( + rule: LeakyBucketRule +): RateLimiterAlgorithm => { + const { capacity, leakRatePerMs, storage = createMemoryStorage() } = rule + + const levelKey = (key: string) => `${key}:lb:level` + const lastLeakKey = (key: string) => `${key}:lb:lastLeak` + const fullDrainMs = () => Math.ceil(capacity / leakRatePerMs) + + const getCurrentLevel = (stored: BucketState, now: number): number => { + return Math.max(0, stored.level - Math.max(0, now - stored.lastLeakAt) * leakRatePerMs) + } + + const readBucket = async (key: string, now: number): Promise => { + const [levelEntry, leakEntry] = await Promise.all([storage.get(levelKey(key)), storage.get(lastLeakKey(key))]) + + if (!levelEntry || !leakEntry) return { level: 0, lastLeakAt: now } + return { level: levelEntry.value, lastLeakAt: leakEntry.value } + } + + const writeBucket = async (key: string, state: BucketState): Promise => { + const ttlMs = fullDrainMs() * 2 + const expiresAt = Date.now() + ttlMs + + await Promise.all([ + storage.set(levelKey(key), { value: state.level, expiresAt } satisfies StorageEntry, ttlMs), + storage.set(lastLeakKey(key), { value: state.lastLeakAt, expiresAt } satisfies StorageEntry, ttlMs), + ]) + } + + const check = async (request: RequestInit): Promise => { + const now = Date.now() + const key = rule.keyGenerator(request) + const stored = await readBucket(key, now) + const level = getCurrentLevel(stored, now) + const newLevel = level + 1 + const ok = newLevel <= capacity + const msPerRequest = 1 / leakRatePerMs + + await writeBucket(key, { + level: ok ? newLevel : level, + lastLeakAt: now, + }) + + return toContent({ + ok, + limit: capacity, + remaining: Math.max(0, capacity - (ok ? newLevel : level)), + resetAt: now + Math.ceil(newLevel * msPerRequest), + retryAfter: Math.ceil(msPerRequest), + }) + } + + const peek = async (request: RequestInit): Promise => { + const now = Date.now() + const key = rule.keyGenerator(request) + const stored = await readBucket(key, now) + const level = getCurrentLevel(stored, now) + const nextLevel = level + 1 + const ok = nextLevel <= capacity + const msPerRequest = 1 / leakRatePerMs + + return toContent({ + ok, + limit: capacity, + remaining: Math.max(0, capacity - (ok ? nextLevel : level)), + resetAt: now + Math.ceil(nextLevel * msPerRequest), + retryAfter: Math.ceil(msPerRequest), + }) + } + + return { check, peek } +} diff --git a/packages/rate-limiter/src/rate-limiter.ts b/packages/rate-limiter/src/rate-limiter.ts index fd4d47c6..6a77619e 100644 --- a/packages/rate-limiter/src/rate-limiter.ts +++ b/packages/rate-limiter/src/rate-limiter.ts @@ -1,5 +1,5 @@ import { createMemoryStorage } from "@/memory.ts" -import { createTokenBucketAlgorithm } from "@/algorithms/token-bucket.ts" +import { createTokenBucketAlgorithm, createFixedWindowAlgorithm, createLeakyBucketAlgorithm } from "@/algorithms/index.ts" import type { InferRules, RateLimiter, RateLimiterAlgorithm, RateLimiterConfig, RateLimiterRule } from "@/types.ts" /** @@ -10,6 +10,10 @@ const buildAlgorithm = (rule: RateLimiterRule { switch (rule.algorithm) { case "token-bucket": return [`${key}:tb:tokens`, `${key}:tb:lastRefill`] + case "fixed-window": + return [`${key}:fw`] + case "leaky-bucket": + return [`${key}:lb:tokens`, `${key}:lb:lastLeak`] } } diff --git a/packages/rate-limiter/src/types.ts b/packages/rate-limiter/src/types.ts index 48bbb254..d44a25ca 100644 --- a/packages/rate-limiter/src/types.ts +++ b/packages/rate-limiter/src/types.ts @@ -58,7 +58,7 @@ export interface RateLimiterAlgorithm { check(request: RequestInit): Promise } -export type AlgorithmType = "token-bucket" +export type AlgorithmType = "token-bucket" | "fixed-window" | "leaky-bucket" interface BaseRule { algorithm: AlgorithmType @@ -69,6 +69,10 @@ interface BaseRule { * @example (req) => `${req.ip}:${req.path}` */ keyGenerator: (request: RequestInit) => string + /** + * Optional storage instance specific to this rule. + */ + storage?: RateLimiterStorage } export type TokenBucketRule = BaseRule & { @@ -77,25 +81,35 @@ export type TokenBucketRule = BaseRule & { capacity: number /** Tokens added per millisecond. */ refillRate: number - /** - * Optional storage instance specific to this rule. - */ - storage?: RateLimiterStorage } -export type FixedWindowRule = { +export type FixedWindowRule = BaseRule & { algorithm: "fixed-window" /** Maximum requests allowed per window. */ limit: number /** Window duration in milliseconds. Hard resets at each boundary. */ windowMs: number +} + +export type LeakyBucketRule = BaseRule & { + algorithm: "leaky-bucket" /** - * Optional storage instance specific to this rule. + * The maximum queue size (burst capacity). When the bucket is full, + * additional requests are rejected until space is available. */ - storage?: RateLimiterStorage -} & Omit, "algorithm"> + capacity: number + /** + * The rate at which the bucket leaks (processes requests) in tokens per + * millisecond. This controls how quickly the bucket can empty and accept + * new requests after reaching capacity. + */ + leakRatePerMs: number +} -export type RateLimiterRule = TokenBucketRule | FixedWindowRule +export type RateLimiterRule = + | TokenBucketRule + | FixedWindowRule + | LeakyBucketRule export interface RateLimiterConfig> { storage?: RateLimiterStorage diff --git a/packages/rate-limiter/test/algorithms/leaky-bucket.test.ts b/packages/rate-limiter/test/algorithms/leaky-bucket.test.ts new file mode 100644 index 00000000..6432a228 --- /dev/null +++ b/packages/rate-limiter/test/algorithms/leaky-bucket.test.ts @@ -0,0 +1,134 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest" +import { createMemoryStorage } from "@/memory.ts" +import { createLeakyBucketAlgorithm } from "@/algorithms/index.ts" + +interface TestRequest { + key: string +} + +const request = (key: string): TestRequest => ({ key }) + +const createAlgorithm = (capacity = 2, leakRatePerMs = 1) => { + return createLeakyBucketAlgorithm({ + algorithm: "leaky-bucket", + capacity, + leakRatePerMs, + storage: createMemoryStorage(), + keyGenerator: (req) => req.key, + }) +} + +beforeEach(() => { + vi.useFakeTimers() + vi.setSystemTime(0) +}) + +afterEach(() => { + vi.useRealTimers() +}) + +describe("LeakyBucketAlgorithm", () => { + test("allows requests up to capacity and then blocks", async () => { + const algorithm = createAlgorithm(2, 1) + const key = request("ip:1") + + const first = await algorithm.check(key) + const second = await algorithm.check(key) + const third = await algorithm.check(key) + + expect(first.ok).toBe(true) + expect(first.remaining).toBe(1) + + expect(second.ok).toBe(true) + expect(second.remaining).toBe(0) + + expect(third.ok).toBe(false) + expect(third.remaining).toBe(0) + expect(third.retryAfter).toBe(1) + }) + + test("drains over time and accepts new requests as capacity frees up", async () => { + const algorithm = createAlgorithm(2, 1) + const key = request("ip:2") + + await algorithm.check(key) + await algorithm.check(key) + + vi.advanceTimersByTime(1) + + const afterOneMs = await algorithm.check(key) + + expect(afterOneMs.ok).toBe(true) + expect(afterOneMs.remaining).toBe(0) + expect(afterOneMs.resetAt).toBe(3) + }) + + test("rejected requests do not increase the stored level", async () => { + const algorithm = createAlgorithm(1, 0.5) + const key = request("ip:3") + + const first = await algorithm.check(key) + const second = await algorithm.check(key) + + vi.advanceTimersByTime(2) + + const afterDrain = await algorithm.check(key) + + expect(first.ok).toBe(true) + expect(second.ok).toBe(false) + expect(second.remaining).toBe(0) + expect(second.retryAfter).toBe(2) + expect(afterDrain.ok).toBe(true) + expect(afterDrain.remaining).toBe(0) + }) + + test("peek reports state without consuming a queued slot", async () => { + const algorithm = createAlgorithm(3, 1) + const key = request("ip:4") + + const consumed = await algorithm.check(key) + const preview = await algorithm.peek(key) + const next = await algorithm.check(key) + + expect(consumed.ok).toBe(true) + expect(consumed.remaining).toBe(2) + + expect(preview.ok).toBe(true) + expect(preview.remaining).toBe(1) + expect(preview.resetAt).toBe(2) + expect(preview.retryAfter).toBe(1) + + expect(next.ok).toBe(true) + expect(next.remaining).toBe(1) + }) + + test("keeps bucket state isolated per key", async () => { + const algorithm = createAlgorithm(1, 1) + const keyA = request("ip:A") + const keyB = request("ip:B") + + const firstA = await algorithm.check(keyA) + const secondA = await algorithm.check(keyA) + const firstB = await algorithm.check(keyB) + + expect(firstA.ok).toBe(true) + expect(secondA.ok).toBe(false) + expect(firstB.ok).toBe(true) + }) + + test("falls back to a fresh bucket after storage entries expire", async () => { + const algorithm = createAlgorithm(2, 1) + const key = request("ip:5") + + await algorithm.check(key) + await algorithm.check(key) + + vi.advanceTimersByTime(5000) + + const afterExpiry = await algorithm.check(key) + + expect(afterExpiry.ok).toBe(true) + expect(afterExpiry.remaining).toBe(1) + expect(afterExpiry.resetAt).toBe(5001) + }) +})