View Javadoc

1   /*
2    * Copyright  2004-2005 Stefan Reuter
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   *
16   */
17  package net.sf.asterisk.util;
18  
19  import java.util.LinkedList;
20  import java.util.List;
21  
22  /***
23   * A fixed sized thread pool.
24   * 
25   * @author srt
26   * @version $Id: ThreadPool.java,v 1.6 2005/10/25 23:10:33 srt Exp $
27   */
28  public class ThreadPool
29  {
30      private final Log logger = LogFactory.getLog(getClass());
31      private boolean running;
32      private int numThreads;
33      private String name;
34      private List jobs;
35  
36      /***
37       * Creates a new ThreadPool of numThreads size. These Threads are waiting
38       * for jobs to be added via the addJob method.
39       * 
40       * @param name the name to use for the thread group and worker threads.
41       * @param numThreads the number of threads to create.
42       */
43      public ThreadPool(String name, int numThreads)
44      {
45          PoolThreadGroup group;
46  
47          this.name = name;
48          this.numThreads = numThreads;
49          jobs = new LinkedList();
50          running = true;
51  
52          group = new PoolThreadGroup(this.name);
53  
54          // create and start the threads
55          for (int i = 0; i < this.numThreads; i++)
56          {
57              TaskThread thread;
58  
59              thread = new TaskThread(group, this.name + "-TaskThread-" + i);
60              thread.start();
61          }
62          logger.debug("ThreadPool created with " + this.numThreads + " threads.");
63      }
64  
65      /***
66       * Gets a job from the queue. If none is availble the calling thread is
67       * blocked until one is added.
68       * 
69       * @return the next job to service, <code>null</code> if the worker thread
70       *         should be shut down.
71       */
72      Runnable obtainJob()
73      {
74          Runnable job = null;
75  
76          synchronized (jobs)
77          {
78              while (job == null && running)
79              {
80                  try
81                  {
82                      if (jobs.size() == 0)
83                      {
84                          jobs.wait();
85                      }
86                  }
87                  catch (InterruptedException ie)
88                  {
89                  }
90  
91                  if (jobs.size() > 0)
92                  {
93                      job = (Runnable) jobs.get(0);
94                      jobs.remove(0);
95                  }
96              }
97          }
98  
99          if (running)
100         {
101             // Got a job from the queue
102             return job;
103         }
104         else
105         {
106             // Shut down the pool
107             return null;
108         }
109     }
110 
111     /***
112      * Adds a new job to the queue. This will be picked up by the next available
113      * active thread.
114      */
115     public void addJob(Runnable runnable)
116     {
117         synchronized (jobs)
118         {
119             jobs.add(runnable);
120             jobs.notifyAll();
121         }
122     }
123 
124     /***
125      * Turn off the pool. Every thread, when finished with its current work,
126      * will realize that the pool is no longer running, and will exit.
127      */
128     public void shutdown()
129     {
130         running = false;
131         synchronized (jobs)
132         {
133             jobs.notifyAll();
134         }
135         logger.debug("ThreadPool shutting down.");
136     }
137 
138     /***
139      * A TaskThread sits in a loop, asking the pool for a job, and servicing it.
140      */
141     class TaskThread extends Thread
142     {
143         public TaskThread(ThreadGroup group, String name)
144         {
145             super(group, name);
146         }
147 
148         /***
149          * Get a job from the pool, run it, repeat. If the obtained job is null,
150          * we exit the loop and the thread.
151          */
152         public void run()
153         {
154             while (true)
155             {
156                 Runnable job;
157 
158                 job = obtainJob();
159 
160                 if (job == null)
161                 {
162                     // no more jobs available as ThreadPool has been closed? ->
163                     // end this Thread
164                     break;
165                 }
166 
167                 job.run();
168             }
169         }
170     }
171 
172     /***
173      * Provided the exception handler for all task threads.
174      */
175     class PoolThreadGroup extends ThreadGroup
176     {
177         public PoolThreadGroup(String name)
178         {
179             super(name);
180         }
181 
182         /***
183          * Logs all exceptions that are not caught within the task threads.
184          */
185         public void uncaughtException(Thread t, Throwable e)
186         {
187             logger.warn("Uncaught exception in Thread " + t.getName(), e);
188         }
189     }
190 }