1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.sf.asterisk.util;
18
19 import java.util.LinkedList;
20 import java.util.List;
21
22 /***
23 * A fixed sized thread pool.
24 *
25 * @author srt
26 * @version $Id: ThreadPool.java,v 1.6 2005/10/25 23:10:33 srt Exp $
27 */
28 public class ThreadPool
29 {
30 private final Log logger = LogFactory.getLog(getClass());
31 private boolean running;
32 private int numThreads;
33 private String name;
34 private List jobs;
35
36 /***
37 * Creates a new ThreadPool of numThreads size. These Threads are waiting
38 * for jobs to be added via the addJob method.
39 *
40 * @param name the name to use for the thread group and worker threads.
41 * @param numThreads the number of threads to create.
42 */
43 public ThreadPool(String name, int numThreads)
44 {
45 PoolThreadGroup group;
46
47 this.name = name;
48 this.numThreads = numThreads;
49 jobs = new LinkedList();
50 running = true;
51
52 group = new PoolThreadGroup(this.name);
53
54
55 for (int i = 0; i < this.numThreads; i++)
56 {
57 TaskThread thread;
58
59 thread = new TaskThread(group, this.name + "-TaskThread-" + i);
60 thread.start();
61 }
62 logger.debug("ThreadPool created with " + this.numThreads + " threads.");
63 }
64
65 /***
66 * Gets a job from the queue. If none is availble the calling thread is
67 * blocked until one is added.
68 *
69 * @return the next job to service, <code>null</code> if the worker thread
70 * should be shut down.
71 */
72 Runnable obtainJob()
73 {
74 Runnable job = null;
75
76 synchronized (jobs)
77 {
78 while (job == null && running)
79 {
80 try
81 {
82 if (jobs.size() == 0)
83 {
84 jobs.wait();
85 }
86 }
87 catch (InterruptedException ie)
88 {
89 }
90
91 if (jobs.size() > 0)
92 {
93 job = (Runnable) jobs.get(0);
94 jobs.remove(0);
95 }
96 }
97 }
98
99 if (running)
100 {
101
102 return job;
103 }
104 else
105 {
106
107 return null;
108 }
109 }
110
111 /***
112 * Adds a new job to the queue. This will be picked up by the next available
113 * active thread.
114 */
115 public void addJob(Runnable runnable)
116 {
117 synchronized (jobs)
118 {
119 jobs.add(runnable);
120 jobs.notifyAll();
121 }
122 }
123
124 /***
125 * Turn off the pool. Every thread, when finished with its current work,
126 * will realize that the pool is no longer running, and will exit.
127 */
128 public void shutdown()
129 {
130 running = false;
131 synchronized (jobs)
132 {
133 jobs.notifyAll();
134 }
135 logger.debug("ThreadPool shutting down.");
136 }
137
138 /***
139 * A TaskThread sits in a loop, asking the pool for a job, and servicing it.
140 */
141 class TaskThread extends Thread
142 {
143 public TaskThread(ThreadGroup group, String name)
144 {
145 super(group, name);
146 }
147
148 /***
149 * Get a job from the pool, run it, repeat. If the obtained job is null,
150 * we exit the loop and the thread.
151 */
152 public void run()
153 {
154 while (true)
155 {
156 Runnable job;
157
158 job = obtainJob();
159
160 if (job == null)
161 {
162
163
164 break;
165 }
166
167 job.run();
168 }
169 }
170 }
171
172 /***
173 * Provided the exception handler for all task threads.
174 */
175 class PoolThreadGroup extends ThreadGroup
176 {
177 public PoolThreadGroup(String name)
178 {
179 super(name);
180 }
181
182 /***
183 * Logs all exceptions that are not caught within the task threads.
184 */
185 public void uncaughtException(Thread t, Throwable e)
186 {
187 logger.warn("Uncaught exception in Thread " + t.getName(), e);
188 }
189 }
190 }