001/*
002 * Copyright 2007 The Kuali Foundation
003 * 
004 * Licensed under the Educational Community License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * 
008 * http://www.opensource.org/licenses/ecl2.php
009 * 
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.kuali.ole.sys.batch;
017
018import java.net.InetAddress;
019import java.net.UnknownHostException;
020import java.text.ParseException;
021import java.util.Calendar;
022import java.util.Collection;
023import java.util.Date;
024import java.util.Iterator;
025import java.util.List;
026
027import org.apache.commons.lang.StringUtils;
028import org.apache.log4j.Appender;
029import org.apache.log4j.Logger;
030import org.kuali.ole.OLEConstants;
031import org.kuali.ole.select.document.service.OleSelectDocumentService;
032import org.kuali.ole.sys.batch.service.SchedulerService;
033import org.kuali.ole.sys.context.ProxyUtils;
034import org.kuali.ole.sys.context.SpringContext;
035import org.kuali.ole.sys.service.impl.OleParameterConstants;
036import org.kuali.rice.core.api.datetime.DateTimeService;
037import org.kuali.rice.coreservice.framework.parameter.ParameterService;
038import org.kuali.rice.kew.api.exception.WorkflowException;
039import org.kuali.rice.krad.UserSession;
040import org.kuali.rice.krad.util.GlobalVariables;
041import org.quartz.InterruptableJob;
042import org.quartz.JobDataMap;
043import org.quartz.JobExecutionContext;
044import org.quartz.JobExecutionException;
045import org.quartz.StatefulJob;
046import org.quartz.UnableToInterruptJobException;
047import org.springframework.util.StopWatch;
048
049public class Job implements StatefulJob, InterruptableJob {
050
051    public static final String JOB_RUN_START_STEP = "JOB_RUN_START_STEP";
052    public static final String JOB_RUN_END_STEP = "JOB_RUN_END_STEP";
053    public static final String MASTER_JOB_NAME = "MASTER_JOB_NAME";
054    public static final String STEP_RUN_PARM_NM = "RUN_IND";
055    public static final String STEP_RUN_ON_DATE_PARM_NM = "RUN_DATE";
056    public static final String STEP_USER_PARM_NM = "USER";
057    public static final String RUN_DATE_CUTOFF_PARM_NM = "RUN_DATE_CUTOFF_TIME";
058    private static final Logger LOG = Logger.getLogger(Job.class);
059    private SchedulerService schedulerService;
060    private ParameterService parameterService;
061    private DateTimeService dateTimeService;
062    private List<Step> steps;
063    private Step currentStep;
064    private Appender ndcAppender;
065    private boolean notRunnable;
066    private transient Thread workerThread;
067    private static OleSelectDocumentService oleSelectDocumentService;
068
069    /**
070     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
071     */
072    @Override
073    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
074        workerThread = Thread.currentThread();
075        if (isNotRunnable()) {
076            if (LOG.isInfoEnabled()) {
077                LOG.info("Skipping job because doNotRun is true: " + jobExecutionContext.getJobDetail().getName());
078            }
079            return;
080        }
081        int startStep = 0;
082        try {
083            startStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_START_STEP));
084        }
085        catch (NumberFormatException ex) {
086            // not present, do nothing
087        }
088        int endStep = 0;
089        try {
090            endStep = Integer.parseInt(jobExecutionContext.getMergedJobDataMap().getString(JOB_RUN_END_STEP));
091        }
092        catch (NumberFormatException ex) {
093            // not present, do nothing
094        }
095        Date jobRunDate = dateTimeService.getCurrentDate();
096        int currentStepNumber = 0;
097        try {
098            LOG.info("Executing job: " + jobExecutionContext.getJobDetail() + " on machine " + getMachineName() + " scheduler instance id " + jobExecutionContext.getScheduler().getSchedulerInstanceId() + "\n" + jobDataMapToString(jobExecutionContext.getJobDetail().getJobDataMap()));
099            for (Step step : getSteps()) {
100                currentStepNumber++;
101                // prevent starting of the next step if the thread has an interrupted status
102                if (workerThread.isInterrupted()) {
103                    LOG.warn("Aborting Job execution due to manual interruption");
104                    schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
105                    return;
106                }
107                if (startStep > 0 && currentStepNumber < startStep) {
108                    if (LOG.isInfoEnabled()) {
109                        LOG.info("Skipping step " + currentStepNumber + " - startStep=" + startStep);
110                    }
111                    continue; // skip to next step
112                }
113                else if (endStep > 0 && currentStepNumber > endStep) {
114                    if (LOG.isInfoEnabled()) {
115                        LOG.info("Ending step loop - currentStepNumber=" + currentStepNumber + " - endStep = " + endStep);
116                    }
117                    break;
118                }
119                step.setInterrupted(false);
120                try {
121                    if (!runStep(parameterService, jobExecutionContext.getJobDetail().getFullName(), currentStepNumber, step, jobRunDate)) {
122                        break;
123                    }
124                }
125                catch (InterruptedException ex) {
126                    LOG.warn("Stopping after step interruption");
127                    schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
128                    return;
129                }
130                if (step.isInterrupted()) {
131                    LOG.warn("attempt to interrupt step failed, step continued to completion");
132                    LOG.warn("cancelling remainder of job due to step interruption");
133                    schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.CANCELLED_JOB_STATUS_CODE);
134                    return;
135                }
136            }
137        }
138        catch (Exception e) {
139            schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.FAILED_JOB_STATUS_CODE);
140            throw new JobExecutionException("Caught exception in " + jobExecutionContext.getJobDetail().getName(), e, false);
141        }
142        LOG.info("Finished executing job: " + jobExecutionContext.getJobDetail().getName());
143        schedulerService.updateStatus(jobExecutionContext.getJobDetail(), SchedulerService.SUCCEEDED_JOB_STATUS_CODE);
144    }
145
146    public static boolean runStep(ParameterService parameterService, String jobName, int currentStepNumber, Step step, Date jobRunDate) throws InterruptedException, WorkflowException {
147        boolean continueJob = true;
148        if (GlobalVariables.getUserSession() == null) {
149            LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user <unknown>"));
150        }
151        else {
152            LOG.info(new StringBuffer("Started processing step: ").append(currentStepNumber).append("=").append(step.getName()).append(" for user ").append(GlobalVariables.getUserSession().getPrincipalName()));
153        }
154        
155        if (!skipStep(parameterService, step, jobRunDate)) {
156            
157            Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
158            Class<?> stepClass = unProxiedStep.getClass();
159            GlobalVariables.clear();
160            
161            String stepUserName = getOleSelectDocumentService().getSelectParameterValue(org.kuali.ole.sys.OLEConstants.SYSTEM_USER);
162            if (parameterService.parameterExists(stepClass, STEP_USER_PARM_NM)) {
163                stepUserName = parameterService.getParameterValueAsString(stepClass, STEP_USER_PARM_NM);
164            }
165            if (LOG.isInfoEnabled()) {
166                LOG.info(new StringBuffer("Creating user session for step: ").append(step.getName()).append("=").append(stepUserName));
167            }
168            GlobalVariables.setUserSession(new UserSession(stepUserName));
169            if (LOG.isInfoEnabled()) {
170                LOG.info(new StringBuffer("Executing step: ").append(step.getName()).append("=").append(stepClass));
171            }
172            StopWatch stopWatch = new StopWatch();
173            stopWatch.start(jobName);
174            try {
175                continueJob = step.execute(jobName, jobRunDate);
176            }
177            catch (InterruptedException e) {
178                LOG.error("Exception occured executing step", e);
179                throw e;
180            }
181            catch (RuntimeException e) {
182                LOG.error("Exception occured executing step", e);
183                throw e;
184            }
185            stopWatch.stop();
186            LOG.info(new StringBuffer("Step ").append(step.getName()).append(" of ").append(jobName).append(" took ").append(stopWatch.getTotalTimeSeconds() / 60.0).append(" minutes to complete").toString());
187            if (!continueJob) {
188                LOG.info("Stopping job after successful step execution");
189            }
190        }
191        LOG.info(new StringBuffer("Finished processing step ").append(currentStepNumber).append(": ").append(step.getName()));
192        return continueJob;
193    }
194
195    
196    /**
197     * This method determines whether the Job should not run the Step based on the RUN_IND and RUN_DATE Parameters.
198     * When RUN_IND exists and equals 'Y' it takes priority and does not consult RUN_DATE. 
199     * If RUN_DATE exists, but contains an empty value the step will not be skipped.
200     */
201    protected static boolean skipStep(ParameterService parameterService, Step step, Date jobRunDate) {
202        Step unProxiedStep = (Step) ProxyUtils.getTargetIfProxied(step);
203        Class<?> stepClass = unProxiedStep.getClass();
204        
205        //RUN_IND takes priority: when RUN_IND exists and RUN_IND=Y always run the Step
206        //RUN_DATE: when RUN_DATE exists, but the value is empty run the Step
207        
208        final boolean runIndExists = parameterService.parameterExists(stepClass, STEP_RUN_PARM_NM);
209        if (runIndExists) {
210            final boolean runInd = parameterService.getParameterValueAsBoolean(stepClass, STEP_RUN_PARM_NM);
211            if (!runInd) {
212            if (LOG.isInfoEnabled()) {
213                LOG.info("Skipping step due to system parameter: " + STEP_RUN_PARM_NM +" for "+ stepClass.getName());
214            }            
215                return true; // RUN_IND is false - let's skip
216        }
217            }
218
219        final boolean runDateExists = parameterService.parameterExists(stepClass, STEP_RUN_ON_DATE_PARM_NM);
220        if (runDateExists) {
221            final boolean runDateIsEmpty = StringUtils.isEmpty(parameterService.getParameterValueAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM));
222            if (runDateIsEmpty) {
223                return false; // run date param is empty, so run the step
224            }
225        
226            final DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
227    
228            final Collection<String> runDates = parameterService.getParameterValuesAsString(stepClass, STEP_RUN_ON_DATE_PARM_NM);
229            boolean matchedRunDate = false;
230            final String[] cutOffTime = parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM) ?
231                    StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':') :
232                    new String[] { "00", "00", "00"}; // no cutoff time param?  Then default to midnight of tomorrow
233            for (String runDate: runDates) {
234                try {
235                    if (withinCutoffWindowForDate(jobRunDate, dTService.convertToDate(runDate), dTService, cutOffTime)) {
236                        matchedRunDate = true;
237                    }
238                }
239                catch (ParseException pe) {
240                    LOG.error("ParseException occured parsing " + runDate, pe);
241        }
242            }
243            // did we fail to match a run date?  then skip this step
244            if (!matchedRunDate) {
245            if (LOG.isInfoEnabled()) {
246                    LOG.info("Skipping step due to system parameters: " + STEP_RUN_PARM_NM + ", " + STEP_RUN_ON_DATE_PARM_NM + " and " + RUN_DATE_CUTOFF_PARM_NM + " for "+ stepClass.getName());
247            }
248            return true;
249        }
250        }
251
252        //run step
253        return false;
254    }
255    
256    /**
257     * Checks if the current jobRunDate is within the cutoff window for the given run date from the RUN_DATE parameter.
258     * The window is defined as midnight of the date specified in the parameter to the RUN_DATE_CUTOFF_TIME of the next day.
259     * 
260     * @param jobRunDate the time the job is attempting to start
261     * @param runDateToCheck the current member of the appropriate RUN_DATE to check
262     * @param dateTimeService an instance of the DateTimeService
263     * @return true if jobRunDate is within the current runDateToCheck window, false otherwise
264     */
265    protected static boolean withinCutoffWindowForDate(Date jobRunDate, Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffWindow) {
266        final Calendar jobRunCalendar = dateTimeService.getCalendar(jobRunDate);
267        final Calendar beginWindow = getCutoffWindowBeginning(runDateToCheck, dateTimeService);
268        final Calendar endWindow = getCutoffWindowEnding(runDateToCheck, dateTimeService, cutOffWindow);
269        return jobRunCalendar.after(beginWindow) && jobRunCalendar.before(endWindow);
270    }
271    
272    /**
273     * Defines the beginning of the cut off window
274     * 
275     * @param runDateToCheck the run date which defines the cut off window
276     * @param dateTimeService an implementation of the DateTimeService
277     * @return the begin date Calendar of the cutoff window
278     */
279    protected static Calendar getCutoffWindowBeginning(Date runDateToCheck, DateTimeService dateTimeService) {
280        Calendar beginWindow = dateTimeService.getCalendar(runDateToCheck);
281        beginWindow.set(Calendar.HOUR_OF_DAY, 0);
282        beginWindow.set(Calendar.MINUTE, 0);
283        beginWindow.set(Calendar.SECOND, 0);
284        beginWindow.set(Calendar.MILLISECOND, 0);
285        return beginWindow;
286    }
287    
288    /**
289     * Defines the end of the cut off window
290     * 
291     * @param runDateToCheck the run date which defines the cut off window
292     * @param dateTimeService an implementation of the DateTimeService
293     * @param cutOffTime an Array in the form of [hour, minute, second] when the cutoff window ends
294     * @return the end date Calendar of the cutoff window
295     */
296    protected static Calendar getCutoffWindowEnding(Date runDateToCheck, DateTimeService dateTimeService, String[] cutOffTime) {
297        Calendar endWindow = dateTimeService.getCalendar(runDateToCheck);
298        endWindow.add(Calendar.DAY_OF_YEAR, 1);
299        endWindow.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
300        endWindow.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
301        endWindow.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
302        return endWindow;
303    }
304
305    /* This code is likely no longer reference, but was not removed, due to the fact that institutions may be calling */
306    /**
307     * @deprecated "Implementing institutions likely want to call Job#withinCutoffWindowForDate"
308     */
309    public static boolean isPastCutoffWindow(Date date, Collection<String> runDates) {
310        DateTimeService dTService = SpringContext.getBean(DateTimeService.class);
311        ParameterService parameterService = SpringContext.getBean(ParameterService.class);
312        Calendar jobRunDate = dTService.getCalendar(date);
313        if (parameterService.parameterExists(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM)) {
314            String[] cutOffTime = StringUtils.split(parameterService.getParameterValueAsString(OleParameterConstants.FINANCIAL_SYSTEM_BATCH.class, RUN_DATE_CUTOFF_PARM_NM), ':');
315            Calendar runDate = null;
316            for (String runDateStr : runDates) {
317                try {
318                    runDate = dTService.getCalendar(dTService.convertToDate(runDateStr));
319                    runDate.add(Calendar.DAY_OF_YEAR, 1);
320                    runDate.set(Calendar.HOUR_OF_DAY, Integer.parseInt(cutOffTime[0]));
321                    runDate.set(Calendar.MINUTE, Integer.parseInt(cutOffTime[1]));
322                    runDate.set(Calendar.SECOND, Integer.parseInt(cutOffTime[2]));
323                }
324                catch (ParseException e) {
325                    LOG.error("ParseException occured parsing " + runDateStr, e);
326                }
327                if (jobRunDate.before(runDate)) {
328            return false;
329        }
330    }
331        }
332        return true;
333    }
334
335    /**
336     * @throws UnableToInterruptJobException
337     */
338    @Override
339    public void interrupt() throws UnableToInterruptJobException {
340        // ask the step to interrupt
341        if (currentStep != null) {
342            currentStep.interrupt();
343        }
344        // also attempt to interrupt the thread, to cause an InterruptedException if the step ever waits or sleeps
345        workerThread.interrupt();
346    }
347
348    public void setParameterService(ParameterService parameterService) {
349        this.parameterService = parameterService;
350    }
351
352    public void setSteps(List<Step> steps) {
353        this.steps = steps;
354    }
355
356    public Appender getNdcAppender() {
357        return ndcAppender;
358    }
359
360    public void setNdcAppender(Appender ndcAppender) {
361        this.ndcAppender = ndcAppender;
362    }
363
364    public void setNotRunnable(boolean notRunnable) {
365        this.notRunnable = notRunnable;
366    }
367
368    protected boolean isNotRunnable() {
369        return notRunnable;
370    }
371
372    public ParameterService getParameterService() {
373        return parameterService;
374    }
375
376    public List<Step> getSteps() {
377        return steps;
378    }
379
380    public void setSchedulerService(SchedulerService schedulerService) {
381        this.schedulerService = schedulerService;
382    }
383
384    public void setDateTimeService(DateTimeService dateTimeService) {
385        this.dateTimeService = dateTimeService;
386    }
387    
388    protected String jobDataMapToString(JobDataMap jobDataMap) {
389        StringBuilder buf = new StringBuilder();
390        buf.append("{");
391        Iterator keys = jobDataMap.keySet().iterator();
392        boolean hasNext = keys.hasNext();
393        while (hasNext) {
394            String key = (String) keys.next();
395            Object value = jobDataMap.get(key);
396            buf.append(key).append("=");
397            if (value == jobDataMap) {
398                buf.append("(this map)");
399            }
400            else {
401                buf.append(value);
402            }
403            hasNext = keys.hasNext();
404            if (hasNext) {
405                buf.append(", ");
406            }
407        }
408        buf.append("}");
409        return buf.toString();
410    }
411    
412    protected String getMachineName() {
413        try {
414            return InetAddress.getLocalHost().getHostName();
415        }
416        catch (UnknownHostException e) {
417            return "Unknown";
418        }
419    }
420
421    public static OleSelectDocumentService getOleSelectDocumentService() {
422        if(oleSelectDocumentService == null){
423            oleSelectDocumentService = SpringContext.getBean(OleSelectDocumentService.class);
424        }
425        return oleSelectDocumentService;
426    }
427
428    public void setOleSelectDocumentService(OleSelectDocumentService oleSelectDocumentService) {
429        this.oleSelectDocumentService = oleSelectDocumentService;
430    }
431}