001 /**
002 * Copyright 2005-2011 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging.threadpool;
017
018 import org.apache.log4j.Logger;
019 import org.kuali.rice.core.api.config.CoreConfigHelper;
020 import org.kuali.rice.core.api.config.property.Config;
021 import org.kuali.rice.core.api.config.property.ConfigContext;
022 import org.kuali.rice.core.api.util.ClassLoaderUtils;
023
024 import java.util.concurrent.Executors;
025 import java.util.concurrent.PriorityBlockingQueue;
026 import java.util.concurrent.ThreadFactory;
027 import java.util.concurrent.ThreadPoolExecutor;
028 import java.util.concurrent.TimeUnit;
029
030 /**
031 * A Thread Pool implementation for the KSB which implements a thread pool backed by a configuration store.
032 *
033 * @author Kuali Rice Team (rice.collab@kuali.org)
034 */
035 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool {
036
037 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class);
038
039 public static final int DEFAULT_POOL_SIZE = 5;
040
041 private boolean started;
042 private boolean poolSizeSet;
043
044 public KSBThreadPoolImpl() {
045 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy());
046 }
047
048 public void setCorePoolSize(int corePoolSize) {
049 LOG.info("Setting core pool size to " + corePoolSize + " threads.");
050 super.setCorePoolSize(corePoolSize);
051 this.poolSizeSet = true;
052 }
053
054 public long getKeepAliveTime() {
055 return super.getKeepAliveTime(TimeUnit.MILLISECONDS);
056 }
057
058 public boolean isStarted() {
059 return this.started;
060 }
061
062 public void start() throws Exception {
063 LOG.info("Starting the KSB thread pool...");
064 loadSettings();
065 this.started = true;
066 LOG.info("...KSB thread pool successfully started.");
067 }
068
069 public void stop() throws Exception {
070 if (isStarted()) {
071 LOG.info("Shutting down KSB thread pool...");
072 this.shutdownNow();
073 this.started = false;
074 LOG.info("...KSB thread pool successfully shut down.");
075 }
076 }
077
078 /**
079 * Loads the thread pool settings from the DAO.
080 */
081 protected void loadSettings() {
082 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE);
083 if (!this.poolSizeSet) {
084 int poolSize = DEFAULT_POOL_SIZE;
085 try {
086 poolSize = new Integer(threadPoolSizeStr);
087 } catch (NumberFormatException nfe) {
088 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'");
089 }
090 setCorePoolSize(poolSize);
091 }
092 }
093
094 public Object getInstance() {
095 return this;
096 }
097
098 /**
099 * A simple ThreadFactory which names the thread as follows:<br>
100 * <br>
101 *
102 * <i>applicationId</i>/KSB-pool-<i>m</i>-thread-<i>n</i><br>
103 * <br>
104 *
105 * Where <i>applicationId</i> is the id of the application running the thread pool, <i>m</i> is the
106 * sequence number of the factory and <i>n</i> is the sequence number of the thread within the factory.
107 *
108 * @author Kuali Rice Team (rice.collab@kuali.org)
109 */
110 private static class KSBThreadFactory implements ThreadFactory {
111
112 private static int factorySequence = 0;
113
114 private static int threadSequence = 0;
115
116 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
117
118 private ClassLoader contextClassLoader;
119
120 public KSBThreadFactory(ClassLoader contextClassLoader) {
121 this.contextClassLoader = contextClassLoader;
122 factorySequence++;
123 }
124
125 public Thread newThread(Runnable runnable) {
126 threadSequence++;
127 Thread thread = this.defaultThreadFactory.newThread(runnable);
128 // if the thread ends up getting spawned by an action inside of a workflow plugin or something along those lines, it will inherit the plugin's
129 // classloader as it's ContextClassLoader. Let's make sure it's set to the same ClassLoader that loaded the KSBConfigurer
130 thread.setContextClassLoader(contextClassLoader);
131 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-"
132 + threadSequence);
133 return thread;
134 }
135
136 }
137 }