diff --git a/src/lime/utils/ThreadSafeObjectPool.hx b/src/lime/utils/ThreadSafeObjectPool.hx new file mode 100644 index 0000000000..7f248b4bdd --- /dev/null +++ b/src/lime/utils/ThreadSafeObjectPool.hx @@ -0,0 +1,95 @@ +package lime.utils; + +#if target.threaded +import sys.thread.Mutex; + +/** + A thread-safe variant of `ObjectPool`. Wraps every mutating method + (`add`, `clear`, `get`, `release`, `remove`, `size` setter) with an + internal `sys.thread.Mutex` so concurrent access from multiple threads + cannot drift the `activeObjects` / `inactiveObjects` counters or hand + out the same instance twice. + + On threaded targets (`#if target.threaded`) the class extends + `ObjectPool` and routes mutating calls through the mutex. On + non-threaded targets (`js`, `flash`) this name resolves to a `typedef` + alias of `ObjectPool` — same compile-time API, zero runtime cost, and + no thread-safety code in the output. + + Caveats: + + - The supplied `create` and `clean` callbacks are invoked while the + internal mutex is held. They must not call back into the same pool, + or `Mutex.acquire()` will deadlock on non-reentrant platforms. + - Reading the `activeObjects`, `inactiveObjects`, or `size` properties + from a different thread than the one currently mutating the pool is + not memory-safe. Use external synchronization if a consistent + cross-thread snapshot is required. +**/ +#if !lime_debug +@:fileXml('tags="haxe,release"') +@:noDebug +#end +#if !js @:generic #end class ThreadSafeObjectPool extends ObjectPool +{ + @:noCompletion private final __mutex:Mutex = new Mutex(); + + /** + Creates a new ThreadSafeObjectPool instance. + + @param create A function that creates a new instance of type T. + @param clean A function that cleans up an instance of type T before it is reused. + @param size The maximum size of the object pool. + **/ + public function new(create:Void->T = null, clean:T->Void = null, size:Null = null) + { + super(create, clean, size); + } + + override public function add(object:T):Void + { + __mutex.acquire(); + super.add(object); + __mutex.release(); + } + + override public function clear():Void + { + __mutex.acquire(); + super.clear(); + __mutex.release(); + } + + override public function get():T + { + __mutex.acquire(); + final object:T = super.get(); + __mutex.release(); + return object; + } + + override public function release(object:T):Void + { + __mutex.acquire(); + super.release(object); + __mutex.release(); + } + + override public function remove(object:T):Void + { + __mutex.acquire(); + super.remove(object); + __mutex.release(); + } + + @:noCompletion override private function set_size(value:Null):Null + { + __mutex.acquire(); + final result:Null = super.set_size(value); + __mutex.release(); + return result; + } +} +#else +typedef ThreadSafeObjectPool = ObjectPool; +#end diff --git a/tests/unit/src/TestMain.hx b/tests/unit/src/TestMain.hx index 6978eb8d6c..b06d8136fd 100644 --- a/tests/unit/src/TestMain.hx +++ b/tests/unit/src/TestMain.hx @@ -12,6 +12,10 @@ class TestMain extends Application { runner.addCase(new utils.UInt16ArrayTest()); runner.addCase(new utils.UInt32ArrayTest()); runner.addCase(new utils.DataViewTest()); + runner.addCase(new utils.ObjectPoolTest()); + #if target.threaded + runner.addCase(new utils.ThreadSafeObjectPoolTest()); + #end Report.create(runner); runner.run(); } diff --git a/tests/unit/src/utils/ObjectPoolTest.hx b/tests/unit/src/utils/ObjectPoolTest.hx new file mode 100644 index 0000000000..35693635c2 --- /dev/null +++ b/tests/unit/src/utils/ObjectPoolTest.hx @@ -0,0 +1,84 @@ +package utils; + +import lime.utils.ObjectPool; +import utest.Assert; +import utest.Test; + +private class Item { + public function new() {} +} + +class ObjectPoolTest extends Test { + public function new() { + super(); + } + + public function testGetReturnsFromCreate():Void { + var pool = new ObjectPool(function() return new Item()); + var a = pool.get(); + Assert.notNull(a); + Assert.equals(1, pool.activeObjects); + Assert.equals(0, pool.inactiveObjects); + } + + public function testReleaseReturnsObjectToPool():Void { + var pool = new ObjectPool(function() return new Item()); + var a = pool.get(); + pool.release(a); + Assert.equals(0, pool.activeObjects); + Assert.equals(1, pool.inactiveObjects); + } + + public function testGetReusesReleasedObject():Void { + var pool = new ObjectPool(function() return new Item()); + var a = pool.get(); + pool.release(a); + var b = pool.get(); + Assert.equals(a, b); + Assert.equals(1, pool.activeObjects); + Assert.equals(0, pool.inactiveObjects); + } + + public function testSizeCapLimitsCreation():Void { + var pool = new ObjectPool(function() return new Item(), null, 2); + var a = pool.get(); + var b = pool.get(); + var c = pool.get(); + Assert.notNull(a); + Assert.notNull(b); + Assert.isNull(c); + Assert.equals(2, pool.activeObjects); + } + + public function testCleanIsCalledOnRelease():Void { + var cleaned = 0; + var pool = new ObjectPool(function() return new Item(), function(_) cleaned++); + var a = pool.get(); + pool.release(a); + Assert.equals(1, cleaned); + } + + public function testClearResetsCounters():Void { + var pool = new ObjectPool(function() return new Item()); + pool.get(); + pool.release(pool.get()); + pool.clear(); + Assert.equals(0, pool.activeObjects); + Assert.equals(0, pool.inactiveObjects); + } + + public function testRemoveDecrementsCorrectCounter():Void { + var pool = new ObjectPool(function() return new Item()); + var a = pool.get(); + pool.remove(a); + Assert.equals(0, pool.activeObjects); + Assert.equals(0, pool.inactiveObjects); + } + + public function testSetSizePrefillsInactive():Void { + var pool = new ObjectPool(function() return new Item()); + pool.size = 3; + Assert.equals(0, pool.activeObjects); + Assert.equals(3, pool.inactiveObjects); + } +} diff --git a/tests/unit/src/utils/ThreadSafeObjectPoolTest.hx b/tests/unit/src/utils/ThreadSafeObjectPoolTest.hx new file mode 100644 index 0000000000..31299372c4 --- /dev/null +++ b/tests/unit/src/utils/ThreadSafeObjectPoolTest.hx @@ -0,0 +1,60 @@ +package utils; + +#if target.threaded +import lime.utils.ThreadSafeObjectPool; +import sys.thread.Mutex; +import sys.thread.Thread; +import utest.Assert; +import utest.Test; + +private class Item { + public var inUse:Bool = false; + public function new() {} +} + +class ThreadSafeObjectPoolTest extends Test { + public function new() { + super(); + } + + public function testConcurrentGetReleaseDoesNotDriftCounters():Void { + final THREADS = 8; + final ITERS = 1000; + final POOL_SIZE = 4; + final TIMEOUT_SEC = 5.0; + var pool = new ThreadSafeObjectPool(function() return new Item(), null, POOL_SIZE); + var doneMutex = new Mutex(); + var done = 0; + var duplicates = 0; + for (i in 0...THREADS) { + Thread.create(function() { + for (j in 0...ITERS) { + var a = pool.get(); + if (a == null) continue; + if (a.inUse) duplicates++; + a.inUse = true; + a.inUse = false; + pool.release(a); + } + doneMutex.acquire(); + done++; + doneMutex.release(); + }); + } + var deadline = haxe.Timer.stamp() + TIMEOUT_SEC; + var timedOut = false; + while (true) { + doneMutex.acquire(); + var d = done; + doneMutex.release(); + if (d >= THREADS) break; + if (haxe.Timer.stamp() > deadline) { timedOut = true; break; } + Sys.sleep(0.01); + } + Assert.isFalse(timedOut, "worker threads did not finish within " + TIMEOUT_SEC + "s — likely pool state corruption"); + Assert.equals(0, duplicates); + Assert.equals(0, pool.activeObjects); + Assert.equals(POOL_SIZE, pool.inactiveObjects); + } +} +#end