1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.kuali.kfs.sys.batch;
20
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23 import java.text.ParseException;
24 import java.util.Calendar;
25 import java.util.Collection;
26 import java.util.Date;
27 import java.util.Iterator;
28 import java.util.List;
29
30 import org.apache.commons.lang.StringUtils;
31 import org.apache.log4j.Appender;
32 import org.apache.log4j.Logger;
33 import org.kuali.kfs.sys.KFSConstants;
34 import org.kuali.kfs.sys.batch.service.SchedulerService;
35 import org.kuali.kfs.sys.context.ProxyUtils;
36 import org.kuali.kfs.sys.context.SpringContext;
37 import org.kuali.kfs.sys.service.impl.KfsParameterConstants;
38 import org.kuali.rice.core.api.CoreConstants;
39 import org.kuali.rice.core.api.datetime.DateTimeService;
40 import org.kuali.rice.coreservice.framework.parameter.ParameterService;
41 import org.kuali.rice.kew.api.exception.WorkflowException;
42 import org.kuali.rice.krad.UserSession;
43 import org.kuali.rice.krad.util.GlobalVariables;
44 import org.kuali.rice.krad.util.KRADConstants;
45 import org.quartz.InterruptableJob;
46 import org.quartz.JobDataMap;
47 import org.quartz.JobExecutionContext;
48 import org.quartz.JobExecutionException;
49 import org.quartz.StatefulJob;
50 import org.quartz.UnableToInterruptJobException;
51 import org.springframework.util.StopWatch;
52
53 public class Job implements StatefulJob, InterruptableJob {
54
55 public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
56 public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
57 public static final String MASTER_JOB_NAME = "MASTER_JOB_NAME";
58 public static final String STEP_RUN_PARM_NM = "RUN_IND";
59 public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
60 public static final String STEP_USER_PARM_NM = "USER";
61 public static final String RUN_DATE_CUTOFF_PARM_NM = "RUN_DATE_CUTOFF_TIME";
62 private static final Logger LOG = Logger.getLogger(Job.class);
63 private SchedulerService schedulerService;
64 private ParameterService parameterService;
65 private DateTimeService dateTimeService;
66 private List<Step> steps;
67 private Step currentStep;
68 private Appender ndcAppender;
69 private boolean notRunnable;
70 private transient Thread workerThread;
71
72
73
74
75 @Override
76 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
77 workerThread = Thread.currentThread();
78 if (isNotRunnable()) {
79 if (LOG.isInfoEnabled()) {
80 LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
81 }
82 return;
83 }
84 int startStep = 0;
85 try {
86 startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
87 }
88 catch (NumberFormatException ex) {
89
90 }
91 int endStep = 0;
92 try {
93 endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
94 }
95 catch (NumberFormatException ex) {
96
97 }
98 Date jobRunDate = dateTimeService.getCurrentDate();
99 int currentStepNumber = 0;
100 try {
101 LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
102 for (Step step : getSteps()) {
103 currentStepNumber++;
104
105 if (workerThread.isInterrupted()) {
106 LOG.warn("Aborting Job execution due to manual interruption");
107 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
108 return;
109 }
110 if (startStep > 0 && currentStepNumber < startStep) {
111 if (LOG.isInfoEnabled()) {
112 LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
113 }
114 continue;
115 }
116 else if (endStep > 0 && currentStepNumber > endStep) {
117 if (LOG.isInfoEnabled()) {
118 LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
119 }
120 break;
121 }
122 step.setInterrupted(false);
123 try {
124 if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
125 break;
126 }
127 }
128 catch (InterruptedException ex) {
129 LOG.warn("Stopping after step interruption");
130 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
131 return;
132 }
133 if (step.isInterrupted()) {
134 LOG.warn("attempt to interrupt step failed, step continued to completion");
135 LOG.warn("cancelling remainder of job due to step interruption");
136 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
137 return;
138 }
139 }
140 }
141 catch (Exception e) {
142 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
143 throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
144 }
145 LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
146 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
147 }
148
149 public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
150 boolean continueJob = true;
151 if (GlobalVariables.getUserSession() == null) {
152 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
153 }
154 else {
155 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
156 }
157
158 if (!skipStep(parameterService, step, jobRunDate)) {
159
160 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
161 Class<?> stepClass = unProxiedStep.getClass();
162 GlobalVariables.clear();
163
164 String stepUserName = KFSConstants.SYSTEM_USER;
165 if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
166 stepUserName = parameterService.getParameterValueAsString(stepClass, STEP_USER_PARM_NM);
167 }
168 if (LOG.isInfoEnabled()) {
169 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
170 }
171 GlobalVariables.setUserSession(new UserSession(stepUserName));
172 if (LOG.isInfoEnabled()) {
173 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
174 }
175 StopWatch stopWatch = new StopWatch();
176 stopWatch.start(jobName);
177 try {
178 continueJob = step.execute(jobName, jobRunDate);
179 }
180 catch (InterruptedException e) {
181 LOG.error("Exception occured executing step", e);
182 throw e;
183 }
184 catch (RuntimeException e) {
185 LOG.error("Exception occured executing step", e);
186 throw e;
187 }
188 stopWatch.stop();
189 LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
190 if (!continueJob) {
191 LOG.info("Stopping job after successful step execution");
192 }
193 }
194 LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
195 return continueJob;
196 }
197
198
199
200
201
202
203
204 protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
205 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
206 Class<?> stepClass = unProxiedStep.getClass();
207
208
209
210
211 final boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
212 if (runIndExists) {
213 final boolean runInd = parameterService.getParameterValueAsBoolean(stepClass, STEP_RUN_PARM_NM);
214 if (!runInd) {
215 if (LOG.isInfoEnabled()) {
216 LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
217 }
218 return true;
219 }
220 }
221
222 final boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
223 if (runDateExists) {
224 final boolean runDateIsEmpty = StringUtils.isEmpty(parameterService.getParameterValueAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM));
225 if (runDateIsEmpty) {
226 return false;
227 }
228
229 final DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
230
231 final Collection<String> runDates = parameterService.getParameterValuesAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM);
232 boolean matchedRunDate = false;
233 final String[] cutOffTime = parameterService.parameterExists(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM) ?
234 StringUtils.split(parameterService.getParameterValueAsString(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':') :
235 new String[] { "00", "00", "00"};
236 for (String runDate: runDates) {
237 try {
238 if (withinCutoffWindowForDate(jobRunDate, dTService.convertToDate(runDate), dTService, cutOffTime)) {
239 matchedRunDate = true;
240 }
241 }
242 catch (ParseException pe) {
243 LOG.error("ParseException occured parsing " + runDate, pe);
244 }
245 }
246
247 if (!matchedRunDate) {
248 if (LOG.isInfoEnabled()) {
249 LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + ", " + STEP_RUN_ON_DATE_PARM_NM + " and " + RUN_DATE_CUTOFF_PARM_NM + " for "+ stepClass.getName());
250 }
251 return true;
252 }
253 }
254
255
256 return false;
257 }
258
259
260
261
262
263
264
265
266
267
268 protected static boolean withinCutoffWindowForDate(Date jobRunDate, Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffWindow) {
269 final Calendar jobRunCalendar = dateTimeService.getCalendar(jobRunDate);
270 final Calendar beginWindow = getCutoffWindowBeginning(runDateToCheck, dateTimeService);
271 final Calendar endWindow = getCutoffWindowEnding(runDateToCheck, dateTimeService, cutOffWindow);
272 return jobRunCalendar.after(beginWindow) && jobRunCalendar.before(endWindow);
273 }
274
275
276
277
278
279
280
281
282 protected static Calendar getCutoffWindowBeginning(Date runDateToCheck, DateTimeService dateTimeService) {
283 Calendar beginWindow = dateTimeService.getCalendar(runDateToCheck);
284 beginWindow.set(Calendar.HOUR_OF_DAY, 0);
285 beginWindow.set(Calendar.MINUTE, 0);
286 beginWindow.set(Calendar.SECOND, 0);
287 beginWindow.set(Calendar.MILLISECOND, 0);
288 return beginWindow;
289 }
290
291
292
293
294
295
296
297
298
299 protected static Calendar getCutoffWindowEnding(Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffTime) {
300 Calendar endWindow = dateTimeService.getCalendar(runDateToCheck);
301 endWindow.add(Calendar.DAY_OF_YEAR, 1);
302 endWindow.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
303 endWindow.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
304 endWindow.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
305 return endWindow;
306 }
307
308
309
310
311
312 public static boolean isPastCutoffWindow(Date date, Collection<String> runDates) {
313 DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
314 ParameterService parameterService = SpringContext.getBean(ParameterService.class);
315 Calendar jobRunDate = dTService.getCalendar(date);
316 if (parameterService.parameterExists(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM)) {
317 String[] cutOffTime = StringUtils.split(parameterService.getParameterValueAsString(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':');
318 Calendar runDate = null;
319 for (String runDateStr : runDates) {
320 try {
321 runDate = dTService.getCalendar(dTService.convertToDate(runDateStr));
322 runDate.add(Calendar.DAY_OF_YEAR, 1);
323 runDate.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
324 runDate.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
325 runDate.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
326 }
327 catch (ParseException e) {
328 LOG.error("ParseException occured parsing " + runDateStr, e);
329 }
330 if (jobRunDate.before(runDate)) {
331 return false;
332 }
333 }
334 }
335 return true;
336 }
337
338
339
340
341 @Override
342 public void interrupt() throws UnableToInterruptJobException {
343
344 if (currentStep != null) {
345 currentStep.interrupt();
346 }
347
348 workerThread.interrupt();
349 }
350
351 public void setParameterService(ParameterService parameterService) {
352 this.parameterService = parameterService;
353 }
354
355 public void setSteps(List<Step> steps) {
356 this.steps = steps;
357 }
358
359 public Appender getNdcAppender() {
360 return ndcAppender;
361 }
362
363 public void setNdcAppender(Appender ndcAppender) {
364 this.ndcAppender = ndcAppender;
365 }
366
367 public void setNotRunnable(boolean notRunnable) {
368 this.notRunnable = notRunnable;
369 }
370
371 protected boolean isNotRunnable() {
372 return notRunnable;
373 }
374
375 public ParameterService getParameterService() {
376 return parameterService;
377 }
378
379 public List<Step> getSteps() {
380 return steps;
381 }
382
383 public void setSchedulerService(SchedulerService schedulerService) {
384 this.schedulerService = schedulerService;
385 }
386
387 public void setDateTimeService(DateTimeService dateTimeService) {
388 this.dateTimeService = dateTimeService;
389 }
390
391 protected String jobDataMapToString(JobDataMap jobDataMap) {
392 StringBuilder buf = new StringBuilder();
393 buf.append("{");
394 Iterator keys = jobDataMap.keySet().iterator();
395 boolean hasNext = keys.hasNext();
396 while (hasNext) {
397 String key = (String) keys.next();
398 Object value = jobDataMap.get(key);
399 buf.append(key).append("=");
400 if (value == jobDataMap) {
401 buf.append("(this map)");
402 }
403 else {
404 buf.append(value);
405 }
406 hasNext = keys.hasNext();
407 if (hasNext) {
408 buf.append(", ");
409 }
410 }
411 buf.append("}");
412 return buf.toString();
413 }
414
415 protected String getMachineName() {
416 try {
417 return InetAddress.getLocalHost().getHostName();
418 }
419 catch (UnknownHostException e) {
420 return "Unknown";
421 }
422 }
423 }