001 /*
002 * Copyright 2006-2011 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package org.kuali.rice.ksb.messaging.threadpool;
018
019 import org.apache.log4j.Logger;
020 import org.kuali.rice.core.api.config.CoreConfigHelper;
021 import org.kuali.rice.core.api.config.property.Config;
022 import org.kuali.rice.core.api.config.property.ConfigContext;
023 import org.kuali.rice.core.util.ClassLoaderUtils;
024
025 import java.util.concurrent.Executors;
026 import java.util.concurrent.PriorityBlockingQueue;
027 import java.util.concurrent.ThreadFactory;
028 import java.util.concurrent.ThreadPoolExecutor;
029 import java.util.concurrent.TimeUnit;
030
031 /**
032 * A Thread Pool implementation for the KSB which implements a thread pool backed by a configuration store.
033 *
034 * @author Kuali Rice Team (rice.collab@kuali.org)
035 */
036 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool {
037
038 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class);
039
040 public static final int DEFAULT_POOL_SIZE = 5;
041
042 private boolean started;
043 private boolean poolSizeSet;
044
045 public KSBThreadPoolImpl() {
046 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy());
047 }
048
049 public void setCorePoolSize(int corePoolSize) {
050 LOG.info("Setting core pool size to " + corePoolSize + " threads.");
051 super.setCorePoolSize(corePoolSize);
052 this.poolSizeSet = true;
053 }
054
055 public long getKeepAliveTime() {
056 return super.getKeepAliveTime(TimeUnit.MILLISECONDS);
057 }
058
059 public boolean isStarted() {
060 return this.started;
061 }
062
063 public void start() throws Exception {
064 LOG.info("Starting the KSB thread pool...");
065 loadSettings();
066 this.started = true;
067 LOG.info("...KSB thread pool successfully started.");
068 }
069
070 public void stop() throws Exception {
071 if (isStarted()) {
072 LOG.info("Shutting down KSB thread pool...");
073 this.shutdownNow();
074 this.started = false;
075 LOG.info("...KSB thread pool successfully shut down.");
076 }
077 }
078
079 /**
080 * Loads the thread pool settings from the DAO.
081 */
082 protected void loadSettings() {
083 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE);
084 if (!this.poolSizeSet) {
085 int poolSize = DEFAULT_POOL_SIZE;
086 try {
087 poolSize = new Integer(threadPoolSizeStr);
088 } catch (NumberFormatException nfe) {
089 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'");
090 }
091 setCorePoolSize(poolSize);
092 }
093 }
094
095 public Object getInstance() {
096 return this;
097 }
098
099 /**
100 * A simple ThreadFactory which names the thread as follows:<br>
101 * <br>
102 *
103 * <i>applicationId</i>/KSB-pool-<i>m</i>-thread-<i>n</i><br>
104 * <br>
105 *
106 * Where <i>applicationId</i> is the id of the application running the thread pool, <i>m</i> is the
107 * sequence number of the factory and <i>n</i> is the sequence number of the thread within the factory.
108 *
109 * @author Kuali Rice Team (rice.collab@kuali.org)
110 */
111 private static class KSBThreadFactory implements ThreadFactory {
112
113 private static int factorySequence = 0;
114
115 private static int threadSequence = 0;
116
117 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
118
119 private ClassLoader contextClassLoader;
120
121 public KSBThreadFactory(ClassLoader contextClassLoader) {
122 this.contextClassLoader = contextClassLoader;
123 factorySequence++;
124 }
125
126 public Thread newThread(Runnable runnable) {
127 threadSequence++;
128 Thread thread = this.defaultThreadFactory.newThread(runnable);
129 // if the thread ends up getting spawned by an action inside of a workflow plugin or something along those lines, it will inherit the plugin's
130 // classloader as it's ContextClassLoader. Let's make sure it's set to the same ClassLoader that loaded the KSBConfigurer
131 thread.setContextClassLoader(contextClassLoader);
132 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-"
133 + threadSequence);
134 return thread;
135 }
136
137 }
138 }