View Javadoc
1   /*
2    * Copyright 2007 The Kuali Foundation
3    * 
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.opensource.org/licenses/ecl2.php
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.ole.sys.batch;
17  
18  import java.net.InetAddress;
19  import java.net.UnknownHostException;
20  import java.text.ParseException;
21  import java.util.Calendar;
22  import java.util.Collection;
23  import java.util.Date;
24  import java.util.Iterator;
25  import java.util.List;
26  
27  import org.apache.commons.lang.StringUtils;
28  import org.apache.log4j.Appender;
29  import org.apache.log4j.Logger;
30  import org.kuali.ole.sys.OLEConstants;
31  import org.kuali.ole.sys.batch.service.SchedulerService;
32  import org.kuali.ole.sys.context.ProxyUtils;
33  import org.kuali.ole.sys.context.SpringContext;
34  import org.kuali.ole.sys.service.impl.OleParameterConstants;
35  import org.kuali.rice.core.api.CoreConstants;
36  import org.kuali.rice.core.api.datetime.DateTimeService;
37  import org.kuali.rice.coreservice.framework.parameter.ParameterService;
38  import org.kuali.rice.kew.api.exception.WorkflowException;
39  import org.kuali.rice.krad.UserSession;
40  import org.kuali.rice.krad.util.GlobalVariables;
41  import org.kuali.rice.krad.util.KRADConstants;
42  import org.quartz.InterruptableJob;
43  import org.quartz.JobDataMap;
44  import org.quartz.JobExecutionContext;
45  import org.quartz.JobExecutionException;
46  import org.quartz.StatefulJob;
47  import org.quartz.UnableToInterruptJobException;
48  import org.springframework.util.StopWatch;
49  
50  public class Job implements StatefulJob, InterruptableJob {
51  
52      public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
53      public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
54      public static final String MASTER_JOB_NAME = "MASTER_JOB_NAME";
55      public static final String STEP_RUN_PARM_NM = "RUN_IND";
56      public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
57      public static final String STEP_USER_PARM_NM = "USER";
58      public static final String RUN_DATE_CUTOFF_PARM_NM = "RUN_DATE_CUTOFF_TIME";
59      private static final Logger LOG = Logger.getLogger(Job.class);
60      private SchedulerService schedulerService;
61      private ParameterService parameterService;
62      private DateTimeService dateTimeService;
63      private List<Step> steps;
64      private Step currentStep;
65      private Appender ndcAppender;
66      private boolean notRunnable;
67      private transient Thread workerThread;
68  
69      /**
70       * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
71       */
72      @Override
73      public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
74          workerThread = Thread.currentThread();
75          if (isNotRunnable()) {
76              if (LOG.isInfoEnabled()) {
77                  LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
78              }
79              return;
80          }
81          int startStep = 0;
82          try {
83              startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
84          }
85          catch (NumberFormatException ex) {
86              // not present, do nothing
87          }
88          int endStep = 0;
89          try {
90              endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
91          }
92          catch (NumberFormatException ex) {
93              // not present, do nothing
94          }
95          Date jobRunDate = dateTimeService.getCurrentDate();
96          int currentStepNumber = 0;
97          try {
98              LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
99              for (Step step : getSteps()) {
100                 currentStepNumber++;
101                 // prevent starting of the next step if the thread has an interrupted status
102                 if (workerThread.isInterrupted()) {
103                     LOG.warn("Aborting Job execution due to manual interruption");
104                     schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
105                     return;
106                 }
107                 if (startStep > 0 && currentStepNumber < startStep) {
108                     if (LOG.isInfoEnabled()) {
109                         LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
110                     }
111                     continue; // skip to next step
112                 }
113                 else if (endStep > 0 && currentStepNumber > endStep) {
114                     if (LOG.isInfoEnabled()) {
115                         LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
116                     }
117                     break;
118                 }
119                 step.setInterrupted(false);
120                 try {
121                     if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
122                         break;
123                     }
124                 }
125                 catch (InterruptedException ex) {
126                     LOG.warn("Stopping after step interruption");
127                     schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
128                     return;
129                 }
130                 if (step.isInterrupted()) {
131                     LOG.warn("attempt to interrupt step failed, step continued to completion");
132                     LOG.warn("cancelling remainder of job due to step interruption");
133                     schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
134                     return;
135                 }
136             }
137         }
138         catch (Exception e) {
139             schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
140             throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
141         }
142         LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
143         schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
144     }
145 
146     public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
147         boolean continueJob = true;
148         if (GlobalVariables.getUserSession() == null) {
149             LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
150         }
151         else {
152             LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
153         }
154         
155         if (!skipStep(parameterService, step, jobRunDate)) {
156             
157             Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
158             Class<?> stepClass = unProxiedStep.getClass();
159             GlobalVariables.clear();
160             
161             String stepUserName = OLEConstants.SYSTEM_USER;
162             if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
163                 stepUserName = parameterService.getParameterValueAsString(stepClass, STEP_USER_PARM_NM);
164             }
165             if (LOG.isInfoEnabled()) {
166                 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
167             }
168             GlobalVariables.setUserSession(new UserSession(stepUserName));
169             if (LOG.isInfoEnabled()) {
170                 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
171             }
172             StopWatch stopWatch = new StopWatch();
173             stopWatch.start(jobName);
174             try {
175                 continueJob = step.execute(jobName, jobRunDate);
176             }
177             catch (InterruptedException e) {
178                 LOG.error("Exception occured executing step", e);
179                 throw e;
180             }
181             catch (RuntimeException e) {
182                 LOG.error("Exception occured executing step", e);
183                 throw e;
184             }
185             stopWatch.stop();
186             LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
187             if (!continueJob) {
188                 LOG.info("Stopping job after successful step execution");
189             }
190         }
191         LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
192         return continueJob;
193     }
194 
195     
196     /**
197      * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters.
198      * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE. 
199      * If RUN_DATE exists, but contains an empty value the step will not be skipped.
200      */
201     protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
202         Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
203         Class<?> stepClass = unProxiedStep.getClass();
204         
205         //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step
206         //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step
207         
208         final boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
209         if (runIndExists) {
210             final boolean runInd = parameterService.getParameterValueAsBoolean(stepClass, STEP_RUN_PARM_NM);
211             if (!runInd) {
212             if (LOG.isInfoEnabled()) {
213                 LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
214             }            
215                 return true; // RUN_IND is false - let's skip
216         }
217             }
218 
219         final boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
220         if (runDateExists) {
221             final boolean runDateIsEmpty = StringUtils.isEmpty(parameterService.getParameterValueAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM));
222             if (runDateIsEmpty) {
223                 return false; // run date param is empty, so run the step
224             }
225         
226             final DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
227     
228             final Collection<String> runDates = parameterService.getParameterValuesAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM);
229             boolean matchedRunDate = false;
230             final String[] cutOffTime = parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM) ?
231                     StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':') :
232                     new String[] { "00", "00", "00"}; // no cutoff time param?  Then default to midnight of tomorrow
233             for (String runDate: runDates) {
234                 try {
235                     if (withinCutoffWindowForDate(jobRunDate, dTService.convertToDate(runDate), dTService, cutOffTime)) {
236                         matchedRunDate = true;
237                     }
238                 }
239                 catch (ParseException pe) {
240                     LOG.error("ParseException occured parsing " + runDate, pe);
241         }
242             }
243             // did we fail to match a run date?  then skip this step
244             if (!matchedRunDate) {
245             if (LOG.isInfoEnabled()) {
246                     LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + ", " + STEP_RUN_ON_DATE_PARM_NM + " and " + RUN_DATE_CUTOFF_PARM_NM + " for "+ stepClass.getName());
247             }
248             return true;
249         }
250         }
251 
252         //run step
253         return false;
254     }
255     
256     /**
257      * Checks if the current jobRunDate is within the cutoff window for the given run date from the RUN_DATE parameter.
258      * The window is defined as midnight of the date specified in the parameter to the RUN_DATE_CUTOFF_TIME of the next day.
259      * 
260      * @param jobRunDate the time the job is attempting to start
261      * @param runDateToCheck the current member of the appropriate RUN_DATE to check
262      * @param dateTimeService an instance of the DateTimeService
263      * @return true if jobRunDate is within the current runDateToCheck window, false otherwise
264      */
265     protected static boolean withinCutoffWindowForDate(Date jobRunDate, Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffWindow) {
266         final Calendar jobRunCalendar = dateTimeService.getCalendar(jobRunDate);
267         final Calendar beginWindow = getCutoffWindowBeginning(runDateToCheck, dateTimeService);
268         final Calendar endWindow = getCutoffWindowEnding(runDateToCheck, dateTimeService, cutOffWindow);
269         return jobRunCalendar.after(beginWindow) && jobRunCalendar.before(endWindow);
270     }
271     
272     /**
273      * Defines the beginning of the cut off window
274      * 
275      * @param runDateToCheck the run date which defines the cut off window
276      * @param dateTimeService an implementation of the DateTimeService
277      * @return the begin date Calendar of the cutoff window
278      */
279     protected static Calendar getCutoffWindowBeginning(Date runDateToCheck, DateTimeService dateTimeService) {
280         Calendar beginWindow = dateTimeService.getCalendar(runDateToCheck);
281         beginWindow.set(Calendar.HOUR_OF_DAY, 0);
282         beginWindow.set(Calendar.MINUTE, 0);
283         beginWindow.set(Calendar.SECOND, 0);
284         beginWindow.set(Calendar.MILLISECOND, 0);
285         return beginWindow;
286     }
287     
288     /**
289      * Defines the end of the cut off window
290      * 
291      * @param runDateToCheck the run date which defines the cut off window
292      * @param dateTimeService an implementation of the DateTimeService
293      * @param cutOffTime an Array in the form of [hour, minute, second] when the cutoff window ends
294      * @return the end date Calendar of the cutoff window
295      */
296     protected static Calendar getCutoffWindowEnding(Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffTime) {
297         Calendar endWindow = dateTimeService.getCalendar(runDateToCheck);
298         endWindow.add(Calendar.DAY_OF_YEAR, 1);
299         endWindow.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
300         endWindow.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
301         endWindow.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
302         return endWindow;
303     }
304 
305     /* This code is likely no longer reference, but was not removed, due to the fact that institutions may be calling */
306     /**
307      * @deprecated "Implementing institutions likely want to call Job#withinCutoffWindowForDate"
308      */
309     public static boolean isPastCutoffWindow(Date date, Collection<String> runDates) {
310         DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
311         ParameterService parameterService = SpringContext.getBean(ParameterService.class);
312         Calendar jobRunDate = dTService.getCalendar(date);
313         if (parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM)) {
314             String[] cutOffTime = StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':');
315             Calendar runDate = null;
316             for (String runDateStr : runDates) {
317                 try {
318                     runDate = dTService.getCalendar(dTService.convertToDate(runDateStr));
319                     runDate.add(Calendar.DAY_OF_YEAR, 1);
320                     runDate.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
321                     runDate.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
322                     runDate.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
323                 }
324                 catch (ParseException e) {
325                     LOG.error("ParseException occured parsing " + runDateStr, e);
326                 }
327                 if (jobRunDate.before(runDate)) {
328             return false;
329         }
330     }
331         }
332         return true;
333     }
334 
335     /**
336      * @throws UnableToInterruptJobException
337      */
338     @Override
339     public void interrupt() throws UnableToInterruptJobException {
340         // ask the step to interrupt
341         if (currentStep != null) {
342             currentStep.interrupt();
343         }
344         // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps
345         workerThread.interrupt();
346     }
347 
348     public void setParameterService(ParameterService parameterService) {
349         this.parameterService = parameterService;
350     }
351 
352     public void setSteps(List<Step> steps) {
353         this.steps = steps;
354     }
355 
356     public Appender getNdcAppender() {
357         return ndcAppender;
358     }
359 
360     public void setNdcAppender(Appender ndcAppender) {
361         this.ndcAppender = ndcAppender;
362     }
363 
364     public void setNotRunnable(boolean notRunnable) {
365         this.notRunnable = notRunnable;
366     }
367 
368     protected boolean isNotRunnable() {
369         return notRunnable;
370     }
371 
372     public ParameterService getParameterService() {
373         return parameterService;
374     }
375 
376     public List<Step> getSteps() {
377         return steps;
378     }
379 
380     public void setSchedulerService(SchedulerService schedulerService) {
381         this.schedulerService = schedulerService;
382     }
383 
384     public void setDateTimeService(DateTimeService dateTimeService) {
385         this.dateTimeService = dateTimeService;
386     }
387     
388     protected String jobDataMapToString(JobDataMap jobDataMap) {
389         StringBuilder buf = new StringBuilder();
390         buf.append("{");
391         Iterator keys = jobDataMap.keySet().iterator();
392         boolean hasNext = keys.hasNext();
393         while (hasNext) {
394             String key = (String) keys.next();
395             Object value = jobDataMap.get(key);
396             buf.append(key).append("=");
397             if (value == jobDataMap) {
398                 buf.append("(this map)");
399             }
400             else {
401                 buf.append(value);
402             }
403             hasNext = keys.hasNext();
404             if (hasNext) {
405                 buf.append(", ");
406             }
407         }
408         buf.append("}");
409         return buf.toString();
410     }
411     
412     protected String getMachineName() {
413         try {
414             return InetAddress.getLocalHost().getHostName();
415         }
416         catch (UnknownHostException e) {
417             return "Unknown";
418         }
419     }
420 }