1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.kuali.rice.ksb.messaging.threadpool;
18
19 import org.apache.log4j.Logger;
20 import org.kuali.rice.core.api.config.CoreConfigHelper;
21 import org.kuali.rice.core.api.config.property.Config;
22 import org.kuali.rice.core.api.config.property.ConfigContext;
23 import org.kuali.rice.core.util.ClassLoaderUtils;
24
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.PriorityBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31
32
33
34
35
36 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool {
37
38 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class);
39
40 public static final int DEFAULT_POOL_SIZE = 5;
41
42 private boolean started;
43 private boolean poolSizeSet;
44
45 public KSBThreadPoolImpl() {
46 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy());
47 }
48
49 public void setCorePoolSize(int corePoolSize) {
50 LOG.info("Setting core pool size to " + corePoolSize + " threads.");
51 super.setCorePoolSize(corePoolSize);
52 this.poolSizeSet = true;
53 }
54
55 public long getKeepAliveTime() {
56 return super.getKeepAliveTime(TimeUnit.MILLISECONDS);
57 }
58
59 public boolean isStarted() {
60 return this.started;
61 }
62
63 public void start() throws Exception {
64 LOG.info("Starting the KSB thread pool...");
65 loadSettings();
66 this.started = true;
67 LOG.info("...KSB thread pool successfully started.");
68 }
69
70 public void stop() throws Exception {
71 if (isStarted()) {
72 LOG.info("Shutting down KSB thread pool...");
73 this.shutdownNow();
74 this.started = false;
75 LOG.info("...KSB thread pool successfully shut down.");
76 }
77 }
78
79
80
81
82 protected void loadSettings() {
83 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE);
84 if (!this.poolSizeSet) {
85 int poolSize = DEFAULT_POOL_SIZE;
86 try {
87 poolSize = new Integer(threadPoolSizeStr);
88 } catch (NumberFormatException nfe) {
89 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'");
90 }
91 setCorePoolSize(poolSize);
92 }
93 }
94
95 public Object getInstance() {
96 return this;
97 }
98
99
100
101
102
103
104
105
106
107
108
109
110
111 private static class KSBThreadFactory implements ThreadFactory {
112
113 private static int factorySequence = 0;
114
115 private static int threadSequence = 0;
116
117 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
118
119 private ClassLoader contextClassLoader;
120
121 public KSBThreadFactory(ClassLoader contextClassLoader) {
122 this.contextClassLoader = contextClassLoader;
123 factorySequence++;
124 }
125
126 public Thread newThread(Runnable runnable) {
127 threadSequence++;
128 Thread thread = this.defaultThreadFactory.newThread(runnable);
129
130
131 thread.setContextClassLoader(contextClassLoader);
132 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-"
133 + threadSequence);
134 return thread;
135 }
136
137 }
138 }