001 /** 002 * Copyright 2004-2012 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.kuali.hr.time.batch; 017 018 import java.util.concurrent.ExecutorService; 019 import java.util.concurrent.Executors; 020 import java.util.concurrent.TimeUnit; 021 022 import org.apache.commons.lang.StringUtils; 023 import org.apache.log4j.Logger; 024 import org.kuali.hr.time.util.TKUtils; 025 import org.kuali.rice.core.api.config.property.ConfigContext; 026 import org.springframework.beans.factory.DisposableBean; 027 import org.springframework.beans.factory.InitializingBean; 028 029 /** 030 * Spring Bean to fire upon initialization. 031 * 032 * Runs on each worker node, manages runables with the ExecutorService. 033 */ 034 public class TkBatchManager implements InitializingBean, DisposableBean { 035 public static final String MESSAGE_QUEUE_CHECKER_IP_PARAM = "tk.batch.master.ip"; 036 public static final String WORKER_THREAD_POOL_SIZE_PARAM = "tk.batch.threadpool.size"; 037 public static final String BATCH_POLLER_SLEEP_PARAM = "tk.batch.poller.seconds.sleep"; 038 public static final String BATCH_MASTER_POLL_SLEEP_PARAM = "tk.batch.job.poller.seconds.sleep"; 039 public static final String BATCH_DAY_WINDOW_FOR_POLLING_PARAM = "tk.batch.job.days.polling.window"; 040 public static final String BATCH_WORK_STARUP_SLEEP = "tk.batch.thread.startup.sleep"; 041 042 ExecutorService pool; 043 BatchJobEntryPoller poller; 044 private static final Logger LOG = Logger.getLogger(TkBatchManager.class); 045 046 // Some defaults for setting up our threads. 047 int masterJobPollSleep = 60; 048 int workerEntryPollSleep = 60; 049 int threadPoolSize = 5; 050 int daysToPollWindow = 30; 051 int startupSleep = 30; 052 053 @Override 054 public void destroy() throws Exception { 055 pool.shutdown(); 056 try { 057 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 058 pool.shutdownNow(); // Cancel currently executing tasks 059 } 060 } catch (InterruptedException ie) { 061 pool.shutdownNow(); 062 Thread.currentThread().interrupt(); 063 } 064 } 065 066 @Override 067 public void afterPropertiesSet() throws Exception { 068 parseConfiguration(); 069 070 // create thread pool 071 pool = Executors.newFixedThreadPool(this.threadPoolSize); 072 poller = new BatchJobEntryPoller(this, this.workerEntryPollSleep, this.startupSleep); 073 poller.start(); 074 if (isMasterBatchNode()) { 075 BatchJobManagerThread batchJobManagerThread = new BatchJobManagerThread(this.masterJobPollSleep, this.daysToPollWindow, this.startupSleep); 076 batchJobManagerThread.start(); 077 } 078 } 079 080 /** 081 * Helper method to determine whether the current machine is the master node, 082 * namely, the node that will create all BatchJobEntries to run on the 083 * worker nodes. 084 * 085 * @return true if this node is the master node. 086 */ 087 public boolean isMasterBatchNode() { 088 return StringUtils.equals(ConfigContext.getCurrentContextConfig().getProperty(MESSAGE_QUEUE_CHECKER_IP_PARAM), TKUtils.getIPNumber()); 089 } 090 091 /** 092 * Helper method to load configuration values, cast to ints, and report errors. 093 */ 094 private void parseConfiguration() { 095 String poolSizeString = ConfigContext.getCurrentContextConfig().getProperty(WORKER_THREAD_POOL_SIZE_PARAM); 096 String masterJobSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_MASTER_POLL_SLEEP_PARAM); 097 String workerPollSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_POLLER_SLEEP_PARAM); 098 String daysString = ConfigContext.getCurrentContextConfig().getProperty(BATCH_DAY_WINDOW_FOR_POLLING_PARAM); 099 String workStartup = ConfigContext.getCurrentContextConfig().getProperty(BATCH_WORK_STARUP_SLEEP); 100 101 try { 102 this.threadPoolSize = Integer.parseInt(poolSizeString); 103 this.masterJobPollSleep = Integer.parseInt(masterJobSleep); 104 this.daysToPollWindow = Integer.parseInt(daysString); 105 this.workerEntryPollSleep = Integer.parseInt(workerPollSleep); 106 this.startupSleep = Integer.parseInt(workStartup); 107 } catch (NumberFormatException nfe) { 108 LOG.warn("Bad parameter when setting up Batch Configuration. using defaults"); 109 } 110 } 111 }