1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.hr.time.batch;
17
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.TimeUnit;
21
22 import org.apache.commons.lang.StringUtils;
23 import org.apache.log4j.Logger;
24 import org.kuali.hr.time.util.TKUtils;
25 import org.kuali.rice.core.api.config.property.ConfigContext;
26 import org.springframework.beans.factory.DisposableBean;
27 import org.springframework.beans.factory.InitializingBean;
28
29
30
31
32
33
34 public class TkBatchManager implements InitializingBean, DisposableBean {
35 public static final String MESSAGE_QUEUE_CHECKER_IP_PARAM = "tk.batch.master.ip";
36 public static final String WORKER_THREAD_POOL_SIZE_PARAM = "tk.batch.threadpool.size";
37 public static final String BATCH_POLLER_SLEEP_PARAM = "tk.batch.poller.seconds.sleep";
38 public static final String BATCH_MASTER_POLL_SLEEP_PARAM = "tk.batch.job.poller.seconds.sleep";
39 public static final String BATCH_DAY_WINDOW_FOR_POLLING_PARAM = "tk.batch.job.days.polling.window";
40 public static final String BATCH_WORK_STARUP_SLEEP = "tk.batch.thread.startup.sleep";
41
42 ExecutorService pool;
43 BatchJobEntryPoller poller;
44 private static final Logger LOG = Logger.getLogger(TkBatchManager.class);
45
46
47 int masterJobPollSleep = 60;
48 int workerEntryPollSleep = 60;
49 int threadPoolSize = 5;
50 int daysToPollWindow = 30;
51 int startupSleep = 30;
52
53 @Override
54 public void destroy() throws Exception {
55 pool.shutdown();
56 try {
57 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
58 pool.shutdownNow();
59 }
60 } catch (InterruptedException ie) {
61 pool.shutdownNow();
62 Thread.currentThread().interrupt();
63 }
64 }
65
66 @Override
67 public void afterPropertiesSet() throws Exception {
68 parseConfiguration();
69
70
71 pool = Executors.newFixedThreadPool(this.threadPoolSize);
72 poller = new BatchJobEntryPoller(this, this.workerEntryPollSleep, this.startupSleep);
73 poller.start();
74 if (isMasterBatchNode()) {
75 BatchJobManagerThread batchJobManagerThread = new BatchJobManagerThread(this.masterJobPollSleep, this.daysToPollWindow, this.startupSleep);
76 batchJobManagerThread.start();
77 }
78 }
79
80
81
82
83
84
85
86
87 public boolean isMasterBatchNode() {
88 return StringUtils.equals(ConfigContext.getCurrentContextConfig().getProperty(MESSAGE_QUEUE_CHECKER_IP_PARAM), TKUtils.getIPNumber());
89 }
90
91
92
93
94 private void parseConfiguration() {
95 String poolSizeString = ConfigContext.getCurrentContextConfig().getProperty(WORKER_THREAD_POOL_SIZE_PARAM);
96 String masterJobSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_MASTER_POLL_SLEEP_PARAM);
97 String workerPollSleep = ConfigContext.getCurrentContextConfig().getProperty(BATCH_POLLER_SLEEP_PARAM);
98 String daysString = ConfigContext.getCurrentContextConfig().getProperty(BATCH_DAY_WINDOW_FOR_POLLING_PARAM);
99 String workStartup = ConfigContext.getCurrentContextConfig().getProperty(BATCH_WORK_STARUP_SLEEP);
100
101 try {
102 this.threadPoolSize = Integer.parseInt(poolSizeString);
103 this.masterJobPollSleep = Integer.parseInt(masterJobSleep);
104 this.daysToPollWindow = Integer.parseInt(daysString);
105 this.workerEntryPollSleep = Integer.parseInt(workerPollSleep);
106 this.startupSleep = Integer.parseInt(workStartup);
107 } catch (NumberFormatException nfe) {
108 LOG.warn("Bad parameter when setting up Batch Configuration. using defaults");
109 }
110 }
111 }