View Javadoc
1   package org.kuali.ole.batch.impl;
2   
3   import org.apache.commons.lang3.StringUtils;
4   import org.apache.log4j.Logger;
5   import org.kuali.ole.DataCarrierService;
6   import org.kuali.ole.OLEConstants;
7   import org.kuali.ole.batch.bo.OLEBatchProcessJobDetailsBo;
8   import org.kuali.ole.batch.bo.OLEBatchProcessProfileBo;
9   import org.kuali.ole.batch.controller.OLEBatchProcessJobDetailsController;
10  import org.kuali.ole.batch.document.OLEBatchProcessDefinitionDocument;
11  import org.kuali.ole.batch.helper.OLEBatchProcessDataHelper;
12  import org.kuali.rice.krad.service.BusinessObjectService;
13  import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
14  import org.kuali.rice.krad.service.KRADServiceLocator;
15  
16  import java.sql.Timestamp;
17  import java.util.*;
18  
19  /**
20   * Created with IntelliJ IDEA.
21   * User: meenrajd
22   * Date: 7/8/13
23   * Time: 3:55 PM
24   * To change this template use File | Settings | File Templates.
25   */
26  public abstract class AbstractBatchProcess implements OLEBatchProcess {
27  
28      protected OLEBatchProcessJobDetailsBo job;
29      protected OLEBatchProcessDefinitionDocument processDef;
30      private OLEBatchProcessDataHelper oleBatchProcessDataHelper;
31      private static final Logger LOG = Logger.getLogger(OLEBatchProcessDataHelper.class);
32      private BusinessObjectService businessObjectService;
33      String jobStatus = null;
34      private OLEBatchProcessDataHelper getOLEBatchProcessDataHelper() {
35  
36          if (oleBatchProcessDataHelper == null) {
37              oleBatchProcessDataHelper = OLEBatchProcessDataHelper.getInstance();
38          }
39          return oleBatchProcessDataHelper;
40      }
41  
42      /**
43       * This method will be invoked for various processes implementing the OLEBatchProcess
44       * The method will perform following operation in sequence
45       * 1. loadProfile() - loads the profile of the process that has been invoked
46       * 2. prepareForRead() - performs the initial read or read support operation as implemented by the invoked process
47       * 3. prepareForWrite() - performs the initial write or write support operation as implemented by the invoked process
48       * 4. writeData() - performs the actual write operation intended and implemented by the process invoked
49       * 5. getNextBatch() - performs the read operation if there is more data to be read
50       * <p/>
51       * The writeData() and getNextBatch() will be executed till the job status is RUNNING. The implemented process will
52       * have to set the job status to STOPPED / COMPLETED etc. as per its discretion
53       * <p/>
54       * updateBatchProgress() - performs the job update after each of the above method complete operation.
55       *
56       * @param processDef
57       * @param jobBo
58       * @throws Exception
59       */
60      @Override
61      public final void process(OLEBatchProcessDefinitionDocument processDef, OLEBatchProcessJobDetailsBo jobBo) throws Exception {
62          this.job = jobBo;
63          if (job.getStatus() != null && job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
64              job.setStatus(job.getStatus());
65          } else {
66              job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING);//set job to running status
67          }
68          job.setStartTime(job.getStartTime() == null ? new Timestamp(new Date().getTime()) : job.getStartTime()); // set the start time of job
69          try {
70              updateJobProgress();   // update job
71              checkJobPauseStatus();
72              loadProfile(processDef); // load the profile for the given process id
73              checkJobPauseStatus();
74              updateJobProgress();  // update job
75              checkJobPauseStatus();
76              this.prepareForRead();  // read the data
77              checkJobPauseStatus();
78              updateJobProgress();  // update job
79              checkJobPauseStatus();
80              if (isJobRunning()) {     // check if the job is running
81                  checkJobPauseStatus();
82                  this.prepareForWrite(); // prepare for writing the data
83                  checkJobPauseStatus();
84                  updateJobProgress();  // update job
85                  while (isJobRunning()) {   // check if the job is running
86                      checkJobPauseStatus();
87                      this.processBatch();     // write data
88                      checkJobPauseStatus();
89                      updateJobProgress();  // update job
90                      if (isJobRunning()) {    // check if the job is running
91                          checkJobPauseStatus();
92                          this.getNextBatch();  // get the next batch of records
93                          checkJobPauseStatus();
94                          updateJobProgress();  // update job
95                      }
96                  }
97              }
98          } catch (Exception ex) {
99              LOG.error("Error while performing batch process for profile :: " + this.processDef.getBatchProcessProfileName(), ex);
100             job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job
101             job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED);
102             job.setStatusDesc("Batch process Failed for profile :: " + this.processDef.getBatchProcessProfileName());
103             updateJobProgress();
104             DataCarrierService dataCarrierService = GlobalResourceLoader.getService(OLEConstants.DATA_CARRIER_SERVICE);
105             List<String> reasonForFailure = (List<String>) dataCarrierService.getData("reasonForBibImportFailure");
106             StringBuffer failureBuffer = new StringBuffer();
107             if(reasonForFailure != null && reasonForFailure.size() > 0){
108                 for(int failureCount = 0;failureCount < reasonForFailure.size();failureCount++){
109                     failureBuffer.append(reasonForFailure.get(failureCount) + "\n");
110                 }
111                 createBatchErrorAttachmentFile(failureBuffer.toString());
112             }
113             dataCarrierService.addData("reasonForBibImportFailure",new ArrayList<>());
114             List<String> reasonForInvoiceImportFailure = (List<String>) dataCarrierService.getData("invoiceIngestFailureReason");
115             StringBuffer invoiceFailureBuffer = new StringBuffer();
116             if(reasonForInvoiceImportFailure != null && reasonForInvoiceImportFailure.size() > 0){
117                 for(int failureCount = 0;failureCount < reasonForInvoiceImportFailure.size();failureCount++){
118                     invoiceFailureBuffer.append(reasonForInvoiceImportFailure.get(failureCount) + "\n");
119                 }
120                 createBatchErrorAttachmentFile(invoiceFailureBuffer.toString());
121             }
122             dataCarrierService.addData("invoiceIngestFailureReason",new ArrayList<>());
123             throw new Exception("Batch process Failed", ex);
124         }
125 
126         if (job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED)) {
127             if (jobBo != null) {
128                 jobBo.setStatus(job.getStatus());
129                 getBusinessObjectService().save(jobBo);
130                 job.setStatusDesc("Batch Operation Stopped");
131                 OLEBatchProcessJobDetailsController.removeStatusFromBatchProcess(jobBo.getJobId());
132             }
133         }
134         job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job
135         if (StringUtils.isEmpty(job.getStatusDesc())) {
136             job.setStatusDesc("Batch Operation Completed");
137         }
138         updateJobProgress();  // update job
139     }
140 
141     protected BusinessObjectService getBusinessObjectService() {
142         if (businessObjectService == null)
143             businessObjectService = KRADServiceLocator.getBusinessObjectService();
144         return businessObjectService;
145     }
146 
147     /**
148      * First method to be called in the batch process will load the profile for the process id in the process defn document
149      *
150      * @param processDef
151      */
152     protected void loadProfile(OLEBatchProcessDefinitionDocument processDef) throws Exception {
153         this.processDef = processDef;
154         //load profile
155         try {
156             OLEBatchProcessProfileBo profileBo = KRADServiceLocator.getBusinessObjectService().findBySinglePrimaryKey(OLEBatchProcessProfileBo.class, processDef.getBatchProcessProfileId());
157             this.processDef.setOleBatchProcessProfileBo(profileBo);
158         } catch (Exception ex) {
159             LOG.error("Error while loading profile :: " + this.processDef.getBatchProcessProfileName(), ex);
160             throw ex;
161         }
162     }
163 
164     /**
165      * method to update the batch job progress will be called after every other method completes operation
166      */
167     public void updateJobProgress() throws Exception {
168             updatePercentCompleted();
169             updateTimeSpent();
170             KRADServiceLocator.getBusinessObjectService().save(job);
171 
172     }
173 
174     /**
175      * method to be implemented to read initial data for the various processes; will be called before prepareForWrite() method
176      *
177      * @throws Exception
178      */
179     protected abstract void prepareForRead() throws Exception;
180 
181     /**
182      * method to be implemented to prepare for writing the data; will be called before writeData()
183      *
184      * @throws Exception
185      */
186     protected abstract void prepareForWrite() throws Exception;
187 
188     /**
189      * method to be implemented to retreive the next batch of data to be processed
190      *
191      * @throws Exception
192      */
193     protected abstract void getNextBatch() throws Exception;
194 
195     /**
196      * method to be implemented for various write operation
197      */
198     protected abstract void processBatch() throws Exception;
199 
200     /**
201      * checks if the job is in the RUNNING state
202      *
203      * @return
204      */
205     protected boolean isJobRunning() {
206         if(job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_COMPLETED)) {
207             return false;
208         }
209         else {
210         if (OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap() != null && OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap().size() > 0) {
211             jobStatus = OLEBatchProcessJobDetailsController.getBatchProcessJobStatus(job.getJobId());
212             job.setStatus(jobStatus);
213         }
214 
215         if (jobStatus == null) {
216             Map<String, String> jobMap = new HashMap<String, String>();
217             jobMap.put("job_id", job.getJobId());
218             OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
219             OLEBatchProcessJobDetailsController.setBatchProcessJobStatusMap(job.getJobId(),jobDetailsBo.getStatus());
220             if (jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING)) {
221                 return true;
222             } else {
223                 return false;
224             }
225         } else if (jobStatus != null && OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING.equalsIgnoreCase(jobStatus)) {
226             return true;
227         } else {
228             return false;
229         }
230         }
231     }
232 
233     /**
234      * checks if the job is in the Pause state
235      *
236      * @return
237      */
238     protected void checkJobPauseStatus() throws Exception{
239         Map<String, String> jobMap = new HashMap<String, String>();
240         jobMap.put("job_id", job.getJobId());
241         OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
242         while(jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
243             Thread.sleep(100);
244             jobMap.put("job_id", job.getJobId());
245             jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
246         }
247     }
248 
249 
250     /**
251      * updates the time spent since the job started
252      */
253     private void updateTimeSpent() {
254         Timestamp startTime = job.getStartTime();
255         long diff = Calendar.getInstance().getTime().getTime() - startTime.getTime();
256         long diffSeconds = diff / 1000 % 60;
257         long diffMinutes = diff / (60 * 1000) % 60;
258         long diffHours = diff / (60 * 60 * 1000) % 24;
259         StringBuffer sb = new StringBuffer();
260         sb.append(diffHours + ":" + diffMinutes + ":" + diffSeconds);
261         job.setTimeSpent(sb.toString());
262     }
263 
264     /**
265      * updates the percentage completed based on the number of records processed and no of records in process
266      */
267     private void updatePercentCompleted() {
268         // String total = job.getNoOfRecords();
269         String total = job.getTotalNoOfRecords();
270         float totRec = Float.parseFloat(total == null ? "0" : total);
271         if (totRec == 0.0) return;
272         String noProcessed = job.getNoOfRecordsProcessed();
273         if (StringUtils.isEmpty(noProcessed)) return;
274         float perCompleted = (Float.valueOf(noProcessed) / Float.valueOf(total)) * 100;
275         job.setPerCompleted(String.format("%.2f", perCompleted) + PERCENT);
276     }
277 
278     /**
279      * create and return the dirctory based on batch process type in the server file system
280      */
281     protected String getBatchProcessFilePath(String batchProceesType) {
282 
283         String batchProcessLocation = getOLEBatchProcessDataHelper().getBatchProcessFilePath(batchProceesType);
284         return batchProcessLocation;
285     }
286 
287 
288     /**
289      * Create the success record file in the batch process type directory  for batch delete
290      * @param successRecordData
291      * @throws Exception
292      */
293     protected void createBatchSuccessFile(String successRecordData) throws Exception {
294 
295         getOLEBatchProcessDataHelper().createBatchSuccessFile(successRecordData, processDef.getBatchProcessType(), job.getJobId() + "_SuccessRecord" + "_" + job.getUploadFileName(), job.getJobId());
296     }
297 
298 
299     /**
300      * Create the failure report record file in the batch process type directory for batch delete
301      * @param failureReportData
302      * @throws Exception
303      */
304     protected void createBatchDeleteFailureReportFile(String failureReportData) throws Exception {
305 
306         getOLEBatchProcessDataHelper().createBatchDeleteFailureReportFile(failureReportData, processDef.getBatchProcessType(), job.getJobId() + "_FailureReport.txt", job.getJobId());
307     }
308 
309 
310     /**
311      * Create the failure record file in the batch process type directory
312      * @param failureRecordData
313      * @throws Exception
314      */
315     protected void createBatchFailureFile(String failureRecordData) throws Exception {
316 
317         getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + job.getUploadFileName(), job.getJobId());
318     }
319 
320     protected void createBatchErrorAttachmentFile(String failureRecordData) throws Exception {
321         String uploadFileName = job.getUploadFileName();
322         String errorFileName = null;
323         String[] fileNames = uploadFileName.split(",");
324         errorFileName = fileNames.length == 2 ? fileNames[0]:uploadFileName;
325         if(errorFileName.endsWith(".mrc")){
326             errorFileName = errorFileName.replace(".mrc",".txt");
327         }
328         else if(errorFileName.endsWith(".INV")){
329             errorFileName = errorFileName.replace(".INV",".txt");
330         }
331         else if(errorFileName.endsWith(".edi")){
332             errorFileName = errorFileName.replace(".edi",".txt");
333         }
334         getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + errorFileName, job.getJobId());
335     }
336 
337     protected void createFile(String[] content) throws Exception {
338 
339         getOLEBatchProcessDataHelper().createFile(content, processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.DELETED_BIB_IDS_FILE_NAME, job.getJobId());
340     }
341 
342     /**
343      * Delete the upload file in the batch process type dirctory
344      */
345     protected void deleteBatchFile() throws Exception {
346 
347         getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
348     }
349 
350     /**
351      * Get the upload file content from the batch process type dirctory by using job id an upload file name
352      */
353     protected String getBatchProcessFileContent() throws Exception {
354         String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
355         return fileContent;
356     }
357 
358     protected String getBatchProcessFileContent(String fileName) throws Exception {
359         String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
360         return fileContent;
361 
362 
363     }
364 
365     /**
366      * Delete the upload file in the batch process type dirctory
367      */
368     protected void deleteBatchFile(String fileName) throws Exception {
369 
370         getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
371     }
372 
373     /**
374      * Create the failure record file in the batch process type directory
375      * @param failureRecordData
376      * @throws Exception
377      */
378     protected void createBatchFailureFile(String failureRecordData, String fileName) throws Exception {
379 
380         getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + fileName, job.getJobId());
381     }
382 
383 
384 
385 }