001    /**
002     * Copyright 2004-2013 The Kuali Foundation
003     *
004     * Licensed under the Educational Community License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.opensource.org/licenses/ecl2.php
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.kuali.hr.time.batch;
017    
018    import java.util.concurrent.ExecutorService;
019    import java.util.concurrent.Executors;
020    import java.util.concurrent.TimeUnit;
021    
022    import org.apache.commons.lang.StringUtils;
023    import org.apache.log4j.Logger;
024    import org.kuali.hr.time.util.TKUtils;
025    import org.kuali.rice.core.api.config.property.ConfigContext;
026    import org.springframework.beans.factory.DisposableBean;
027    import org.springframework.beans.factory.InitializingBean;
028    
029    /**
030     * Spring Bean to fire upon initialization.
031     *
032     * Runs on each worker node, manages runables with the ExecutorService.
033     */
034    public class TkBatchManager implements InitializingBean, DisposableBean {
035        public static final String MESSAGE_QUEUE_CHECKER_IP_PARAM = "tk.batch.master.ip";
036        public static final String WORKER_THREAD_POOL_SIZE_PARAM = "tk.batch.threadpool.size";
037        public static final String BATCH_POLLER_SLEEP_PARAM = "tk.batch.poller.seconds.sleep";
038        public static final String BATCH_MASTER_POLL_SLEEP_PARAM = "tk.batch.job.poller.seconds.sleep";
039        public static final String BATCH_DAY_WINDOW_FOR_POLLING_PARAM = "tk.batch.job.days.polling.window";
040        public static final String BATCH_WORK_STARUP_SLEEP = "tk.batch.thread.startup.sleep";
041    
042        ExecutorService pool;
043        BatchJobEntryPoller poller;
044        private static final Logger LOG = Logger.getLogger(TkBatchManager.class);
045    
046        // Some defaults for setting up our threads.
047        int masterJobPollSleep = 60;
048        int workerEntryPollSleep = 60;
049        int threadPoolSize = 5;
050        int daysToPollWindow = 30;
051        int startupSleep = 30;
052    
053        @Override
054        public void destroy() throws Exception {
055            pool.shutdown();
056            try {
057                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
058                    pool.shutdownNow(); // Cancel currently executing tasks
059                }
060            } catch (InterruptedException ie) {
061                pool.shutdownNow();
062                Thread.currentThread().interrupt();
063            }
064        }
065    
066        @Override
067        public void afterPropertiesSet() throws Exception {
068            parseConfiguration();
069    
070            // create thread pool
071            pool = Executors.newFixedThreadPool(this.threadPoolSize);
072            poller = new BatchJobEntryPoller(this, this.workerEntryPollSleep, this.startupSleep);
073            poller.start();
074            if (isMasterBatchNode()) {
075                BatchJobManagerThread batchJobManagerThread = new BatchJobManagerThread(this.masterJobPollSleep, this.daysToPollWindow, this.startupSleep);
076                batchJobManagerThread.start();
077            }
078        }
079    
080        /**
081         * Helper method to determine whether the current machine is the master node,
082         * namely, the node that will create all BatchJobEntries to run on the
083         * worker nodes.
084         *
085         * @return true if this node is the master node.
086         */
087        public boolean isMasterBatchNode() {
088            return StringUtils.equals(ConfigContext.getCurrentContextConfig().getProperty(MESSAGE_QUEUE_CHECKER_IP_PARAM), TKUtils.getIPNumber());
089        }
090    
091        /**
092         * Helper method to load configuration values, cast to ints, and report errors.
093         */
094        private void parseConfiguration() {
095            String poolSizeString = ConfigContext.getCurrentContextConfig().getProperty(WORKER_THREAD_POOL_SIZE_PARAM);
096            String masterJobSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_MASTER_POLL_SLEEP_PARAM);
097            String workerPollSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_POLLER_SLEEP_PARAM);
098            String daysString = ConfigContext.getCurrentContextConfig().getProperty(BATCH_DAY_WINDOW_FOR_POLLING_PARAM);
099            String workStartup = ConfigContext.getCurrentContextConfig().getProperty(BATCH_WORK_STARUP_SLEEP);
100    
101            try {
102                this.threadPoolSize = Integer.parseInt(poolSizeString);
103                this.masterJobPollSleep = Integer.parseInt(masterJobSleep);
104                this.daysToPollWindow = Integer.parseInt(daysString);
105                this.workerEntryPollSleep = Integer.parseInt(workerPollSleep);
106                this.startupSleep = Integer.parseInt(workStartup);
107            } catch (NumberFormatException nfe) {
108                LOG.warn("Bad parameter when setting up Batch Configuration. using defaults");
109            }
110        }
111    }