-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPool.java
More file actions
71 lines (61 loc) · 1.6 KB
/
Pool.java
File metadata and controls
71 lines (61 loc) · 1.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import java.util.concurrent.*;
/**
Fixed size pool that blocks the submitter of tasks if at full capacity.
Simple way to throttle the producer and avoid wasting memory.
Configure with nThreads equal to the number of threads and nTasks the number
of submittable tasks.
*/
public class Pool {
final private int nThreads;
final private int nTasks;
final private Semaphore s;
final private ExecutorService pool;
private int cnt = 0;
public Pool(int nThreads, int nTasks){
this.nThreads = nThreads;
this.nTasks = nTasks;
this.s = new Semaphore(nTasks);
this.pool = Executors.newFixedThreadPool(nThreads);
}
private class Wrap<T> implements Callable<T>{
final private int id;
final private Callable<T> fun;
Wrap(int id, Callable<T> fun){
this.id = id;
this.fun = fun;
}
public T call() throws Exception {
T res = fun.call();
s.release();
Util.log("released " + id);
return res;
}
}
public <T> Future<T> submit(Callable<T> task){
s.acquireUninterruptibly(); // blocks
cnt++;
Util.log("Acquired "+ cnt);
return pool.submit(new Wrap<T>(cnt, task));
}
public void shutdown(){
pool.shutdown();
s.acquireUninterruptibly(nTasks); // wait for all thread to terminate
Util.log("Shutdown");
}
}
class TestPool {
public static void main(String[] args){
Pool p = new Pool(2,2);
for(int i=0; i<100; i++){
final int fi = i;
p.submit(new Callable<Integer>() {
public Integer call() throws Exception {
System.out.println("Start "+fi);
Thread.sleep(10*1000);
System.out.println("Stop "+fi);
return fi;
}
});
}
}
}