001package org.kuali.ole.sys.batch;
002
003
004
005import java.io.File;
006import java.util.ArrayList;
007import java.util.Date;
008import java.util.HashMap;
009import java.util.Iterator;
010import java.util.List;
011import java.util.Map;
012import java.util.concurrent.Executor;
013import java.util.concurrent.Executors;
014
015import org.kuali.ole.sys.OLEConstants;
016import org.kuali.ole.sys.context.BatchContainerDirectory;
017import org.kuali.ole.sys.context.BatchLogger;
018import org.kuali.ole.sys.context.BatchStepExecutor;
019import org.kuali.ole.sys.context.BatchStepFileDescriptor;
020import org.kuali.ole.sys.context.ContainerStepListener;
021import org.kuali.rice.core.api.datetime.DateTimeService;
022import org.kuali.rice.coreservice.framework.parameter.ParameterService;
023
024/**
025 * BatchContainerStep looks for .run files.
026 * When one is found it deletes the file and creates a new thread (BatchStepExecutor) which executes the Step indicated by the .run file.
027 * BatchContainerStep continues looking for .run files until it finds a stopBatchContainerStep.run file.
028 * When BatchContainerStep begins it writes a .runlock file to indicate that the batch container is running, but will first look for an existing .runlock file
029 * and if one is found it will exit immediately.
030 *
031 * BatchContainerStep adds a ConsoleAppender to its Logger if one hasn't been configured.
032 *
033 */
034public class BatchContainerStep extends AbstractStep implements ContainerStepListener {
035    static org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(BatchContainerStep.class);
036
037    protected String batchContainerDirectory;
038    protected Step batchContainerStopStep;
039
040    protected BatchContainerDirectory directory;
041
042    protected StringBuffer containerResults;
043    protected Map<String, BatchStepFileDescriptor> startedSteps;
044    protected List<BatchStepFileDescriptor[]> completedSteps;
045
046    /**
047     * This method begins an infinite loop in order to process semaphore files written by BatchStepTrigger (called via the brte scripts).
048     * BatchStepTrigger writes .run files, BatchContainerStep reads .run files and calls BatchStepExecutor (its own thread) which will execute the
049     * Step and write either a .success or .error result file which is then read by BatchStepTrigger.
050     *
051     * This method exits gracefully when it receives a .run file for the batchContainerStopStep.
052     *
053     */
054        @Override
055    public boolean execute(String jobName, Date jobRunDate) throws InterruptedException {
056            BatchLogger.addConsoleAppender(LOG);
057
058                LOG.info("Starting the batch container in Job: "+ jobName +" on "+ jobRunDate);
059
060                if (batchContainerDirectory == null) {
061                        throw new RuntimeException("The batchContainerDirectory has not been specified.");
062                }
063                if (batchContainerStopStep == null) {
064                        throw new RuntimeException("The batchContainerStopStep has not been specified.");
065                }
066
067                directory = new BatchContainerDirectory(batchContainerDirectory);
068
069                if (directory.isBatchContainerRunning()) {
070                        //an instance of the batch container is already running - exit w/out trying to remove the batch container semaphore file
071                        LOG.error("The BatchContainer is already running");
072                        throw new RuntimeException("The BatchContainer is already running.");
073                }
074
075                initContainerResults();
076
077                try {
078                        //write batch container run lock file to indicate the batch container is running
079                        directory.writeBatchContainerSemaphore(jobName, getName());
080                        directory.addShutdownHook();
081                        LOG.info("The BatchContainer is running");
082
083                ParameterService parameterService = getParameterService();
084                DateTimeService dateTimeService = getDateTimeService();
085
086                        Executor executor = Executors.newCachedThreadPool();
087                while(true) {
088
089                    if (LOG.isDebugEnabled()) {
090                        LOG.debug("Looking for steps...");
091                    }
092                        File[] stepRunFiles = directory.getStepRunFiles();
093
094                        while (stepRunFiles != null && stepRunFiles.length > 0) {
095                                LOG.info("Found "+ stepRunFiles.length +" steps to execute");
096
097                                for(File stepRunFile : stepRunFiles) {
098                                        BatchStepFileDescriptor batchStepFile = new BatchStepFileDescriptor(stepRunFile);
099
100                                        Step step = getStep(batchStepFile);
101                                        if (step == null) {
102                                                directory.removeBatchStepFileFromSystem(batchStepFile);
103                                                directory.writeBatchStepErrorResultFile(batchStepFile, new IllegalArgumentException("Unable to find bean for step: "+ batchStepFile.getStepName()));
104                                        }
105                                        else {
106
107                                                if (isStopBatchContainerTriggered(step)) {
108                                                        directory.removeBatchStepFileFromSystem(batchStepFile);
109                                                        directory.writeBatchStepSuccessfulResultFile(batchStepFile);
110
111                                                        //Stop BatchContainer
112                                                        LOG.info("shutting down container");
113                                                        return true;
114                                                }
115
116                                                //retrieve the stepIndex before the file is removed
117                                                int stepIndex = directory.getStepIndexFromFile(batchStepFile);
118
119                                                directory.removeBatchStepFileFromSystem(batchStepFile);
120
121                                                if (LOG.isDebugEnabled()) {
122                                                    LOG.debug("Creating new thread to run "+ batchStepFile);
123                                                }
124                                                BatchStepExecutor batchStepExecutor = new BatchStepExecutor(parameterService, dateTimeService, directory, batchStepFile, step, stepIndex);
125                            batchStepExecutor.addContainerStepListener(this);
126                                                executor.execute(batchStepExecutor);
127
128                                        }
129                                }
130
131                                if (LOG.isDebugEnabled()) {
132                                    LOG.debug("Looking for steps...");
133                                }
134                        stepRunFiles = directory.getStepRunFiles();
135
136                        }
137
138                        sleep();
139                        if (!directory.isBatchContainerRunning()) {
140                                //the batch container's runlock file no longer exists - exit
141                                LOG.error("The BatchContainer runlock file no longer exists - exiting");
142                                return false;
143                        }
144                }
145
146                } finally {
147                        //remove batch container run lock file
148                        directory.removeBatchContainerSemaphore();
149                        LOG.info("The BatchContainer has stopped running");
150
151                        logContainerResultsSummary();
152                }
153        }
154
155        /**
156         * Notification that the Step started. Log the Step's information
157         */
158        @Override
159    public void stepStarted(BatchStepFileDescriptor runFile, String logFileName) {
160            logStepStarted(runFile, logFileName);
161        }
162
163        /**
164         * Notification that the Step finished. Log the Step's information
165         */
166        @Override
167    public void stepFinished(BatchStepFileDescriptor resultFile, String logFileName) {
168            logStepFinished(resultFile, logFileName);
169        }
170
171        /**
172         * Retrieves the Step bean from the SpringContext
173         *
174         * @param batchStepFile the file descriptor for the step to run
175         * @return the Step bean from the SpringContext
176         */
177        protected Step getStep(BatchStepFileDescriptor batchStepFile) {
178          if (LOG.isDebugEnabled()) {
179              LOG.debug("Converting step named in .run file into a Step class...");
180          }
181
182          Step step = null;
183          try {
184              step = BatchSpringContext.getStep(batchStepFile.getStepName());
185          } catch (RuntimeException runtimeException) {
186              LOG.error("Failed to getStep from spring context: ", runtimeException);
187          }
188      if (step == null) {
189        LOG.error("Unable to find bean for step: "+ batchStepFile.getStepName());
190        return null;
191      }
192
193      LOG.info("Found valid step: "+ step.getName());
194      return step;
195        }
196
197        /**
198         * @param step the Step specified by the .run file
199         * @return true if the Step received is the step to stop the batch container, false otherwise
200         */
201    protected boolean isStopBatchContainerTriggered(Step step) {
202        if (step.getName().equals(batchContainerStopStep.getName())) {
203                LOG.info("Received Step: "+ batchContainerStopStep.getName() +". Stop listening for steps.");
204                return true;
205        }
206        return false;
207        }
208
209    /**
210     * Sleep for a specified amount of time before looking for more semaphore files to process
211     */
212    protected void sleep() {
213        try {
214            if (LOG.isDebugEnabled()) {
215                LOG.debug("Sleeping...");
216            }
217            Thread.sleep(getSemaphoreProcessingInterval());
218        }
219        catch (InterruptedException e) {
220            throw new RuntimeException("BatchContainerStep encountered interrupt exception while trying to wait for the specified semaphore processing interval", e);
221        }
222    }
223
224    /**
225     * @return (in milliseconds) the amount of time to wait before looking for more semaphore files to process
226     */
227        protected long getSemaphoreProcessingInterval() {
228                return Long.parseLong(getParameterService().getParameterValueAsString(BatchContainerStep.class, OLEConstants.SystemGroupParameterNames.BATCH_CONTAINER_SEMAPHORE_PROCESSING_INTERVAL));
229        }
230
231        /**
232         * Initialize the structures necessary for logging the Steps' statistics
233         */
234    protected void initContainerResults() {
235        containerResults = new StringBuffer("Container Results:\n");
236        startedSteps = new HashMap<String, BatchStepFileDescriptor>();
237        completedSteps = new ArrayList<BatchStepFileDescriptor[]>();
238    }
239
240    /**
241     * Log the notification that the Step started to an internal buffer. Add the descriptor to the list of started steps.
242     * The logFileName is used as a unique identifier in the list of started steps in order to identify it in the list of started steps on completion.
243     *
244     * @param runFile the step's run file descriptor
245     * @param logFileName the name of the log created by the Step's executor.
246     */
247    protected void logStepStarted(BatchStepFileDescriptor runFile, String logFileName) {
248        if (LOG.isDebugEnabled()) {
249            LOG.debug("stepStarted: "+ runFile);
250        }
251
252        startedSteps.put(logFileName, runFile);
253
254        containerResults.append("STARTED "+ runFile
255                +" "+ runFile.getStartedDate()
256                +" LOGFILE="+ logFileName
257                +"\n");
258    }
259
260    /**
261     * Log the notification that the Step finished to an internal buffer. Remove the run descriptor from the list of started steps and add the run descriptor
262     * and the result descriptor to the list of completed steps. The logFileName is used to locate the run descriptor from the list of started steps.
263     *
264     * @param resultFile the step's result file descriptor
265     * @param logFileName the name of the log created by the Step's executor
266     */
267    protected void logStepFinished(BatchStepFileDescriptor resultFile, String logFileName) {
268        if (LOG.isDebugEnabled()) {
269            LOG.debug("stepFinished: "+ resultFile);
270        }
271
272        BatchStepFileDescriptor runFile = startedSteps.remove(logFileName);
273
274        containerResults.append("COMPLETED "+ resultFile
275                                +" "+ resultFile.getCompletedDate()
276                                +" LOGFILE="+ logFileName
277                                +" STATUS="+ resultFile.getExtension()
278                                +(resultFile.isStepFileAnErrorResultFile() ? " EXCEPTION:\n"+ directory.getExceptionFromFile(resultFile) : "")
279                                +"\n");
280
281        BatchStepFileDescriptor[] files = {runFile, resultFile};
282        completedSteps.add(files);
283    }
284
285    /**
286     * Print a summary of the steps that ran and the steps that haven't completed yet.
287     */
288        protected void logContainerResultsSummary() {
289            LOG.info("Printing container results...");
290
291            containerResults.append("\n\nCompleted Steps: \n");
292            if (completedSteps.isEmpty()) { containerResults.append("None"); }
293
294            for(BatchStepFileDescriptor[] batchStepFile : completedSteps) {
295                String status = batchStepFile[1].getExtension();
296                Date startedDate = batchStepFile[0].getStartedDate();
297                Date completedDate = batchStepFile[1].getCompletedDate();
298
299            containerResults.append(batchStepFile[0] +"=" +status +"; S:"+ startedDate +" F:"+ completedDate +"\n");
300            }
301
302            containerResults.append("\n\nIncomplete Steps: \n");
303            if (startedSteps.isEmpty()) { containerResults.append("None"); }
304
305            for(Iterator<BatchStepFileDescriptor> iter = startedSteps.values().iterator(); iter.hasNext();) {
306                BatchStepFileDescriptor batchStepFile = iter.next();
307
308            Date startedDate = batchStepFile.getStartedDate();
309
310            containerResults.append(batchStepFile +"; S:"+ startedDate +"\n");
311            }
312
313            LOG.info(containerResults);
314        }
315
316        /**
317         * The path to the directory in which BatchContainerStep, BatchStepExecutor, and BatchStepTrigger will read and write the step semaphore files.
318         *
319         * @param batchContainerDirectory
320         */
321        public void setBatchContainerDirectory(String batchContainerDirectory) {
322                this.batchContainerDirectory = batchContainerDirectory;
323        }
324
325        /**
326         * The Step which indicates to the Batch Container to shut itself down.
327         *
328         * @param batchContainerStopStep
329         */
330        public void setBatchContainerStopStep(Step batchContainerStopStep) {
331                this.batchContainerStopStep = batchContainerStopStep;
332        }
333
334        /**
335         * @return the batchContainerStopStep
336         */
337        public Step getBatchContainerStopStep() {
338            return this.batchContainerStopStep;
339        }
340}