Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions src/lime/utils/ThreadSafeObjectPool.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package lime.utils;

#if target.threaded
import sys.thread.Mutex;

/**
A thread-safe variant of `ObjectPool`. Wraps every mutating method
(`add`, `clear`, `get`, `release`, `remove`, `size` setter) with an
internal `sys.thread.Mutex` so concurrent access from multiple threads
cannot drift the `activeObjects` / `inactiveObjects` counters or hand
out the same instance twice.

On threaded targets (`#if target.threaded`) the class extends
`ObjectPool` and routes mutating calls through the mutex. On
non-threaded targets (`js`, `flash`) this name resolves to a `typedef`
alias of `ObjectPool` — same compile-time API, zero runtime cost, and
no thread-safety code in the output.

Caveats:

- The supplied `create` and `clean` callbacks are invoked while the
internal mutex is held. They must not call back into the same pool,
or `Mutex.acquire()` will deadlock on non-reentrant platforms.
- Reading the `activeObjects`, `inactiveObjects`, or `size` properties
from a different thread than the one currently mutating the pool is
not memory-safe. Use external synchronization if a consistent
cross-thread snapshot is required.
**/
#if !lime_debug
@:fileXml('tags="haxe,release"')
@:noDebug
#end
#if !js @:generic #end class ThreadSafeObjectPool<T> extends ObjectPool<T>
{
@:noCompletion private final __mutex:Mutex = new Mutex();

/**
Creates a new ThreadSafeObjectPool instance.

@param create A function that creates a new instance of type T.
@param clean A function that cleans up an instance of type T before it is reused.
@param size The maximum size of the object pool.
**/
public function new(create:Void->T = null, clean:T->Void = null, size:Null<Int> = null)
{
super(create, clean, size);
}

override public function add(object:T):Void
{
__mutex.acquire();
super.add(object);
__mutex.release();
}

override public function clear():Void
{
__mutex.acquire();
super.clear();
__mutex.release();
}

override public function get():T
{
__mutex.acquire();
final object:T = super.get();
__mutex.release();
return object;
}

override public function release(object:T):Void
{
__mutex.acquire();
super.release(object);
__mutex.release();
}

override public function remove(object:T):Void
{
__mutex.acquire();
super.remove(object);
__mutex.release();
}

@:noCompletion override private function set_size(value:Null<Int>):Null<Int>
{
__mutex.acquire();
final result:Null<Int> = super.set_size(value);
__mutex.release();
return result;
}
}
#else
typedef ThreadSafeObjectPool<T> = ObjectPool<T>;
#end
4 changes: 4 additions & 0 deletions tests/unit/src/TestMain.hx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class TestMain extends Application {
runner.addCase(new utils.UInt16ArrayTest());
runner.addCase(new utils.UInt32ArrayTest());
runner.addCase(new utils.DataViewTest());
runner.addCase(new utils.ObjectPoolTest());
#if target.threaded
runner.addCase(new utils.ThreadSafeObjectPoolTest());
#end
Report.create(runner);
runner.run();
}
Expand Down
84 changes: 84 additions & 0 deletions tests/unit/src/utils/ObjectPoolTest.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package utils;

import lime.utils.ObjectPool;
import utest.Assert;
import utest.Test;

private class Item {
public function new() {}
}

class ObjectPoolTest extends Test {
public function new() {
super();
}

public function testGetReturnsFromCreate():Void {
var pool = new ObjectPool<Item>(function() return new Item());
var a = pool.get();
Assert.notNull(a);
Assert.equals(1, pool.activeObjects);
Assert.equals(0, pool.inactiveObjects);
}

public function testReleaseReturnsObjectToPool():Void {
var pool = new ObjectPool<Item>(function() return new Item());
var a = pool.get();
pool.release(a);
Assert.equals(0, pool.activeObjects);
Assert.equals(1, pool.inactiveObjects);
}

public function testGetReusesReleasedObject():Void {
var pool = new ObjectPool<Item>(function() return new Item());
var a = pool.get();
pool.release(a);
var b = pool.get();
Assert.equals(a, b);
Assert.equals(1, pool.activeObjects);
Assert.equals(0, pool.inactiveObjects);
}

public function testSizeCapLimitsCreation():Void {
var pool = new ObjectPool<Item>(function() return new Item(), null, 2);
var a = pool.get();
var b = pool.get();
var c = pool.get();
Assert.notNull(a);
Assert.notNull(b);
Assert.isNull(c);
Assert.equals(2, pool.activeObjects);
}

public function testCleanIsCalledOnRelease():Void {
var cleaned = 0;
var pool = new ObjectPool<Item>(function() return new Item(), function(_) cleaned++);
var a = pool.get();
pool.release(a);
Assert.equals(1, cleaned);
}

public function testClearResetsCounters():Void {
var pool = new ObjectPool<Item>(function() return new Item());
pool.get();
pool.release(pool.get());
pool.clear();
Assert.equals(0, pool.activeObjects);
Assert.equals(0, pool.inactiveObjects);
}

public function testRemoveDecrementsCorrectCounter():Void {
var pool = new ObjectPool<Item>(function() return new Item());
var a = pool.get();
pool.remove(a);
Assert.equals(0, pool.activeObjects);
Assert.equals(0, pool.inactiveObjects);
}

public function testSetSizePrefillsInactive():Void {
var pool = new ObjectPool<Item>(function() return new Item());
pool.size = 3;
Assert.equals(0, pool.activeObjects);
Assert.equals(3, pool.inactiveObjects);
}
}
60 changes: 60 additions & 0 deletions tests/unit/src/utils/ThreadSafeObjectPoolTest.hx
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package utils;

#if target.threaded
import lime.utils.ThreadSafeObjectPool;
import sys.thread.Mutex;
import sys.thread.Thread;
import utest.Assert;
import utest.Test;

private class Item {
public var inUse:Bool = false;
public function new() {}
}

class ThreadSafeObjectPoolTest extends Test {
public function new() {
super();
}

public function testConcurrentGetReleaseDoesNotDriftCounters():Void {
final THREADS = 8;
final ITERS = 1000;
final POOL_SIZE = 4;
final TIMEOUT_SEC = 5.0;
var pool = new ThreadSafeObjectPool<Item>(function() return new Item(), null, POOL_SIZE);
var doneMutex = new Mutex();
var done = 0;
var duplicates = 0;
for (i in 0...THREADS) {
Thread.create(function() {
for (j in 0...ITERS) {
var a = pool.get();
if (a == null) continue;
if (a.inUse) duplicates++;
a.inUse = true;
a.inUse = false;
pool.release(a);
}
doneMutex.acquire();
done++;
doneMutex.release();
});
}
var deadline = haxe.Timer.stamp() + TIMEOUT_SEC;
var timedOut = false;
while (true) {
doneMutex.acquire();
var d = done;
doneMutex.release();
if (d >= THREADS) break;
if (haxe.Timer.stamp() > deadline) { timedOut = true; break; }
Sys.sleep(0.01);
}
Assert.isFalse(timedOut, "worker threads did not finish within " + TIMEOUT_SEC + "s — likely pool state corruption");
Assert.equals(0, duplicates);
Assert.equals(0, pool.activeObjects);
Assert.equals(POOL_SIZE, pool.inactiveObjects);
}
}
#end