001 /* 002 * Copyright 2006-2011 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package org.kuali.rice.ksb.messaging.threadpool; 018 019 import org.apache.log4j.Logger; 020 import org.kuali.rice.core.api.config.CoreConfigHelper; 021 import org.kuali.rice.core.api.config.property.Config; 022 import org.kuali.rice.core.api.config.property.ConfigContext; 023 import org.kuali.rice.core.util.ClassLoaderUtils; 024 025 import java.util.concurrent.Executors; 026 import java.util.concurrent.PriorityBlockingQueue; 027 import java.util.concurrent.ThreadFactory; 028 import java.util.concurrent.ThreadPoolExecutor; 029 import java.util.concurrent.TimeUnit; 030 031 /** 032 * A Thread Pool implementation for the KSB which implements a thread pool backed by a configuration store. 033 * 034 * @author Kuali Rice Team (rice.collab@kuali.org) 035 */ 036 public class KSBThreadPoolImpl extends ThreadPoolExecutor implements KSBThreadPool { 037 038 private static final Logger LOG = Logger.getLogger(KSBThreadPoolImpl.class); 039 040 public static final int DEFAULT_POOL_SIZE = 5; 041 042 private boolean started; 043 private boolean poolSizeSet; 044 045 public KSBThreadPoolImpl() { 046 super(DEFAULT_POOL_SIZE, DEFAULT_POOL_SIZE, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(1, new PriorityBlockingQueuePersistedMessageComparator()), new KSBThreadFactory(ClassLoaderUtils.getDefaultClassLoader()), new ThreadPoolExecutor.AbortPolicy()); 047 } 048 049 public void setCorePoolSize(int corePoolSize) { 050 LOG.info("Setting core pool size to " + corePoolSize + " threads."); 051 super.setCorePoolSize(corePoolSize); 052 this.poolSizeSet = true; 053 } 054 055 public long getKeepAliveTime() { 056 return super.getKeepAliveTime(TimeUnit.MILLISECONDS); 057 } 058 059 public boolean isStarted() { 060 return this.started; 061 } 062 063 public void start() throws Exception { 064 LOG.info("Starting the KSB thread pool..."); 065 loadSettings(); 066 this.started = true; 067 LOG.info("...KSB thread pool successfully started."); 068 } 069 070 public void stop() throws Exception { 071 if (isStarted()) { 072 LOG.info("Shutting down KSB thread pool..."); 073 this.shutdownNow(); 074 this.started = false; 075 LOG.info("...KSB thread pool successfully shut down."); 076 } 077 } 078 079 /** 080 * Loads the thread pool settings from the DAO. 081 */ 082 protected void loadSettings() { 083 String threadPoolSizeStr = ConfigContext.getCurrentContextConfig().getProperty(Config.THREAD_POOL_SIZE); 084 if (!this.poolSizeSet) { 085 int poolSize = DEFAULT_POOL_SIZE; 086 try { 087 poolSize = new Integer(threadPoolSizeStr); 088 } catch (NumberFormatException nfe) { 089 LOG.error( "loadSettings(): Unable to parse the pool size: '"+threadPoolSizeStr+"'"); 090 } 091 setCorePoolSize(poolSize); 092 } 093 } 094 095 public Object getInstance() { 096 return this; 097 } 098 099 /** 100 * A simple ThreadFactory which names the thread as follows:<br> 101 * <br> 102 * 103 * <i>applicationId</i>/KSB-pool-<i>m</i>-thread-<i>n</i><br> 104 * <br> 105 * 106 * Where <i>applicationId</i> is the id of the application running the thread pool, <i>m</i> is the 107 * sequence number of the factory and <i>n</i> is the sequence number of the thread within the factory. 108 * 109 * @author Kuali Rice Team (rice.collab@kuali.org) 110 */ 111 private static class KSBThreadFactory implements ThreadFactory { 112 113 private static int factorySequence = 0; 114 115 private static int threadSequence = 0; 116 117 private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); 118 119 private ClassLoader contextClassLoader; 120 121 public KSBThreadFactory(ClassLoader contextClassLoader) { 122 this.contextClassLoader = contextClassLoader; 123 factorySequence++; 124 } 125 126 public Thread newThread(Runnable runnable) { 127 threadSequence++; 128 Thread thread = this.defaultThreadFactory.newThread(runnable); 129 // if the thread ends up getting spawned by an action inside of a workflow plugin or something along those lines, it will inherit the plugin's 130 // classloader as it's ContextClassLoader. Let's make sure it's set to the same ClassLoader that loaded the KSBConfigurer 131 thread.setContextClassLoader(contextClassLoader); 132 thread.setName(CoreConfigHelper.getApplicationId() + "/KSB-pool-" + factorySequence + "-thread-" 133 + threadSequence); 134 return thread; 135 } 136 137 } 138 }