1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole.sys.batch;
17
18 import java.net.InetAddress;
19 import java.net.UnknownHostException;
20 import java.text.ParseException;
21 import java.util.Calendar;
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.Iterator;
25 import java.util.List;
26
27 import org.apache.commons.lang.StringUtils;
28 import org.apache.log4j.Appender;
29 import org.apache.log4j.Logger;
30 import org.kuali.ole.sys.OLEConstants;
31 import org.kuali.ole.sys.batch.service.SchedulerService;
32 import org.kuali.ole.sys.context.ProxyUtils;
33 import org.kuali.ole.sys.context.SpringContext;
34 import org.kuali.ole.sys.service.impl.OleParameterConstants;
35 import org.kuali.rice.core.api.CoreConstants;
36 import org.kuali.rice.core.api.datetime.DateTimeService;
37 import org.kuali.rice.coreservice.framework.parameter.ParameterService;
38 import org.kuali.rice.kew.api.exception.WorkflowException;
39 import org.kuali.rice.krad.UserSession;
40 import org.kuali.rice.krad.util.GlobalVariables;
41 import org.kuali.rice.krad.util.KRADConstants;
42 import org.quartz.InterruptableJob;
43 import org.quartz.JobDataMap;
44 import org.quartz.JobExecutionContext;
45 import org.quartz.JobExecutionException;
46 import org.quartz.StatefulJob;
47 import org.quartz.UnableToInterruptJobException;
48 import org.springframework.util.StopWatch;
49
50 public class Job implements StatefulJob, InterruptableJob {
51
52 public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
53 public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
54 public static final String MASTER_JOB_NAME = "MASTER_JOB_NAME";
55 public static final String STEP_RUN_PARM_NM = "RUN_IND";
56 public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
57 public static final String STEP_USER_PARM_NM = "USER";
58 public static final String RUN_DATE_CUTOFF_PARM_NM = "RUN_DATE_CUTOFF_TIME";
59 private static final Logger LOG = Logger.getLogger(Job.class);
60 private SchedulerService schedulerService;
61 private ParameterService parameterService;
62 private DateTimeService dateTimeService;
63 private List<Step> steps;
64 private Step currentStep;
65 private Appender ndcAppender;
66 private boolean notRunnable;
67 private transient Thread workerThread;
68
69
70
71
72 @Override
73 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
74 workerThread = Thread.currentThread();
75 if (isNotRunnable()) {
76 if (LOG.isInfoEnabled()) {
77 LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
78 }
79 return;
80 }
81 int startStep = 0;
82 try {
83 startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
84 }
85 catch (NumberFormatException ex) {
86
87 }
88 int endStep = 0;
89 try {
90 endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
91 }
92 catch (NumberFormatException ex) {
93
94 }
95 Date jobRunDate = dateTimeService.getCurrentDate();
96 int currentStepNumber = 0;
97 try {
98 LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
99 for (Step step : getSteps()) {
100 currentStepNumber++;
101
102 if (workerThread.isInterrupted()) {
103 LOG.warn("Aborting Job execution due to manual interruption");
104 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
105 return;
106 }
107 if (startStep > 0 && currentStepNumber < startStep) {
108 if (LOG.isInfoEnabled()) {
109 LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
110 }
111 continue;
112 }
113 else if (endStep > 0 && currentStepNumber > endStep) {
114 if (LOG.isInfoEnabled()) {
115 LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
116 }
117 break;
118 }
119 step.setInterrupted(false);
120 try {
121 if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
122 break;
123 }
124 }
125 catch (InterruptedException ex) {
126 LOG.warn("Stopping after step interruption");
127 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
128 return;
129 }
130 if (step.isInterrupted()) {
131 LOG.warn("attempt to interrupt step failed, step continued to completion");
132 LOG.warn("cancelling remainder of job due to step interruption");
133 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
134 return;
135 }
136 }
137 }
138 catch (Exception e) {
139 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
140 throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
141 }
142 LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
143 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
144 }
145
146 public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
147 boolean continueJob = true;
148 if (GlobalVariables.getUserSession() == null) {
149 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
150 }
151 else {
152 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
153 }
154
155 if (!skipStep(parameterService, step, jobRunDate)) {
156
157 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
158 Class<?> stepClass = unProxiedStep.getClass();
159 GlobalVariables.clear();
160
161 String stepUserName = OLEConstants.SYSTEM_USER;
162 if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
163 stepUserName = parameterService.getParameterValueAsString(stepClass, STEP_USER_PARM_NM);
164 }
165 if (LOG.isInfoEnabled()) {
166 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
167 }
168 GlobalVariables.setUserSession(new UserSession(stepUserName));
169 if (LOG.isInfoEnabled()) {
170 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
171 }
172 StopWatch stopWatch = new StopWatch();
173 stopWatch.start(jobName);
174 try {
175 continueJob = step.execute(jobName, jobRunDate);
176 }
177 catch (InterruptedException e) {
178 LOG.error("Exception occured executing step", e);
179 throw e;
180 }
181 catch (RuntimeException e) {
182 LOG.error("Exception occured executing step", e);
183 throw e;
184 }
185 stopWatch.stop();
186 LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
187 if (!continueJob) {
188 LOG.info("Stopping job after successful step execution");
189 }
190 }
191 LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
192 return continueJob;
193 }
194
195
196
197
198
199
200
201 protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
202 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
203 Class<?> stepClass = unProxiedStep.getClass();
204
205
206
207
208 final boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
209 if (runIndExists) {
210 final boolean runInd = parameterService.getParameterValueAsBoolean(stepClass, STEP_RUN_PARM_NM);
211 if (!runInd) {
212 if (LOG.isInfoEnabled()) {
213 LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
214 }
215 return true;
216 }
217 }
218
219 final boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
220 if (runDateExists) {
221 final boolean runDateIsEmpty = StringUtils.isEmpty(parameterService.getParameterValueAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM));
222 if (runDateIsEmpty) {
223 return false;
224 }
225
226 final DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
227
228 final Collection<String> runDates = parameterService.getParameterValuesAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM);
229 boolean matchedRunDate = false;
230 final String[] cutOffTime = parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM) ?
231 StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':') :
232 new String[] { "00", "00", "00"};
233 for (String runDate: runDates) {
234 try {
235 if (withinCutoffWindowForDate(jobRunDate, dTService.convertToDate(runDate), dTService, cutOffTime)) {
236 matchedRunDate = true;
237 }
238 }
239 catch (ParseException pe) {
240 LOG.error("ParseException occured parsing " + runDate, pe);
241 }
242 }
243
244 if (!matchedRunDate) {
245 if (LOG.isInfoEnabled()) {
246 LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + ", " + STEP_RUN_ON_DATE_PARM_NM + " and " + RUN_DATE_CUTOFF_PARM_NM + " for "+ stepClass.getName());
247 }
248 return true;
249 }
250 }
251
252
253 return false;
254 }
255
256
257
258
259
260
261
262
263
264
265 protected static boolean withinCutoffWindowForDate(Date jobRunDate, Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffWindow) {
266 final Calendar jobRunCalendar = dateTimeService.getCalendar(jobRunDate);
267 final Calendar beginWindow = getCutoffWindowBeginning(runDateToCheck, dateTimeService);
268 final Calendar endWindow = getCutoffWindowEnding(runDateToCheck, dateTimeService, cutOffWindow);
269 return jobRunCalendar.after(beginWindow) && jobRunCalendar.before(endWindow);
270 }
271
272
273
274
275
276
277
278
279 protected static Calendar getCutoffWindowBeginning(Date runDateToCheck, DateTimeService dateTimeService) {
280 Calendar beginWindow = dateTimeService.getCalendar(runDateToCheck);
281 beginWindow.set(Calendar.HOUR_OF_DAY, 0);
282 beginWindow.set(Calendar.MINUTE, 0);
283 beginWindow.set(Calendar.SECOND, 0);
284 beginWindow.set(Calendar.MILLISECOND, 0);
285 return beginWindow;
286 }
287
288
289
290
291
292
293
294
295
296 protected static Calendar getCutoffWindowEnding(Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffTime) {
297 Calendar endWindow = dateTimeService.getCalendar(runDateToCheck);
298 endWindow.add(Calendar.DAY_OF_YEAR, 1);
299 endWindow.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
300 endWindow.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
301 endWindow.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
302 return endWindow;
303 }
304
305
306
307
308
309 public static boolean isPastCutoffWindow(Date date, Collection<String> runDates) {
310 DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
311 ParameterService parameterService = SpringContext.getBean(ParameterService.class);
312 Calendar jobRunDate = dTService.getCalendar(date);
313 if (parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM)) {
314 String[] cutOffTime = StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':');
315 Calendar runDate = null;
316 for (String runDateStr : runDates) {
317 try {
318 runDate = dTService.getCalendar(dTService.convertToDate(runDateStr));
319 runDate.add(Calendar.DAY_OF_YEAR, 1);
320 runDate.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
321 runDate.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
322 runDate.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
323 }
324 catch (ParseException e) {
325 LOG.error("ParseException occured parsing " + runDateStr, e);
326 }
327 if (jobRunDate.before(runDate)) {
328 return false;
329 }
330 }
331 }
332 return true;
333 }
334
335
336
337
338 @Override
339 public void interrupt() throws UnableToInterruptJobException {
340
341 if (currentStep != null) {
342 currentStep.interrupt();
343 }
344
345 workerThread.interrupt();
346 }
347
348 public void setParameterService(ParameterService parameterService) {
349 this.parameterService = parameterService;
350 }
351
352 public void setSteps(List<Step> steps) {
353 this.steps = steps;
354 }
355
356 public Appender getNdcAppender() {
357 return ndcAppender;
358 }
359
360 public void setNdcAppender(Appender ndcAppender) {
361 this.ndcAppender = ndcAppender;
362 }
363
364 public void setNotRunnable(boolean notRunnable) {
365 this.notRunnable = notRunnable;
366 }
367
368 protected boolean isNotRunnable() {
369 return notRunnable;
370 }
371
372 public ParameterService getParameterService() {
373 return parameterService;
374 }
375
376 public List<Step> getSteps() {
377 return steps;
378 }
379
380 public void setSchedulerService(SchedulerService schedulerService) {
381 this.schedulerService = schedulerService;
382 }
383
384 public void setDateTimeService(DateTimeService dateTimeService) {
385 this.dateTimeService = dateTimeService;
386 }
387
388 protected String jobDataMapToString(JobDataMap jobDataMap) {
389 StringBuilder buf = new StringBuilder();
390 buf.append("{");
391 Iterator keys = jobDataMap.keySet().iterator();
392 boolean hasNext = keys.hasNext();
393 while (hasNext) {
394 String key = (String) keys.next();
395 Object value = jobDataMap.get(key);
396 buf.append(key).append("=");
397 if (value == jobDataMap) {
398 buf.append("(this map)");
399 }
400 else {
401 buf.append(value);
402 }
403 hasNext = keys.hasNext();
404 if (hasNext) {
405 buf.append(", ");
406 }
407 }
408 buf.append("}");
409 return buf.toString();
410 }
411
412 protected String getMachineName() {
413 try {
414 return InetAddress.getLocalHost().getHostName();
415 }
416 catch (UnknownHostException e) {
417 return "Unknown";
418 }
419 }
420 }