View Javadoc
1   /*
2    * The Kuali Financial System, a comprehensive financial management system for higher education.
3    * 
4    * Copyright 2005-2014 The Kuali Foundation
5    * 
6    * This program is free software: you can redistribute it and/or modify
7    * it under the terms of the GNU Affero General Public License as
8    * published by the Free Software Foundation, either version 3 of the
9    * License, or (at your option) any later version.
10   * 
11   * This program is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   * GNU Affero General Public License for more details.
15   * 
16   * You should have received a copy of the GNU Affero General Public License
17   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18   */
19  package org.kuali.kfs.sys.batch;
20  
21  import java.net.InetAddress;
22  import java.net.UnknownHostException;
23  import java.text.ParseException;
24  import java.util.Calendar;
25  import java.util.Collection;
26  import java.util.Date;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.commons.lang.StringUtils;
31  import org.apache.log4j.Appender;
32  import org.apache.log4j.Logger;
33  import org.kuali.kfs.sys.KFSConstants;
34  import org.kuali.kfs.sys.batch.service.SchedulerService;
35  import org.kuali.kfs.sys.context.ProxyUtils;
36  import org.kuali.kfs.sys.context.SpringContext;
37  import org.kuali.kfs.sys.service.impl.KfsParameterConstants;
38  import org.kuali.rice.core.api.CoreConstants;
39  import org.kuali.rice.core.api.datetime.DateTimeService;
40  import org.kuali.rice.coreservice.framework.parameter.ParameterService;
41  import org.kuali.rice.kew.api.exception.WorkflowException;
42  import org.kuali.rice.krad.UserSession;
43  import org.kuali.rice.krad.util.GlobalVariables;
44  import org.kuali.rice.krad.util.KRADConstants;
45  import org.quartz.InterruptableJob;
46  import org.quartz.JobDataMap;
47  import org.quartz.JobExecutionContext;
48  import org.quartz.JobExecutionException;
49  import org.quartz.StatefulJob;
50  import org.quartz.UnableToInterruptJobException;
51  import org.springframework.util.StopWatch;
52  
53  public class Job implements StatefulJob, InterruptableJob {
54  
55      public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
56      public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
57      public static final String MASTER_JOB_NAME = "MASTER_JOB_NAME";
58      public static final String STEP_RUN_PARM_NM = "RUN_IND";
59      public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
60      public static final String STEP_USER_PARM_NM = "USER";
61      public static final String RUN_DATE_CUTOFF_PARM_NM = "RUN_DATE_CUTOFF_TIME";
62      private static final Logger LOG = Logger.getLogger(Job.class);
63      private SchedulerService schedulerService;
64      private ParameterService parameterService;
65      private DateTimeService dateTimeService;
66      private List<Step> steps;
67      private Step currentStep;
68      private Appender ndcAppender;
69      private boolean notRunnable;
70      private transient Thread workerThread;
71  
72      /**
73       * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
74       */
75      @Override
76      public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
77          workerThread = Thread.currentThread();
78          if (isNotRunnable()) {
79              if (LOG.isInfoEnabled()) {
80                  LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
81              }
82              return;
83          }
84          int startStep = 0;
85          try {
86              startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
87          }
88          catch (NumberFormatException ex) {
89              // not present, do nothing
90          }
91          int endStep = 0;
92          try {
93              endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
94          }
95          catch (NumberFormatException ex) {
96              // not present, do nothing
97          }
98          Date jobRunDate = dateTimeService.getCurrentDate();
99          int currentStepNumber = 0;
100         try {
101             LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
102             for (Step step : getSteps()) {
103                 currentStepNumber++;
104                 // prevent starting of the next step if the thread has an interrupted status
105                 if (workerThread.isInterrupted()) {
106                     LOG.warn("Aborting Job execution due to manual interruption");
107                     schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
108                     return;
109                 }
110                 if (startStep > 0 && currentStepNumber < startStep) {
111                     if (LOG.isInfoEnabled()) {
112                         LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
113                     }
114                     continue; // skip to next step
115                 }
116                 else if (endStep > 0 && currentStepNumber > endStep) {
117                     if (LOG.isInfoEnabled()) {
118                         LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
119                     }
120                     break;
121                 }
122                 step.setInterrupted(false);
123                 try {
124                     if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
125                         break;
126                     }
127                 }
128                 catch (InterruptedException ex) {
129                     LOG.warn("Stopping after step interruption");
130                     schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
131                     return;
132                 }
133                 if (step.isInterrupted()) {
134                     LOG.warn("attempt to interrupt step failed, step continued to completion");
135                     LOG.warn("cancelling remainder of job due to step interruption");
136                     schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
137                     return;
138                 }
139             }
140         }
141         catch (Exception e) {
142             schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
143             throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
144         }
145         LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
146         schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
147     }
148 
149     public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
150         boolean continueJob = true;
151         if (GlobalVariables.getUserSession() == null) {
152             LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
153         }
154         else {
155             LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
156         }
157 
158         if (!skipStep(parameterService, step, jobRunDate)) {
159 
160             Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
161             Class<?> stepClass = unProxiedStep.getClass();
162             GlobalVariables.clear();
163 
164             String stepUserName = KFSConstants.SYSTEM_USER;
165             if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
166                 stepUserName = parameterService.getParameterValueAsString(stepClass, STEP_USER_PARM_NM);
167             }
168             if (LOG.isInfoEnabled()) {
169                 LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
170             }
171             GlobalVariables.setUserSession(new UserSession(stepUserName));
172             if (LOG.isInfoEnabled()) {
173                 LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
174             }
175             StopWatch stopWatch = new StopWatch();
176             stopWatch.start(jobName);
177             try {
178                 continueJob = step.execute(jobName, jobRunDate);
179             }
180             catch (InterruptedException e) {
181                 LOG.error("Exception occured executing step", e);
182                 throw e;
183             }
184             catch (RuntimeException e) {
185                 LOG.error("Exception occured executing step", e);
186                 throw e;
187             }
188             stopWatch.stop();
189             LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
190             if (!continueJob) {
191                 LOG.info("Stopping job after successful step execution");
192             }
193         }
194         LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
195         return continueJob;
196     }
197 
198 
199     /**
200      * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters.
201      * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE.
202      * If RUN_DATE exists, but contains an empty value the step will not be skipped.
203      */
204     protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
205         Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
206         Class<?> stepClass = unProxiedStep.getClass();
207 
208         //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step
209         //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step
210 
211         final boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
212         if (runIndExists) {
213             final boolean runInd = parameterService.getParameterValueAsBoolean(stepClass, STEP_RUN_PARM_NM);
214             if (!runInd) {
215                 if (LOG.isInfoEnabled()) {
216                     LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
217                 }
218                 return true; // RUN_IND is false - let's skip
219             }
220         }
221 
222         final boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
223         if (runDateExists) {
224             final boolean runDateIsEmpty = StringUtils.isEmpty(parameterService.getParameterValueAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM));
225             if (runDateIsEmpty) {
226                 return false; // run date param is empty, so run the step
227             }
228         
229             final DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
230     
231             final Collection<String> runDates = parameterService.getParameterValuesAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM);
232             boolean matchedRunDate = false;
233             final String[] cutOffTime = parameterService.parameterExists(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM) ?
234                     StringUtils.split(parameterService.getParameterValueAsString(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':') :
235                     new String[] { "00", "00", "00"}; // no cutoff time param?  Then default to midnight of tomorrow
236             for (String runDate: runDates) {
237                 try {
238                     if (withinCutoffWindowForDate(jobRunDate, dTService.convertToDate(runDate), dTService, cutOffTime)) {
239                         matchedRunDate = true;
240                     }
241                 }
242                 catch (ParseException pe) {
243                     LOG.error("ParseException occured parsing " + runDate, pe);
244                 }
245             }
246             // did we fail to match a run date?  then skip this step
247             if (!matchedRunDate) {
248                 if (LOG.isInfoEnabled()) {
249                     LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + ", " + STEP_RUN_ON_DATE_PARM_NM + " and " + RUN_DATE_CUTOFF_PARM_NM + " for "+ stepClass.getName());
250                 }
251                 return true;
252             }
253         }
254 
255         //run step
256         return false;
257     }
258     
259     /**
260      * Checks if the current jobRunDate is within the cutoff window for the given run date from the RUN_DATE parameter.
261      * The window is defined as midnight of the date specified in the parameter to the RUN_DATE_CUTOFF_TIME of the next day.
262      * 
263      * @param jobRunDate the time the job is attempting to start
264      * @param runDateToCheck the current member of the appropriate RUN_DATE to check
265      * @param dateTimeService an instance of the DateTimeService
266      * @return true if jobRunDate is within the current runDateToCheck window, false otherwise
267      */
268     protected static boolean withinCutoffWindowForDate(Date jobRunDate, Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffWindow) {
269         final Calendar jobRunCalendar = dateTimeService.getCalendar(jobRunDate);
270         final Calendar beginWindow = getCutoffWindowBeginning(runDateToCheck, dateTimeService);
271         final Calendar endWindow = getCutoffWindowEnding(runDateToCheck, dateTimeService, cutOffWindow);
272         return jobRunCalendar.after(beginWindow) && jobRunCalendar.before(endWindow);
273     }
274     
275     /**
276      * Defines the beginning of the cut off window
277      * 
278      * @param runDateToCheck the run date which defines the cut off window
279      * @param dateTimeService an implementation of the DateTimeService
280      * @return the begin date Calendar of the cutoff window
281      */
282     protected static Calendar getCutoffWindowBeginning(Date runDateToCheck, DateTimeService dateTimeService) {
283         Calendar beginWindow = dateTimeService.getCalendar(runDateToCheck);
284         beginWindow.set(Calendar.HOUR_OF_DAY, 0);
285         beginWindow.set(Calendar.MINUTE, 0);
286         beginWindow.set(Calendar.SECOND, 0);
287         beginWindow.set(Calendar.MILLISECOND, 0);
288         return beginWindow;
289     }
290     
291     /**
292      * Defines the end of the cut off window
293      * 
294      * @param runDateToCheck the run date which defines the cut off window
295      * @param dateTimeService an implementation of the DateTimeService
296      * @param cutOffTime an Array in the form of [hour, minute, second] when the cutoff window ends
297      * @return the end date Calendar of the cutoff window
298      */
299     protected static Calendar getCutoffWindowEnding(Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffTime) {
300         Calendar endWindow = dateTimeService.getCalendar(runDateToCheck);
301         endWindow.add(Calendar.DAY_OF_YEAR, 1);
302         endWindow.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
303         endWindow.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
304         endWindow.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
305         return endWindow;
306     }
307 
308     /* This code is likely no longer reference, but was not removed, due to the fact that institutions may be calling */
309     /**
310      * @deprecated "Implementing institutions likely want to call Job#withinCutoffWindowForDate"
311      */
312     public static boolean isPastCutoffWindow(Date date, Collection<String> runDates) {
313         DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
314         ParameterService parameterService = SpringContext.getBean(ParameterService.class);
315         Calendar jobRunDate = dTService.getCalendar(date);
316         if (parameterService.parameterExists(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM)) {
317             String[] cutOffTime = StringUtils.split(parameterService.getParameterValueAsString(KfsParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':');
318             Calendar runDate = null;
319             for (String runDateStr : runDates) {
320                 try {
321                     runDate = dTService.getCalendar(dTService.convertToDate(runDateStr));
322                     runDate.add(Calendar.DAY_OF_YEAR, 1);
323                     runDate.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
324                     runDate.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
325                     runDate.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
326                 }
327                 catch (ParseException e) {
328                     LOG.error("ParseException occured parsing " + runDateStr, e);
329                 }
330                 if (jobRunDate.before(runDate)) {
331                     return false;
332                 }
333             }
334         }
335         return true;
336     }
337 
338     /**
339      * @throws UnableToInterruptJobException
340      */
341     @Override
342     public void interrupt() throws UnableToInterruptJobException {
343         // ask the step to interrupt
344         if (currentStep != null) {
345             currentStep.interrupt();
346         }
347         // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps
348         workerThread.interrupt();
349     }
350 
351     public void setParameterService(ParameterService parameterService) {
352         this.parameterService = parameterService;
353     }
354 
355     public void setSteps(List<Step> steps) {
356         this.steps = steps;
357     }
358 
359     public Appender getNdcAppender() {
360         return ndcAppender;
361     }
362 
363     public void setNdcAppender(Appender ndcAppender) {
364         this.ndcAppender = ndcAppender;
365     }
366 
367     public void setNotRunnable(boolean notRunnable) {
368         this.notRunnable = notRunnable;
369     }
370 
371     protected boolean isNotRunnable() {
372         return notRunnable;
373     }
374 
375     public ParameterService getParameterService() {
376         return parameterService;
377     }
378 
379     public List<Step> getSteps() {
380         return steps;
381     }
382 
383     public void setSchedulerService(SchedulerService schedulerService) {
384         this.schedulerService = schedulerService;
385     }
386 
387     public void setDateTimeService(DateTimeService dateTimeService) {
388         this.dateTimeService = dateTimeService;
389     }
390 
391     protected String jobDataMapToString(JobDataMap jobDataMap) {
392         StringBuilder buf = new StringBuilder();
393         buf.append("{");
394         Iterator keys = jobDataMap.keySet().iterator();
395         boolean hasNext = keys.hasNext();
396         while (hasNext) {
397             String key = (String) keys.next();
398             Object value = jobDataMap.get(key);
399             buf.append(key).append("=");
400             if (value == jobDataMap) {
401                 buf.append("(this map)");
402             }
403             else {
404                 buf.append(value);
405             }
406             hasNext = keys.hasNext();
407             if (hasNext) {
408                 buf.append(", ");
409             }
410         }
411         buf.append("}");
412         return buf.toString();
413     }
414 
415     protected String getMachineName() {
416         try {
417             return InetAddress.getLocalHost().getHostName();
418         }
419         catch (UnknownHostException e) {
420             return "Unknown";
421         }
422     }
423 }