001package org.kuali.ole.batch.impl; 002 003import org.apache.commons.lang3.StringUtils; 004import org.apache.log4j.Logger; 005import org.kuali.ole.DataCarrierService; 006import org.kuali.ole.OLEConstants; 007import org.kuali.ole.batch.bo.OLEBatchProcessJobDetailsBo; 008import org.kuali.ole.batch.bo.OLEBatchProcessProfileBo; 009import org.kuali.ole.batch.controller.OLEBatchProcessJobDetailsController; 010import org.kuali.ole.batch.document.OLEBatchProcessDefinitionDocument; 011import org.kuali.ole.batch.helper.OLEBatchProcessDataHelper; 012import org.kuali.rice.krad.service.BusinessObjectService; 013import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader; 014import org.kuali.rice.krad.service.KRADServiceLocator; 015 016import java.sql.Timestamp; 017import java.util.*; 018 019/** 020 * Created with IntelliJ IDEA. 021 * User: meenrajd 022 * Date: 7/8/13 023 * Time: 3:55 PM 024 * To change this template use File | Settings | File Templates. 025 */ 026public abstract class AbstractBatchProcess implements OLEBatchProcess { 027 028 protected OLEBatchProcessJobDetailsBo job; 029 protected OLEBatchProcessDefinitionDocument processDef; 030 private OLEBatchProcessDataHelper oleBatchProcessDataHelper; 031 private static final Logger LOG = Logger.getLogger(OLEBatchProcessDataHelper.class); 032 private BusinessObjectService businessObjectService; 033 String jobStatus = null; 034 private OLEBatchProcessDataHelper getOLEBatchProcessDataHelper() { 035 036 if (oleBatchProcessDataHelper == null) { 037 oleBatchProcessDataHelper = OLEBatchProcessDataHelper.getInstance(); 038 } 039 return oleBatchProcessDataHelper; 040 } 041 042 /** 043 * This method will be invoked for various processes implementing the OLEBatchProcess 044 * The method will perform following operation in sequence 045 * 1. loadProfile() - loads the profile of the process that has been invoked 046 * 2. prepareForRead() - performs the initial read or read support operation as implemented by the invoked process 047 * 3. prepareForWrite() - performs the initial write or write support operation as implemented by the invoked process 048 * 4. writeData() - performs the actual write operation intended and implemented by the process invoked 049 * 5. getNextBatch() - performs the read operation if there is more data to be read 050 * <p/> 051 * The writeData() and getNextBatch() will be executed till the job status is RUNNING. The implemented process will 052 * have to set the job status to STOPPED / COMPLETED etc. as per its discretion 053 * <p/> 054 * updateBatchProgress() - performs the job update after each of the above method complete operation. 055 * 056 * @param processDef 057 * @param jobBo 058 * @throws Exception 059 */ 060 @Override 061 public final void process(OLEBatchProcessDefinitionDocument processDef, OLEBatchProcessJobDetailsBo jobBo) throws Exception { 062 this.job = jobBo; 063 if (job.getStatus() != null && job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) { 064 job.setStatus(job.getStatus()); 065 } else { 066 job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING);//set job to running status 067 } 068 job.setStartTime(job.getStartTime() == null ? new Timestamp(new Date().getTime()) : job.getStartTime()); // set the start time of job 069 try { 070 updateJobProgress(); // update job 071 checkJobPauseStatus(); 072 loadProfile(processDef); // load the profile for the given process id 073 checkJobPauseStatus(); 074 updateJobProgress(); // update job 075 checkJobPauseStatus(); 076 this.prepareForRead(); // read the data 077 checkJobPauseStatus(); 078 updateJobProgress(); // update job 079 checkJobPauseStatus(); 080 if (isJobRunning()) { // check if the job is running 081 checkJobPauseStatus(); 082 this.prepareForWrite(); // prepare for writing the data 083 checkJobPauseStatus(); 084 updateJobProgress(); // update job 085 while (isJobRunning()) { // check if the job is running 086 checkJobPauseStatus(); 087 this.processBatch(); // write data 088 checkJobPauseStatus(); 089 updateJobProgress(); // update job 090 if (isJobRunning()) { // check if the job is running 091 checkJobPauseStatus(); 092 this.getNextBatch(); // get the next batch of records 093 checkJobPauseStatus(); 094 updateJobProgress(); // update job 095 } 096 } 097 } 098 } catch (Exception ex) { 099 LOG.error("Error while performing batch process for profile :: " + this.processDef.getBatchProcessProfileName(), ex); 100 job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job 101 job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED); 102 job.setStatusDesc("Batch process Failed for profile :: " + this.processDef.getBatchProcessProfileName()); 103 updateJobProgress(); 104 DataCarrierService dataCarrierService = GlobalResourceLoader.getService(OLEConstants.DATA_CARRIER_SERVICE); 105 List<String> reasonForFailure = (List<String>) dataCarrierService.getData("reasonForBibImportFailure"); 106 StringBuffer failureBuffer = new StringBuffer(); 107 if(reasonForFailure != null && reasonForFailure.size() > 0){ 108 for(int failureCount = 0;failureCount < reasonForFailure.size();failureCount++){ 109 failureBuffer.append(reasonForFailure.get(failureCount) + "\n"); 110 } 111 createBatchErrorAttachmentFile(failureBuffer.toString()); 112 } 113 dataCarrierService.addData("reasonForBibImportFailure",new ArrayList<>()); 114 List<String> reasonForInvoiceImportFailure = (List<String>) dataCarrierService.getData("invoiceIngestFailureReason"); 115 StringBuffer invoiceFailureBuffer = new StringBuffer(); 116 if(reasonForInvoiceImportFailure != null && reasonForInvoiceImportFailure.size() > 0){ 117 for(int failureCount = 0;failureCount < reasonForInvoiceImportFailure.size();failureCount++){ 118 invoiceFailureBuffer.append(reasonForInvoiceImportFailure.get(failureCount) + "\n"); 119 } 120 createBatchErrorAttachmentFile(invoiceFailureBuffer.toString()); 121 } 122 dataCarrierService.addData("invoiceIngestFailureReason",new ArrayList<>()); 123 throw new Exception("Batch process Failed", ex); 124 } 125 126 if (job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED)) { 127 if (jobBo != null) { 128 jobBo.setStatus(job.getStatus()); 129 getBusinessObjectService().save(jobBo); 130 job.setStatusDesc("Batch Operation Stopped"); 131 OLEBatchProcessJobDetailsController.removeStatusFromBatchProcess(jobBo.getJobId()); 132 } 133 } 134 job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job 135 if (StringUtils.isEmpty(job.getStatusDesc())) { 136 job.setStatusDesc("Batch Operation Completed"); 137 } 138 updateJobProgress(); // update job 139 } 140 141 protected BusinessObjectService getBusinessObjectService() { 142 if (businessObjectService == null) 143 businessObjectService = KRADServiceLocator.getBusinessObjectService(); 144 return businessObjectService; 145 } 146 147 /** 148 * First method to be called in the batch process will load the profile for the process id in the process defn document 149 * 150 * @param processDef 151 */ 152 protected void loadProfile(OLEBatchProcessDefinitionDocument processDef) throws Exception { 153 this.processDef = processDef; 154 //load profile 155 try { 156 OLEBatchProcessProfileBo profileBo = KRADServiceLocator.getBusinessObjectService().findBySinglePrimaryKey(OLEBatchProcessProfileBo.class, processDef.getBatchProcessProfileId()); 157 this.processDef.setOleBatchProcessProfileBo(profileBo); 158 } catch (Exception ex) { 159 LOG.error("Error while loading profile :: " + this.processDef.getBatchProcessProfileName(), ex); 160 throw ex; 161 } 162 } 163 164 /** 165 * method to update the batch job progress will be called after every other method completes operation 166 */ 167 public void updateJobProgress() throws Exception { 168 updatePercentCompleted(); 169 updateTimeSpent(); 170 KRADServiceLocator.getBusinessObjectService().save(job); 171 172 } 173 174 /** 175 * method to be implemented to read initial data for the various processes; will be called before prepareForWrite() method 176 * 177 * @throws Exception 178 */ 179 protected abstract void prepareForRead() throws Exception; 180 181 /** 182 * method to be implemented to prepare for writing the data; will be called before writeData() 183 * 184 * @throws Exception 185 */ 186 protected abstract void prepareForWrite() throws Exception; 187 188 /** 189 * method to be implemented to retreive the next batch of data to be processed 190 * 191 * @throws Exception 192 */ 193 protected abstract void getNextBatch() throws Exception; 194 195 /** 196 * method to be implemented for various write operation 197 */ 198 protected abstract void processBatch() throws Exception; 199 200 /** 201 * checks if the job is in the RUNNING state 202 * 203 * @return 204 */ 205 protected boolean isJobRunning() { 206 if(job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_COMPLETED)) { 207 return false; 208 } 209 else { 210 if (OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap() != null && OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap().size() > 0) { 211 jobStatus = OLEBatchProcessJobDetailsController.getBatchProcessJobStatus(job.getJobId()); 212 job.setStatus(jobStatus); 213 } 214 215 if (jobStatus == null) { 216 Map<String, String> jobMap = new HashMap<String, String>(); 217 jobMap.put("job_id", job.getJobId()); 218 OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap); 219 OLEBatchProcessJobDetailsController.setBatchProcessJobStatusMap(job.getJobId(),jobDetailsBo.getStatus()); 220 if (jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING)) { 221 return true; 222 } else { 223 return false; 224 } 225 } else if (jobStatus != null && OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING.equalsIgnoreCase(jobStatus)) { 226 return true; 227 } else { 228 return false; 229 } 230 } 231 } 232 233 /** 234 * checks if the job is in the Pause state 235 * 236 * @return 237 */ 238 protected void checkJobPauseStatus() throws Exception{ 239 Map<String, String> jobMap = new HashMap<String, String>(); 240 jobMap.put("job_id", job.getJobId()); 241 OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap); 242 while(jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) { 243 Thread.sleep(100); 244 jobMap.put("job_id", job.getJobId()); 245 jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap); 246 } 247 } 248 249 250 /** 251 * updates the time spent since the job started 252 */ 253 private void updateTimeSpent() { 254 Timestamp startTime = job.getStartTime(); 255 long diff = Calendar.getInstance().getTime().getTime() - startTime.getTime(); 256 long diffSeconds = diff / 1000 % 60; 257 long diffMinutes = diff / (60 * 1000) % 60; 258 long diffHours = diff / (60 * 60 * 1000) % 24; 259 StringBuffer sb = new StringBuffer(); 260 sb.append(diffHours + ":" + diffMinutes + ":" + diffSeconds); 261 job.setTimeSpent(sb.toString()); 262 } 263 264 /** 265 * updates the percentage completed based on the number of records processed and no of records in process 266 */ 267 private void updatePercentCompleted() { 268 // String total = job.getNoOfRecords(); 269 String total = job.getTotalNoOfRecords(); 270 float totRec = Float.parseFloat(total == null ? "0" : total); 271 if (totRec == 0.0) return; 272 String noProcessed = job.getNoOfRecordsProcessed(); 273 if (StringUtils.isEmpty(noProcessed)) return; 274 float perCompleted = (Float.valueOf(noProcessed) / Float.valueOf(total)) * 100; 275 job.setPerCompleted(String.format("%.2f", perCompleted) + PERCENT); 276 } 277 278 /** 279 * create and return the dirctory based on batch process type in the server file system 280 */ 281 protected String getBatchProcessFilePath(String batchProceesType) { 282 283 String batchProcessLocation = getOLEBatchProcessDataHelper().getBatchProcessFilePath(batchProceesType); 284 return batchProcessLocation; 285 } 286 287 288 /** 289 * Create the success record file in the batch process type directory for batch delete 290 * @param successRecordData 291 * @throws Exception 292 */ 293 protected void createBatchSuccessFile(String successRecordData) throws Exception { 294 295 getOLEBatchProcessDataHelper().createBatchSuccessFile(successRecordData, processDef.getBatchProcessType(), job.getJobId() + "_SuccessRecord" + "_" + job.getUploadFileName(), job.getJobId()); 296 } 297 298 299 /** 300 * Create the failure report record file in the batch process type directory for batch delete 301 * @param failureReportData 302 * @throws Exception 303 */ 304 protected void createBatchDeleteFailureReportFile(String failureReportData) throws Exception { 305 306 getOLEBatchProcessDataHelper().createBatchDeleteFailureReportFile(failureReportData, processDef.getBatchProcessType(), job.getJobId() + "_FailureReport.txt", job.getJobId()); 307 } 308 309 310 /** 311 * Create the failure record file in the batch process type directory 312 * @param failureRecordData 313 * @throws Exception 314 */ 315 protected void createBatchFailureFile(String failureRecordData) throws Exception { 316 317 getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + job.getUploadFileName(), job.getJobId()); 318 } 319 320 protected void createBatchErrorAttachmentFile(String failureRecordData) throws Exception { 321 String uploadFileName = job.getUploadFileName(); 322 String errorFileName = null; 323 String[] fileNames = uploadFileName.split(","); 324 errorFileName = fileNames.length == 2 ? fileNames[0]:uploadFileName; 325 if(errorFileName.endsWith(".mrc")){ 326 errorFileName = errorFileName.replace(".mrc",".txt"); 327 } 328 else if(errorFileName.endsWith(".INV")){ 329 errorFileName = errorFileName.replace(".INV",".txt"); 330 } 331 else if(errorFileName.endsWith(".edi")){ 332 errorFileName = errorFileName.replace(".edi",".txt"); 333 } 334 getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + errorFileName, job.getJobId()); 335 } 336 337 protected void createFile(String[] content) throws Exception { 338 339 getOLEBatchProcessDataHelper().createFile(content, processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.DELETED_BIB_IDS_FILE_NAME, job.getJobId()); 340 } 341 342 /** 343 * Delete the upload file in the batch process type dirctory 344 */ 345 protected void deleteBatchFile() throws Exception { 346 347 getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId()); 348 } 349 350 /** 351 * Get the upload file content from the batch process type dirctory by using job id an upload file name 352 */ 353 protected String getBatchProcessFileContent() throws Exception { 354 String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId()); 355 return fileContent; 356 } 357 358 protected String getBatchProcessFileContent(String fileName) throws Exception { 359 String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId()); 360 return fileContent; 361 362 363 } 364 365 /** 366 * Delete the upload file in the batch process type dirctory 367 */ 368 protected void deleteBatchFile(String fileName) throws Exception { 369 370 getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId()); 371 } 372 373 /** 374 * Create the failure record file in the batch process type directory 375 * @param failureRecordData 376 * @throws Exception 377 */ 378 protected void createBatchFailureFile(String failureRecordData, String fileName) throws Exception { 379 380 getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + fileName, job.getJobId()); 381 } 382 383 384 385}