View Javadoc

1   /**
2    * Copyright 2004-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.hr.time.batch;
17  
18  import java.util.concurrent.ExecutorService;
19  import java.util.concurrent.Executors;
20  import java.util.concurrent.TimeUnit;
21  
22  import org.apache.commons.lang.StringUtils;
23  import org.apache.log4j.Logger;
24  import org.kuali.hr.time.util.TKUtils;
25  import org.kuali.rice.core.api.config.property.ConfigContext;
26  import org.springframework.beans.factory.DisposableBean;
27  import org.springframework.beans.factory.InitializingBean;
28  
29  /**
30   * Spring Bean to fire upon initialization.
31   *
32   * Runs on each worker node, manages runables with the ExecutorService.
33   */
34  public class TkBatchManager implements InitializingBean, DisposableBean {
35      public static final String MESSAGE_QUEUE_CHECKER_IP_PARAM = "tk.batch.master.ip";
36      public static final String WORKER_THREAD_POOL_SIZE_PARAM = "tk.batch.threadpool.size";
37      public static final String BATCH_POLLER_SLEEP_PARAM = "tk.batch.poller.seconds.sleep";
38      public static final String BATCH_MASTER_POLL_SLEEP_PARAM = "tk.batch.job.poller.seconds.sleep";
39      public static final String BATCH_DAY_WINDOW_FOR_POLLING_PARAM = "tk.batch.job.days.polling.window";
40      public static final String BATCH_WORK_STARUP_SLEEP = "tk.batch.thread.startup.sleep";
41  
42      ExecutorService pool;
43      BatchJobEntryPoller poller;
44      private static final Logger LOG = Logger.getLogger(TkBatchManager.class);
45  
46      // Some defaults for setting up our threads.
47      int masterJobPollSleep = 60;
48      int workerEntryPollSleep = 60;
49      int threadPoolSize = 5;
50      int daysToPollWindow = 30;
51      int startupSleep = 30;
52  
53      @Override
54      public void destroy() throws Exception {
55          pool.shutdown();
56          try {
57              if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
58                  pool.shutdownNow(); // Cancel currently executing tasks
59              }
60          } catch (InterruptedException ie) {
61              pool.shutdownNow();
62              Thread.currentThread().interrupt();
63          }
64      }
65  
66      @Override
67      public void afterPropertiesSet() throws Exception {
68          parseConfiguration();
69  
70          // create thread pool
71          pool = Executors.newFixedThreadPool(this.threadPoolSize);
72          poller = new BatchJobEntryPoller(this, this.workerEntryPollSleep, this.startupSleep);
73          poller.start();
74          if (isMasterBatchNode()) {
75              BatchJobManagerThread batchJobManagerThread = new BatchJobManagerThread(this.masterJobPollSleep, this.daysToPollWindow, this.startupSleep);
76              batchJobManagerThread.start();
77          }
78      }
79  
80      /**
81       * Helper method to determine whether the current machine is the master node,
82       * namely, the node that will create all BatchJobEntries to run on the
83       * worker nodes.
84       *
85       * @return true if this node is the master node.
86       */
87      public boolean isMasterBatchNode() {
88          return StringUtils.equals(ConfigContext.getCurrentContextConfig().getProperty(MESSAGE_QUEUE_CHECKER_IP_PARAM), TKUtils.getIPNumber());
89      }
90  
91      /**
92       * Helper method to load configuration values, cast to ints, and report errors.
93       */
94      private void parseConfiguration() {
95          String poolSizeString = ConfigContext.getCurrentContextConfig().getProperty(WORKER_THREAD_POOL_SIZE_PARAM);
96          String masterJobSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_MASTER_POLL_SLEEP_PARAM);
97          String workerPollSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_POLLER_SLEEP_PARAM);
98          String daysString = ConfigContext.getCurrentContextConfig().getProperty(BATCH_DAY_WINDOW_FOR_POLLING_PARAM);
99          String workStartup = ConfigContext.getCurrentContextConfig().getProperty(BATCH_WORK_STARUP_SLEEP);
100 
101         try {
102             this.threadPoolSize = Integer.parseInt(poolSizeString);
103             this.masterJobPollSleep = Integer.parseInt(masterJobSleep);
104             this.daysToPollWindow = Integer.parseInt(daysString);
105             this.workerEntryPollSleep = Integer.parseInt(workerPollSleep);
106             this.startupSleep = Integer.parseInt(workStartup);
107         } catch (NumberFormatException nfe) {
108             LOG.warn("Bad parameter when setting up Batch Configuration. using defaults");
109         }
110     }
111 }