001 /** 002 * Copyright 2005-2013 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.rice.ksb.messaging.threadpool; 017 018 import org.apache.log4j.Logger; 019 import org.kuali.rice.core.api.config.CoreConfigHelper; 020 import org.kuali.rice.core.api.config.property.Config; 021 import org.kuali.rice.core.api.config.property.ConfigContext; 022 import org.kuali.rice.core.api.util.ClassLoaderUtils; 023 024 import java.util.concurrent.Executors; 025 import java.util.concurrent.PriorityBlockingQueue; 026 import java.util.concurrent.ThreadFactory; 027 import java.util.concurrent.ThreadPoolExecutor; 028 import java.util.concurrent.TimeUnit; 029 030 /** 031 * A Thread Pool implementation for the KSB which implements a thread pool backed by a configuration store. 032 * 033 * @author Kuali Rice Team (rice.collab@kuali.org) 034 */ 035 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool { 036 037 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class); 038 039 public static final int DEFAULT_POOL_SIZE = 5; 040 041 private boolean started; 042 private boolean poolSizeSet; 043 044 public KSBThreadPoolImpl() { 045 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy()); 046 } 047 048 public void setCorePoolSize(int corePoolSize) { 049 LOG.info("Setting core pool size to " + corePoolSize + " threads."); 050 super.setCorePoolSize(corePoolSize); 051 this.poolSizeSet = true; 052 } 053 054 public long getKeepAliveTime() { 055 return super.getKeepAliveTime(TimeUnit.MILLISECONDS); 056 } 057 058 public boolean isStarted() { 059 return this.started; 060 } 061 062 public void start() throws Exception { 063 LOG.info("Starting the KSB thread pool..."); 064 loadSettings(); 065 this.started = true; 066 LOG.info("...KSB thread pool successfully started."); 067 } 068 069 public void stop() throws Exception { 070 if (isStarted()) { 071 LOG.info("Shutting down KSB thread pool..."); 072 int pendingTasks = this.shutdownNow().size(); 073 LOG.info(pendingTasks + " pending tasks..."); 074 LOG.info("awaiting termination: " + this.awaitTermination(20, TimeUnit.SECONDS)); 075 LOG.info("...KSB thread pool successfully stopped, isShutdown=" + this.isShutdown() + ", isTerminated=" + this.isTerminated()); 076 this.started = false; 077 LOG.info("...KSB thread pool successfully shut down."); 078 } 079 } 080 081 /** 082 * Loads the thread pool settings from the DAO. 083 */ 084 protected void loadSettings() { 085 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE); 086 if (!this.poolSizeSet) { 087 int poolSize = DEFAULT_POOL_SIZE; 088 try { 089 poolSize = new Integer(threadPoolSizeStr); 090 } catch (NumberFormatException nfe) { 091 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'"); 092 } 093 setCorePoolSize(poolSize); 094 } 095 } 096 097 public Object getInstance() { 098 return this; 099 } 100 101 /** 102 * A simple ThreadFactory which names the thread as follows:<br> 103 * <br> 104 * 105 * <i>applicationId</i>/KSB-pool-<i>m</i>-thread-<i>n</i><br> 106 * <br> 107 * 108 * Where <i>applicationId</i> is the id of the application running the thread pool, <i>m</i> is the 109 * sequence number of the factory and <i>n</i> is the sequence number of the thread within the factory. 110 * 111 * @author Kuali Rice Team (rice.collab@kuali.org) 112 */ 113 private static class KSBThreadFactory implements ThreadFactory { 114 115 private static int factorySequence = 0; 116 117 private static int threadSequence = 0; 118 119 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); 120 121 private ClassLoader contextClassLoader; 122 123 public KSBThreadFactory(ClassLoader contextClassLoader) { 124 this.contextClassLoader = contextClassLoader; 125 factorySequence++; 126 } 127 128 public Thread newThread(Runnable runnable) { 129 threadSequence++; 130 Thread thread = this.defaultThreadFactory.newThread(runnable); 131 // if the thread ends up getting spawned by an action inside of a workflow plugin or something along those lines, it will inherit the plugin's 132 // classloader as it's ContextClassLoader. Let's make sure it's set to the same ClassLoader that loaded the KSBConfigurer 133 thread.setContextClassLoader(contextClassLoader); 134 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-" 135 + threadSequence); 136 return thread; 137 } 138 139 } 140 }