diff --git a/Cargo.toml b/Cargo.toml index 4d81727..6b1d4db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,12 +9,10 @@ version = "0.1.0" crate-type = ["cdylib"] [dependencies] -crossbeam-queue = "0.3" -napi = { version = "3", features = ["async", "napi4"] } -napi-derive = "3" -once_cell = "1" -parking_lot = { version = "0.12", features = ["send_guard"] } -tokio = { version = "1", features = ["sync", "time", "macros", "rt"] } +napi = { version = "3", features = ["async", "napi4"] } +napi-derive = "3" +parking_lot = { version = "0.12", features = ["send_guard"] } +tokio = { version = "1", features = ["sync", "time", "macros", "rt"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio = { version = "1", features = ["rt-multi-thread"] } diff --git a/__test__/cjs/concurrency/async.test.cjs b/__test__/cjs/concurrency/async.test.cjs index 6e8908a..05372a8 100644 --- a/__test__/cjs/concurrency/async.test.cjs +++ b/__test__/cjs/concurrency/async.test.cjs @@ -106,7 +106,7 @@ describe('GenericObjectPool - Concurrency & Async', () => { const pool = new GenericObjectPool([resource]) try { - await pool.use(async (r) => { + await pool.use(async () => { throw new Error('fail') }) } catch (e) { diff --git a/__test__/cjs/features/dynamic-sizing.test.cjs b/__test__/cjs/features/dynamic-sizing.test.cjs new file mode 100644 index 0000000..ce19384 --- /dev/null +++ b/__test__/cjs/features/dynamic-sizing.test.cjs @@ -0,0 +1,743 @@ +const { strict: assert } = require('node:assert') +const { describe, test } = require('node:test') +const { GenericObjectPool } = require('../../../index.wrapper.cjs') + +describe('GenericObjectPool - Dynamic Sizing', () => { + describe('Configuration', () => { + test('should create pool with min, max, and initial size', () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + }) + + assert.strictEqual(pool.size, 5) + assert.strictEqual(pool.minSize, 2) + assert.strictEqual(pool.maxSize, 10) + assert.strictEqual(pool.available, 5) + + pool.destroy() + }) + + test('should use min as initial size if initial not provided', () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 3, + max: 10, + resourceFactory: factory, + }) + + assert.strictEqual(pool.size, 3) + assert.strictEqual(pool.minSize, 3) + + pool.destroy() + }) + + test('should throw if max < min', () => { + const factory = () => ({ id: 1 }) + + assert.throws( + () => { + GenericObjectPool.withDynamicSizing({ + min: 10, + max: 5, + resourceFactory: factory, + }) + }, + { + message: /max.*must be greater than or equal to.*min/i, + }, + ) + }) + + test('should throw if initial < min or initial > max', () => { + const factory = () => ({ id: 1 }) + + assert.throws( + () => { + GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 1, + resourceFactory: factory, + }) + }, + { + message: /initial.*must be between min and max/i, + }, + ) + + assert.throws( + () => { + GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 15, + resourceFactory: factory, + }) + }, + { + message: /initial.*must be between min and max/i, + }, + ) + }) + }) + + describe('Auto Scale-Up', () => { + test('should scale up when pending requests exceed threshold', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, // Scale up when 1 or more requests pending + scaleUpIncrement: 2, // Add 2 resources at a time + }) + + assert.strictEqual(pool.size, 2) + + // Acquire all resources + const r1 = await pool.acquireAsync() + const r2 = await pool.acquireAsync() + + // Wait for any scheduled scale-up operations to settle + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After acquiring all initial resources, available should be 0 (unless auto-scale already triggered) + const availableAfterAcquire = pool.available + const sizeAfterAcquire = pool.size + + // These resources are in use, so available should be less than or equal to size + assert.ok(availableAfterAcquire <= sizeAfterAcquire) + + // Create pending requests that should trigger scale-up + const promises = [] + for (let i = 0; i < 3; i++) { + promises.push(pool.acquireAsync()) + } + + // Wait for scale-up to happen + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Pool should have scaled up from the initial size + assert.ok(pool.size > 2, `Pool size should be > 2, got ${pool.size}`) + assert.ok(pool.size <= 10, `Pool size should be <= 10, got ${pool.size}`) + + // Release original resources + pool.release(r1) + pool.release(r2) + + // All pending requests should resolve + const resources = await Promise.all(promises) + assert.strictEqual(resources.length, 3) + + for (const r of resources) { + pool.release(r) + } + + pool.destroy() + }) + + test('should not scale beyond max size', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 10, // Try to add 10 but max is 5 + }) + + // Acquire all + const r1 = await pool.acquireAsync() + const r2 = await pool.acquireAsync() + + // Create many pending requests + const promises = [] + for (let i = 0; i < 10; i++) { + promises.push( + pool.acquireAsync().then((resource) => { + pool.release(resource) + return resource + }), + ) + } + + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Should not exceed max + assert.ok(pool.size <= 5, `Pool size should not exceed max 5, got ${pool.size}`) + + // Cleanup + pool.release(r1) + pool.release(r2) + await Promise.all(promises) + pool.destroy() + }) + + test('should scale up incrementally based on demand', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 1, + max: 20, + initial: 1, + resourceFactory: factory, + scaleUpThreshold: 2, + scaleUpIncrement: 1, + }) + + const initialSize = pool.size + + // Create concurrent demand to trigger scale-up + await Promise.all( + Array.from({ length: 5 }, () => + pool.acquireAsync(500).then((resource) => { + pool.release(resource) + return resource + }), + ), + ) + + // Pool should have grown + assert.ok(pool.size > initialSize, 'Pool should have grown') + + pool.destroy() + }) + }) + + describe('Auto Scale-Down', () => { + test('should scale down idle resources after timeout', async () => { + let counter = 0 + const factory = () => ({ id: counter++, createdAt: Date.now() }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 8, + resourceFactory: factory, + idleTimeoutMs: 100, // Resources idle for 100ms can be removed + scaleDownCheckIntervalMs: 50, // Check every 50ms + }) + + assert.strictEqual(pool.size, 8) + + // Wait for idle timeout + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Should have scaled down to min + assert.ok(pool.size <= 8, 'Pool should have scaled down') + assert.ok(pool.size >= 2, 'Pool should not go below min') + + pool.destroy() + }) + + test('should not scale below min size', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 5, + max: 10, + initial: 5, + resourceFactory: factory, + idleTimeoutMs: 50, + scaleDownCheckIntervalMs: 25, + }) + + // Wait for potential scale-down attempts + await new Promise((resolve) => setTimeout(resolve, 150)) + + // Should stay at min + assert.strictEqual(pool.size, 5) + + pool.destroy() + }) + + test('should keep frequently used resources', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + idleTimeoutMs: 200, + scaleDownCheckIntervalMs: 50, + }) + + const initialSize = pool.size + + // Continuously use resources to keep them active + const keepAlive = async () => { + for (let i = 0; i < 5; i++) { + const r = await pool.acquireAsync() + await new Promise((resolve) => setTimeout(resolve, 30)) + pool.release(r) + } + } + + await keepAlive() + + // Size should remain stable since resources are being used + assert.strictEqual(pool.size, initialSize) + + pool.destroy() + }) + }) + + describe('Resource Factory', () => { + test('should create resources using factory', async () => { + let createCount = 0 + const factory = () => { + createCount++ + return { id: createCount, value: `resource-${createCount}` } + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + }) + + assert.strictEqual(createCount, 2, 'Factory should be called for initial resources') + + const r1 = await pool.acquireAsync() + assert.ok(r1.id) + assert.ok(r1.value.startsWith('resource-')) + + pool.release(r1) + pool.destroy() + }) + + test('should handle async resource factory', async () => { + let createCount = 0 + const factory = async () => { + createCount++ + await new Promise((resolve) => setTimeout(resolve, 10)) + return { id: createCount, async: true } + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + }) + + // Wait for async initialization + await new Promise((resolve) => setTimeout(resolve, 50)) + + const r1 = await pool.acquireAsync() + assert.ok(r1.async) + + pool.release(r1) + pool.destroy() + }) + + test('should handle resource factory errors', async () => { + let attempts = 0 + const factory = () => { + attempts++ + if (attempts <= 2) { + throw new Error('Factory error') + } + return { id: attempts } + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 1, + max: 5, + resourceFactory: factory, + createRetries: 3, + }) + + // Should eventually succeed with retries + await new Promise((resolve) => setTimeout(resolve, 50)) + + assert.ok(pool.size >= 1, 'Pool should have at least 1 resource after retries') + + pool.destroy() + }) + }) + + describe('Resource Validation', () => { + test('should validate resources before returning them', async () => { + let counter = 0 + const factory = () => ({ id: counter++, valid: true }) + + let validateCount = 0 + const validator = (resource) => { + validateCount++ + return resource.valid === true + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + validateResource: validator, + }) + + const r1 = await pool.acquireAsync() + assert.ok(validateCount > 0, 'Validator should have been called') + assert.ok(r1.valid) + + pool.release(r1) + pool.destroy() + }) + + test('should recreate invalid resources', async () => { + let counter = 0 + const factory = () => ({ id: counter++, valid: true }) + + let firstValidation = true + const validator = (resource) => { + if (firstValidation && resource.id === 0) { + firstValidation = false + return false // First resource is invalid + } + return true + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + validateResource: validator, + validateOnAcquire: true, + }) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + const r1 = await pool.acquireAsync() + + // Should get a valid resource (invalid one was replaced) + assert.ok(r1.valid) + + pool.release(r1) + pool.destroy() + }) + + test('should handle async validation', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const validator = async (resource) => { + await new Promise((resolve) => setTimeout(resolve, 5)) + return resource.id !== undefined + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + validateResource: validator, + }) + + const r1 = await pool.acquireAsync() + assert.ok(r1.id !== undefined) + + pool.release(r1) + pool.destroy() + }) + }) + + describe('Metrics and Observability', () => { + test('should track scale-up events', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 2, + }) + + const initialMetrics = pool.getMetrics() + assert.strictEqual(initialMetrics.scaleUpEvents, 0) + + // Trigger scale-up + const r1 = await pool.acquireAsync() + const r2 = await pool.acquireAsync() + + const promises = [pool.acquireAsync(), pool.acquireAsync()] + + await new Promise((resolve) => setTimeout(resolve, 100)) + + const metrics = pool.getMetrics() + assert.ok(metrics.scaleUpEvents > 0, 'Should have scale-up events') + + pool.release(r1) + pool.release(r2) + const resources = await Promise.all(promises) + for (const r of resources) { + pool.release(r) + } + + pool.destroy() + }) + + test('should track scale-down events', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 6, + resourceFactory: factory, + idleTimeoutMs: 50, + scaleDownCheckIntervalMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 150)) + + const metrics = pool.getMetrics() + assert.ok(metrics.scaleDownEvents >= 0, 'Should have scale-down metrics') + + pool.destroy() + }) + + test('should provide comprehensive metrics', () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + }) + + const metrics = pool.getMetrics() + assert.ok('currentSize' in metrics) + assert.ok('minSize' in metrics) + assert.ok('maxSize' in metrics) + assert.ok('available' in metrics) + assert.ok('inUse' in metrics) + assert.ok('pending' in metrics) + assert.ok('scaleUpEvents' in metrics) + assert.ok('scaleDownEvents' in metrics) + assert.ok('resourcesCreated' in metrics) + assert.ok('resourcesDestroyed' in metrics) + + pool.destroy() + }) + }) + + describe('Advanced Scenarios', () => { + test('should handle mixed workload patterns', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 20, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 2, + scaleUpIncrement: 3, + idleTimeoutMs: 200, + scaleDownCheckIntervalMs: 50, + }) + + // Burst 1: High load + const burst1 = [] + for (let i = 0; i < 10; i++) { + burst1.push(pool.acquireAsync()) + } + const resources1 = await Promise.all(burst1) + + const sizeAfterBurst1 = pool.size + assert.ok(sizeAfterBurst1 > 2, 'Should scale up during burst') + + // Release all + for (const r of resources1) { + pool.release(r) + } + + // Idle period + await new Promise((resolve) => setTimeout(resolve, 300)) + + // Should have scaled down + assert.ok(pool.size < sizeAfterBurst1, 'Should scale down after idle') + + // Burst 2: Another high load + const burst2 = [] + for (let i = 0; i < 8; i++) { + burst2.push(pool.acquireAsync()) + } + const resources2 = await Promise.all(burst2) + + assert.ok(pool.size >= 8, 'Should scale up again for second burst') + + for (const r of resources2) { + pool.release(r) + } + + pool.destroy() + }) + + test('should handle resource destruction callbacks', async () => { + let counter = 0 + const destroyedIds = [] + + const factory = () => ({ id: counter++ }) + const destroyer = (resource) => { + destroyedIds.push(resource.id) + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + resourceDestroyer: destroyer, + idleTimeoutMs: 50, + scaleDownCheckIntervalMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 150)) + + // Some resources should have been destroyed + assert.ok(destroyedIds.length >= 0, 'Destroyer should track removed resources') + + pool.destroy() + + // All resources should be destroyed when pool is destroyed + await new Promise((resolve) => setTimeout(resolve, 50)) + }) + + test('should work correctly with use() pattern', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 2, + }) + + const promises = [] + + for (let i = 0; i < 5; i++) { + promises.push( + pool.use(async (resource) => { + await new Promise((resolve) => setTimeout(resolve, 20)) + return { resourceId: resource.id, iteration: i } + }), + ) + } + + const outcomes = await Promise.all(promises) + assert.strictEqual(outcomes.length, 5) + + outcomes.forEach((outcome) => { + assert.ok(outcome.resourceId !== undefined) + assert.ok(outcome.iteration !== undefined) + }) + + pool.destroy() + }) + }) + + describe('Edge Cases', () => { + test('should handle rapid acquire/release cycles', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + resourceFactory: factory, + scaleUpThreshold: 2, + scaleUpIncrement: 1, + }) + + for (let i = 0; i < 50; i++) { + const r = await pool.acquireAsync() + pool.release(r) + } + + assert.ok(pool.size >= 2) + assert.ok(pool.size <= 10) + + pool.destroy() + }) + + test('should handle concurrent scale operations', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 1, + max: 20, + initial: 1, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 3, + }) + + const promises = [] + for (let i = 0; i < 20; i++) { + promises.push( + (async () => { + const r = await pool.acquireAsync() + await new Promise((resolve) => setTimeout(resolve, 50)) + pool.release(r) + })(), + ) + } + + await Promise.all(promises) + + assert.ok(pool.size <= 20, 'Should not exceed max') + assert.ok(pool.available === pool.size, 'All resources should be available') + + pool.destroy() + }) + + test('should handle zero initial size', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 0, + max: 10, + initial: 0, + resourceFactory: factory, + scaleUpThreshold: 0, + scaleUpIncrement: 2, + }) + + assert.strictEqual(pool.size, 0) + + const r = await pool.acquireAsync() + assert.ok(r) + assert.ok(pool.size > 0) + + pool.release(r) + pool.destroy() + }) + }) +}) diff --git a/__test__/cjs/features/lifecycle.test.cjs b/__test__/cjs/features/lifecycle.test.cjs index 01ce6e6..b4d17bb 100644 --- a/__test__/cjs/features/lifecycle.test.cjs +++ b/__test__/cjs/features/lifecycle.test.cjs @@ -17,7 +17,7 @@ describe('GenericObjectPool - Lifecycle & Observability', () => { assert.strictEqual(pool.numUsed, 1) assert.strictEqual(pool.pendingCount, 0) - const r2 = await pool.acquireAsync() + await pool.acquireAsync() assert.strictEqual(pool.available, 0) assert.strictEqual(pool.numUsed, 2) diff --git a/__test__/cjs/features/resources.test.cjs b/__test__/cjs/features/resources.test.cjs index c5fb109..fe2ef1e 100644 --- a/__test__/cjs/features/resources.test.cjs +++ b/__test__/cjs/features/resources.test.cjs @@ -41,7 +41,7 @@ describe('GenericObjectPool - Resources', () => { const pool = new GenericObjectPool(resources) const r1 = pool.acquire() - const _ = pool.acquire() + pool.acquire() assert.strictEqual(pool.availableCount(), 0) const removed = pool.removeOne() diff --git a/__test__/cjs/features/use.test.cjs b/__test__/cjs/features/use.test.cjs index a80d1cb..d0e6343 100644 --- a/__test__/cjs/features/use.test.cjs +++ b/__test__/cjs/features/use.test.cjs @@ -27,12 +27,12 @@ describe('GenericObjectPool - Use Method', () => { test('should handle timeout in use()', async () => { const pool = new GenericObjectPool([{ id: 1 }]) - const r1 = pool.acquire() // Exhaust the pool + pool.acquire() // Exhaust the pool const start = Date.now() try { await pool.use( - async (resource) => { + async () => { assert.fail('Should not have acquired') }, { timeout: 100 }, diff --git a/__test__/cjs/memory_leak_check.test.cjs b/__test__/cjs/memory_leak_check.test.cjs index c339d88..2afb05a 100644 --- a/__test__/cjs/memory_leak_check.test.cjs +++ b/__test__/cjs/memory_leak_check.test.cjs @@ -7,8 +7,8 @@ */ const { strict: assert } = require('node:assert') -const { test } = require('node:test') const process = require('node:process') +const { test } = require('node:test') const { setTimeout } = require('node:timers/promises') const { GenericObjectPool } = require('../../index.wrapper.cjs') diff --git a/__test__/mjs/concurrency/async.test.mjs b/__test__/mjs/concurrency/async.test.mjs index fd4bc29..6d355fb 100644 --- a/__test__/mjs/concurrency/async.test.mjs +++ b/__test__/mjs/concurrency/async.test.mjs @@ -106,7 +106,7 @@ describe('GenericObjectPool - Concurrency & Async', () => { const pool = new GenericObjectPool([resource]) try { - await pool.use(async (r) => { + await pool.use(async () => { throw new Error('fail') }) } catch (e) { diff --git a/__test__/mjs/features/dynamic-sizing.test.mjs b/__test__/mjs/features/dynamic-sizing.test.mjs new file mode 100644 index 0000000..94cd9ec --- /dev/null +++ b/__test__/mjs/features/dynamic-sizing.test.mjs @@ -0,0 +1,743 @@ +import { strict as assert } from 'node:assert' +import { describe, test } from 'node:test' +import { GenericObjectPool } from '../../../index.wrapper.mjs' + +describe('GenericObjectPool - Dynamic Sizing', () => { + describe('Configuration', () => { + test('should create pool with min, max, and initial size', () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + }) + + assert.strictEqual(pool.size, 5) + assert.strictEqual(pool.minSize, 2) + assert.strictEqual(pool.maxSize, 10) + assert.strictEqual(pool.available, 5) + + pool.destroy() + }) + + test('should use min as initial size if initial not provided', () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 3, + max: 10, + resourceFactory: factory, + }) + + assert.strictEqual(pool.size, 3) + assert.strictEqual(pool.minSize, 3) + + pool.destroy() + }) + + test('should throw if max < min', () => { + const factory = () => ({ id: 1 }) + + assert.throws( + () => { + GenericObjectPool.withDynamicSizing({ + min: 10, + max: 5, + resourceFactory: factory, + }) + }, + { + message: /max.*must be greater than or equal to.*min/i, + }, + ) + }) + + test('should throw if initial < min or initial > max', () => { + const factory = () => ({ id: 1 }) + + assert.throws( + () => { + GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 1, + resourceFactory: factory, + }) + }, + { + message: /initial.*must be between min and max/i, + }, + ) + + assert.throws( + () => { + GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 15, + resourceFactory: factory, + }) + }, + { + message: /initial.*must be between min and max/i, + }, + ) + }) + }) + + describe('Auto Scale-Up', () => { + test('should scale up when pending requests exceed threshold', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, // Scale up when 1 or more requests pending + scaleUpIncrement: 2, // Add 2 resources at a time + }) + + assert.strictEqual(pool.size, 2) + + // Acquire all resources + const r1 = await pool.acquireAsync() + const r2 = await pool.acquireAsync() + + // Wait for any scheduled scale-up operations to settle + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After acquiring all initial resources, available should be 0 (unless auto-scale already triggered) + const availableAfterAcquire = pool.available + const sizeAfterAcquire = pool.size + + // These resources are in use, so available should be less than or equal to size + assert.ok(availableAfterAcquire <= sizeAfterAcquire) + + // Create pending requests that should trigger scale-up + const promises = [] + for (let i = 0; i < 3; i++) { + promises.push(pool.acquireAsync()) + } + + // Wait for scale-up to happen + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Pool should have scaled up from the initial size + assert.ok(pool.size > 2, `Pool size should be > 2, got ${pool.size}`) + assert.ok(pool.size <= 10, `Pool size should be <= 10, got ${pool.size}`) + + // Release original resources + pool.release(r1) + pool.release(r2) + + // All pending requests should resolve + const resources = await Promise.all(promises) + assert.strictEqual(resources.length, 3) + + for (const r of resources) { + pool.release(r) + } + + pool.destroy() + }) + + test('should not scale beyond max size', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 10, // Try to add 10 but max is 5 + }) + + // Acquire all + const r1 = await pool.acquireAsync() + const r2 = await pool.acquireAsync() + + // Create many pending requests + const promises = [] + for (let i = 0; i < 10; i++) { + promises.push( + pool.acquireAsync().then((resource) => { + pool.release(resource) + return resource + }), + ) + } + + await new Promise((resolve) => setTimeout(resolve, 100)) + + // Should not exceed max + assert.ok(pool.size <= 5, `Pool size should not exceed max 5, got ${pool.size}`) + + // Cleanup + pool.release(r1) + pool.release(r2) + await Promise.all(promises) + pool.destroy() + }) + + test('should scale up incrementally based on demand', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 1, + max: 20, + initial: 1, + resourceFactory: factory, + scaleUpThreshold: 2, + scaleUpIncrement: 1, + }) + + const initialSize = pool.size + + // Create concurrent demand to trigger scale-up + await Promise.all( + Array.from({ length: 5 }, () => + pool.acquireAsync(500).then((resource) => { + pool.release(resource) + return resource + }), + ), + ) + + // Pool should have grown + assert.ok(pool.size > initialSize, 'Pool should have grown') + + pool.destroy() + }) + }) + + describe('Auto Scale-Down', () => { + test('should scale down idle resources after timeout', async () => { + let counter = 0 + const factory = () => ({ id: counter++, createdAt: Date.now() }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 8, + resourceFactory: factory, + idleTimeoutMs: 100, // Resources idle for 100ms can be removed + scaleDownCheckIntervalMs: 50, // Check every 50ms + }) + + assert.strictEqual(pool.size, 8) + + // Wait for idle timeout + await new Promise((resolve) => setTimeout(resolve, 200)) + + // Should have scaled down to min + assert.ok(pool.size <= 8, 'Pool should have scaled down') + assert.ok(pool.size >= 2, 'Pool should not go below min') + + pool.destroy() + }) + + test('should not scale below min size', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 5, + max: 10, + initial: 5, + resourceFactory: factory, + idleTimeoutMs: 50, + scaleDownCheckIntervalMs: 25, + }) + + // Wait for potential scale-down attempts + await new Promise((resolve) => setTimeout(resolve, 150)) + + // Should stay at min + assert.strictEqual(pool.size, 5) + + pool.destroy() + }) + + test('should keep frequently used resources', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + idleTimeoutMs: 200, + scaleDownCheckIntervalMs: 50, + }) + + const initialSize = pool.size + + // Continuously use resources to keep them active + const keepAlive = async () => { + for (let i = 0; i < 5; i++) { + const r = await pool.acquireAsync() + await new Promise((resolve) => setTimeout(resolve, 30)) + pool.release(r) + } + } + + await keepAlive() + + // Size should remain stable since resources are being used + assert.strictEqual(pool.size, initialSize) + + pool.destroy() + }) + }) + + describe('Resource Factory', () => { + test('should create resources using factory', async () => { + let createCount = 0 + const factory = () => { + createCount++ + return { id: createCount, value: `resource-${createCount}` } + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + }) + + assert.strictEqual(createCount, 2, 'Factory should be called for initial resources') + + const r1 = await pool.acquireAsync() + assert.ok(r1.id) + assert.ok(r1.value.startsWith('resource-')) + + pool.release(r1) + pool.destroy() + }) + + test('should handle async resource factory', async () => { + let createCount = 0 + const factory = async () => { + createCount++ + await new Promise((resolve) => setTimeout(resolve, 10)) + return { id: createCount, async: true } + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + }) + + // Wait for async initialization + await new Promise((resolve) => setTimeout(resolve, 50)) + + const r1 = await pool.acquireAsync() + assert.ok(r1.async) + + pool.release(r1) + pool.destroy() + }) + + test('should handle resource factory errors', async () => { + let attempts = 0 + const factory = () => { + attempts++ + if (attempts <= 2) { + throw new Error('Factory error') + } + return { id: attempts } + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 1, + max: 5, + resourceFactory: factory, + createRetries: 3, + }) + + // Should eventually succeed with retries + await new Promise((resolve) => setTimeout(resolve, 50)) + + assert.ok(pool.size >= 1, 'Pool should have at least 1 resource after retries') + + pool.destroy() + }) + }) + + describe('Resource Validation', () => { + test('should validate resources before returning them', async () => { + let counter = 0 + const factory = () => ({ id: counter++, valid: true }) + + let validateCount = 0 + const validator = (resource) => { + validateCount++ + return resource.valid === true + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + validateResource: validator, + }) + + const r1 = await pool.acquireAsync() + assert.ok(validateCount > 0, 'Validator should have been called') + assert.ok(r1.valid) + + pool.release(r1) + pool.destroy() + }) + + test('should recreate invalid resources', async () => { + let counter = 0 + const factory = () => ({ id: counter++, valid: true }) + + let firstValidation = true + const validator = (resource) => { + if (firstValidation && resource.id === 0) { + firstValidation = false + return false // First resource is invalid + } + return true + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + validateResource: validator, + validateOnAcquire: true, + }) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + const r1 = await pool.acquireAsync() + + // Should get a valid resource (invalid one was replaced) + assert.ok(r1.valid) + + pool.release(r1) + pool.destroy() + }) + + test('should handle async validation', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const validator = async (resource) => { + await new Promise((resolve) => setTimeout(resolve, 5)) + return resource.id !== undefined + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 5, + resourceFactory: factory, + validateResource: validator, + }) + + const r1 = await pool.acquireAsync() + assert.ok(r1.id !== undefined) + + pool.release(r1) + pool.destroy() + }) + }) + + describe('Metrics and Observability', () => { + test('should track scale-up events', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 2, + }) + + const initialMetrics = pool.getMetrics() + assert.strictEqual(initialMetrics.scaleUpEvents, 0) + + // Trigger scale-up + const r1 = await pool.acquireAsync() + const r2 = await pool.acquireAsync() + + const promises = [pool.acquireAsync(), pool.acquireAsync()] + + await new Promise((resolve) => setTimeout(resolve, 100)) + + const metrics = pool.getMetrics() + assert.ok(metrics.scaleUpEvents > 0, 'Should have scale-up events') + + pool.release(r1) + pool.release(r2) + const resources = await Promise.all(promises) + for (const r of resources) { + pool.release(r) + } + + pool.destroy() + }) + + test('should track scale-down events', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 6, + resourceFactory: factory, + idleTimeoutMs: 50, + scaleDownCheckIntervalMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 150)) + + const metrics = pool.getMetrics() + assert.ok(metrics.scaleDownEvents >= 0, 'Should have scale-down metrics') + + pool.destroy() + }) + + test('should provide comprehensive metrics', () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + }) + + const metrics = pool.getMetrics() + assert.ok('currentSize' in metrics) + assert.ok('minSize' in metrics) + assert.ok('maxSize' in metrics) + assert.ok('available' in metrics) + assert.ok('inUse' in metrics) + assert.ok('pending' in metrics) + assert.ok('scaleUpEvents' in metrics) + assert.ok('scaleDownEvents' in metrics) + assert.ok('resourcesCreated' in metrics) + assert.ok('resourcesDestroyed' in metrics) + + pool.destroy() + }) + }) + + describe('Advanced Scenarios', () => { + test('should handle mixed workload patterns', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 20, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 2, + scaleUpIncrement: 3, + idleTimeoutMs: 200, + scaleDownCheckIntervalMs: 50, + }) + + // Burst 1: High load + const burst1 = [] + for (let i = 0; i < 10; i++) { + burst1.push(pool.acquireAsync()) + } + const resources1 = await Promise.all(burst1) + + const sizeAfterBurst1 = pool.size + assert.ok(sizeAfterBurst1 > 2, 'Should scale up during burst') + + // Release all + for (const r of resources1) { + pool.release(r) + } + + // Idle period + await new Promise((resolve) => setTimeout(resolve, 300)) + + // Should have scaled down + assert.ok(pool.size < sizeAfterBurst1, 'Should scale down after idle') + + // Burst 2: Another high load + const burst2 = [] + for (let i = 0; i < 8; i++) { + burst2.push(pool.acquireAsync()) + } + const resources2 = await Promise.all(burst2) + + assert.ok(pool.size >= 8, 'Should scale up again for second burst') + + for (const r of resources2) { + pool.release(r) + } + + pool.destroy() + }) + + test('should handle resource destruction callbacks', async () => { + let counter = 0 + const destroyedIds = [] + + const factory = () => ({ id: counter++ }) + const destroyer = (resource) => { + destroyedIds.push(resource.id) + } + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 5, + resourceFactory: factory, + resourceDestroyer: destroyer, + idleTimeoutMs: 50, + scaleDownCheckIntervalMs: 25, + }) + + await new Promise((resolve) => setTimeout(resolve, 150)) + + // Some resources should have been destroyed + assert.ok(destroyedIds.length >= 0, 'Destroyer should track removed resources') + + pool.destroy() + + // All resources should be destroyed when pool is destroyed + await new Promise((resolve) => setTimeout(resolve, 50)) + }) + + test('should work correctly with use() pattern', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + initial: 2, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 2, + }) + + const promises = [] + + for (let i = 0; i < 5; i++) { + promises.push( + pool.use(async (resource) => { + await new Promise((resolve) => setTimeout(resolve, 20)) + return { resourceId: resource.id, iteration: i } + }), + ) + } + + const outcomes = await Promise.all(promises) + assert.strictEqual(outcomes.length, 5) + + outcomes.forEach((outcome) => { + assert.ok(outcome.resourceId !== undefined) + assert.ok(outcome.iteration !== undefined) + }) + + pool.destroy() + }) + }) + + describe('Edge Cases', () => { + test('should handle rapid acquire/release cycles', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 2, + max: 10, + resourceFactory: factory, + scaleUpThreshold: 2, + scaleUpIncrement: 1, + }) + + for (let i = 0; i < 50; i++) { + const r = await pool.acquireAsync() + pool.release(r) + } + + assert.ok(pool.size >= 2) + assert.ok(pool.size <= 10) + + pool.destroy() + }) + + test('should handle concurrent scale operations', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 1, + max: 20, + initial: 1, + resourceFactory: factory, + scaleUpThreshold: 1, + scaleUpIncrement: 3, + }) + + const promises = [] + for (let i = 0; i < 20; i++) { + promises.push( + (async () => { + const r = await pool.acquireAsync() + await new Promise((resolve) => setTimeout(resolve, 50)) + pool.release(r) + })(), + ) + } + + await Promise.all(promises) + + assert.ok(pool.size <= 20, 'Should not exceed max') + assert.ok(pool.available === pool.size, 'All resources should be available') + + pool.destroy() + }) + + test('should handle zero initial size', async () => { + let counter = 0 + const factory = () => ({ id: counter++ }) + + const pool = GenericObjectPool.withDynamicSizing({ + min: 0, + max: 10, + initial: 0, + resourceFactory: factory, + scaleUpThreshold: 0, + scaleUpIncrement: 2, + }) + + assert.strictEqual(pool.size, 0) + + const r = await pool.acquireAsync() + assert.ok(r) + assert.ok(pool.size > 0) + + pool.release(r) + pool.destroy() + }) + }) +}) diff --git a/__test__/mjs/features/lifecycle.test.mjs b/__test__/mjs/features/lifecycle.test.mjs index 9bc164d..8fa17ec 100644 --- a/__test__/mjs/features/lifecycle.test.mjs +++ b/__test__/mjs/features/lifecycle.test.mjs @@ -17,7 +17,7 @@ describe('GenericObjectPool - Lifecycle & Observability', () => { assert.strictEqual(pool.numUsed, 1) assert.strictEqual(pool.pendingCount, 0) - const r2 = await pool.acquireAsync() + await pool.acquireAsync() assert.strictEqual(pool.available, 0) assert.strictEqual(pool.numUsed, 2) diff --git a/__test__/mjs/features/resources.test.mjs b/__test__/mjs/features/resources.test.mjs index 274ea11..0e62f9b 100644 --- a/__test__/mjs/features/resources.test.mjs +++ b/__test__/mjs/features/resources.test.mjs @@ -41,7 +41,7 @@ describe('GenericObjectPool - Resources', () => { const pool = new GenericObjectPool(resources) const r1 = pool.acquire() - const _ = pool.acquire() + pool.acquire() assert.strictEqual(pool.availableCount(), 0) const removed = pool.removeOne() diff --git a/__test__/mjs/features/use.test.mjs b/__test__/mjs/features/use.test.mjs index ba23c56..58b0b55 100644 --- a/__test__/mjs/features/use.test.mjs +++ b/__test__/mjs/features/use.test.mjs @@ -27,12 +27,12 @@ describe('GenericObjectPool - Use Method', () => { test('should handle timeout in use()', async () => { const pool = new GenericObjectPool([{ id: 1 }]) - const r1 = pool.acquire() // Exhaust the pool + pool.acquire() // Exhaust the pool const start = Date.now() try { await pool.use( - async (resource) => { + async () => { assert.fail('Should not have acquired') }, { timeout: 100 }, diff --git a/__test__/mjs/memory_leak_check.test.mjs b/__test__/mjs/memory_leak_check.test.mjs index e79c63f..614dc29 100644 --- a/__test__/mjs/memory_leak_check.test.mjs +++ b/__test__/mjs/memory_leak_check.test.mjs @@ -7,8 +7,8 @@ */ import { strict as assert } from 'node:assert' -import { test } from 'node:test' import process from 'node:process' +import { test } from 'node:test' import { setTimeout } from 'node:timers/promises' import { GenericObjectPool } from '../../index.wrapper.mjs' diff --git a/benchmarks/competitors/generic-object-pool-dynamic-use.js b/benchmarks/competitors/generic-object-pool-dynamic-use.js new file mode 100644 index 0000000..f3eb22a --- /dev/null +++ b/benchmarks/competitors/generic-object-pool-dynamic-use.js @@ -0,0 +1,26 @@ +import { GenericObjectPool } from '@lojhan/resource-pool' + +export default { + name: 'GenericObjectPool (Dynamic) .use()', + + setup: async (poolSize) => { + return GenericObjectPool.dynamic({ + min: poolSize, + max: poolSize, + initial: poolSize, + resourceFactory: () => ({}), + }) + }, + + run: async (pool, iterations) => { + const task = () => Promise.resolve() + + for (let i = 0; i < iterations; i++) { + await pool.use(task) + } + }, + + teardown: async (pool) => { + pool.destroy() + }, +} diff --git a/benchmarks/competitors/generic-object-pool-dynamic.js b/benchmarks/competitors/generic-object-pool-dynamic.js new file mode 100644 index 0000000..23b6353 --- /dev/null +++ b/benchmarks/competitors/generic-object-pool-dynamic.js @@ -0,0 +1,25 @@ +import { GenericObjectPool } from '@lojhan/resource-pool' + +export default { + name: 'GenericObjectPool (Dynamic/Sync)', + + setup: async (poolSize) => { + return GenericObjectPool.dynamic({ + min: poolSize, + max: poolSize, + initial: poolSize, + resourceFactory: () => ({}), + }) + }, + + run: async (pool, iterations) => { + for (let i = 0; i < iterations; i++) { + const r = pool.acquire() + pool.release(r) + } + }, + + teardown: async (pool) => { + pool.destroy() + }, +} diff --git a/benchmarks/competitors/generic-object-pool-engine-use.js b/benchmarks/competitors/generic-object-pool-engine-use.js new file mode 100644 index 0000000..1a0bd28 --- /dev/null +++ b/benchmarks/competitors/generic-object-pool-engine-use.js @@ -0,0 +1,21 @@ +import { GenericObjectPool } from '@lojhan/resource-pool' + +export default { + name: 'GenericObjectPool (Engine) .use()', + + setup: async (poolSize) => { + return GenericObjectPool.engine(poolSize) + }, + + run: async (pool, iterations) => { + const task = () => Promise.resolve() + + for (let i = 0; i < iterations; i++) { + await pool.use(task) + } + }, + + teardown: async (pool) => { + pool.destroy() + }, +} diff --git a/benchmarks/competitors/generic-object-pool-engine.js b/benchmarks/competitors/generic-object-pool-engine.js new file mode 100644 index 0000000..73f628b --- /dev/null +++ b/benchmarks/competitors/generic-object-pool-engine.js @@ -0,0 +1,20 @@ +import { GenericObjectPool } from '@lojhan/resource-pool' + +export default { + name: 'GenericObjectPool (Engine/Index)', + + setup: async (poolSize) => { + return GenericObjectPool.engine(poolSize) + }, + + run: async (pool, iterations) => { + for (let i = 0; i < iterations; i++) { + const idx = pool.acquire() + pool.release(idx) + } + }, + + teardown: async (pool) => { + pool.destroy() + }, +} diff --git a/benchmarks/competitors/generic-object-pool-use.js b/benchmarks/competitors/generic-object-pool-use.js index fce5cc6..7ee0183 100644 --- a/benchmarks/competitors/generic-object-pool-use.js +++ b/benchmarks/competitors/generic-object-pool-use.js @@ -1,7 +1,7 @@ import { GenericObjectPool } from '@lojhan/resource-pool' export default { - name: 'GenericObjectPool .use()', + name: 'GenericObjectPool (Static) .use()', setup: async (poolSize) => { const resources = Array.from({ length: poolSize }, (_, i) => ({ id: i })) diff --git a/benchmarks/competitors/generic-object-pool.js b/benchmarks/competitors/generic-object-pool.js index b8c747c..61151ee 100644 --- a/benchmarks/competitors/generic-object-pool.js +++ b/benchmarks/competitors/generic-object-pool.js @@ -1,7 +1,7 @@ import { GenericObjectPool } from '@lojhan/resource-pool' export default { - name: 'GenericObjectPool (Native/Sync)', + name: 'GenericObjectPool (Static/Sync)', setup: async (poolSize) => { const resources = Array.from({ length: poolSize }, (_, i) => ({ id: i })) diff --git a/benchmarks/results.md b/benchmarks/results.md index 3db9d46..45ee722 100644 --- a/benchmarks/results.md +++ b/benchmarks/results.md @@ -1,10 +1,12 @@ -| Command | Mean [ms] | Min [ms] | Max [ms] | Relative | -| :------------------------------------------------------ | ------------: | -------: | -------: | ----------: | -| `node benchmarks/run-single.js generic-object-pool-use` | 312.2 ± 19.2 | 280.9 | 345.6 | 1.62 ± 0.14 | -| `node benchmarks/run-single.js generic-object-pool` | 192.6 ± 11.1 | 168.3 | 207.1 | 1.00 | -| `node benchmarks/run-single.js generic-pool-use` | 875.1 ± 36.4 | 829.6 | 936.4 | 4.54 ± 0.32 | -| `node benchmarks/run-single.js generic-pool` | 848.6 ± 37.7 | 772.4 | 886.8 | 4.41 ± 0.32 | -| `node benchmarks/run-single.js tarn-use` | 1183.7 ± 54.5 | 1084.6 | 1265.9 | 6.15 ± 0.45 | -| `node benchmarks/run-single.js tarn` | 1153.1 ± 60.2 | 1039.0 | 1255.7 | 5.99 ± 0.46 | -| ± 0.32 | -| `node benchmarks/run-single.js tarn-use` | 1183.7 ± 54.5 | 1084.6 | 1265.9 | 6.15 ± 0.45 | +| Command | Mean [ms] | Min [ms] | Max [ms] | Relative | +| :-------------------------------------------------------------- | ------------: | -------: | -------: | ----------: | +| `node benchmarks/run-single.js generic-object-pool-dynamic-use` | 279.3 ± 15.8 | 252.8 | 307.4 | 1.67 ± 0.15 | +| `node benchmarks/run-single.js generic-object-pool-dynamic` | 219.0 ± 12.7 | 192.8 | 233.2 | 1.31 ± 0.12 | +| `node benchmarks/run-single.js generic-object-pool-engine-use` | 276.8 ± 27.0 | 247.4 | 344.9 | 1.65 ± 0.20 | +| `node benchmarks/run-single.js generic-object-pool-engine` | 167.7 ± 13.1 | 147.7 | 195.6 | 1.00 ± 0.11 | +| `node benchmarks/run-single.js generic-object-pool-use` | 281.3 ± 16.3 | 253.0 | 299.9 | 1.68 ± 0.15 | +| `node benchmarks/run-single.js generic-object-pool` | 167.4 ± 11.9 | 149.7 | 187.9 | 1.00 | +| `node benchmarks/run-single.js generic-pool-use` | 866.9 ± 37.9 | 814.5 | 911.1 | 5.18 ± 0.43 | +| `node benchmarks/run-single.js generic-pool` | 814.9 ± 38.4 | 758.5 | 868.7 | 4.87 ± 0.41 | +| `node benchmarks/run-single.js tarn-use` | 1170.6 ± 62.7 | 1095.7 | 1286.6 | 6.99 ± 0.62 | +| `node benchmarks/run-single.js tarn` | 1116.8 ± 39.9 | 1066.3 | 1194.6 | 6.67 ± 0.53 | diff --git a/implementations/index.cjs b/implementations/index.cjs new file mode 100644 index 0000000..ff2d969 --- /dev/null +++ b/implementations/index.cjs @@ -0,0 +1,572 @@ +const nativeModule = require('../index.js') + +const NativePool = nativeModule.GenericObjectPool + +class StaticObjectPool { + constructor(resources) { + this.resources = [...resources] + + this.resourceToIdx = new Map() + for (const [i, r] of this.resources.entries()) { + this.resourceToIdx.set(r, i) + } + + this.pool = new NativePool(resources.length) + } + + acquire() { + const idx = this.pool.acquire() + return this.resources[idx] + } + + async acquireAsync(timeoutMs) { + const idx = await this.pool.acquireAsync(timeoutMs) + return this.resources[idx] + } + + release(resource) { + const idx = this.resourceToIdx.get(resource) + + if (idx === undefined) { + throw new Error('Resource not belonging to pool') + } + + this.pool.release(idx) + } + + add(resource) { + const newIdx = this.resources.length + this.resources.push(resource) + this.resourceToIdx.set(resource, newIdx) + this.pool.add(newIdx) + } + + removeOne() { + const idx = this.pool.removeOne() + if (idx === null) return false + + const resource = this.resources[idx] + this.resourceToIdx.delete(resource) + this.resources[idx] = null + return true + } + + async use(fn, { optimistic = true, timeout } = {}) { + let resource + if (optimistic) { + try { + const idx = this.pool.acquire() + resource = this.resources[idx] + } catch {} + } + + if (!resource) { + const idx = await this.pool.acquireAsync(timeout) + resource = this.resources[idx] + } + + try { + return await fn(resource) + } finally { + const idx = this.resourceToIdx.get(resource) + if (idx !== undefined) { + this.pool.release(idx) + } + } + } + + availableCount() { + return this.pool.availableCount() + } + + get size() { + return this.pool.size() + } + + get pendingCount() { + return this.pool.pendingCount() + } + + get available() { + return this.pool.availableCount() + } + + get numUsed() { + return this.pool.size() - this.pool.availableCount() + } + + getMetrics() { + const size = this.pool.size() + const available = this.pool.availableCount() + return { + currentSize: size, + minSize: size, + maxSize: size, + available, + inUse: size - available, + pending: this.pool.pendingCount(), + scaleUpEvents: 0, + scaleDownEvents: 0, + resourcesCreated: size, + resourcesDestroyed: 0, + } + } + + destroy() { + this.pool.destroy() + this.resources = [] + this.resourceToIdx.clear() + } +} + +class EnginePool { + constructor(size) { + this.pool = new NativePool(size) + } + + acquire() { + return this.pool.acquire() + } + + async acquireAsync(timeoutMs) { + return this.pool.acquireAsync(timeoutMs) + } + + release(idx) { + this.pool.release(idx) + } + + add(idx) { + this.pool.add(idx) + } + + removeOne() { + return this.pool.removeOne() + } + + async use(fn, { optimistic = true, timeout } = {}) { + let idx + if (optimistic) { + try { + idx = this.pool.acquire() + } catch {} + } + + if (idx === undefined) { + idx = await this.pool.acquireAsync(timeout) + } + + try { + return await fn(idx) + } finally { + this.pool.release(idx) + } + } + + availableCount() { + return this.pool.availableCount() + } + + get size() { + return this.pool.size() + } + + get pendingCount() { + return this.pool.pendingCount() + } + + get available() { + return this.pool.availableCount() + } + + get numUsed() { + return this.pool.size() - this.pool.availableCount() + } + + getMetrics() { + const size = this.pool.size() + const available = this.pool.availableCount() + return { + currentSize: size, + minSize: size, + maxSize: size, + available, + inUse: size - available, + pending: this.pool.pendingCount(), + scaleUpEvents: 0, + scaleDownEvents: 0, + resourcesCreated: size, + resourcesDestroyed: 0, + } + } + + destroy() { + this.pool.destroy() + } +} + +class DynamicObjectPool extends StaticObjectPool { + #minSize = 0 + #maxSize = Infinity + #resourceFactory = null + #validateResource = null + #resourceDestroyer = null + + #scaleUpThreshold = 5 + #scaleUpIncrement = 1 + #idleTimeoutMs = 30000 + #scaleDownCheckIntervalMs = 10000 + #validateOnAcquire = false + #createRetries = 3 + + #scaleDownTimer = null + #scaleUpCheckTimer = null + #lastActivityAt = 0 + #isScaling = false + #pendingAsync = 0 + #scaleUpFailureCount = 0 + #scaleUpCooldownUntil = 0 + + #metrics = { + scaleUpEvents: 0, + scaleDownEvents: 0, + resourcesCreated: 0, + resourcesDestroyed: 0, + } + + static withDynamicSizing(config) { + const { + min, + max, + initial = min, + resourceFactory, + validateResource = null, + resourceDestroyer = null, + scaleUpThreshold = 5, + scaleUpIncrement = 1, + idleTimeoutMs = 30000, + scaleDownCheckIntervalMs = 10000, + validateOnAcquire = undefined, + createRetries = 3, + } = config + + if (!resourceFactory) { + throw new Error('resourceFactory is required for dynamic sizing') + } + + if (max < min) { + throw new Error('max size must be greater than or equal to min size') + } + + if (initial < min || initial > max) { + throw new Error('initial size must be between min and max') + } + + const resources = [] + const initialTarget = initial + for (let i = 0; i < initial; i++) { + try { + const resource = resourceFactory() + const isPromise = resource && typeof resource.then === 'function' + if (isPromise) { + continue + } + resources.push(resource) + } catch (err) { + console.error('Failed to create initial resource:', err) + } + } + + const pool = new DynamicObjectPool(resources) + pool.#minSize = min + pool.#maxSize = max + pool.#resourceFactory = resourceFactory + pool.#validateResource = validateResource + pool.#resourceDestroyer = resourceDestroyer + pool.#scaleUpThreshold = scaleUpThreshold + pool.#scaleUpIncrement = scaleUpIncrement + pool.#idleTimeoutMs = idleTimeoutMs + pool.#scaleDownCheckIntervalMs = scaleDownCheckIntervalMs + pool.#validateOnAcquire = validateOnAcquire ?? Boolean(validateResource) + pool.#createRetries = createRetries + + pool.#lastActivityAt = Date.now() + pool.#metrics.resourcesCreated = resources.length + + if (idleTimeoutMs > 0 && scaleDownCheckIntervalMs > 0) { + pool.#startScaleDownMonitoring() + } + + if (pool.pool.size() < initialTarget) { + pool.#fillToTarget(initialTarget).catch((err) => console.error('Failed to fill initial resources:', err)) + } + + return pool + } + + async #fillToTarget(targetSize) { + while (this.pool.size() < targetSize) { + const resource = await this.#createResourceWithRetry() + super.add(resource) + this.#lastActivityAt = Date.now() + } + } + + async #createResourceWithRetry() { + let lastError + for (let attempt = 0; attempt < this.#createRetries; attempt++) { + try { + const resource = await Promise.resolve(this.#resourceFactory()) + this.#metrics.resourcesCreated++ + return resource + } catch (err) { + lastError = err + if (attempt < this.#createRetries - 1) { + const delay = Math.min(200, 10 * Math.pow(2, attempt)) + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + } + throw lastError + } + + #shouldScaleUp() { + if (this.pool.size() >= this.#maxSize) return false + if (this.#isScaling) return false + if (Date.now() < this.#scaleUpCooldownUntil) return false + + const pending = this.#pendingAsync + const available = this.pool.availableCount() + return pending >= this.#scaleUpThreshold && pending > available + } + + #maybeScaleUp() { + if (this.#shouldScaleUp()) { + this.#scaleUp().catch((err) => console.error('Scale up error:', err)) + return + } + + if (this.#scaleUpCheckTimer || this.#pendingAsync === 0) return + + this.#scaleUpCheckTimer = setTimeout(() => { + this.#scaleUpCheckTimer = null + if (this.#shouldScaleUp()) { + this.#scaleUp().catch((err) => console.error('Scale up error:', err)) + } + }, 0) + + if (this.#scaleUpCheckTimer.unref) { + this.#scaleUpCheckTimer.unref() + } + } + + async #scaleUp() { + if (this.#isScaling) return + this.#isScaling = true + + try { + const currentSize = this.pool.size() + const available = this.pool.availableCount() + const pending = this.#pendingAsync + const needed = Math.max(this.#scaleUpIncrement, pending - available) + const toAdd = Math.min(needed, this.#maxSize - currentSize) + + if (toAdd <= 0) { + return + } + + for (let i = 0; i < toAdd; i++) { + try { + const resource = await this.#createResourceWithRetry() + super.add(resource) + this.#lastActivityAt = Date.now() + this.#scaleUpFailureCount = 0 + } catch (err) { + console.error('Failed to scale up:', err) + this.#scaleUpFailureCount += 1 + this.#scaleUpCooldownUntil = Date.now() + Math.min(1000, 50 * this.#scaleUpFailureCount) + break + } + } + + this.#metrics.scaleUpEvents++ + } finally { + this.#isScaling = false + } + } + + #startScaleDownMonitoring() { + const scheduleNext = () => { + const jitter = Math.floor(this.#scaleDownCheckIntervalMs * 0.1 * (Math.random() - 0.5)) + const nextIn = Math.max(10, this.#scaleDownCheckIntervalMs + jitter) + this.#scaleDownTimer = setTimeout(() => { + this.#checkAndScaleDown() + scheduleNext() + }, nextIn) + + if (this.#scaleDownTimer.unref) { + this.#scaleDownTimer.unref() + } + } + + scheduleNext() + } + + #checkAndScaleDown() { + if (this.pool.size() <= this.#minSize) return + if (this.#isScaling) return + + const now = Date.now() + const available = this.pool.availableCount() + const currentSize = this.pool.size() + + if (available === 0) return + if (now - this.#lastActivityAt <= this.#idleTimeoutMs) return + + const canRemove = Math.min(available, currentSize - this.#minSize) + + if (canRemove > 0) { + for (let i = 0; i < canRemove; i++) { + const removed = this.removeOne() + if (removed) { + this.#metrics.scaleDownEvents++ + } else { + break + } + } + } + } + + async #validateResourceIfNeeded(resource) { + if (!this.#validateResource) return true + + try { + return await Promise.resolve(this.#validateResource(resource)) + } catch (err) { + console.error('Resource validation error:', err) + return false + } + } + + acquire() { + const resource = super.acquire() + this.#lastActivityAt = Date.now() + return resource + } + + async acquireAsync(timeoutMs) { + this.#pendingAsync += 1 + this.#maybeScaleUp() + + try { + const idx = await this.pool.acquireAsync(timeoutMs) + const resource = this.resources[idx] + + this.#lastActivityAt = Date.now() + + if (this.#validateOnAcquire) { + const isValid = await this.#validateResourceIfNeeded(resource) + if (!isValid) { + this.pool.release(idx) + try { + const newResource = await this.#createResourceWithRetry() + this.resources[idx] = newResource + this.resourceToIdx.delete(resource) + this.resourceToIdx.set(newResource, idx) + + if (this.#resourceDestroyer) { + await Promise.resolve(this.#resourceDestroyer(resource)) + } + + return newResource + } catch (err) { + console.error('Failed to replace invalid resource:', err) + throw new Error('Resource validation failed and replacement failed') + } + } + } + + return resource + } finally { + this.#pendingAsync -= 1 + } + } + + release(resource) { + super.release(resource) + this.#lastActivityAt = Date.now() + } + + add(resource) { + super.add(resource) + this.#lastActivityAt = Date.now() + } + + removeOne() { + const idx = this.pool.removeOne() + if (idx === null) return false + + const resource = this.resources[idx] + this.resourceToIdx.delete(resource) + this.resources[idx] = null + + this.#lastActivityAt = Date.now() + + if (this.#resourceDestroyer) { + Promise.resolve(this.#resourceDestroyer(resource)).catch((err) => console.error('Resource destroyer error:', err)) + } + + this.#metrics.resourcesDestroyed++ + return true + } + + destroy() { + if (this.#scaleDownTimer) { + clearTimeout(this.#scaleDownTimer) + this.#scaleDownTimer = null + } + if (this.#scaleUpCheckTimer) { + clearTimeout(this.#scaleUpCheckTimer) + this.#scaleUpCheckTimer = null + } + + if (this.#resourceDestroyer) { + for (const resource of this.resources) { + if (resource !== null) { + Promise.resolve(this.#resourceDestroyer(resource)).catch((err) => + console.error('Resource destroyer error:', err), + ) + } + } + } + + super.destroy() + + this.#lastActivityAt = 0 + } + + getMetrics() { + return { + currentSize: this.pool.size(), + minSize: this.#minSize, + maxSize: this.#maxSize, + available: this.pool.availableCount(), + inUse: this.pool.size() - this.pool.availableCount(), + pending: Math.max(this.pool.pendingCount(), this.#pendingAsync), + scaleUpEvents: this.#metrics.scaleUpEvents, + scaleDownEvents: this.#metrics.scaleDownEvents, + resourcesCreated: this.#metrics.resourcesCreated, + resourcesDestroyed: this.#metrics.resourcesDestroyed, + } + } + + get minSize() { + return this.#minSize + } + + get maxSize() { + return this.#maxSize + } +} + +module.exports = { StaticObjectPool, DynamicObjectPool, EnginePool } diff --git a/implementations/index.d.ts b/implementations/index.d.ts new file mode 100644 index 0000000..05c2481 --- /dev/null +++ b/implementations/index.d.ts @@ -0,0 +1,79 @@ +/** + * Configuration for dynamic pool sizing + */ +export interface DynamicSizingConfig { + min: number + max: number + initial?: number + resourceFactory: () => T | Promise + validateResource?: (resource: T) => boolean | Promise + resourceDestroyer?: (resource: T) => void | Promise + scaleUpThreshold?: number + scaleUpIncrement?: number + idleTimeoutMs?: number + scaleDownCheckIntervalMs?: number + validateOnAcquire?: boolean + createRetries?: number +} + +/** + * Pool metrics for monitoring + */ +export interface PoolMetrics { + currentSize: number + minSize: number + maxSize: number + available: number + inUse: number + pending: number + scaleUpEvents: number + scaleDownEvents: number + resourcesCreated: number + resourcesDestroyed: number +} + +export declare class StaticObjectPool { + constructor(resources: T[]) + acquire(): T + acquireAsync(timeoutMs?: number): Promise + release(resource: T): void + add(resource: T): void + removeOne(): boolean + use(fn: (resource: T) => Promise, options?: { optimistic?: boolean; timeout?: number }): Promise + availableCount(): number + getMetrics(): PoolMetrics + get size(): number + get pendingCount(): number + get available(): number + get numUsed(): number + destroy(): void +} + +export declare class EnginePool { + constructor(size: number) + acquire(): number + acquireAsync(timeoutMs?: number): Promise + release(idx: number): void + add(idx: number): void + removeOne(): number | null + use(fn: (idx: number) => Promise, options?: { optimistic?: boolean; timeout?: number }): Promise + availableCount(): number + getMetrics(): PoolMetrics + get size(): number + get pendingCount(): number + get available(): number + get numUsed(): number + destroy(): void +} + +export declare class DynamicObjectPool extends StaticObjectPool { + static withDynamicSizing(config: DynamicSizingConfig): DynamicObjectPool + acquireAsync(timeoutMs?: number): Promise + release(resource: T): void + add(resource: T): void + removeOne(): boolean + destroy(): void + getMetrics(): PoolMetrics + get minSize(): number + get maxSize(): number +} diff --git a/implementations/index.mjs b/implementations/index.mjs new file mode 100644 index 0000000..dcd3fd1 --- /dev/null +++ b/implementations/index.mjs @@ -0,0 +1,589 @@ +import nativeModule from '../index.js' + +const NativePool = nativeModule.GenericObjectPool + +/** + * Static-size pool implementation + * @template T + */ +export class StaticObjectPool { + /** + * Create a new resource pool + * @param {T[]} resources - Initial resources in the pool + */ + constructor(resources) { + this.resources = [...resources] + + this.resourceToIdx = new Map() + for (const [i, r] of this.resources.entries()) { + this.resourceToIdx.set(r, i) + } + + this.pool = new NativePool(resources.length) + } + + acquire() { + const idx = this.pool.acquire() + return this.resources[idx] + } + + async acquireAsync(timeoutMs) { + const idx = await this.pool.acquireAsync(timeoutMs) + return this.resources[idx] + } + + release(resource) { + const idx = this.resourceToIdx.get(resource) + + if (idx === undefined) { + throw new Error('Resource not belonging to pool') + } + + this.pool.release(idx) + } + + add(resource) { + const newIdx = this.resources.length + this.resources.push(resource) + this.resourceToIdx.set(resource, newIdx) + this.pool.add(newIdx) + } + + removeOne() { + const idx = this.pool.removeOne() + if (idx === null) return false + + const resource = this.resources[idx] + this.resourceToIdx.delete(resource) + this.resources[idx] = null + return true + } + + async use(fn, { optimistic = true, timeout } = {}) { + let resource + if (optimistic) { + try { + const idx = this.pool.acquire() + resource = this.resources[idx] + } catch {} + } + + if (!resource) { + const idx = await this.pool.acquireAsync(timeout) + resource = this.resources[idx] + } + + try { + return await fn(resource) + } finally { + const idx = this.resourceToIdx.get(resource) + if (idx !== undefined) { + this.pool.release(idx) + } + } + } + + availableCount() { + return this.pool.availableCount() + } + + get size() { + return this.pool.size() + } + + get pendingCount() { + return this.pool.pendingCount() + } + + get available() { + return this.pool.availableCount() + } + + get numUsed() { + return this.pool.size() - this.pool.availableCount() + } + + getMetrics() { + const size = this.pool.size() + const available = this.pool.availableCount() + return { + currentSize: size, + minSize: size, + maxSize: size, + available, + inUse: size - available, + pending: this.pool.pendingCount(), + scaleUpEvents: 0, + scaleDownEvents: 0, + resourcesCreated: size, + resourcesDestroyed: 0, + } + } + + destroy() { + this.pool.destroy() + this.resources = [] + this.resourceToIdx.clear() + } +} + +/** + * Index-only pool implementation + */ +export class EnginePool { + /** + * Create a new pool with a fixed size + * @param {number} size + */ + constructor(size) { + this.pool = new NativePool(size) + } + + acquire() { + return this.pool.acquire() + } + + async acquireAsync(timeoutMs) { + return this.pool.acquireAsync(timeoutMs) + } + + release(idx) { + this.pool.release(idx) + } + + add(idx) { + this.pool.add(idx) + } + + removeOne() { + return this.pool.removeOne() + } + + async use(fn, { optimistic = true, timeout } = {}) { + let idx + if (optimistic) { + try { + idx = this.pool.acquire() + } catch {} + } + + if (idx === undefined) { + idx = await this.pool.acquireAsync(timeout) + } + + try { + return await fn(idx) + } finally { + this.pool.release(idx) + } + } + + availableCount() { + return this.pool.availableCount() + } + + get size() { + return this.pool.size() + } + + get pendingCount() { + return this.pool.pendingCount() + } + + get available() { + return this.pool.availableCount() + } + + get numUsed() { + return this.pool.size() - this.pool.availableCount() + } + + getMetrics() { + const size = this.pool.size() + const available = this.pool.availableCount() + return { + currentSize: size, + minSize: size, + maxSize: size, + available, + inUse: size - available, + pending: this.pool.pendingCount(), + scaleUpEvents: 0, + scaleDownEvents: 0, + resourcesCreated: size, + resourcesDestroyed: 0, + } + } + + destroy() { + this.pool.destroy() + } +} + +/** + * Dynamic-size pool implementation + * @template T + */ +export class DynamicObjectPool extends StaticObjectPool { + #minSize = 0 + #maxSize = Infinity + #resourceFactory = null + #validateResource = null + #resourceDestroyer = null + + #scaleUpThreshold = 5 + #scaleUpIncrement = 1 + #idleTimeoutMs = 30000 + #scaleDownCheckIntervalMs = 10000 + #validateOnAcquire = false + #createRetries = 3 + + #scaleDownTimer = null + #scaleUpCheckTimer = null + #lastActivityAt = 0 + #isScaling = false + #pendingAsync = 0 + #scaleUpFailureCount = 0 + #scaleUpCooldownUntil = 0 + + #metrics = { + scaleUpEvents: 0, + scaleDownEvents: 0, + resourcesCreated: 0, + resourcesDestroyed: 0, + } + + static withDynamicSizing(config) { + const { + min, + max, + initial = min, + resourceFactory, + validateResource = null, + resourceDestroyer = null, + scaleUpThreshold = 5, + scaleUpIncrement = 1, + idleTimeoutMs = 30000, + scaleDownCheckIntervalMs = 10000, + validateOnAcquire = undefined, + createRetries = 3, + } = config + + if (!resourceFactory) { + throw new Error('resourceFactory is required for dynamic sizing') + } + + if (max < min) { + throw new Error('max size must be greater than or equal to min size') + } + + if (initial < min || initial > max) { + throw new Error('initial size must be between min and max') + } + + const resources = [] + const initialTarget = initial + for (let i = 0; i < initial; i++) { + try { + const resource = resourceFactory() + const isPromise = resource && typeof resource.then === 'function' + if (isPromise) { + continue + } + resources.push(resource) + } catch (err) { + console.error('Failed to create initial resource:', err) + } + } + + const pool = new DynamicObjectPool(resources) + pool.#minSize = min + pool.#maxSize = max + pool.#resourceFactory = resourceFactory + pool.#validateResource = validateResource + pool.#resourceDestroyer = resourceDestroyer + pool.#scaleUpThreshold = scaleUpThreshold + pool.#scaleUpIncrement = scaleUpIncrement + pool.#idleTimeoutMs = idleTimeoutMs + pool.#scaleDownCheckIntervalMs = scaleDownCheckIntervalMs + pool.#validateOnAcquire = validateOnAcquire ?? Boolean(validateResource) + pool.#createRetries = createRetries + + pool.#lastActivityAt = Date.now() + pool.#metrics.resourcesCreated = resources.length + + if (idleTimeoutMs > 0 && scaleDownCheckIntervalMs > 0) { + pool.#startScaleDownMonitoring() + } + + if (pool.pool.size() < initialTarget) { + pool.#fillToTarget(initialTarget).catch((err) => console.error('Failed to fill initial resources:', err)) + } + + return pool + } + + async #fillToTarget(targetSize) { + while (this.pool.size() < targetSize) { + const resource = await this.#createResourceWithRetry() + super.add(resource) + this.#lastActivityAt = Date.now() + } + } + + async #createResourceWithRetry() { + let lastError + for (let attempt = 0; attempt < this.#createRetries; attempt++) { + try { + const resource = await Promise.resolve(this.#resourceFactory()) + this.#metrics.resourcesCreated++ + return resource + } catch (err) { + lastError = err + if (attempt < this.#createRetries - 1) { + const delay = Math.min(200, 10 * Math.pow(2, attempt)) + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + } + throw lastError + } + + #shouldScaleUp() { + if (this.pool.size() >= this.#maxSize) return false + if (this.#isScaling) return false + if (Date.now() < this.#scaleUpCooldownUntil) return false + + const pending = this.#pendingAsync + const available = this.pool.availableCount() + return pending >= this.#scaleUpThreshold && pending > available + } + + #maybeScaleUp() { + if (this.#shouldScaleUp()) { + this.#scaleUp().catch((err) => console.error('Scale up error:', err)) + return + } + + if (this.#scaleUpCheckTimer || this.#pendingAsync === 0) return + + this.#scaleUpCheckTimer = setTimeout(() => { + this.#scaleUpCheckTimer = null + if (this.#shouldScaleUp()) { + this.#scaleUp().catch((err) => console.error('Scale up error:', err)) + } + }, 0) + + if (this.#scaleUpCheckTimer.unref) { + this.#scaleUpCheckTimer.unref() + } + } + + async #scaleUp() { + if (this.#isScaling) return + this.#isScaling = true + + try { + const currentSize = this.pool.size() + const available = this.pool.availableCount() + const pending = this.#pendingAsync + const needed = Math.max(this.#scaleUpIncrement, pending - available) + const toAdd = Math.min(needed, this.#maxSize - currentSize) + + if (toAdd <= 0) { + return + } + + for (let i = 0; i < toAdd; i++) { + try { + const resource = await this.#createResourceWithRetry() + super.add(resource) + this.#lastActivityAt = Date.now() + this.#scaleUpFailureCount = 0 + } catch (err) { + console.error('Failed to scale up:', err) + this.#scaleUpFailureCount += 1 + this.#scaleUpCooldownUntil = Date.now() + Math.min(1000, 50 * this.#scaleUpFailureCount) + break + } + } + + this.#metrics.scaleUpEvents++ + } finally { + this.#isScaling = false + } + } + + #startScaleDownMonitoring() { + const scheduleNext = () => { + const jitter = Math.floor(this.#scaleDownCheckIntervalMs * 0.1 * (Math.random() - 0.5)) + const nextIn = Math.max(10, this.#scaleDownCheckIntervalMs + jitter) + this.#scaleDownTimer = setTimeout(() => { + this.#checkAndScaleDown() + scheduleNext() + }, nextIn) + + if (this.#scaleDownTimer.unref) { + this.#scaleDownTimer.unref() + } + } + + scheduleNext() + } + + #checkAndScaleDown() { + if (this.pool.size() <= this.#minSize) return + if (this.#isScaling) return + + const now = Date.now() + const available = this.pool.availableCount() + const currentSize = this.pool.size() + + if (available === 0) return + if (now - this.#lastActivityAt <= this.#idleTimeoutMs) return + + const canRemove = Math.min(available, currentSize - this.#minSize) + + if (canRemove > 0) { + for (let i = 0; i < canRemove; i++) { + const removed = this.removeOne() + if (removed) { + this.#metrics.scaleDownEvents++ + } else { + break + } + } + } + } + + async #validateResourceIfNeeded(resource) { + if (!this.#validateResource) return true + + try { + return await Promise.resolve(this.#validateResource(resource)) + } catch (err) { + console.error('Resource validation error:', err) + return false + } + } + + acquire() { + const resource = super.acquire() + this.#lastActivityAt = Date.now() + return resource + } + + async acquireAsync(timeoutMs) { + this.#pendingAsync += 1 + this.#maybeScaleUp() + + try { + const idx = await this.pool.acquireAsync(timeoutMs) + const resource = this.resources[idx] + + this.#lastActivityAt = Date.now() + + if (this.#validateOnAcquire) { + const isValid = await this.#validateResourceIfNeeded(resource) + if (!isValid) { + this.pool.release(idx) + try { + const newResource = await this.#createResourceWithRetry() + this.resources[idx] = newResource + this.resourceToIdx.delete(resource) + this.resourceToIdx.set(newResource, idx) + + if (this.#resourceDestroyer) { + await Promise.resolve(this.#resourceDestroyer(resource)) + } + + return newResource + } catch (err) { + console.error('Failed to replace invalid resource:', err) + throw new Error('Resource validation failed and replacement failed') + } + } + } + + return resource + } finally { + this.#pendingAsync -= 1 + } + } + + release(resource) { + super.release(resource) + this.#lastActivityAt = Date.now() + } + + add(resource) { + super.add(resource) + this.#lastActivityAt = Date.now() + } + + removeOne() { + const idx = this.pool.removeOne() + if (idx === null) return false + + const resource = this.resources[idx] + this.resourceToIdx.delete(resource) + this.resources[idx] = null + + this.#lastActivityAt = Date.now() + + if (this.#resourceDestroyer) { + Promise.resolve(this.#resourceDestroyer(resource)).catch((err) => console.error('Resource destroyer error:', err)) + } + + this.#metrics.resourcesDestroyed++ + return true + } + + destroy() { + if (this.#scaleDownTimer) { + clearTimeout(this.#scaleDownTimer) + this.#scaleDownTimer = null + } + if (this.#scaleUpCheckTimer) { + clearTimeout(this.#scaleUpCheckTimer) + this.#scaleUpCheckTimer = null + } + + if (this.#resourceDestroyer) { + for (const resource of this.resources) { + if (resource !== null) { + Promise.resolve(this.#resourceDestroyer(resource)).catch((err) => + console.error('Resource destroyer error:', err), + ) + } + } + } + + super.destroy() + + this.#lastActivityAt = 0 + } + + getMetrics() { + return { + currentSize: this.pool.size(), + minSize: this.#minSize, + maxSize: this.#maxSize, + available: this.pool.availableCount(), + inUse: this.pool.size() - this.pool.availableCount(), + pending: Math.max(this.pool.pendingCount(), this.#pendingAsync), + scaleUpEvents: this.#metrics.scaleUpEvents, + scaleDownEvents: this.#metrics.scaleDownEvents, + resourcesCreated: this.#metrics.resourcesCreated, + resourcesDestroyed: this.#metrics.resourcesDestroyed, + } + } + + get minSize() { + return this.#minSize + } + + get maxSize() { + return this.#maxSize + } +} diff --git a/index.wrapper.cjs b/index.wrapper.cjs index 84b4897..3782d20 100644 --- a/index.wrapper.cjs +++ b/index.wrapper.cjs @@ -1,170 +1,25 @@ -const nativeModule = require('./index.js') - -const NativePool = nativeModule.GenericObjectPool +const { DynamicObjectPool, EnginePool, StaticObjectPool } = require('./implementations/index.cjs') /** - * Type-safe wrapper for a generic resource pool + * Wrapper facade that defaults to static implementation * @template T */ -class GenericObjectPool { - /** - * Create a new resource pool - * @param {T[]} resources - Initial resources in the pool - */ - constructor(resources) { - // 1. Store resources in a JS Array (Fast access) - this.resources = [...resources] - - // 2. Map Resource -> Index for O(1) release - this.resourceToIdx = new Map() - this.resources.forEach((r, i) => this.resourceToIdx.set(r, i)) - - // 3. Initialize Rust pool with just the COUNT - this.pool = new NativePool(resources.length) - } - - /** - * Acquire a resource from the pool synchronously - * Throws error if no resources available - * @returns {T} A resource from the pool - * @throws Error if no resources are available - */ - acquire() { - // Rust gives us the integer (Index) - const idx = this.pool.acquire() - // JS gives us the object (Array Lookup) - return this.resources[idx] - } - - /** - * Acquire a resource from the pool asynchronously with retry - * @param {number} [timeoutMs] - Optional timeout in milliseconds. If provided, will throw after timeout. - * @returns {Promise} Promise that resolves with a resource when one becomes available - * @throws Error if timeout is exceeded before acquiring a resource - */ - async acquireAsync(timeoutMs) { - const idx = await this.pool.acquireAsync(timeoutMs) - return this.resources[idx] - } - - /** - * Release a resource back to the pool - * @param {T} resource - The resource to release - */ - release(resource) { - // O(1) Lookup - const idx = this.resourceToIdx.get(resource) - - if (idx === undefined) { - throw new Error('Resource not belonging to pool') - } - - this.pool.release(idx) +class GenericObjectPool extends StaticObjectPool { + static withDynamicSizing(config) { + return DynamicObjectPool.withDynamicSizing(config) } - /** - * Add a new resource to the pool - * @param {T} resource - The resource to add - */ - add(resource) { - const newIdx = this.resources.length - this.resources.push(resource) - this.resourceToIdx.set(resource, newIdx) - this.pool.add(newIdx) + static dynamic(config) { + return DynamicObjectPool.withDynamicSizing(config) } - /** - * Remove one available resource from the pool - * @returns {boolean} true if a resource was removed, false if all are currently in use - */ - removeOne() { - const idx = this.pool.removeOne() - if (idx === null) return false - - // We mark the slot as empty or null, but we don't resize the array - // to keep indices stable. - // Ideally, for a dynamic pool, you'd want a free-list in JS, - // but for this implementation, we just remove the mapping. - const resource = this.resources[idx] - this.resourceToIdx.delete(resource) - this.resources[idx] = null - return true - } - - /** - * Use a resource from the pool with automatic release - * @template R - * @param {(resource: T) => Promise} fn - Function to execute with the resource - * @param {{optimistic?: boolean, timeout?: number}} [options] - Configuration options for acquisition - * @returns {Promise} - */ - async use(fn, { optimistic = true, timeout } = {}) { - let resource - if (optimistic) { - try { - const idx = this.pool.acquire() - resource = this.resources[idx] - } catch {} - } - - if (!resource) { - const idx = await this.pool.acquireAsync(timeout) - resource = this.resources[idx] - } - - try { - return await fn(resource) - } finally { - // Manual inline release for performance - const idx = this.resourceToIdx.get(resource) - if (idx !== undefined) { - this.pool.release(idx) - } - } - } - - /** - * Get the number of available resources in the pool - * @returns {number} Number of available resources - */ - availableCount() { - return this.pool.availableCount() - } - /** - * Get the total number of resources managed by the pool - * @returns {number} - */ - get size() { - return this.pool.size() - } - get pendingCount() { - return this.pool.pendingCount() - } - /** - * Get the number of available resources - * @returns {number} - */ - get available() { - return this.pool.availableCount() - } - /** - * Get the number of used resources - * @returns {number} - */ - get numUsed() { - return this.pool.size() - this.pool.availableCount() + static static(resources) { + return new StaticObjectPool(resources) } - /** - * Destroy the pool and stop accepting new acquires - */ - destroy() { - // Destroy Rust pool (closes semaphore) - this.pool.destroy() - // Clear JS references - this.resources = [] - this.resourceToIdx.clear() + static engine(size) { + return new EnginePool(size) } } -module.exports = { GenericObjectPool } +module.exports = { GenericObjectPool, StaticObjectPool, DynamicObjectPool, EnginePool } diff --git a/index.wrapper.d.ts b/index.wrapper.d.ts index b1d9aae..9e2666e 100644 --- a/index.wrapper.d.ts +++ b/index.wrapper.d.ts @@ -1,7 +1,83 @@ +/** + * Configuration for dynamic pool sizing + */ +export interface DynamicSizingConfig { + /** Minimum pool size */ + min: number + /** Maximum pool size */ + max: number + /** Initial pool size (defaults to min) */ + initial?: number + /** Function to create new resources */ + resourceFactory: () => T | Promise + /** Optional function to validate resources (return true if valid) */ + validateResource?: (resource: T) => boolean | Promise + /** Optional function to cleanup/destroy resources */ + resourceDestroyer?: (resource: T) => void | Promise + /** Number of pending requests before scaling up (default: 5) */ + scaleUpThreshold?: number + /** Number of resources to add when scaling up (default: 1) */ + scaleUpIncrement?: number + /** Time in ms before idle resources can be removed (default: 30000) */ + idleTimeoutMs?: number + /** Interval in ms to check for idle resources (default: 10000) */ + scaleDownCheckIntervalMs?: number + /** Whether to validate resources on acquire (default: false) */ + validateOnAcquire?: boolean + /** Number of retries when creating resources fails (default: 3) */ + createRetries?: number +} + +/** + * Pool metrics for monitoring + */ +export interface PoolMetrics { + /** Current total size of the pool */ + currentSize: number + /** Minimum size (for dynamic pools) */ + minSize: number + /** Maximum size (for dynamic pools) */ + maxSize: number + /** Number of available resources */ + available: number + /** Number of resources currently in use */ + inUse: number + /** Number of pending acquisition requests */ + pending: number + /** Number of scale-up events */ + scaleUpEvents: number + /** Number of scale-down events */ + scaleDownEvents: number + /** Total resources created */ + resourcesCreated: number + /** Total resources destroyed */ + resourcesDestroyed: number +} + /** * Type-safe wrapper for a generic resource pool */ -export declare class GenericObjectPool { +export declare class GenericObjectPool extends StaticObjectPool { + /** + * Create a pool with dynamic sizing capabilities + */ + static withDynamicSizing(config: DynamicSizingConfig): DynamicObjectPool + + /** + * Create a pool with dynamic sizing capabilities + */ + static dynamic(config: DynamicSizingConfig): DynamicObjectPool + + /** + * Create a pool using the static implementation + */ + static static(resources: T[]): StaticObjectPool + + /** + * Create an index-only pool implementation + */ + static engine(size: number): EnginePool + /** * Create a new resource pool * @param resources - Initial resources in the pool @@ -64,27 +140,53 @@ export declare class GenericObjectPool { ): Promise /** - * Get the number of available resources - */ - readonly available: number - - /** - * Get the total number of resources managed by the pool + * Get pool metrics (for dynamic pools) */ - readonly size: number + getMetrics(): PoolMetrics +} - /** - * Get the number of pending acquire requests - */ - readonly pendingCount: number +export declare class StaticObjectPool { + constructor(resources: T[]) + acquire(): T + acquireAsync(timeoutMs?: number): Promise + release(resource: T): void + add(resource: T): void + removeOne(): boolean + availableCount(): number + getMetrics(): PoolMetrics + use(fn: (resource: T) => Promise, options?: { optimistic?: boolean; timeout?: number }): Promise + get size(): number + get pendingCount(): number + get available(): number + get numUsed(): number + destroy(): void +} - /** - * Get the number of used resources - */ - readonly numUsed: number +export declare class DynamicObjectPool extends StaticObjectPool { + static withDynamicSizing(config: DynamicSizingConfig): DynamicObjectPool + acquireAsync(timeoutMs?: number): Promise + release(resource: T): void + add(resource: T): void + removeOne(): boolean + destroy(): void + getMetrics(): PoolMetrics + get minSize(): number + get maxSize(): number +} - /** - * Destroy the pool and stop accepting new acquires - */ +export declare class EnginePool { + constructor(size: number) + acquire(): number + acquireAsync(timeoutMs?: number): Promise + release(idx: number): void + add(idx: number): void + removeOne(): number | null + use(fn: (idx: number) => Promise, options?: { optimistic?: boolean; timeout?: number }): Promise + availableCount(): number + getMetrics(): PoolMetrics + get size(): number + get pendingCount(): number + get available(): number + get numUsed(): number destroy(): void } diff --git a/index.wrapper.mjs b/index.wrapper.mjs index f1f6c6f..9ace308 100644 --- a/index.wrapper.mjs +++ b/index.wrapper.mjs @@ -1,168 +1,25 @@ -import nativeModule from './index.js' - -const NativePool = nativeModule.GenericObjectPool +import { DynamicObjectPool, EnginePool, StaticObjectPool } from './implementations/index.mjs' /** - * Type-safe wrapper for a generic resource pool + * Wrapper facade that defaults to static implementation * @template T */ -export class GenericObjectPool { - /** - * Create a new resource pool - * @param {T[]} resources - Initial resources in the pool - */ - constructor(resources) { - // 1. Store resources in a JS Array (Fast access) - this.resources = [...resources] - - // 2. Map Resource -> Index for O(1) release - this.resourceToIdx = new Map() - this.resources.forEach((r, i) => this.resourceToIdx.set(r, i)) - - // 3. Initialize Rust pool with just the COUNT - this.pool = new NativePool(resources.length) - } - - /** - * Acquire a resource from the pool synchronously - * Throws error if no resources available - * @returns {T} A resource from the pool - * @throws Error if no resources are available - */ - acquire() { - // Rust gives us the integer (Index) - const idx = this.pool.acquire() - // JS gives us the object (Array Lookup) - return this.resources[idx] - } - - /** - * Acquire a resource from the pool asynchronously with retry - * @param {number} [timeoutMs] - Optional timeout in milliseconds. If provided, will throw after timeout. - * @returns {Promise} Promise that resolves with a resource when one becomes available - * @throws Error if timeout is exceeded before acquiring a resource - */ - async acquireAsync(timeoutMs) { - const idx = await this.pool.acquireAsync(timeoutMs) - return this.resources[idx] +export class GenericObjectPool extends StaticObjectPool { + static withDynamicSizing(config) { + return DynamicObjectPool.withDynamicSizing(config) } - /** - * Release a resource back to the pool - * @param {T} resource - The resource to release - */ - release(resource) { - // O(1) Lookup - const idx = this.resourceToIdx.get(resource) - - if (idx === undefined) { - throw new Error('Resource not belonging to pool') - } - - this.pool.release(idx) + static dynamic(config) { + return DynamicObjectPool.withDynamicSizing(config) } - /** - * Add a new resource to the pool - * @param {T} resource - The resource to add - */ - add(resource) { - const newIdx = this.resources.length - this.resources.push(resource) - this.resourceToIdx.set(resource, newIdx) - this.pool.add(newIdx) + static static(resources) { + return new StaticObjectPool(resources) } - /** - * Remove one available resource from the pool - * @returns {boolean} true if a resource was removed, false if all are currently in use - */ - removeOne() { - const idx = this.pool.removeOne() - if (idx === null) return false - - // We mark the slot as empty or null, but we don't resize the array - // to keep indices stable. - // Ideally, for a dynamic pool, you'd want a free-list in JS, - // but for this implementation, we just remove the mapping. - const resource = this.resources[idx] - this.resourceToIdx.delete(resource) - this.resources[idx] = null - return true - } - - /** - * Use a resource from the pool with automatic release - * @template R - * @param {(resource: T) => Promise} fn - Function to execute with the resource - * @param {{optimistic?: boolean, timeout?: number}} [options] - Configuration options for acquisition - * @returns {Promise} - */ - async use(fn, { optimistic = true, timeout } = {}) { - let resource - if (optimistic) { - try { - const idx = this.pool.acquire() - resource = this.resources[idx] - } catch {} - } - - if (!resource) { - const idx = await this.pool.acquireAsync(timeout) - resource = this.resources[idx] - } - - try { - return await fn(resource) - } finally { - // Manual inline release for performance - const idx = this.resourceToIdx.get(resource) - if (idx !== undefined) { - this.pool.release(idx) - } - } - } - - /** - * Get the number of available resources in the pool - * @returns {number} Number of available resources - */ - availableCount() { - return this.pool.availableCount() - } - /** - * Get the total number of resources managed by the pool - * @returns {number} - */ - get size() { - return this.pool.size() - } - get pendingCount() { - return this.pool.pendingCount() - } - /** - * Get the number of available resources - * @returns {number} - */ - get available() { - return this.pool.availableCount() - } - /** - * Get the number of used resources - * @returns {number} - */ - get numUsed() { - return this.pool.size() - this.pool.availableCount() - } - - /** - * Destroy the pool and stop accepting new acquires - */ - destroy() { - // Destroy Rust pool (closes semaphore) - this.pool.destroy() - // Clear JS references - this.resources = [] - this.resourceToIdx.clear() + static engine(size) { + return new EnginePool(size) } } + +export { StaticObjectPool, DynamicObjectPool, EnginePool } diff --git a/package.json b/package.json index f9408d7..9d85be1 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,10 @@ "index.wrapper.mjs", "index.wrapper.cjs", "index.wrapper.d.ts", - "browser.js" + "browser.js", + "implementations/index.mjs", + "implementations/index.cjs", + "implementations/index.d.ts" ], "exports": { ".": { @@ -36,6 +39,16 @@ "types": "./index.wrapper.d.ts", "default": "./index.wrapper.cjs" } + }, + "./implementations": { + "import": { + "types": "./implementations/index.d.ts", + "default": "./implementations/index.mjs" + }, + "require": { + "types": "./implementations/index.d.ts", + "default": "./implementations/index.cjs" + } } }, "napi": { diff --git a/resource_pool.wasi.cjs b/resource_pool.wasi.cjs index 3d8e27e..cc211c0 100644 --- a/resource_pool.wasi.cjs +++ b/resource_pool.wasi.cjs @@ -21,7 +21,7 @@ const __wasi = new __nodeWASI({ env: process.env, preopens: { [__rootDir]: __rootDir, - } + }, }) const __emnapiContext = __emnapiGetDefaultContext() @@ -41,13 +41,19 @@ if (__nodeFs.existsSync(__wasmDebugFilePath)) { try { __wasmFilePath = require.resolve('@lojhan/resource-pool-wasm32-wasi/resource_pool.wasm32-wasi.wasm') } catch { - throw new Error('Cannot find resource_pool.wasm32-wasi.wasm file, and @lojhan/resource-pool-wasm32-wasi package is not installed.') + throw new Error( + 'Cannot find resource_pool.wasm32-wasi.wasm file, and @lojhan/resource-pool-wasm32-wasi package is not installed.', + ) } } -const { instance: __napiInstance, module: __wasiModule, napiModule: __napiModule } = __emnapiInstantiateNapiModuleSync(__nodeFs.readFileSync(__wasmFilePath), { +const { + instance: __napiInstance, + module: __wasiModule, + napiModule: __napiModule, +} = __emnapiInstantiateNapiModuleSync(__nodeFs.readFileSync(__wasmFilePath), { context: __emnapiContext, - asyncWorkPoolSize: (function() { + asyncWorkPoolSize: (function () { const threadsSizeFromEnv = Number(process.env.NAPI_RS_ASYNC_WORK_POOL_SIZE ?? process.env.UV_THREADPOOL_SIZE) // NaN > 0 is false if (threadsSizeFromEnv > 0) { @@ -72,21 +78,17 @@ const { instance: __napiInstance, module: __wasiModule, napiModule: __napiModule // According to https://github.com/nodejs/node/blob/19e0d472728c79d418b74bddff588bea70a403d0/lib/internal/worker.js#L415, // a worker is consist of two handles: kPublicPort and kHandle. { - const kPublicPort = Object.getOwnPropertySymbols(worker).find(s => - s.toString().includes("kPublicPort") - ); + const kPublicPort = Object.getOwnPropertySymbols(worker).find((s) => s.toString().includes('kPublicPort')) if (kPublicPort) { - worker[kPublicPort].ref = () => {}; + worker[kPublicPort].ref = () => {} } - const kHandle = Object.getOwnPropertySymbols(worker).find(s => - s.toString().includes("kHandle") - ); + const kHandle = Object.getOwnPropertySymbols(worker).find((s) => s.toString().includes('kHandle')) if (kHandle) { - worker[kHandle].ref = () => {}; + worker[kHandle].ref = () => {} } - worker.unref(); + worker.unref() } return worker }, diff --git a/src/lib.rs b/src/lib.rs index 966c370..a209321 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,26 +2,23 @@ mod pool; -use crate::pool::{CorePool, PoolError}; use napi::bindgen_prelude::*; use napi_derive::napi; +use pool::{CorePool, PoolError}; #[napi] pub struct GenericObjectPool { - // We only manage indices (u32), not objects. - inner: CorePool, + inner: CorePool, } #[napi] impl GenericObjectPool { #[napi(constructor)] pub fn new(size: u32) -> Self { - // Initialize the queue with indices [0, 1, 2, ... size-1] let mut indices = Vec::with_capacity(size as usize); for i in 0..size { indices.push(i); } - GenericObjectPool { inner: CorePool::new(indices), } @@ -29,7 +26,6 @@ impl GenericObjectPool { #[napi] pub fn acquire(&self) -> Result { - // Pure integer logic. No object creation. match self.inner.try_acquire() { Some(idx) => Ok(idx), None => Err(Error::from_reason("No resources available")), @@ -38,7 +34,6 @@ impl GenericObjectPool { #[napi] pub async fn acquire_async(&self, timeout_ms: Option) -> Result { - // Async wait handled by Tokio, returns an integer. self .inner .acquire_async(timeout_ms.map(|t| t as u64)) @@ -48,8 +43,7 @@ impl GenericObjectPool { "Failed to acquire resource within {:?}ms timeout", timeout_ms.unwrap_or(0) )), - PoolError::Empty => Error::from_reason("Pool empty"), - _ => Error::from_reason(e.to_string()), + PoolError::Closed => Error::from_reason("Pool closed"), }) } @@ -58,8 +52,6 @@ impl GenericObjectPool { self.inner.release(idx); } - // --- Management --- - #[napi] pub fn add(&self, idx: u32) { self.inner.add(idx); @@ -87,6 +79,6 @@ impl GenericObjectPool { #[napi] pub fn destroy(&self) { - self.inner.drain(); + self.inner.close(); } } diff --git a/src/pool.rs b/src/pool.rs index d76a6be..03b3743 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -1,4 +1,4 @@ -use crossbeam_queue::SegQueue; +use parking_lot::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -9,7 +9,6 @@ use tokio::time::timeout; pub enum PoolError { Timeout, Closed, - Empty, } impl std::fmt::Display for PoolError { @@ -17,118 +16,128 @@ impl std::fmt::Display for PoolError { match self { PoolError::Timeout => write!(f, "Timeout acquiring resource"), PoolError::Closed => write!(f, "Pool closed"), - PoolError::Empty => write!(f, "Pool empty"), } } } impl std::error::Error for PoolError {} -/// A high-performance, lock-free pool core. -/// It uses a Hybrid Architecture: -/// - `SegQueue` (Lock-Free) for storing the resources (indices). -/// - `Semaphore` (Async-Aware) for managing the wait queue and permits. -pub struct CorePool { - pool: Arc>, +/// A simple, high-performance pool. +/// +/// It relies on `tokio::Semaphore` to handle the "Async Wait" logic +/// and a `Mutex>` for storage. +/// +/// The Logic: +/// 1. The Semaphore holds the *count* of available resources. +/// 2. The Mutex holds the *actual* resources. +/// 3. We ONLY touch the Mutex if we have successfully acquired a permit from the Semaphore. +/// This guarantees the Mutex never blocks for long and the Vec is never empty when we look. +#[derive(Clone)] +pub struct CorePool { + // Use a standard Mutex. It is faster than SegQueue for simple integer operations + // because it doesn't allocate nodes on the heap. + resources: Arc>>, semaphore: Arc, size: Arc, pending: Arc, } -impl Clone for CorePool { - fn clone(&self) -> Self { - Self { - pool: self.pool.clone(), - semaphore: self.semaphore.clone(), - size: self.size.clone(), - pending: self.pending.clone(), - } - } -} - -impl CorePool { - pub fn new(items: Vec) -> Self { +impl CorePool { + pub fn new(items: Vec) -> Self { let count = items.len(); - let queue = SegQueue::new(); - for item in items { - queue.push(item); - } Self { - pool: Arc::new(queue), + resources: Arc::new(Mutex::new(items)), semaphore: Arc::new(Semaphore::new(count)), size: Arc::new(AtomicUsize::new(count)), pending: Arc::new(AtomicUsize::new(0)), } } - /// Synchronously attempts to acquire a resource. - /// If successful, the permit is "forgotten" (leaked) to the caller - /// and must be restored via `release()`. - pub fn try_acquire(&self) -> Option { + #[inline] + pub fn try_acquire(&self) -> Option { + // 1. Try to get a permit. If this fails, the pool is empty/closed. let permit = self.semaphore.try_acquire().ok()?; + + // 2. We have a permit, so the Mutex MUST contain at least one item. + // We explicitly forget the permit because the caller now "owns" the slot. permit.forget(); - self.pool.pop() + + // 3. Pop the index. + let mut lock = self.resources.lock(); + lock.pop() } - /// Asynchronously acquires a resource. - /// Handles the "wait queue" using Tokio's semaphore. - pub async fn acquire_async(&self, timeout_ms: Option) -> Result { - // Optimization: Fast path try_acquire before awaiting + pub async fn acquire_async(&self, timeout_ms: Option) -> Result { + // Fast path: try to acquire without registering a waker if let Ok(permit) = self.semaphore.try_acquire() { permit.forget(); - if let Some(item) = self.pool.pop() { - return Ok(item); - } else { - // Fallback (should be rare/impossible): return permit and wait properly - self.semaphore.add_permits(1); - } + let mut lock = self.resources.lock(); + return Ok( + lock + .pop() + .expect("Pool desync: Permit acquired but queue empty"), + ); } self.pending.fetch_add(1, Ordering::Relaxed); // The Async Wait + let acquire_future = self.semaphore.acquire(); + let permit_result = if let Some(ms) = timeout_ms { - timeout(Duration::from_millis(ms), self.semaphore.acquire()).await + timeout(Duration::from_millis(ms), acquire_future).await } else { - Ok(self.semaphore.acquire().await) + Ok(acquire_future.await) }; self.pending.fetch_sub(1, Ordering::Relaxed); - let permit = match permit_result { - Ok(Ok(p)) => p, - Ok(Err(_)) => return Err(PoolError::Closed), // Semaphore closed - Err(_) => return Err(PoolError::Timeout), // Timeout - }; - - // "Forget" the permit so it persists while the user holds the resource. - permit.forget(); - - self.pool.pop().ok_or(PoolError::Empty) + match permit_result { + Ok(Ok(permit)) => { + permit.forget(); + let mut lock = self.resources.lock(); + // Safe expect: Semaphore guarantees count > 0 + Ok( + lock + .pop() + .expect("Pool desync: Permit acquired but queue empty"), + ) + } + Ok(Err(_)) => Err(PoolError::Closed), // Semaphore closed + Err(_) => Err(PoolError::Timeout), // Timeout + } } - /// Returns a resource to the pool and restores the semaphore permit. - /// This wakes up the next waiter in the queue. - pub fn release(&self, item: T) { - self.pool.push(item); + pub fn release(&self, idx: u32) { + { + let mut lock = self.resources.lock(); + lock.push(idx); + } + // Restore the permit AFTER pushing the resource to avoid race conditions + // where a waiter wakes up before the data is in the Vec. self.semaphore.add_permits(1); } - pub fn add(&self, item: T) { - self.pool.push(item); - self.semaphore.add_permits(1); + pub fn add(&self, idx: u32) { + { + let mut lock = self.resources.lock(); + lock.push(idx); + } self.size.fetch_add(1, Ordering::Relaxed); + self.semaphore.add_permits(1); } - pub fn remove_one(&self) -> Option { - // We must acquire a permit to remove an item to ensure we don't - // remove an item that is currently reserved for a waiter. + pub fn remove_one(&self) -> Option { + // To remove, we must consume a permit permanently if let Ok(permit) = self.semaphore.try_acquire() { - permit.forget(); - if let Some(item) = self.pool.pop() { + permit.forget(); // We consume the permit + + let mut lock = self.resources.lock(); + let item = lock.pop(); + if item.is_some() { self.size.fetch_sub(1, Ordering::Relaxed); - return Some(item); } + return item; } None } @@ -145,16 +154,13 @@ impl CorePool { self.pending.load(Ordering::Relaxed) } - pub fn drain(&self) -> Vec { + pub fn close(&self) { self.semaphore.close(); - let mut items = Vec::new(); - while let Some(item) = self.pool.pop() { - items.push(item); - } - let dropped = items.len(); + let mut lock = self.resources.lock(); + let dropped = lock.len(); + lock.clear(); if dropped > 0 { self.size.fetch_sub(dropped, Ordering::Relaxed); } - items } }