1   package org.kuali.ole.batch.impl;
2   
3   import org.apache.commons.lang3.StringUtils;
4   import org.apache.log4j.Logger;
5   import org.kuali.ole.DataCarrierService;
6   import org.kuali.ole.OLEConstants;
7   import org.kuali.ole.batch.bo.OLEBatchProcessJobDetailsBo;
8   import org.kuali.ole.batch.bo.OLEBatchProcessProfileBo;
9   import org.kuali.ole.batch.controller.OLEBatchProcessJobDetailsController;
10  import org.kuali.ole.batch.document.OLEBatchProcessDefinitionDocument;
11  import org.kuali.ole.batch.helper.OLEBatchProcessDataHelper;
12  import org.kuali.rice.krad.service.BusinessObjectService;
13  import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
14  import org.kuali.rice.krad.service.KRADServiceLocator;
15  
16  import java.sql.Timestamp;
17  import java.util.*;
18  
19  
20  
21  
22  
23  
24  
25  
26  public abstract class AbstractBatchProcess implements OLEBatchProcess {
27  
28      protected OLEBatchProcessJobDetailsBo job;
29      protected OLEBatchProcessDefinitionDocument processDef;
30      private OLEBatchProcessDataHelper oleBatchProcessDataHelper;
31      private static final Logger LOG = Logger.getLogger(OLEBatchProcessDataHelper.class);
32      private BusinessObjectService businessObjectService;
33      String jobStatus = null;
34      private OLEBatchProcessDataHelper getOLEBatchProcessDataHelper() {
35  
36          if (oleBatchProcessDataHelper == null) {
37              oleBatchProcessDataHelper = OLEBatchProcessDataHelper.getInstance();
38          }
39          return oleBatchProcessDataHelper;
40      }
41  
42      
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60      @Override
61      public final void process(OLEBatchProcessDefinitionDocument processDef, OLEBatchProcessJobDetailsBo jobBo) throws Exception {
62          this.job = jobBo;
63          if (job.getStatus() != null && job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
64              job.setStatus(job.getStatus());
65          } else {
66              job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING);
67          }
68          job.setStartTime(job.getStartTime() == null ? new Timestamp(new Date().getTime()) : job.getStartTime()); 
69          try {
70              updateJobProgress();   
71              checkJobPauseStatus();
72              loadProfile(processDef); 
73              checkJobPauseStatus();
74              updateJobProgress();  
75              checkJobPauseStatus();
76              this.prepareForRead();  
77              checkJobPauseStatus();
78              updateJobProgress();  
79              checkJobPauseStatus();
80              if (isJobRunning()) {     
81                  checkJobPauseStatus();
82                  this.prepareForWrite(); 
83                  checkJobPauseStatus();
84                  updateJobProgress();  
85                  while (isJobRunning()) {   
86                      checkJobPauseStatus();
87                      this.processBatch();     
88                      checkJobPauseStatus();
89                      updateJobProgress();  
90                      if (isJobRunning()) {    
91                          checkJobPauseStatus();
92                          this.getNextBatch();  
93                          checkJobPauseStatus();
94                          updateJobProgress();  
95                      }
96                  }
97              }
98          } catch (Exception ex) {
99              LOG.error("Error while performing batch process for profile :: " + this.processDef.getBatchProcessProfileName(), ex);
100             job.setEndTime(new Timestamp(new Date().getTime())); 
101             job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED);
102             job.setStatusDesc("Batch process Failed for profile :: " + this.processDef.getBatchProcessProfileName());
103             updateJobProgress();
104             DataCarrierService dataCarrierService = GlobalResourceLoader.getService(OLEConstants.DATA_CARRIER_SERVICE);
105             List<String> reasonForFailure = (List<String>) dataCarrierService.getData("reasonForBibImportFailure");
106             StringBuffer failureBuffer = new StringBuffer();
107             if(reasonForFailure != null && reasonForFailure.size() > 0){
108                 for(int failureCount = 0;failureCount < reasonForFailure.size();failureCount++){
109                     failureBuffer.append(reasonForFailure.get(failureCount) + "\n");
110                 }
111                 createBatchErrorAttachmentFile(failureBuffer.toString());
112             }
113             dataCarrierService.addData("reasonForBibImportFailure",new ArrayList<>());
114             List<String> reasonForInvoiceImportFailure = (List<String>) dataCarrierService.getData("invoiceIngestFailureReason");
115             StringBuffer invoiceFailureBuffer = new StringBuffer();
116             if(reasonForInvoiceImportFailure != null && reasonForInvoiceImportFailure.size() > 0){
117                 for(int failureCount = 0;failureCount < reasonForInvoiceImportFailure.size();failureCount++){
118                     invoiceFailureBuffer.append(reasonForInvoiceImportFailure.get(failureCount) + "\n");
119                 }
120                 createBatchErrorAttachmentFile(invoiceFailureBuffer.toString());
121             }
122             dataCarrierService.addData("invoiceIngestFailureReason",new ArrayList<>());
123             throw new Exception("Batch process Failed", ex);
124         }
125 
126         if (job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED)) {
127             if (jobBo != null) {
128                 jobBo.setStatus(job.getStatus());
129                 getBusinessObjectService().save(jobBo);
130                 job.setStatusDesc("Batch Operation Stopped");
131                 OLEBatchProcessJobDetailsController.removeStatusFromBatchProcess(jobBo.getJobId());
132             }
133         }
134         job.setEndTime(new Timestamp(new Date().getTime())); 
135         if (StringUtils.isEmpty(job.getStatusDesc())) {
136             job.setStatusDesc("Batch Operation Completed");
137         }
138         updateJobProgress();  
139     }
140 
141     protected BusinessObjectService getBusinessObjectService() {
142         if (businessObjectService == null)
143             businessObjectService = KRADServiceLocator.getBusinessObjectService();
144         return businessObjectService;
145     }
146 
147     
148 
149 
150 
151 
152     protected void loadProfile(OLEBatchProcessDefinitionDocument processDef) throws Exception {
153         this.processDef = processDef;
154         
155         try {
156             OLEBatchProcessProfileBo profileBo = KRADServiceLocator.getBusinessObjectService().findBySinglePrimaryKey(OLEBatchProcessProfileBo.class, processDef.getBatchProcessProfileId());
157             this.processDef.setOleBatchProcessProfileBo(profileBo);
158         } catch (Exception ex) {
159             LOG.error("Error while loading profile :: " + this.processDef.getBatchProcessProfileName(), ex);
160             throw ex;
161         }
162     }
163 
164     
165 
166 
167     public void updateJobProgress() throws Exception {
168             updatePercentCompleted();
169             updateTimeSpent();
170             KRADServiceLocator.getBusinessObjectService().save(job);
171 
172     }
173 
174     
175 
176 
177 
178 
179     protected abstract void prepareForRead() throws Exception;
180 
181     
182 
183 
184 
185 
186     protected abstract void prepareForWrite() throws Exception;
187 
188     
189 
190 
191 
192 
193     protected abstract void getNextBatch() throws Exception;
194 
195     
196 
197 
198     protected abstract void processBatch() throws Exception;
199 
200     
201 
202 
203 
204 
205     protected boolean isJobRunning() {
206         if(job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_COMPLETED)) {
207             return false;
208         }
209         else {
210         if (OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap() != null && OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap().size() > 0) {
211             jobStatus = OLEBatchProcessJobDetailsController.getBatchProcessJobStatus(job.getJobId());
212             job.setStatus(jobStatus);
213         }
214 
215         if (jobStatus == null) {
216             Map<String, String> jobMap = new HashMap<String, String>();
217             jobMap.put("job_id", job.getJobId());
218             OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
219             OLEBatchProcessJobDetailsController.setBatchProcessJobStatusMap(job.getJobId(),jobDetailsBo.getStatus());
220             if (jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING)) {
221                 return true;
222             } else {
223                 return false;
224             }
225         } else if (jobStatus != null && OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING.equalsIgnoreCase(jobStatus)) {
226             return true;
227         } else {
228             return false;
229         }
230         }
231     }
232 
233     
234 
235 
236 
237 
238     protected void checkJobPauseStatus() throws Exception{
239         Map<String, String> jobMap = new HashMap<String, String>();
240         jobMap.put("job_id", job.getJobId());
241         OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
242         while(jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
243             Thread.sleep(100);
244             jobMap.put("job_id", job.getJobId());
245             jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
246         }
247     }
248 
249 
250     
251 
252 
253     private void updateTimeSpent() {
254         Timestamp startTime = job.getStartTime();
255         long diff = Calendar.getInstance().getTime().getTime() - startTime.getTime();
256         long diffSeconds = diff / 1000 % 60;
257         long diffMinutes = diff / (60 * 1000) % 60;
258         long diffHours = diff / (60 * 60 * 1000) % 24;
259         StringBuffer sb = new StringBuffer();
260         sb.append(diffHours + ":" + diffMinutes + ":" + diffSeconds);
261         job.setTimeSpent(sb.toString());
262     }
263 
264     
265 
266 
267     private void updatePercentCompleted() {
268         
269         String total = job.getTotalNoOfRecords();
270         float totRec = Float.parseFloat(total == null ? "0" : total);
271         if (totRec == 0.0) return;
272         String noProcessed = job.getNoOfRecordsProcessed();
273         if (StringUtils.isEmpty(noProcessed)) return;
274         float perCompleted = (Float.valueOf(noProcessed) / Float.valueOf(total)) * 100;
275         job.setPerCompleted(String.format("%.2f", perCompleted) + PERCENT);
276     }
277 
278     
279 
280 
281     protected String getBatchProcessFilePath(String batchProceesType) {
282 
283         String batchProcessLocation = getOLEBatchProcessDataHelper().getBatchProcessFilePath(batchProceesType);
284         return batchProcessLocation;
285     }
286 
287 
288     
289 
290 
291 
292 
293     protected void createBatchSuccessFile(String successRecordData) throws Exception {
294 
295         getOLEBatchProcessDataHelper().createBatchSuccessFile(successRecordData, processDef.getBatchProcessType(), job.getJobId() + "_SuccessRecord" + "_" + job.getUploadFileName(), job.getJobId());
296     }
297 
298 
299     
300 
301 
302 
303 
304     protected void createBatchDeleteFailureReportFile(String failureReportData) throws Exception {
305 
306         getOLEBatchProcessDataHelper().createBatchDeleteFailureReportFile(failureReportData, processDef.getBatchProcessType(), job.getJobId() + "_FailureReport.txt", job.getJobId());
307     }
308 
309 
310     
311 
312 
313 
314 
315     protected void createBatchFailureFile(String failureRecordData) throws Exception {
316 
317         getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + job.getUploadFileName(), job.getJobId());
318     }
319 
320     protected void createBatchErrorAttachmentFile(String failureRecordData) throws Exception {
321         String uploadFileName = job.getUploadFileName();
322         String errorFileName = null;
323         String[] fileNames = uploadFileName.split(",");
324         errorFileName = fileNames.length == 2 ? fileNames[0]:uploadFileName;
325         if(errorFileName.endsWith(".mrc")){
326             errorFileName = errorFileName.replace(".mrc",".txt");
327         }
328         else if(errorFileName.endsWith(".INV")){
329             errorFileName = errorFileName.replace(".INV",".txt");
330         }
331         else if(errorFileName.endsWith(".edi")){
332             errorFileName = errorFileName.replace(".edi",".txt");
333         }
334         getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + errorFileName, job.getJobId());
335     }
336 
337     protected void createFile(String[] content) throws Exception {
338 
339         getOLEBatchProcessDataHelper().createFile(content, processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.DELETED_BIB_IDS_FILE_NAME, job.getJobId());
340     }
341 
342     
343 
344 
345     protected void deleteBatchFile() throws Exception {
346 
347         getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
348     }
349 
350     
351 
352 
353     protected String getBatchProcessFileContent() throws Exception {
354         String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
355         return fileContent;
356     }
357 
358     protected String getBatchProcessFileContent(String fileName) throws Exception {
359         String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
360         return fileContent;
361 
362 
363     }
364 
365     
366 
367 
368     protected void deleteBatchFile(String fileName) throws Exception {
369 
370         getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
371     }
372 
373     
374 
375 
376 
377 
378     protected void createBatchFailureFile(String failureRecordData, String fileName) throws Exception {
379 
380         getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + fileName, job.getJobId());
381     }
382 
383 
384 
385 }