001 /**
002 * Copyright 2005-2013 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.rice.ksb.messaging.threadpool;
017
018 import org.apache.log4j.Logger;
019 import org.kuali.rice.core.api.config.CoreConfigHelper;
020 import org.kuali.rice.core.api.config.property.Config;
021 import org.kuali.rice.core.api.config.property.ConfigContext;
022 import org.kuali.rice.core.api.util.ClassLoaderUtils;
023
024 import java.util.concurrent.Executors;
025 import java.util.concurrent.PriorityBlockingQueue;
026 import java.util.concurrent.ThreadFactory;
027 import java.util.concurrent.ThreadPoolExecutor;
028 import java.util.concurrent.TimeUnit;
029
030 /**
031 * A Thread Pool implementation for the KSB which implements a thread pool backed by a configuration store.
032 *
033 * @author Kuali Rice Team (rice.collab@kuali.org)
034 */
035 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool {
036
037 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class);
038
039 public static final int DEFAULT_POOL_SIZE = 5;
040
041 private boolean started;
042 private boolean poolSizeSet;
043
044 public KSBThreadPoolImpl() {
045 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy());
046 }
047
048 public void setCorePoolSize(int corePoolSize) {
049 LOG.info("Setting core pool size to " + corePoolSize + " threads.");
050 super.setCorePoolSize(corePoolSize);
051 this.poolSizeSet = true;
052 }
053
054 public long getKeepAliveTime() {
055 return super.getKeepAliveTime(TimeUnit.MILLISECONDS);
056 }
057
058 public boolean isStarted() {
059 return this.started;
060 }
061
062 public void start() throws Exception {
063 LOG.info("Starting the KSB thread pool...");
064 loadSettings();
065 this.started = true;
066 LOG.info("...KSB thread pool successfully started.");
067 }
068
069 public void stop() throws Exception {
070 if (isStarted()) {
071 LOG.info("Shutting down KSB thread pool...");
072 int pendingTasks = this.shutdownNow().size();
073 LOG.info(pendingTasks + " pending tasks...");
074 LOG.info("awaiting termination: " + this.awaitTermination(20, TimeUnit.SECONDS));
075 LOG.info("...KSB thread pool successfully stopped, isShutdown=" + this.isShutdown() + ", isTerminated=" + this.isTerminated());
076 this.started = false;
077 LOG.info("...KSB thread pool successfully shut down.");
078 }
079 }
080
081 /**
082 * Loads the thread pool settings from the DAO.
083 */
084 protected void loadSettings() {
085 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE);
086 if (!this.poolSizeSet) {
087 int poolSize = DEFAULT_POOL_SIZE;
088 try {
089 poolSize = new Integer(threadPoolSizeStr);
090 } catch (NumberFormatException nfe) {
091 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'");
092 }
093 setCorePoolSize(poolSize);
094 }
095 }
096
097 public Object getInstance() {
098 return this;
099 }
100
101 /**
102 * A simple ThreadFactory which names the thread as follows:<br>
103 * <br>
104 *
105 * <i>applicationId</i>/KSB-pool-<i>m</i>-thread-<i>n</i><br>
106 * <br>
107 *
108 * Where <i>applicationId</i> is the id of the application running the thread pool, <i>m</i> is the
109 * sequence number of the factory and <i>n</i> is the sequence number of the thread within the factory.
110 *
111 * @author Kuali Rice Team (rice.collab@kuali.org)
112 */
113 private static class KSBThreadFactory implements ThreadFactory {
114
115 private static int factorySequence = 0;
116
117 private static int threadSequence = 0;
118
119 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
120
121 private ClassLoader contextClassLoader;
122
123 public KSBThreadFactory(ClassLoader contextClassLoader) {
124 this.contextClassLoader = contextClassLoader;
125 factorySequence++;
126 }
127
128 public Thread newThread(Runnable runnable) {
129 threadSequence++;
130 Thread thread = this.defaultThreadFactory.newThread(runnable);
131 // if the thread ends up getting spawned by an action inside of a workflow plugin or something along those lines, it will inherit the plugin's
132 // classloader as it's ContextClassLoader. Let's make sure it's set to the same ClassLoader that loaded the KSBConfigurer
133 thread.setContextClassLoader(contextClassLoader);
134 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-"
135 + threadSequence);
136 return thread;
137 }
138
139 }
140 }