package org.kuali.ole.batch.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.kuali.ole.DataCarrierService;
import org.kuali.ole.OLEConstants;
import org.kuali.ole.batch.bo.OLEBatchProcessJobDetailsBo;
import org.kuali.ole.batch.bo.OLEBatchProcessProfileBo;
import org.kuali.ole.batch.controller.OLEBatchProcessJobDetailsController;
import org.kuali.ole.batch.document.OLEBatchProcessDefinitionDocument;
import org.kuali.ole.batch.helper.OLEBatchProcessDataHelper;
import org.kuali.rice.krad.service.BusinessObjectService;
import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
import org.kuali.rice.krad.service.KRADServiceLocator;
import java.sql.Timestamp;
import java.util.*;
*/
public abstract class AbstractBatchProcess implements OLEBatchProcess {
protected OLEBatchProcessJobDetailsBo job;
protected OLEBatchProcessDefinitionDocument processDef;
private OLEBatchProcessDataHelper oleBatchProcessDataHelper;
private static final Logger LOG = Logger.getLogger(OLEBatchProcessDataHelper.class);
private BusinessObjectService businessObjectService;
String jobStatus = null;
private OLEBatchProcessDataHelper getOLEBatchProcessDataHelper() {
if (oleBatchProcessDataHelper == null) {
oleBatchProcessDataHelper = OLEBatchProcessDataHelper.getInstance();
}
return oleBatchProcessDataHelper;
}
/**
* This method will be invoked for various processes implementing the OLEBatchProcess
* The method will perform following operation in sequence
* 1. loadProfile() - loads the profile of the process that has been invoked
* 2. prepareForRead() - performs the initial read or read support operation as implemented by the invoked process
* 3. prepareForWrite() - performs the initial write or write support operation as implemented by the invoked process
* 4. writeData() - performs the actual write operation intended and implemented by the process invoked
* 5. getNextBatch() - performs the read operation if there is more data to be read
* <p/>
* The writeData() and getNextBatch() will be executed till the job status is RUNNING. The implemented process will
* have to set the job status to STOPPED / COMPLETED etc. as per its discretion
* <p/>
* updateBatchProgress() - performs the job update after each of the above method complete operation.
*
* @param processDef
* @param jobBo
* @throws Exception
*/
@Override
public final void process(OLEBatchProcessDefinitionDocument processDef, OLEBatchProcessJobDetailsBo jobBo) throws Exception {
this.job = jobBo;
if (job.getStatus() != null && job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
job.setStatus(job.getStatus());
} else {
job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING);//set job to running status
}
job.setStartTime(job.getStartTime() == null ? new Timestamp(new Date().getTime()) : job.getStartTime()); // set the start time of job
try {
updateJobProgress();   // update job
checkJobPauseStatus();
loadProfile(processDef); // load the profile for the given process id
checkJobPauseStatus();
updateJobProgress();  // update job
checkJobPauseStatus();
this.prepareForRead();  // read the data
checkJobPauseStatus();
updateJobProgress();  // update job
checkJobPauseStatus();
if (isJobRunning()) {     // check if the job is running
checkJobPauseStatus();
this.prepareForWrite(); // prepare for writing the data
checkJobPauseStatus();
updateJobProgress();  // update job
while (isJobRunning()) {   // check if the job is running
checkJobPauseStatus();
this.processBatch();     // write data
checkJobPauseStatus();
updateJobProgress();  // update job
if (isJobRunning()) {    // check if the job is running
checkJobPauseStatus();
this.getNextBatch();  // get the next batch of records
checkJobPauseStatus();
updateJobProgress();  // update job
}
}
}
} catch (Exception ex) {
LOG.error("Error while performing batch process for profile :: " + this.processDef.getBatchProcessProfileName(), ex);
job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job
job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED);
job.setStatusDesc("Batch process Failed for profile :: " + this.processDef.getBatchProcessProfileName());
updateJobProgress();
DataCarrierService dataCarrierService = GlobalResourceLoader.getService(OLEConstants.DATA_CARRIER_SERVICE);
List<String> reasonForFailure = (List<String>) dataCarrierService.getData("reasonForBibImportFailure");
StringBuffer failureBuffer = new StringBuffer();
if(reasonForFailure != null && reasonForFailure.size() > 0){
for(int failureCount = 0;failureCount < reasonForFailure.size();failureCount++){
failureBuffer.append(reasonForFailure.get(failureCount) + "\n");
}
createBatchErrorAttachmentFile(failureBuffer.toString());
}
dataCarrierService.addData("reasonForBibImportFailure",new ArrayList<>());
List<String> reasonForInvoiceImportFailure = (List<String>) dataCarrierService.getData("invoiceIngestFailureReason");
StringBuffer invoiceFailureBuffer = new StringBuffer();
if(reasonForInvoiceImportFailure != null && reasonForInvoiceImportFailure.size() > 0){
for(int failureCount = 0;failureCount < reasonForInvoiceImportFailure.size();failureCount++){
invoiceFailureBuffer.append(reasonForInvoiceImportFailure.get(failureCount) + "\n");
}
createBatchErrorAttachmentFile(invoiceFailureBuffer.toString());
}
dataCarrierService.addData("invoiceIngestFailureReason",new ArrayList<>());
throw new Exception("Batch process Failed", ex);
}
if (job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED)) {
if (jobBo != null) {
jobBo.setStatus(job.getStatus());
getBusinessObjectService().save(jobBo);
job.setStatusDesc("Batch Operation Stopped");
OLEBatchProcessJobDetailsController.removeStatusFromBatchProcess(jobBo.getJobId());
}
}
job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job
if (StringUtils.isEmpty(job.getStatusDesc())) {
job.setStatusDesc("Batch Operation Completed");
}
updateJobProgress();  // update job
}
protected BusinessObjectService getBusinessObjectService() {
if (businessObjectService == null)
businessObjectService = KRADServiceLocator.getBusinessObjectService();
return businessObjectService;
}
/**
* First method to be called in the batch process will load the profile for the process id in the process defn document
*
* @param processDef
*/
protected void loadProfile(OLEBatchProcessDefinitionDocument processDef) throws Exception {
this.processDef = processDef;
//load profile
try {
OLEBatchProcessProfileBo profileBo = KRADServiceLocator.getBusinessObjectService().findBySinglePrimaryKey(OLEBatchProcessProfileBo.class, processDef.getBatchProcessProfileId());
this.processDef.setOleBatchProcessProfileBo(profileBo);
} catch (Exception ex) {
LOG.error("Error while loading profile :: " + this.processDef.getBatchProcessProfileName(), ex);
throw ex;
}
}
/**
* method to update the batch job progress will be called after every other method completes operation
*/
public void updateJobProgress() throws Exception {
updatePercentCompleted();
updateTimeSpent();
KRADServiceLocator.getBusinessObjectService().save(job);
}
/**
* method to be implemented to read initial data for the various processes; will be called before prepareForWrite() method
*
* @throws Exception
*/
protected abstract void prepareForRead() throws Exception;
/**
* method to be implemented to prepare for writing the data; will be called before writeData()
*
* @throws Exception
*/
186    protected abstract void prepareForWrite() throws Exception;
188    /**
189     * method to be implemented to retreive the next batch of data to be processed
190     *
191     * @throws Exception
192     */
193    protected abstract void getNextBatch() throws Exception;
195    /**
196     * method to be implemented for various write operation
197     */
198    protected abstract void processBatch() throws Exception;
200    /**
201     * checks if the job is in the RUNNING state
202     *
203     * @return
204     */
205    protected boolean isJobRunning() {
206        if(job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_COMPLETED)) {
207            return false;
208        }
209        else {
210        if (OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap() != null && OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap().size() > 0) {
211            jobStatus = OLEBatchProcessJobDetailsController.getBatchProcessJobStatus(job.getJobId());
212            job.setStatus(jobStatus);
213        }
215        if (jobStatus == null) {
216            Map<String, String> jobMap = new HashMap<String, String>();
217            jobMap.put("job_id", job.getJobId());
218            OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
219            OLEBatchProcessJobDetailsController.setBatchProcessJobStatusMap(job.getJobId(),jobDetailsBo.getStatus());
220            if (jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING)) {
221                return true;
222            } else {
223                return false;
224            }
225        } else if (jobStatus != null && OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING.equalsIgnoreCase(jobStatus)) {
226            return true;
227        } else {
228            return false;
229        }
230        }
231    }
233    /**
234     * checks if the job is in the Pause state
235     *
236     * @return
237     */
238    protected void checkJobPauseStatus() throws Exception{
239        Map<String, String> jobMap = new HashMap<String, String>();
240        jobMap.put("job_id", job.getJobId());
241        OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
242        while(jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
243            Thread.sleep(100);
244            jobMap.put("job_id", job.getJobId());
245            jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
246        }
247    }
250    /**
251     * updates the time spent since the job started
252     */
253    private void updateTimeSpent() {
254        Timestamp startTime = job.getStartTime();
255        long diff = Calendar.getInstance().getTime().getTime() - startTime.getTime();
256        long diffSeconds = diff / 1000 % 60;
257        long diffMinutes = diff / (60 * 1000) % 60;
258        long diffHours = diff / (60 * 60 * 1000) % 24;
259        StringBuffer sb = new StringBuffer();
260        sb.append(diffHours + ":" + diffMinutes + ":" + diffSeconds);
261        job.setTimeSpent(sb.toString());
262    }
264    /**
265     * updates the percentage completed based on the number of records processed and no of records in process
266     */
267    private void updatePercentCompleted() {
268        // String total = job.getNoOfRecords();
269        String total = job.getTotalNoOfRecords();
270        float totRec = Float.parseFloat(total == null ? "0" : total);
271        if (totRec == 0.0) return;
272        String noProcessed = job.getNoOfRecordsProcessed();
273        if (StringUtils.isEmpty(noProcessed)) return;
274        float perCompleted = (Float.valueOf(noProcessed) / Float.valueOf(total)) * 100;
275        job.setPerCompleted(String.format("%.2f", perCompleted) + PERCENT);
276    }
278    /**
279     * create and return the dirctory based on batch process type in the server file system
280     */
281    protected String getBatchProcessFilePath(String batchProceesType) {
283        String batchProcessLocation = getOLEBatchProcessDataHelper().getBatchProcessFilePath(batchProceesType);
284        return batchProcessLocation;
285    }
288    /**
289     * Create the success record file in the batch process type directory  for batch delete
290     * @param successRecordData
291     * @throws Exception
292     */
293    protected void createBatchSuccessFile(String successRecordData) throws Exception {
295        getOLEBatchProcessDataHelper().createBatchSuccessFile(successRecordData, processDef.getBatchProcessType(), job.getJobId() + "_SuccessRecord" + "_" + job.getUploadFileName(), job.getJobId());
296    }
299    /**
300     * Create the failure report record file in the batch process type directory for batch delete
301     * @param failureReportData
302     * @throws Exception
303     */
304    protected void createBatchDeleteFailureReportFile(String failureReportData) throws Exception {
306        getOLEBatchProcessDataHelper().createBatchDeleteFailureReportFile(failureReportData, processDef.getBatchProcessType(), job.getJobId() + "_FailureReport.txt", job.getJobId());
307    }
310    /**
311     * Create the failure record file in the batch process type directory
312     * @param failureRecordData
313     * @throws Exception
314     */
315    protected void createBatchFailureFile(String failureRecordData) throws Exception {
317        getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + job.getUploadFileName(), job.getJobId());
318    }
320    protected void createBatchErrorAttachmentFile(String failureRecordData) throws Exception {
321        String uploadFileName = job.getUploadFileName();
322        String errorFileName = null;
323        String[] fileNames = uploadFileName.split(",");
324        errorFileName = fileNames.length == 2 ? fileNames[0]:uploadFileName;
325        if(errorFileName.endsWith(".mrc")){
326            errorFileName = errorFileName.replace(".mrc",".txt");
327        }
328        else if(errorFileName.endsWith(".INV")){
329            errorFileName = errorFileName.replace(".INV",".txt");
330        }
331        else if(errorFileName.endsWith(".edi")){
332            errorFileName = errorFileName.replace(".edi",".txt");
333        }
334        getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + errorFileName, job.getJobId());
335    }
337    protected void createFile(String[] content) throws Exception {
339        getOLEBatchProcessDataHelper().createFile(content, processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.DELETED_BIB_IDS_FILE_NAME, job.getJobId());
340    }
342    /**
343     * Delete the upload file in the batch process type dirctory
344     */
345    protected void deleteBatchFile() throws Exception {
347        getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
348    }
350    /**
351     * Get the upload file content from the batch process type dirctory by using job id an upload file name
352     */
353    protected String getBatchProcessFileContent() throws Exception {
354        String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
355        return fileContent;
356    }
358    protected String getBatchProcessFileContent(String fileName) throws Exception {
359        String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
360        return fileContent;
363    }
365    /**
366     * Delete the upload file in the batch process type dirctory
367     */
368    protected void deleteBatchFile(String fileName) throws Exception {
370        getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
371    }
373    /**
374     * Create the failure record file in the batch process type directory
375     * @param failureRecordData
376     * @throws Exception
377     */
378    protected void createBatchFailureFile(String failureRecordData, String fileName) throws Exception {
380        getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + fileName, job.getJobId());
381    }