1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.threadpool;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.config.CoreConfigHelper;
20 import org.kuali.rice.core.api.config.property.Config;
21 import org.kuali.rice.core.api.config.property.ConfigContext;
22 import org.kuali.rice.core.api.util.ClassLoaderUtils;
23
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.PriorityBlockingQueue;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30
31
32
33
34
35 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool {
36
37 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class);
38
39 public static final int DEFAULT_POOL_SIZE = 5;
40
41 private boolean started;
42 private boolean poolSizeSet;
43
44 public KSBThreadPoolImpl() {
45 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy());
46 }
47
48 public void setCorePoolSize(int corePoolSize) {
49 LOG.info("Setting core pool size to " + corePoolSize + " threads.");
50 super.setCorePoolSize(corePoolSize);
51 this.poolSizeSet = true;
52 }
53
54 public long getKeepAliveTime() {
55 return super.getKeepAliveTime(TimeUnit.MILLISECONDS);
56 }
57
58 public boolean isStarted() {
59 return this.started;
60 }
61
62 public void start() throws Exception {
63 LOG.info("Starting the KSB thread pool...");
64 loadSettings();
65 this.started = true;
66 LOG.info("...KSB thread pool successfully started.");
67 }
68
69 public void stop() throws Exception {
70 if (isStarted()) {
71 LOG.info("Shutting down KSB thread pool...");
72 int pendingTasks = this.shutdownNow().size();
73 LOG.info(pendingTasks + " pending tasks...");
74 LOG.info("awaiting termination: " + this.awaitTermination(20, TimeUnit.SECONDS));
75 LOG.info("...KSB thread pool successfully stopped, isShutdown=" + this.isShutdown() + ", isTerminated=" + this.isTerminated());
76 this.started = false;
77 LOG.info("...KSB thread pool successfully shut down.");
78 }
79 }
80
81
82
83
84 protected void loadSettings() {
85 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE);
86 if (!this.poolSizeSet) {
87 int poolSize = DEFAULT_POOL_SIZE;
88 try {
89 poolSize = new Integer(threadPoolSizeStr);
90 } catch (NumberFormatException nfe) {
91 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'");
92 }
93 setCorePoolSize(poolSize);
94 }
95 }
96
97 public Object getInstance() {
98 return this;
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113 private static class KSBThreadFactory implements ThreadFactory {
114
115 private static int factorySequence = 0;
116
117 private static int threadSequence = 0;
118
119 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
120
121 private ClassLoader contextClassLoader;
122
123 public KSBThreadFactory(ClassLoader contextClassLoader) {
124 this.contextClassLoader = contextClassLoader;
125 factorySequence++;
126 }
127
128 public Thread newThread(Runnable runnable) {
129 threadSequence++;
130 Thread thread = this.defaultThreadFactory.newThread(runnable);
131
132
133 thread.setContextClassLoader(contextClassLoader);
134 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-"
135 + threadSequence);
136 return thread;
137 }
138
139 }
140 }