001/* 002 * Copyright 2007 The Kuali Foundation 003 * 004 * Licensed under the Educational Community License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.opensource.org/licenses/ecl2.php 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.kuali.ole.sys.batch; 017 018import java.net.InetAddress; 019import java.net.UnknownHostException; 020import java.text.ParseException; 021import java.util.Calendar; 022import java.util.Collection; 023import java.util.Date; 024import java.util.Iterator; 025import java.util.List; 026 027import org.apache.commons.lang.StringUtils; 028import org.apache.log4j.Appender; 029import org.apache.log4j.Logger; 030import org.kuali.ole.OLEConstants; 031import org.kuali.ole.select.document.service.OleSelectDocumentService; 032import org.kuali.ole.sys.batch.service.SchedulerService; 033import org.kuali.ole.sys.context.ProxyUtils; 034import org.kuali.ole.sys.context.SpringContext; 035import org.kuali.ole.sys.service.impl.OleParameterConstants; 036import org.kuali.rice.core.api.datetime.DateTimeService; 037import org.kuali.rice.coreservice.framework.parameter.ParameterService; 038import org.kuali.rice.kew.api.exception.WorkflowException; 039import org.kuali.rice.krad.UserSession; 040import org.kuali.rice.krad.util.GlobalVariables; 041import org.quartz.InterruptableJob; 042import org.quartz.JobDataMap; 043import org.quartz.JobExecutionContext; 044import org.quartz.JobExecutionException; 045import org.quartz.StatefulJob; 046import org.quartz.UnableToInterruptJobException; 047import org.springframework.util.StopWatch; 048 049public class Job implements StatefulJob, InterruptableJob { 050 051 public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP"; 052 public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP"; 053 public static final String MASTER_JOB_NAME = "MASTER_JOB_NAME"; 054 public static final String STEP_RUN_PARM_NM = "RUN_IND"; 055 public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE"; 056 public static final String STEP_USER_PARM_NM = "USER"; 057 public static final String RUN_DATE_CUTOFF_PARM_NM = "RUN_DATE_CUTOFF_TIME"; 058 private static final Logger LOG = Logger.getLogger(Job.class); 059 private SchedulerService schedulerService; 060 private ParameterService parameterService; 061 private DateTimeService dateTimeService; 062 private List<Step> steps; 063 private Step currentStep; 064 private Appender ndcAppender; 065 private boolean notRunnable; 066 private transient Thread workerThread; 067 private static OleSelectDocumentService oleSelectDocumentService; 068 069 /** 070 * @see org.quartz.Job#execute(org.quartz.JobExecutionContext) 071 */ 072 @Override 073 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { 074 workerThread = Thread.currentThread(); 075 if (isNotRunnable()) { 076 if (LOG.isInfoEnabled()) { 077 LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName()); 078 } 079 return; 080 } 081 int startStep = 0; 082 try { 083 startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP)); 084 } 085 catch (NumberFormatException ex) { 086 // not present, do nothing 087 } 088 int endStep = 0; 089 try { 090 endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP)); 091 } 092 catch (NumberFormatException ex) { 093 // not present, do nothing 094 } 095 Date jobRunDate = dateTimeService.getCurrentDate(); 096 int currentStepNumber = 0; 097 try { 098 LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap())); 099 for (Step step : getSteps()) { 100 currentStepNumber++; 101 // prevent starting of the next step if the thread has an interrupted status 102 if (workerThread.isInterrupted()) { 103 LOG.warn("Aborting Job execution due to manual interruption"); 104 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE); 105 return; 106 } 107 if (startStep > 0 && currentStepNumber < startStep) { 108 if (LOG.isInfoEnabled()) { 109 LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep); 110 } 111 continue; // skip to next step 112 } 113 else if (endStep > 0 && currentStepNumber > endStep) { 114 if (LOG.isInfoEnabled()) { 115 LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep); 116 } 117 break; 118 } 119 step.setInterrupted(false); 120 try { 121 if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) { 122 break; 123 } 124 } 125 catch (InterruptedException ex) { 126 LOG.warn("Stopping after step interruption"); 127 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE); 128 return; 129 } 130 if (step.isInterrupted()) { 131 LOG.warn("attempt to interrupt step failed, step continued to completion"); 132 LOG.warn("cancelling remainder of job due to step interruption"); 133 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE); 134 return; 135 } 136 } 137 } 138 catch (Exception e) { 139 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE); 140 throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false); 141 } 142 LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName()); 143 schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE); 144 } 145 146 public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException { 147 boolean continueJob = true; 148 if (GlobalVariables.getUserSession() == null) { 149 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>")); 150 } 151 else { 152 LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName())); 153 } 154 155 if (!skipStep(parameterService, step, jobRunDate)) { 156 157 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step); 158 Class<?> stepClass = unProxiedStep.getClass(); 159 GlobalVariables.clear(); 160 161 String stepUserName = getOleSelectDocumentService().getSelectParameterValue(org.kuali.ole.sys.OLEConstants.SYSTEM_USER); 162 if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) { 163 stepUserName = parameterService.getParameterValueAsString(stepClass, STEP_USER_PARM_NM); 164 } 165 if (LOG.isInfoEnabled()) { 166 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName)); 167 } 168 GlobalVariables.setUserSession(new UserSession(stepUserName)); 169 if (LOG.isInfoEnabled()) { 170 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass)); 171 } 172 StopWatch stopWatch = new StopWatch(); 173 stopWatch.start(jobName); 174 try { 175 continueJob = step.execute(jobName, jobRunDate); 176 } 177 catch (InterruptedException e) { 178 LOG.error("Exception occured executing step", e); 179 throw e; 180 } 181 catch (RuntimeException e) { 182 LOG.error("Exception occured executing step", e); 183 throw e; 184 } 185 stopWatch.stop(); 186 LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString()); 187 if (!continueJob) { 188 LOG.info("Stopping job after successful step execution"); 189 } 190 } 191 LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName())); 192 return continueJob; 193 } 194 195 196 /** 197 * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters. 198 * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE. 199 * If RUN_DATE exists, but contains an empty value the step will not be skipped. 200 */ 201 protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) { 202 Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step); 203 Class<?> stepClass = unProxiedStep.getClass(); 204 205 //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step 206 //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step 207 208 final boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM); 209 if (runIndExists) { 210 final boolean runInd = parameterService.getParameterValueAsBoolean(stepClass, STEP_RUN_PARM_NM); 211 if (!runInd) { 212 if (LOG.isInfoEnabled()) { 213 LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName()); 214 } 215 return true; // RUN_IND is false - let's skip 216 } 217 } 218 219 final boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM); 220 if (runDateExists) { 221 final boolean runDateIsEmpty = StringUtils.isEmpty(parameterService.getParameterValueAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM)); 222 if (runDateIsEmpty) { 223 return false; // run date param is empty, so run the step 224 } 225 226 final DateTimeService dTService = SpringContext.getBean(DateTimeService.class); 227 228 final Collection<String> runDates = parameterService.getParameterValuesAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM); 229 boolean matchedRunDate = false; 230 final String[] cutOffTime = parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM) ? 231 StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':') : 232 new String[] { "00", "00", "00"}; // no cutoff time param? Then default to midnight of tomorrow 233 for (String runDate: runDates) { 234 try { 235 if (withinCutoffWindowForDate(jobRunDate, dTService.convertToDate(runDate), dTService, cutOffTime)) { 236 matchedRunDate = true; 237 } 238 } 239 catch (ParseException pe) { 240 LOG.error("ParseException occured parsing " + runDate, pe); 241 } 242 } 243 // did we fail to match a run date? then skip this step 244 if (!matchedRunDate) { 245 if (LOG.isInfoEnabled()) { 246 LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + ", " + STEP_RUN_ON_DATE_PARM_NM + " and " + RUN_DATE_CUTOFF_PARM_NM + " for "+ stepClass.getName()); 247 } 248 return true; 249 } 250 } 251 252 //run step 253 return false; 254 } 255 256 /** 257 * Checks if the current jobRunDate is within the cutoff window for the given run date from the RUN_DATE parameter. 258 * The window is defined as midnight of the date specified in the parameter to the RUN_DATE_CUTOFF_TIME of the next day. 259 * 260 * @param jobRunDate the time the job is attempting to start 261 * @param runDateToCheck the current member of the appropriate RUN_DATE to check 262 * @param dateTimeService an instance of the DateTimeService 263 * @return true if jobRunDate is within the current runDateToCheck window, false otherwise 264 */ 265 protected static boolean withinCutoffWindowForDate(Date jobRunDate, Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffWindow) { 266 final Calendar jobRunCalendar = dateTimeService.getCalendar(jobRunDate); 267 final Calendar beginWindow = getCutoffWindowBeginning(runDateToCheck, dateTimeService); 268 final Calendar endWindow = getCutoffWindowEnding(runDateToCheck, dateTimeService, cutOffWindow); 269 return jobRunCalendar.after(beginWindow) && jobRunCalendar.before(endWindow); 270 } 271 272 /** 273 * Defines the beginning of the cut off window 274 * 275 * @param runDateToCheck the run date which defines the cut off window 276 * @param dateTimeService an implementation of the DateTimeService 277 * @return the begin date Calendar of the cutoff window 278 */ 279 protected static Calendar getCutoffWindowBeginning(Date runDateToCheck, DateTimeService dateTimeService) { 280 Calendar beginWindow = dateTimeService.getCalendar(runDateToCheck); 281 beginWindow.set(Calendar.HOUR_OF_DAY, 0); 282 beginWindow.set(Calendar.MINUTE, 0); 283 beginWindow.set(Calendar.SECOND, 0); 284 beginWindow.set(Calendar.MILLISECOND, 0); 285 return beginWindow; 286 } 287 288 /** 289 * Defines the end of the cut off window 290 * 291 * @param runDateToCheck the run date which defines the cut off window 292 * @param dateTimeService an implementation of the DateTimeService 293 * @param cutOffTime an Array in the form of [hour, minute, second] when the cutoff window ends 294 * @return the end date Calendar of the cutoff window 295 */ 296 protected static Calendar getCutoffWindowEnding(Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffTime) { 297 Calendar endWindow = dateTimeService.getCalendar(runDateToCheck); 298 endWindow.add(Calendar.DAY_OF_YEAR, 1); 299 endWindow.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0])); 300 endWindow.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1])); 301 endWindow.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2])); 302 return endWindow; 303 } 304 305 /* This code is likely no longer reference, but was not removed, due to the fact that institutions may be calling */ 306 /** 307 * @deprecated "Implementing institutions likely want to call Job#withinCutoffWindowForDate" 308 */ 309 public static boolean isPastCutoffWindow(Date date, Collection<String> runDates) { 310 DateTimeService dTService = SpringContext.getBean(DateTimeService.class); 311 ParameterService parameterService = SpringContext.getBean(ParameterService.class); 312 Calendar jobRunDate = dTService.getCalendar(date); 313 if (parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM)) { 314 String[] cutOffTime = StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':'); 315 Calendar runDate = null; 316 for (String runDateStr : runDates) { 317 try { 318 runDate = dTService.getCalendar(dTService.convertToDate(runDateStr)); 319 runDate.add(Calendar.DAY_OF_YEAR, 1); 320 runDate.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0])); 321 runDate.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1])); 322 runDate.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2])); 323 } 324 catch (ParseException e) { 325 LOG.error("ParseException occured parsing " + runDateStr, e); 326 } 327 if (jobRunDate.before(runDate)) { 328 return false; 329 } 330 } 331 } 332 return true; 333 } 334 335 /** 336 * @throws UnableToInterruptJobException 337 */ 338 @Override 339 public void interrupt() throws UnableToInterruptJobException { 340 // ask the step to interrupt 341 if (currentStep != null) { 342 currentStep.interrupt(); 343 } 344 // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps 345 workerThread.interrupt(); 346 } 347 348 public void setParameterService(ParameterService parameterService) { 349 this.parameterService = parameterService; 350 } 351 352 public void setSteps(List<Step> steps) { 353 this.steps = steps; 354 } 355 356 public Appender getNdcAppender() { 357 return ndcAppender; 358 } 359 360 public void setNdcAppender(Appender ndcAppender) { 361 this.ndcAppender = ndcAppender; 362 } 363 364 public void setNotRunnable(boolean notRunnable) { 365 this.notRunnable = notRunnable; 366 } 367 368 protected boolean isNotRunnable() { 369 return notRunnable; 370 } 371 372 public ParameterService getParameterService() { 373 return parameterService; 374 } 375 376 public List<Step> getSteps() { 377 return steps; 378 } 379 380 public void setSchedulerService(SchedulerService schedulerService) { 381 this.schedulerService = schedulerService; 382 } 383 384 public void setDateTimeService(DateTimeService dateTimeService) { 385 this.dateTimeService = dateTimeService; 386 } 387 388 protected String jobDataMapToString(JobDataMap jobDataMap) { 389 StringBuilder buf = new StringBuilder(); 390 buf.append("{"); 391 Iterator keys = jobDataMap.keySet().iterator(); 392 boolean hasNext = keys.hasNext(); 393 while (hasNext) { 394 String key = (String) keys.next(); 395 Object value = jobDataMap.get(key); 396 buf.append(key).append("="); 397 if (value == jobDataMap) { 398 buf.append("(this map)"); 399 } 400 else { 401 buf.append(value); 402 } 403 hasNext = keys.hasNext(); 404 if (hasNext) { 405 buf.append(", "); 406 } 407 } 408 buf.append("}"); 409 return buf.toString(); 410 } 411 412 protected String getMachineName() { 413 try { 414 return InetAddress.getLocalHost().getHostName(); 415 } 416 catch (UnknownHostException e) { 417 return "Unknown"; 418 } 419 } 420 421 public static OleSelectDocumentService getOleSelectDocumentService() { 422 if(oleSelectDocumentService == null){ 423 oleSelectDocumentService = SpringContext.getBean(OleSelectDocumentService.class); 424 } 425 return oleSelectDocumentService; 426 } 427 428 public void setOleSelectDocumentService(OleSelectDocumentService oleSelectDocumentService) { 429 this.oleSelectDocumentService = oleSelectDocumentService; 430 } 431}