Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/rate-limiter/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),

### Added

- Added support for the `fixed-window` rate-limiter algorithm via the `createFixedWindow` function. The function can be used standalone or integrated with the centralized `createRateLimiter` function. [#185](https://github.com/aura-stack-ts/auth/pull/185)
- Added support for the `leaky-bucket` rate-limiter algorithm via the `createLeakyBucketAlgorithm` function. The function can be used standalone or integrated with the centralized `createRateLimiter` function. [#186](https://github.com/aura-stack-ts/auth/pull/186)

- Introduced `createRateLimiter` function to create a rate limiter. Currently, the only supported algorithm is `token-bucket`, implemented by the `createTokenBucket` function. [#131](https://github.com/aura-stack-ts/auth/pull/131)
- Added support for the `fixed-window` rate-limiter algorithm via the `createFixedWindowAlgorithm` function. The function can be used standalone or integrated with the centralized `createRateLimiter` function. [#185](https://github.com/aura-stack-ts/auth/pull/185)

- Introduced `createRateLimiter` function to create a rate limiter. Currently, the only supported algorithm is `token-bucket`, implemented by the `createTokenBucketAlgorithm` function. [#131](https://github.com/aura-stack-ts/auth/pull/131)
1 change: 1 addition & 0 deletions packages/rate-limiter/src/algorithms/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { createTokenBucketAlgorithm } from "@/algorithms/token-bucket.ts"
export { createFixedWindowAlgorithm } from "@/algorithms/fixed-window.ts"
export { createLeakyBucketAlgorithm } from "@/algorithms/leaky-bucket.ts"
98 changes: 98 additions & 0 deletions packages/rate-limiter/src/algorithms/leaky-bucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { toContent } from "@/utils.ts"
import { createMemoryStorage } from "@/memory.ts"
import type { LeakyBucketRule, RateLimiterAlgorithm, RateLimitResult, StorageEntry } from "@/types.ts"

interface BucketState {
level: number
lastLeakAt: number
}

/**
* Leaky Bucket
*
* Models a bucket that accumulates requests and drains at a constant rate.
* When full, new requests are dropped immediately. Unlike token bucket, there
* is no burst tolerance — the output rate is always exactly `leakRatePerMs`.
*
* currentLevel = max(0, storedLevel - elapsed * leakRatePerMs)
* newLevel = currentLevel + 1
* ok = newLevel <= capacity
*
* Rejected requests do NOT raise the level — a hammering client cannot
* permanently block legitimate traffic.
*
* Recommended for: outbound webhook dispatch, downstream call smoothing.
*/
export const createLeakyBucketAlgorithm = <RequestInit = Request>(
rule: LeakyBucketRule<RequestInit>
): RateLimiterAlgorithm<RequestInit> => {
const { capacity, leakRatePerMs, storage = createMemoryStorage() } = rule

const levelKey = (key: string) => `${key}:lb:level`
const lastLeakKey = (key: string) => `${key}:lb:lastLeak`
const fullDrainMs = () => Math.ceil(capacity / leakRatePerMs)

const getCurrentLevel = (stored: BucketState, now: number): number => {
return Math.max(0, stored.level - Math.max(0, now - stored.lastLeakAt) * leakRatePerMs)
}

const readBucket = async (key: string, now: number): Promise<BucketState> => {
const [levelEntry, leakEntry] = await Promise.all([storage.get(levelKey(key)), storage.get(lastLeakKey(key))])

if (!levelEntry || !leakEntry) return { level: 0, lastLeakAt: now }
return { level: levelEntry.value, lastLeakAt: leakEntry.value }
}

const writeBucket = async (key: string, state: BucketState): Promise<void> => {
const ttlMs = fullDrainMs() * 2
const expiresAt = Date.now() + ttlMs

await Promise.all([
storage.set(levelKey(key), { value: state.level, expiresAt } satisfies StorageEntry, ttlMs),
storage.set(lastLeakKey(key), { value: state.lastLeakAt, expiresAt } satisfies StorageEntry, ttlMs),
])
}

const check = async (request: RequestInit): Promise<RateLimitResult> => {
const now = Date.now()
const key = rule.keyGenerator(request)
const stored = await readBucket(key, now)
const level = getCurrentLevel(stored, now)
const newLevel = level + 1
const ok = newLevel <= capacity
const msPerRequest = 1 / leakRatePerMs

await writeBucket(key, {
level: ok ? newLevel : level,
lastLeakAt: now,
})

return toContent({
ok,
limit: capacity,
remaining: Math.max(0, capacity - (ok ? newLevel : level)),
resetAt: now + Math.ceil(newLevel * msPerRequest),
retryAfter: Math.ceil(msPerRequest),
})
}

const peek = async (request: RequestInit): Promise<RateLimitResult> => {
const now = Date.now()
const key = rule.keyGenerator(request)
const stored = await readBucket(key, now)
const level = getCurrentLevel(stored, now)
const nextLevel = level + 1
const ok = nextLevel <= capacity
const msPerRequest = 1 / leakRatePerMs

return toContent({
ok,
limit: capacity,
remaining: Math.max(0, capacity - (ok ? nextLevel : level)),
resetAt: now + Math.ceil(nextLevel * msPerRequest),
retryAfter: Math.ceil(msPerRequest),
})
}

return { check, peek }
}
10 changes: 9 additions & 1 deletion packages/rate-limiter/src/rate-limiter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createMemoryStorage } from "@/memory.ts"
import { createTokenBucketAlgorithm } from "@/algorithms/token-bucket.ts"
import { createTokenBucketAlgorithm, createFixedWindowAlgorithm, createLeakyBucketAlgorithm } from "@/algorithms/index.ts"
import type { InferRules, RateLimiter, RateLimiterAlgorithm, RateLimiterConfig, RateLimiterRule } from "@/types.ts"

/**
Expand All @@ -10,6 +10,10 @@ const buildAlgorithm = <RequestInit = Request>(rule: RateLimiterRule<RequestInit
switch (rule.algorithm) {
case "token-bucket":
return createTokenBucketAlgorithm(rule)
case "fixed-window":
return createFixedWindowAlgorithm(rule)
case "leaky-bucket":
return createLeakyBucketAlgorithm(rule)
default: {
throw new Error(`[rate-limiter] Unknown algorithm: "${String((rule as { algorithm?: string }).algorithm)}"`)
}
Expand All @@ -20,6 +24,10 @@ const resetKeys = (rule: RateLimiterRule, key: string): string[] => {
switch (rule.algorithm) {
case "token-bucket":
return [`${key}:tb:tokens`, `${key}:tb:lastRefill`]
case "fixed-window":
return [`${key}:fw`]
case "leaky-bucket":
return [`${key}:lb:tokens`, `${key}:lb:lastLeak`]
}
}

Expand Down
34 changes: 24 additions & 10 deletions packages/rate-limiter/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export interface RateLimiterAlgorithm<RequestInit = Request> {
check(request: RequestInit): Promise<RateLimitResult>
}

export type AlgorithmType = "token-bucket"
export type AlgorithmType = "token-bucket" | "fixed-window" | "leaky-bucket"

interface BaseRule<RequestInit = Request> {
algorithm: AlgorithmType
Expand All @@ -69,6 +69,10 @@ interface BaseRule<RequestInit = Request> {
* @example (req) => `${req.ip}:${req.path}`
*/
keyGenerator: (request: RequestInit) => string
/**
* Optional storage instance specific to this rule.
*/
storage?: RateLimiterStorage
}

export type TokenBucketRule<RequestInit = Request> = BaseRule<RequestInit> & {
Expand All @@ -77,25 +81,35 @@ export type TokenBucketRule<RequestInit = Request> = BaseRule<RequestInit> & {
capacity: number
/** Tokens added per millisecond. */
refillRate: number
/**
* Optional storage instance specific to this rule.
*/
storage?: RateLimiterStorage
}

export type FixedWindowRule<RequestInit = Request> = {
export type FixedWindowRule<RequestInit = Request> = BaseRule<RequestInit> & {
algorithm: "fixed-window"
/** Maximum requests allowed per window. */
limit: number
/** Window duration in milliseconds. Hard resets at each boundary. */
windowMs: number
}

export type LeakyBucketRule<RequestInit = Request> = BaseRule<RequestInit> & {
algorithm: "leaky-bucket"
/**
* Optional storage instance specific to this rule.
* The maximum queue size (burst capacity). When the bucket is full,
* additional requests are rejected until space is available.
*/
storage?: RateLimiterStorage
} & Omit<BaseRule<RequestInit>, "algorithm">
capacity: number
/**
* The rate at which the bucket leaks (processes requests) in tokens per
* millisecond. This controls how quickly the bucket can empty and accept
* new requests after reaching capacity.
*/
leakRatePerMs: number
}

export type RateLimiterRule<RequestInit = Request> = TokenBucketRule<RequestInit> | FixedWindowRule<RequestInit>
export type RateLimiterRule<RequestInit = Request> =
| TokenBucketRule<RequestInit>
| FixedWindowRule<RequestInit>
| LeakyBucketRule<RequestInit>

export interface RateLimiterConfig<Rules extends Record<string, RateLimiterRule>> {
storage?: RateLimiterStorage
Expand Down
134 changes: 134 additions & 0 deletions packages/rate-limiter/test/algorithms/leaky-bucket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"
import { createMemoryStorage } from "@/memory.ts"
import { createLeakyBucketAlgorithm } from "@/algorithms/index.ts"

interface TestRequest {
key: string
}

const request = (key: string): TestRequest => ({ key })

const createAlgorithm = (capacity = 2, leakRatePerMs = 1) => {
return createLeakyBucketAlgorithm<TestRequest>({
algorithm: "leaky-bucket",
capacity,
leakRatePerMs,
storage: createMemoryStorage(),
keyGenerator: (req) => req.key,
})
}

beforeEach(() => {
vi.useFakeTimers()
vi.setSystemTime(0)
})

afterEach(() => {
vi.useRealTimers()
})

describe("LeakyBucketAlgorithm", () => {
test("allows requests up to capacity and then blocks", async () => {
const algorithm = createAlgorithm(2, 1)
const key = request("ip:1")

const first = await algorithm.check(key)
const second = await algorithm.check(key)
const third = await algorithm.check(key)

expect(first.ok).toBe(true)
expect(first.remaining).toBe(1)

expect(second.ok).toBe(true)
expect(second.remaining).toBe(0)

expect(third.ok).toBe(false)
expect(third.remaining).toBe(0)
expect(third.retryAfter).toBe(1)
})

test("drains over time and accepts new requests as capacity frees up", async () => {
const algorithm = createAlgorithm(2, 1)
const key = request("ip:2")

await algorithm.check(key)
await algorithm.check(key)

vi.advanceTimersByTime(1)

const afterOneMs = await algorithm.check(key)

expect(afterOneMs.ok).toBe(true)
expect(afterOneMs.remaining).toBe(0)
expect(afterOneMs.resetAt).toBe(3)
})

test("rejected requests do not increase the stored level", async () => {
const algorithm = createAlgorithm(1, 0.5)
const key = request("ip:3")

const first = await algorithm.check(key)
const second = await algorithm.check(key)

vi.advanceTimersByTime(2)

const afterDrain = await algorithm.check(key)

expect(first.ok).toBe(true)
expect(second.ok).toBe(false)
expect(second.remaining).toBe(0)
expect(second.retryAfter).toBe(2)
expect(afterDrain.ok).toBe(true)
expect(afterDrain.remaining).toBe(0)
})

test("peek reports state without consuming a queued slot", async () => {
const algorithm = createAlgorithm(3, 1)
const key = request("ip:4")

const consumed = await algorithm.check(key)
const preview = await algorithm.peek(key)
const next = await algorithm.check(key)

expect(consumed.ok).toBe(true)
expect(consumed.remaining).toBe(2)

expect(preview.ok).toBe(true)
expect(preview.remaining).toBe(1)
expect(preview.resetAt).toBe(2)
expect(preview.retryAfter).toBe(1)

expect(next.ok).toBe(true)
expect(next.remaining).toBe(1)
})

test("keeps bucket state isolated per key", async () => {
const algorithm = createAlgorithm(1, 1)
const keyA = request("ip:A")
const keyB = request("ip:B")

const firstA = await algorithm.check(keyA)
const secondA = await algorithm.check(keyA)
const firstB = await algorithm.check(keyB)

expect(firstA.ok).toBe(true)
expect(secondA.ok).toBe(false)
expect(firstB.ok).toBe(true)
})

test("falls back to a fresh bucket after storage entries expire", async () => {
const algorithm = createAlgorithm(2, 1)
const key = request("ip:5")

await algorithm.check(key)
await algorithm.check(key)

vi.advanceTimersByTime(5000)

const afterExpiry = await algorithm.check(key)

expect(afterExpiry.ok).toBe(true)
expect(afterExpiry.remaining).toBe(1)
expect(afterExpiry.resetAt).toBe(5001)
})
})
Loading