1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.rice.ksb.messaging.threadpool;
17
18 import org.apache.log4j.Logger;
19 import org.kuali.rice.core.api.config.CoreConfigHelper;
20 import org.kuali.rice.core.api.config.property.Config;
21 import org.kuali.rice.core.api.config.property.ConfigContext;
22 import org.kuali.rice.core.api.util.ClassLoaderUtils;
23
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.PriorityBlockingQueue;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30
31
32
33
34
35 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool {
36
37 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class);
38
39 public static final int DEFAULT_POOL_SIZE = 5;
40
41 private boolean started;
42 private boolean poolSizeSet;
43
44 public KSBThreadPoolImpl() {
45 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy());
46 }
47
48 public void setCorePoolSize(int corePoolSize) {
49 LOG.info("Setting core pool size to " + corePoolSize + " threads.");
50 super.setCorePoolSize(corePoolSize);
51 this.poolSizeSet = true;
52 }
53
54 public long getKeepAliveTime() {
55 return super.getKeepAliveTime(TimeUnit.MILLISECONDS);
56 }
57
58 public boolean isStarted() {
59 return this.started;
60 }
61
62 public void start() throws Exception {
63 LOG.info("Starting the KSB thread pool...");
64 loadSettings();
65 this.started = true;
66 LOG.info("...KSB thread pool successfully started.");
67 }
68
69 public void stop() throws Exception {
70 if (isStarted()) {
71 LOG.info("Shutting down KSB thread pool...");
72 this.shutdownNow();
73 this.started = false;
74 LOG.info("...KSB thread pool successfully shut down.");
75 }
76 }
77
78
79
80
81 protected void loadSettings() {
82 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE);
83 if (!this.poolSizeSet) {
84 int poolSize = DEFAULT_POOL_SIZE;
85 try {
86 poolSize = new Integer(threadPoolSizeStr);
87 } catch (NumberFormatException nfe) {
88 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'");
89 }
90 setCorePoolSize(poolSize);
91 }
92 }
93
94 public Object getInstance() {
95 return this;
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110 private static class KSBThreadFactory implements ThreadFactory {
111
112 private static int factorySequence = 0;
113
114 private static int threadSequence = 0;
115
116 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
117
118 private ClassLoader contextClassLoader;
119
120 public KSBThreadFactory(ClassLoader contextClassLoader) {
121 this.contextClassLoader = contextClassLoader;
122 factorySequence++;
123 }
124
125 public Thread newThread(Runnable runnable) {
126 threadSequence++;
127 Thread thread = this.defaultThreadFactory.newThread(runnable);
128
129
130 thread.setContextClassLoader(contextClassLoader);
131 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-"
132 + threadSequence);
133 return thread;
134 }
135
136 }
137 }