001 /**
002 * Copyright 2004-2012 The Kuali Foundation
003 *
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.opensource.org/licenses/ecl2.php
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.kuali.hr.time.batch;
017
018 import java.util.concurrent.ExecutorService;
019 import java.util.concurrent.Executors;
020 import java.util.concurrent.TimeUnit;
021
022 import org.apache.commons.lang.StringUtils;
023 import org.apache.log4j.Logger;
024 import org.kuali.hr.time.util.TKUtils;
025 import org.kuali.rice.core.api.config.property.ConfigContext;
026 import org.springframework.beans.factory.DisposableBean;
027 import org.springframework.beans.factory.InitializingBean;
028
029 /**
030 * Spring Bean to fire upon initialization.
031 *
032 * Runs on each worker node, manages runables with the ExecutorService.
033 */
034 public class TkBatchManager implements InitializingBean, DisposableBean {
035 public static final String MESSAGE_QUEUE_CHECKER_IP_PARAM = "tk.batch.master.ip";
036 public static final String WORKER_THREAD_POOL_SIZE_PARAM = "tk.batch.threadpool.size";
037 public static final String BATCH_POLLER_SLEEP_PARAM = "tk.batch.poller.seconds.sleep";
038 public static final String BATCH_MASTER_POLL_SLEEP_PARAM = "tk.batch.job.poller.seconds.sleep";
039 public static final String BATCH_DAY_WINDOW_FOR_POLLING_PARAM = "tk.batch.job.days.polling.window";
040 public static final String BATCH_WORK_STARUP_SLEEP = "tk.batch.thread.startup.sleep";
041
042 ExecutorService pool;
043 BatchJobEntryPoller poller;
044 private static final Logger LOG = Logger.getLogger(TkBatchManager.class);
045
046 // Some defaults for setting up our threads.
047 int masterJobPollSleep = 60;
048 int workerEntryPollSleep = 60;
049 int threadPoolSize = 5;
050 int daysToPollWindow = 30;
051 int startupSleep = 30;
052
053 @Override
054 public void destroy() throws Exception {
055 pool.shutdown();
056 try {
057 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
058 pool.shutdownNow(); // Cancel currently executing tasks
059 }
060 } catch (InterruptedException ie) {
061 pool.shutdownNow();
062 Thread.currentThread().interrupt();
063 }
064 }
065
066 @Override
067 public void afterPropertiesSet() throws Exception {
068 parseConfiguration();
069
070 // create thread pool
071 pool = Executors.newFixedThreadPool(this.threadPoolSize);
072 poller = new BatchJobEntryPoller(this, this.workerEntryPollSleep, this.startupSleep);
073 poller.start();
074 if (isMasterBatchNode()) {
075 BatchJobManagerThread batchJobManagerThread = new BatchJobManagerThread(this.masterJobPollSleep, this.daysToPollWindow, this.startupSleep);
076 batchJobManagerThread.start();
077 }
078 }
079
080 /**
081 * Helper method to determine whether the current machine is the master node,
082 * namely, the node that will create all BatchJobEntries to run on the
083 * worker nodes.
084 *
085 * @return true if this node is the master node.
086 */
087 public boolean isMasterBatchNode() {
088 return StringUtils.equals(ConfigContext.getCurrentContextConfig().getProperty(MESSAGE_QUEUE_CHECKER_IP_PARAM), TKUtils.getIPNumber());
089 }
090
091 /**
092 * Helper method to load configuration values, cast to ints, and report errors.
093 */
094 private void parseConfiguration() {
095 String poolSizeString = ConfigContext.getCurrentContextConfig().getProperty(WORKER_THREAD_POOL_SIZE_PARAM);
096 String masterJobSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_MASTER_POLL_SLEEP_PARAM);
097 String workerPollSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_POLLER_SLEEP_PARAM);
098 String daysString = ConfigContext.getCurrentContextConfig().getProperty(BATCH_DAY_WINDOW_FOR_POLLING_PARAM);
099 String workStartup = ConfigContext.getCurrentContextConfig().getProperty(BATCH_WORK_STARUP_SLEEP);
100
101 try {
102 this.threadPoolSize = Integer.parseInt(poolSizeString);
103 this.masterJobPollSleep = Integer.parseInt(masterJobSleep);
104 this.daysToPollWindow = Integer.parseInt(daysString);
105 this.workerEntryPollSleep = Integer.parseInt(workerPollSleep);
106 this.startupSleep = Integer.parseInt(workStartup);
107 } catch (NumberFormatException nfe) {
108 LOG.warn("Bad parameter when setting up Batch Configuration. using defaults");
109 }
110 }
111 }