View Javadoc
1   package org.kuali.ole.sys.batch;
2   
3   
4   
5   import java.io.File;
6   import java.util.ArrayList;
7   import java.util.Date;
8   import java.util.HashMap;
9   import java.util.Iterator;
10  import java.util.List;
11  import java.util.Map;
12  import java.util.concurrent.Executor;
13  import java.util.concurrent.Executors;
14  
15  import org.kuali.ole.sys.OLEConstants;
16  import org.kuali.ole.sys.context.BatchContainerDirectory;
17  import org.kuali.ole.sys.context.BatchLogger;
18  import org.kuali.ole.sys.context.BatchStepExecutor;
19  import org.kuali.ole.sys.context.BatchStepFileDescriptor;
20  import org.kuali.ole.sys.context.ContainerStepListener;
21  import org.kuali.rice.core.api.datetime.DateTimeService;
22  import org.kuali.rice.coreservice.framework.parameter.ParameterService;
23  
24  /**
25   * BatchContainerStep looks for .run files.
26   * When one is found it deletes the file and creates a new thread (BatchStepExecutor) which executes the Step indicated by the .run file.
27   * BatchContainerStep continues looking for .run files until it finds a stopBatchContainerStep.run file.
28   * When BatchContainerStep begins it writes a .runlock file to indicate that the batch container is running, but will first look for an existing .runlock file
29   * and if one is found it will exit immediately.
30   *
31   * BatchContainerStep adds a ConsoleAppender to its Logger if one hasn't been configured.
32   *
33   */
34  public class BatchContainerStep extends AbstractStep implements ContainerStepListener {
35      static org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(BatchContainerStep.class);
36  
37      protected String batchContainerDirectory;
38      protected Step batchContainerStopStep;
39  
40      protected BatchContainerDirectory directory;
41  
42      protected StringBuffer containerResults;
43      protected Map<String, BatchStepFileDescriptor> startedSteps;
44      protected List<BatchStepFileDescriptor[]> completedSteps;
45  
46      /**
47       * This method begins an infinite loop in order to process semaphore files written by BatchStepTrigger (called via the brte scripts).
48       * BatchStepTrigger writes .run files, BatchContainerStep reads .run files and calls BatchStepExecutor (its own thread) which will execute the
49       * Step and write either a .success or .error result file which is then read by BatchStepTrigger.
50       *
51       * This method exits gracefully when it receives a .run file for the batchContainerStopStep.
52       *
53       */
54  	@Override
55      public boolean execute(String jobName, Date jobRunDate) throws InterruptedException {
56  	    BatchLogger.addConsoleAppender(LOG);
57  
58  		LOG.info("Starting the batch container in Job: "+ jobName +" on "+ jobRunDate);
59  
60  		if (batchContainerDirectory == null) {
61  			throw new RuntimeException("The batchContainerDirectory has not been specified.");
62  		}
63  		if (batchContainerStopStep == null) {
64  			throw new RuntimeException("The batchContainerStopStep has not been specified.");
65  		}
66  
67  		directory = new BatchContainerDirectory(batchContainerDirectory);
68  
69  		if (directory.isBatchContainerRunning()) {
70  			//an instance of the batch container is already running - exit w/out trying to remove the batch container semaphore file
71  			LOG.error("The BatchContainer is already running");
72  			throw new RuntimeException("The BatchContainer is already running.");
73  		}
74  
75  		initContainerResults();
76  
77  		try {
78  			//write batch container run lock file to indicate the batch container is running
79  			directory.writeBatchContainerSemaphore(jobName, getName());
80  			directory.addShutdownHook();
81  			LOG.info("The BatchContainer is running");
82  
83  	        ParameterService parameterService = getParameterService();
84  	        DateTimeService dateTimeService = getDateTimeService();
85  
86  			Executor executor = Executors.newCachedThreadPool();
87  	        while(true) {
88  
89  	            if (LOG.isDebugEnabled()) {
90  	                LOG.debug("Looking for steps...");
91  	            }
92  	        	File[] stepRunFiles = directory.getStepRunFiles();
93  
94  	        	while (stepRunFiles != null && stepRunFiles.length > 0) {
95  	        		LOG.info("Found "+ stepRunFiles.length +" steps to execute");
96  
97  	        		for(File stepRunFile : stepRunFiles) {
98  	        			BatchStepFileDescriptor batchStepFile = new BatchStepFileDescriptor(stepRunFile);
99  
100 	        			Step step = getStep(batchStepFile);
101 	        			if (step == null) {
102 	        				directory.removeBatchStepFileFromSystem(batchStepFile);
103 	        				directory.writeBatchStepErrorResultFile(batchStepFile, new IllegalArgumentException("Unable to find bean for step: "+ batchStepFile.getStepName()));
104 	        			}
105 	        			else {
106 
107 	        				if (isStopBatchContainerTriggered(step)) {
108 	        					directory.removeBatchStepFileFromSystem(batchStepFile);
109 	        					directory.writeBatchStepSuccessfulResultFile(batchStepFile);
110 
111 	        					//Stop BatchContainer
112 	        					LOG.info("shutting down container");
113 	        					return true;
114 	        				}
115 
116 	        				//retrieve the stepIndex before the file is removed
117 	        				int stepIndex = directory.getStepIndexFromFile(batchStepFile);
118 
119 	        				directory.removeBatchStepFileFromSystem(batchStepFile);
120 
121 	        				if (LOG.isDebugEnabled()) {
122 	        				    LOG.debug("Creating new thread to run "+ batchStepFile);
123 	        				}
124 	        				BatchStepExecutor batchStepExecutor = new BatchStepExecutor(parameterService, dateTimeService, directory, batchStepFile, step, stepIndex);
125                             batchStepExecutor.addContainerStepListener(this);
126 	        				executor.execute(batchStepExecutor);
127 
128 	        			}
129 	        		}
130 
131 	        		if (LOG.isDebugEnabled()) {
132 	        		    LOG.debug("Looking for steps...");
133 	        		}
134 	            	stepRunFiles = directory.getStepRunFiles();
135 
136 	        	}
137 
138 	        	sleep();
139 	    		if (!directory.isBatchContainerRunning()) {
140 	    			//the batch container's runlock file no longer exists - exit
141 	    			LOG.error("The BatchContainer runlock file no longer exists - exiting");
142 	    			return false;
143 	    		}
144 	        }
145 
146 		} finally {
147 			//remove batch container run lock file
148 			directory.removeBatchContainerSemaphore();
149 			LOG.info("The BatchContainer has stopped running");
150 
151 			logContainerResultsSummary();
152 		}
153 	}
154 
155 	/**
156 	 * Notification that the Step started. Log the Step's information
157 	 */
158 	@Override
159     public void stepStarted(BatchStepFileDescriptor runFile, String logFileName) {
160 	    logStepStarted(runFile, logFileName);
161 	}
162 
163 	/**
164 	 * Notification that the Step finished. Log the Step's information
165 	 */
166 	@Override
167     public void stepFinished(BatchStepFileDescriptor resultFile, String logFileName) {
168 	    logStepFinished(resultFile, logFileName);
169 	}
170 
171 	/**
172 	 * Retrieves the Step bean from the SpringContext
173 	 *
174 	 * @param batchStepFile the file descriptor for the step to run
175 	 * @return the Step bean from the SpringContext
176 	 */
177 	protected Step getStep(BatchStepFileDescriptor batchStepFile) {
178 	  if (LOG.isDebugEnabled()) {
179 	      LOG.debug("Converting step named in .run file into a Step class...");
180 	  }
181 
182 	  Step step = null;
183 	  try {
184 	      step = BatchSpringContext.getStep(batchStepFile.getStepName());
185 	  } catch (RuntimeException runtimeException) {
186 	      LOG.error("Failed to getStep from spring context: ", runtimeException);
187 	  }
188       if (step == null) {
189       	LOG.error("Unable to find bean for step: "+ batchStepFile.getStepName());
190       	return null;
191       }
192 
193       LOG.info("Found valid step: "+ step.getName());
194       return step;
195 	}
196 
197 	/**
198 	 * @param step the Step specified by the .run file
199 	 * @return true if the Step received is the step to stop the batch container, false otherwise
200 	 */
201     protected boolean isStopBatchContainerTriggered(Step step) {
202     	if (step.getName().equals(batchContainerStopStep.getName())) {
203     		LOG.info("Received Step: "+ batchContainerStopStep.getName() +". Stop listening for steps.");
204     		return true;
205     	}
206     	return false;
207  	}
208 
209     /**
210      * Sleep for a specified amount of time before looking for more semaphore files to process
211      */
212     protected void sleep() {
213         try {
214             if (LOG.isDebugEnabled()) {
215                 LOG.debug("Sleeping...");
216             }
217             Thread.sleep(getSemaphoreProcessingInterval());
218         }
219         catch (InterruptedException e) {
220             throw new RuntimeException("BatchContainerStep encountered interrupt exception while trying to wait for the specified semaphore processing interval", e);
221         }
222     }
223 
224     /**
225      * @return (in milliseconds) the amount of time to wait before looking for more semaphore files to process
226      */
227 	protected long getSemaphoreProcessingInterval() {
228 		return Long.parseLong(getParameterService().getParameterValueAsString(BatchContainerStep.class, OLEConstants.SystemGroupParameterNames.BATCH_CONTAINER_SEMAPHORE_PROCESSING_INTERVAL));
229 	}
230 
231 	/**
232 	 * Initialize the structures necessary for logging the Steps' statistics
233 	 */
234     protected void initContainerResults() {
235         containerResults = new StringBuffer("Container Results:\n");
236         startedSteps = new HashMap<String, BatchStepFileDescriptor>();
237         completedSteps = new ArrayList<BatchStepFileDescriptor[]>();
238     }
239 
240     /**
241      * Log the notification that the Step started to an internal buffer. Add the descriptor to the list of started steps.
242      * The logFileName is used as a unique identifier in the list of started steps in order to identify it in the list of started steps on completion.
243      *
244      * @param runFile the step's run file descriptor
245      * @param logFileName the name of the log created by the Step's executor.
246      */
247     protected void logStepStarted(BatchStepFileDescriptor runFile, String logFileName) {
248         if (LOG.isDebugEnabled()) {
249             LOG.debug("stepStarted: "+ runFile);
250         }
251 
252         startedSteps.put(logFileName, runFile);
253 
254         containerResults.append("STARTED "+ runFile
255                 +" "+ runFile.getStartedDate()
256                 +" LOGFILE="+ logFileName
257                 +"\n");
258     }
259 
260     /**
261      * Log the notification that the Step finished to an internal buffer. Remove the run descriptor from the list of started steps and add the run descriptor
262      * and the result descriptor to the list of completed steps. The logFileName is used to locate the run descriptor from the list of started steps.
263      *
264      * @param resultFile the step's result file descriptor
265      * @param logFileName the name of the log created by the Step's executor
266      */
267     protected void logStepFinished(BatchStepFileDescriptor resultFile, String logFileName) {
268         if (LOG.isDebugEnabled()) {
269             LOG.debug("stepFinished: "+ resultFile);
270         }
271 
272         BatchStepFileDescriptor runFile = startedSteps.remove(logFileName);
273 
274         containerResults.append("COMPLETED "+ resultFile
275                                 +" "+ resultFile.getCompletedDate()
276                                 +" LOGFILE="+ logFileName
277                                 +" STATUS="+ resultFile.getExtension()
278                                 +(resultFile.isStepFileAnErrorResultFile() ? " EXCEPTION:\n"+ directory.getExceptionFromFile(resultFile) : "")
279                                 +"\n");
280 
281         BatchStepFileDescriptor[] files = {runFile, resultFile};
282         completedSteps.add(files);
283     }
284 
285     /**
286      * Print a summary of the steps that ran and the steps that haven't completed yet.
287      */
288 	protected void logContainerResultsSummary() {
289 	    LOG.info("Printing container results...");
290 
291 	    containerResults.append("\n\nCompleted Steps: \n");
292 	    if (completedSteps.isEmpty()) { containerResults.append("None"); }
293 
294 	    for(BatchStepFileDescriptor[] batchStepFile : completedSteps) {
295 	        String status = batchStepFile[1].getExtension();
296 	        Date startedDate = batchStepFile[0].getStartedDate();
297 	        Date completedDate = batchStepFile[1].getCompletedDate();
298 
299             containerResults.append(batchStepFile[0] +"=" +status +"; S:"+ startedDate +" F:"+ completedDate +"\n");
300 	    }
301 
302 	    containerResults.append("\n\nIncomplete Steps: \n");
303 	    if (startedSteps.isEmpty()) { containerResults.append("None"); }
304 
305 	    for(Iterator<BatchStepFileDescriptor> iter = startedSteps.values().iterator(); iter.hasNext();) {
306 	        BatchStepFileDescriptor batchStepFile = iter.next();
307 
308             Date startedDate = batchStepFile.getStartedDate();
309 
310             containerResults.append(batchStepFile +"; S:"+ startedDate +"\n");
311 	    }
312 
313 	    LOG.info(containerResults);
314 	}
315 
316 	/**
317 	 * The path to the directory in which BatchContainerStep, BatchStepExecutor, and BatchStepTrigger will read and write the step semaphore files.
318 	 *
319 	 * @param batchContainerDirectory
320 	 */
321 	public void setBatchContainerDirectory(String batchContainerDirectory) {
322 		this.batchContainerDirectory = batchContainerDirectory;
323 	}
324 
325 	/**
326 	 * The Step which indicates to the Batch Container to shut itself down.
327 	 *
328 	 * @param batchContainerStopStep
329 	 */
330 	public void setBatchContainerStopStep(Step batchContainerStopStep) {
331 		this.batchContainerStopStep = batchContainerStopStep;
332 	}
333 
334 	/**
335 	 * @return the batchContainerStopStep
336 	 */
337 	public Step getBatchContainerStopStep() {
338 	    return this.batchContainerStopStep;
339 	}
340 }