001package org.kuali.ole.sys.batch; 002 003 004 005import java.io.File; 006import java.util.ArrayList; 007import java.util.Date; 008import java.util.HashMap; 009import java.util.Iterator; 010import java.util.List; 011import java.util.Map; 012import java.util.concurrent.Executor; 013import java.util.concurrent.Executors; 014 015import org.kuali.ole.sys.OLEConstants; 016import org.kuali.ole.sys.context.BatchContainerDirectory; 017import org.kuali.ole.sys.context.BatchLogger; 018import org.kuali.ole.sys.context.BatchStepExecutor; 019import org.kuali.ole.sys.context.BatchStepFileDescriptor; 020import org.kuali.ole.sys.context.ContainerStepListener; 021import org.kuali.rice.core.api.datetime.DateTimeService; 022import org.kuali.rice.coreservice.framework.parameter.ParameterService; 023 024/** 025 * BatchContainerStep looks for .run files. 026 * When one is found it deletes the file and creates a new thread (BatchStepExecutor) which executes the Step indicated by the .run file. 027 * BatchContainerStep continues looking for .run files until it finds a stopBatchContainerStep.run file. 028 * When BatchContainerStep begins it writes a .runlock file to indicate that the batch container is running, but will first look for an existing .runlock file 029 * and if one is found it will exit immediately. 030 * 031 * BatchContainerStep adds a ConsoleAppender to its Logger if one hasn't been configured. 032 * 033 */ 034public class BatchContainerStep extends AbstractStep implements ContainerStepListener { 035 static org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(BatchContainerStep.class); 036 037 protected String batchContainerDirectory; 038 protected Step batchContainerStopStep; 039 040 protected BatchContainerDirectory directory; 041 042 protected StringBuffer containerResults; 043 protected Map<String, BatchStepFileDescriptor> startedSteps; 044 protected List<BatchStepFileDescriptor[]> completedSteps; 045 046 /** 047 * This method begins an infinite loop in order to process semaphore files written by BatchStepTrigger (called via the brte scripts). 048 * BatchStepTrigger writes .run files, BatchContainerStep reads .run files and calls BatchStepExecutor (its own thread) which will execute the 049 * Step and write either a .success or .error result file which is then read by BatchStepTrigger. 050 * 051 * This method exits gracefully when it receives a .run file for the batchContainerStopStep. 052 * 053 */ 054 @Override 055 public boolean execute(String jobName, Date jobRunDate) throws InterruptedException { 056 BatchLogger.addConsoleAppender(LOG); 057 058 LOG.info("Starting the batch container in Job: "+ jobName +" on "+ jobRunDate); 059 060 if (batchContainerDirectory == null) { 061 throw new RuntimeException("The batchContainerDirectory has not been specified."); 062 } 063 if (batchContainerStopStep == null) { 064 throw new RuntimeException("The batchContainerStopStep has not been specified."); 065 } 066 067 directory = new BatchContainerDirectory(batchContainerDirectory); 068 069 if (directory.isBatchContainerRunning()) { 070 //an instance of the batch container is already running - exit w/out trying to remove the batch container semaphore file 071 LOG.error("The BatchContainer is already running"); 072 throw new RuntimeException("The BatchContainer is already running."); 073 } 074 075 initContainerResults(); 076 077 try { 078 //write batch container run lock file to indicate the batch container is running 079 directory.writeBatchContainerSemaphore(jobName, getName()); 080 directory.addShutdownHook(); 081 LOG.info("The BatchContainer is running"); 082 083 ParameterService parameterService = getParameterService(); 084 DateTimeService dateTimeService = getDateTimeService(); 085 086 Executor executor = Executors.newCachedThreadPool(); 087 while(true) { 088 089 if (LOG.isDebugEnabled()) { 090 LOG.debug("Looking for steps..."); 091 } 092 File[] stepRunFiles = directory.getStepRunFiles(); 093 094 while (stepRunFiles != null && stepRunFiles.length > 0) { 095 LOG.info("Found "+ stepRunFiles.length +" steps to execute"); 096 097 for(File stepRunFile : stepRunFiles) { 098 BatchStepFileDescriptor batchStepFile = new BatchStepFileDescriptor(stepRunFile); 099 100 Step step = getStep(batchStepFile); 101 if (step == null) { 102 directory.removeBatchStepFileFromSystem(batchStepFile); 103 directory.writeBatchStepErrorResultFile(batchStepFile, new IllegalArgumentException("Unable to find bean for step: "+ batchStepFile.getStepName())); 104 } 105 else { 106 107 if (isStopBatchContainerTriggered(step)) { 108 directory.removeBatchStepFileFromSystem(batchStepFile); 109 directory.writeBatchStepSuccessfulResultFile(batchStepFile); 110 111 //Stop BatchContainer 112 LOG.info("shutting down container"); 113 return true; 114 } 115 116 //retrieve the stepIndex before the file is removed 117 int stepIndex = directory.getStepIndexFromFile(batchStepFile); 118 119 directory.removeBatchStepFileFromSystem(batchStepFile); 120 121 if (LOG.isDebugEnabled()) { 122 LOG.debug("Creating new thread to run "+ batchStepFile); 123 } 124 BatchStepExecutor batchStepExecutor = new BatchStepExecutor(parameterService, dateTimeService, directory, batchStepFile, step, stepIndex); 125 batchStepExecutor.addContainerStepListener(this); 126 executor.execute(batchStepExecutor); 127 128 } 129 } 130 131 if (LOG.isDebugEnabled()) { 132 LOG.debug("Looking for steps..."); 133 } 134 stepRunFiles = directory.getStepRunFiles(); 135 136 } 137 138 sleep(); 139 if (!directory.isBatchContainerRunning()) { 140 //the batch container's runlock file no longer exists - exit 141 LOG.error("The BatchContainer runlock file no longer exists - exiting"); 142 return false; 143 } 144 } 145 146 } finally { 147 //remove batch container run lock file 148 directory.removeBatchContainerSemaphore(); 149 LOG.info("The BatchContainer has stopped running"); 150 151 logContainerResultsSummary(); 152 } 153 } 154 155 /** 156 * Notification that the Step started. Log the Step's information 157 */ 158 @Override 159 public void stepStarted(BatchStepFileDescriptor runFile, String logFileName) { 160 logStepStarted(runFile, logFileName); 161 } 162 163 /** 164 * Notification that the Step finished. Log the Step's information 165 */ 166 @Override 167 public void stepFinished(BatchStepFileDescriptor resultFile, String logFileName) { 168 logStepFinished(resultFile, logFileName); 169 } 170 171 /** 172 * Retrieves the Step bean from the SpringContext 173 * 174 * @param batchStepFile the file descriptor for the step to run 175 * @return the Step bean from the SpringContext 176 */ 177 protected Step getStep(BatchStepFileDescriptor batchStepFile) { 178 if (LOG.isDebugEnabled()) { 179 LOG.debug("Converting step named in .run file into a Step class..."); 180 } 181 182 Step step = null; 183 try { 184 step = BatchSpringContext.getStep(batchStepFile.getStepName()); 185 } catch (RuntimeException runtimeException) { 186 LOG.error("Failed to getStep from spring context: ", runtimeException); 187 } 188 if (step == null) { 189 LOG.error("Unable to find bean for step: "+ batchStepFile.getStepName()); 190 return null; 191 } 192 193 LOG.info("Found valid step: "+ step.getName()); 194 return step; 195 } 196 197 /** 198 * @param step the Step specified by the .run file 199 * @return true if the Step received is the step to stop the batch container, false otherwise 200 */ 201 protected boolean isStopBatchContainerTriggered(Step step) { 202 if (step.getName().equals(batchContainerStopStep.getName())) { 203 LOG.info("Received Step: "+ batchContainerStopStep.getName() +". Stop listening for steps."); 204 return true; 205 } 206 return false; 207 } 208 209 /** 210 * Sleep for a specified amount of time before looking for more semaphore files to process 211 */ 212 protected void sleep() { 213 try { 214 if (LOG.isDebugEnabled()) { 215 LOG.debug("Sleeping..."); 216 } 217 Thread.sleep(getSemaphoreProcessingInterval()); 218 } 219 catch (InterruptedException e) { 220 throw new RuntimeException("BatchContainerStep encountered interrupt exception while trying to wait for the specified semaphore processing interval", e); 221 } 222 } 223 224 /** 225 * @return (in milliseconds) the amount of time to wait before looking for more semaphore files to process 226 */ 227 protected long getSemaphoreProcessingInterval() { 228 return Long.parseLong(getParameterService().getParameterValueAsString(BatchContainerStep.class, OLEConstants.SystemGroupParameterNames.BATCH_CONTAINER_SEMAPHORE_PROCESSING_INTERVAL)); 229 } 230 231 /** 232 * Initialize the structures necessary for logging the Steps' statistics 233 */ 234 protected void initContainerResults() { 235 containerResults = new StringBuffer("Container Results:\n"); 236 startedSteps = new HashMap<String, BatchStepFileDescriptor>(); 237 completedSteps = new ArrayList<BatchStepFileDescriptor[]>(); 238 } 239 240 /** 241 * Log the notification that the Step started to an internal buffer. Add the descriptor to the list of started steps. 242 * The logFileName is used as a unique identifier in the list of started steps in order to identify it in the list of started steps on completion. 243 * 244 * @param runFile the step's run file descriptor 245 * @param logFileName the name of the log created by the Step's executor. 246 */ 247 protected void logStepStarted(BatchStepFileDescriptor runFile, String logFileName) { 248 if (LOG.isDebugEnabled()) { 249 LOG.debug("stepStarted: "+ runFile); 250 } 251 252 startedSteps.put(logFileName, runFile); 253 254 containerResults.append("STARTED "+ runFile 255 +" "+ runFile.getStartedDate() 256 +" LOGFILE="+ logFileName 257 +"\n"); 258 } 259 260 /** 261 * Log the notification that the Step finished to an internal buffer. Remove the run descriptor from the list of started steps and add the run descriptor 262 * and the result descriptor to the list of completed steps. The logFileName is used to locate the run descriptor from the list of started steps. 263 * 264 * @param resultFile the step's result file descriptor 265 * @param logFileName the name of the log created by the Step's executor 266 */ 267 protected void logStepFinished(BatchStepFileDescriptor resultFile, String logFileName) { 268 if (LOG.isDebugEnabled()) { 269 LOG.debug("stepFinished: "+ resultFile); 270 } 271 272 BatchStepFileDescriptor runFile = startedSteps.remove(logFileName); 273 274 containerResults.append("COMPLETED "+ resultFile 275 +" "+ resultFile.getCompletedDate() 276 +" LOGFILE="+ logFileName 277 +" STATUS="+ resultFile.getExtension() 278 +(resultFile.isStepFileAnErrorResultFile() ? " EXCEPTION:\n"+ directory.getExceptionFromFile(resultFile) : "") 279 +"\n"); 280 281 BatchStepFileDescriptor[] files = {runFile, resultFile}; 282 completedSteps.add(files); 283 } 284 285 /** 286 * Print a summary of the steps that ran and the steps that haven't completed yet. 287 */ 288 protected void logContainerResultsSummary() { 289 LOG.info("Printing container results..."); 290 291 containerResults.append("\n\nCompleted Steps: \n"); 292 if (completedSteps.isEmpty()) { containerResults.append("None"); } 293 294 for(BatchStepFileDescriptor[] batchStepFile : completedSteps) { 295 String status = batchStepFile[1].getExtension(); 296 Date startedDate = batchStepFile[0].getStartedDate(); 297 Date completedDate = batchStepFile[1].getCompletedDate(); 298 299 containerResults.append(batchStepFile[0] +"=" +status +"; S:"+ startedDate +" F:"+ completedDate +"\n"); 300 } 301 302 containerResults.append("\n\nIncomplete Steps: \n"); 303 if (startedSteps.isEmpty()) { containerResults.append("None"); } 304 305 for(Iterator<BatchStepFileDescriptor> iter = startedSteps.values().iterator(); iter.hasNext();) { 306 BatchStepFileDescriptor batchStepFile = iter.next(); 307 308 Date startedDate = batchStepFile.getStartedDate(); 309 310 containerResults.append(batchStepFile +"; S:"+ startedDate +"\n"); 311 } 312 313 LOG.info(containerResults); 314 } 315 316 /** 317 * The path to the directory in which BatchContainerStep, BatchStepExecutor, and BatchStepTrigger will read and write the step semaphore files. 318 * 319 * @param batchContainerDirectory 320 */ 321 public void setBatchContainerDirectory(String batchContainerDirectory) { 322 this.batchContainerDirectory = batchContainerDirectory; 323 } 324 325 /** 326 * The Step which indicates to the Batch Container to shut itself down. 327 * 328 * @param batchContainerStopStep 329 */ 330 public void setBatchContainerStopStep(Step batchContainerStopStep) { 331 this.batchContainerStopStep = batchContainerStopStep; 332 } 333 334 /** 335 * @return the batchContainerStopStep 336 */ 337 public Step getBatchContainerStopStep() { 338 return this.batchContainerStopStep; 339 } 340}