1 package org.kuali.ole.sys.batch;
2
3
4
5 import java.io.File;
6 import java.util.ArrayList;
7 import java.util.Date;
8 import java.util.HashMap;
9 import java.util.Iterator;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.concurrent.Executor;
13 import java.util.concurrent.Executors;
14
15 import org.kuali.ole.sys.OLEConstants;
16 import org.kuali.ole.sys.context.BatchContainerDirectory;
17 import org.kuali.ole.sys.context.BatchLogger;
18 import org.kuali.ole.sys.context.BatchStepExecutor;
19 import org.kuali.ole.sys.context.BatchStepFileDescriptor;
20 import org.kuali.ole.sys.context.ContainerStepListener;
21 import org.kuali.rice.core.api.datetime.DateTimeService;
22 import org.kuali.rice.coreservice.framework.parameter.ParameterService;
23
24
25
26
27
28
29
30
31
32
33
34 public class BatchContainerStep extends AbstractStep implements ContainerStepListener {
35 static org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(BatchContainerStep.class);
36
37 protected String batchContainerDirectory;
38 protected Step batchContainerStopStep;
39
40 protected BatchContainerDirectory directory;
41
42 protected StringBuffer containerResults;
43 protected Map<String, BatchStepFileDescriptor> startedSteps;
44 protected List<BatchStepFileDescriptor[]> completedSteps;
45
46
47
48
49
50
51
52
53
54 @Override
55 public boolean execute(String jobName, Date jobRunDate) throws InterruptedException {
56 BatchLogger.addConsoleAppender(LOG);
57
58 LOG.info("Starting the batch container in Job: "+ jobName +" on "+ jobRunDate);
59
60 if (batchContainerDirectory == null) {
61 throw new RuntimeException("The batchContainerDirectory has not been specified.");
62 }
63 if (batchContainerStopStep == null) {
64 throw new RuntimeException("The batchContainerStopStep has not been specified.");
65 }
66
67 directory = new BatchContainerDirectory(batchContainerDirectory);
68
69 if (directory.isBatchContainerRunning()) {
70
71 LOG.error("The BatchContainer is already running");
72 throw new RuntimeException("The BatchContainer is already running.");
73 }
74
75 initContainerResults();
76
77 try {
78
79 directory.writeBatchContainerSemaphore(jobName, getName());
80 directory.addShutdownHook();
81 LOG.info("The BatchContainer is running");
82
83 ParameterService parameterService = getParameterService();
84 DateTimeService dateTimeService = getDateTimeService();
85
86 Executor executor = Executors.newCachedThreadPool();
87 while(true) {
88
89 if (LOG.isDebugEnabled()) {
90 LOG.debug("Looking for steps...");
91 }
92 File[] stepRunFiles = directory.getStepRunFiles();
93
94 while (stepRunFiles != null && stepRunFiles.length > 0) {
95 LOG.info("Found "+ stepRunFiles.length +" steps to execute");
96
97 for(File stepRunFile : stepRunFiles) {
98 BatchStepFileDescriptor batchStepFile = new BatchStepFileDescriptor(stepRunFile);
99
100 Step step = getStep(batchStepFile);
101 if (step == null) {
102 directory.removeBatchStepFileFromSystem(batchStepFile);
103 directory.writeBatchStepErrorResultFile(batchStepFile, new IllegalArgumentException("Unable to find bean for step: "+ batchStepFile.getStepName()));
104 }
105 else {
106
107 if (isStopBatchContainerTriggered(step)) {
108 directory.removeBatchStepFileFromSystem(batchStepFile);
109 directory.writeBatchStepSuccessfulResultFile(batchStepFile);
110
111
112 LOG.info("shutting down container");
113 return true;
114 }
115
116
117 int stepIndex = directory.getStepIndexFromFile(batchStepFile);
118
119 directory.removeBatchStepFileFromSystem(batchStepFile);
120
121 if (LOG.isDebugEnabled()) {
122 LOG.debug("Creating new thread to run "+ batchStepFile);
123 }
124 BatchStepExecutor batchStepExecutor = new BatchStepExecutor(parameterService, dateTimeService, directory, batchStepFile, step, stepIndex);
125 batchStepExecutor.addContainerStepListener(this);
126 executor.execute(batchStepExecutor);
127
128 }
129 }
130
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("Looking for steps...");
133 }
134 stepRunFiles = directory.getStepRunFiles();
135
136 }
137
138 sleep();
139 if (!directory.isBatchContainerRunning()) {
140
141 LOG.error("The BatchContainer runlock file no longer exists - exiting");
142 return false;
143 }
144 }
145
146 } finally {
147
148 directory.removeBatchContainerSemaphore();
149 LOG.info("The BatchContainer has stopped running");
150
151 logContainerResultsSummary();
152 }
153 }
154
155
156
157
158 @Override
159 public void stepStarted(BatchStepFileDescriptor runFile, String logFileName) {
160 logStepStarted(runFile, logFileName);
161 }
162
163
164
165
166 @Override
167 public void stepFinished(BatchStepFileDescriptor resultFile, String logFileName) {
168 logStepFinished(resultFile, logFileName);
169 }
170
171
172
173
174
175
176
177 protected Step getStep(BatchStepFileDescriptor batchStepFile) {
178 if (LOG.isDebugEnabled()) {
179 LOG.debug("Converting step named in .run file into a Step class...");
180 }
181
182 Step step = null;
183 try {
184 step = BatchSpringContext.getStep(batchStepFile.getStepName());
185 } catch (RuntimeException runtimeException) {
186 LOG.error("Failed to getStep from spring context: ", runtimeException);
187 }
188 if (step == null) {
189 LOG.error("Unable to find bean for step: "+ batchStepFile.getStepName());
190 return null;
191 }
192
193 LOG.info("Found valid step: "+ step.getName());
194 return step;
195 }
196
197
198
199
200
201 protected boolean isStopBatchContainerTriggered(Step step) {
202 if (step.getName().equals(batchContainerStopStep.getName())) {
203 LOG.info("Received Step: "+ batchContainerStopStep.getName() +". Stop listening for steps.");
204 return true;
205 }
206 return false;
207 }
208
209
210
211
212 protected void sleep() {
213 try {
214 if (LOG.isDebugEnabled()) {
215 LOG.debug("Sleeping...");
216 }
217 Thread.sleep(getSemaphoreProcessingInterval());
218 }
219 catch (InterruptedException e) {
220 throw new RuntimeException("BatchContainerStep encountered interrupt exception while trying to wait for the specified semaphore processing interval", e);
221 }
222 }
223
224
225
226
227 protected long getSemaphoreProcessingInterval() {
228 return Long.parseLong(getParameterService().getParameterValueAsString(BatchContainerStep.class, OLEConstants.SystemGroupParameterNames.BATCH_CONTAINER_SEMAPHORE_PROCESSING_INTERVAL));
229 }
230
231
232
233
234 protected void initContainerResults() {
235 containerResults = new StringBuffer("Container Results:\n");
236 startedSteps = new HashMap<String, BatchStepFileDescriptor>();
237 completedSteps = new ArrayList<BatchStepFileDescriptor[]>();
238 }
239
240
241
242
243
244
245
246
247 protected void logStepStarted(BatchStepFileDescriptor runFile, String logFileName) {
248 if (LOG.isDebugEnabled()) {
249 LOG.debug("stepStarted: "+ runFile);
250 }
251
252 startedSteps.put(logFileName, runFile);
253
254 containerResults.append("STARTED "+ runFile
255 +" "+ runFile.getStartedDate()
256 +" LOGFILE="+ logFileName
257 +"\n");
258 }
259
260
261
262
263
264
265
266
267 protected void logStepFinished(BatchStepFileDescriptor resultFile, String logFileName) {
268 if (LOG.isDebugEnabled()) {
269 LOG.debug("stepFinished: "+ resultFile);
270 }
271
272 BatchStepFileDescriptor runFile = startedSteps.remove(logFileName);
273
274 containerResults.append("COMPLETED "+ resultFile
275 +" "+ resultFile.getCompletedDate()
276 +" LOGFILE="+ logFileName
277 +" STATUS="+ resultFile.getExtension()
278 +(resultFile.isStepFileAnErrorResultFile() ? " EXCEPTION:\n"+ directory.getExceptionFromFile(resultFile) : "")
279 +"\n");
280
281 BatchStepFileDescriptor[] files = {runFile, resultFile};
282 completedSteps.add(files);
283 }
284
285
286
287
288 protected void logContainerResultsSummary() {
289 LOG.info("Printing container results...");
290
291 containerResults.append("\n\nCompleted Steps: \n");
292 if (completedSteps.isEmpty()) { containerResults.append("None"); }
293
294 for(BatchStepFileDescriptor[] batchStepFile : completedSteps) {
295 String status = batchStepFile[1].getExtension();
296 Date startedDate = batchStepFile[0].getStartedDate();
297 Date completedDate = batchStepFile[1].getCompletedDate();
298
299 containerResults.append(batchStepFile[0] +"=" +status +"; S:"+ startedDate +" F:"+ completedDate +"\n");
300 }
301
302 containerResults.append("\n\nIncomplete Steps: \n");
303 if (startedSteps.isEmpty()) { containerResults.append("None"); }
304
305 for(Iterator<BatchStepFileDescriptor> iter = startedSteps.values().iterator(); iter.hasNext();) {
306 BatchStepFileDescriptor batchStepFile = iter.next();
307
308 Date startedDate = batchStepFile.getStartedDate();
309
310 containerResults.append(batchStepFile +"; S:"+ startedDate +"\n");
311 }
312
313 LOG.info(containerResults);
314 }
315
316
317
318
319
320
321 public void setBatchContainerDirectory(String batchContainerDirectory) {
322 this.batchContainerDirectory = batchContainerDirectory;
323 }
324
325
326
327
328
329
330 public void setBatchContainerStopStep(Step batchContainerStopStep) {
331 this.batchContainerStopStep = batchContainerStopStep;
332 }
333
334
335
336
337 public Step getBatchContainerStopStep() {
338 return this.batchContainerStopStep;
339 }
340 }