001package org.kuali.ole.batch.impl;
002
003import org.apache.commons.lang3.StringUtils;
004import org.apache.log4j.Logger;
005import org.kuali.ole.DataCarrierService;
006import org.kuali.ole.OLEConstants;
007import org.kuali.ole.batch.bo.OLEBatchProcessJobDetailsBo;
008import org.kuali.ole.batch.bo.OLEBatchProcessProfileBo;
009import org.kuali.ole.batch.controller.OLEBatchProcessJobDetailsController;
010import org.kuali.ole.batch.document.OLEBatchProcessDefinitionDocument;
011import org.kuali.ole.batch.helper.OLEBatchProcessDataHelper;
012import org.kuali.rice.krad.service.BusinessObjectService;
013import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
014import org.kuali.rice.krad.service.KRADServiceLocator;
015
016import java.sql.Timestamp;
017import java.util.*;
018
019/**
020 * Created with IntelliJ IDEA.
021 * User: meenrajd
022 * Date: 7/8/13
023 * Time: 3:55 PM
024 * To change this template use File | Settings | File Templates.
025 */
026public abstract class AbstractBatchProcess implements OLEBatchProcess {
027
028    protected OLEBatchProcessJobDetailsBo job;
029    protected OLEBatchProcessDefinitionDocument processDef;
030    private OLEBatchProcessDataHelper oleBatchProcessDataHelper;
031    private static final Logger LOG = Logger.getLogger(OLEBatchProcessDataHelper.class);
032    private BusinessObjectService businessObjectService;
033    String jobStatus = null;
034    private OLEBatchProcessDataHelper getOLEBatchProcessDataHelper() {
035
036        if (oleBatchProcessDataHelper == null) {
037            oleBatchProcessDataHelper = OLEBatchProcessDataHelper.getInstance();
038        }
039        return oleBatchProcessDataHelper;
040    }
041
042    /**
043     * This method will be invoked for various processes implementing the OLEBatchProcess
044     * The method will perform following operation in sequence
045     * 1. loadProfile() - loads the profile of the process that has been invoked
046     * 2. prepareForRead() - performs the initial read or read support operation as implemented by the invoked process
047     * 3. prepareForWrite() - performs the initial write or write support operation as implemented by the invoked process
048     * 4. writeData() - performs the actual write operation intended and implemented by the process invoked
049     * 5. getNextBatch() - performs the read operation if there is more data to be read
050     * <p/>
051     * The writeData() and getNextBatch() will be executed till the job status is RUNNING. The implemented process will
052     * have to set the job status to STOPPED / COMPLETED etc. as per its discretion
053     * <p/>
054     * updateBatchProgress() - performs the job update after each of the above method complete operation.
055     *
056     * @param processDef
057     * @param jobBo
058     * @throws Exception
059     */
060    @Override
061    public final void process(OLEBatchProcessDefinitionDocument processDef, OLEBatchProcessJobDetailsBo jobBo) throws Exception {
062        this.job = jobBo;
063        if (job.getStatus() != null && job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
064            job.setStatus(job.getStatus());
065        } else {
066            job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING);//set job to running status
067        }
068        job.setStartTime(job.getStartTime() == null ? new Timestamp(new Date().getTime()) : job.getStartTime()); // set the start time of job
069        try {
070            updateJobProgress();   // update job
071            checkJobPauseStatus();
072            loadProfile(processDef); // load the profile for the given process id
073            checkJobPauseStatus();
074            updateJobProgress();  // update job
075            checkJobPauseStatus();
076            this.prepareForRead();  // read the data
077            checkJobPauseStatus();
078            updateJobProgress();  // update job
079            checkJobPauseStatus();
080            if (isJobRunning()) {     // check if the job is running
081                checkJobPauseStatus();
082                this.prepareForWrite(); // prepare for writing the data
083                checkJobPauseStatus();
084                updateJobProgress();  // update job
085                while (isJobRunning()) {   // check if the job is running
086                    checkJobPauseStatus();
087                    this.processBatch();     // write data
088                    checkJobPauseStatus();
089                    updateJobProgress();  // update job
090                    if (isJobRunning()) {    // check if the job is running
091                        checkJobPauseStatus();
092                        this.getNextBatch();  // get the next batch of records
093                        checkJobPauseStatus();
094                        updateJobProgress();  // update job
095                    }
096                }
097            }
098        } catch (Exception ex) {
099            LOG.error("Error while performing batch process for profile :: " + this.processDef.getBatchProcessProfileName(), ex);
100            job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job
101            job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED);
102            job.setStatusDesc("Batch process Failed for profile :: " + this.processDef.getBatchProcessProfileName());
103            updateJobProgress();
104            DataCarrierService dataCarrierService = GlobalResourceLoader.getService(OLEConstants.DATA_CARRIER_SERVICE);
105            List<String> reasonForFailure = (List<String>) dataCarrierService.getData("reasonForBibImportFailure");
106            StringBuffer failureBuffer = new StringBuffer();
107            if(reasonForFailure != null && reasonForFailure.size() > 0){
108                for(int failureCount = 0;failureCount < reasonForFailure.size();failureCount++){
109                    failureBuffer.append(reasonForFailure.get(failureCount) + "\n");
110                }
111                createBatchErrorAttachmentFile(failureBuffer.toString());
112            }
113            dataCarrierService.addData("reasonForBibImportFailure",new ArrayList<>());
114            List<String> reasonForInvoiceImportFailure = (List<String>) dataCarrierService.getData("invoiceIngestFailureReason");
115            StringBuffer invoiceFailureBuffer = new StringBuffer();
116            if(reasonForInvoiceImportFailure != null && reasonForInvoiceImportFailure.size() > 0){
117                for(int failureCount = 0;failureCount < reasonForInvoiceImportFailure.size();failureCount++){
118                    invoiceFailureBuffer.append(reasonForInvoiceImportFailure.get(failureCount) + "\n");
119                }
120                createBatchErrorAttachmentFile(invoiceFailureBuffer.toString());
121            }
122            dataCarrierService.addData("invoiceIngestFailureReason",new ArrayList<>());
123            throw new Exception("Batch process Failed", ex);
124        }
125
126        if (job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED)) {
127            if (jobBo != null) {
128                jobBo.setStatus(job.getStatus());
129                getBusinessObjectService().save(jobBo);
130                job.setStatusDesc("Batch Operation Stopped");
131                OLEBatchProcessJobDetailsController.removeStatusFromBatchProcess(jobBo.getJobId());
132            }
133        }
134        job.setEndTime(new Timestamp(new Date().getTime())); // set the end time of the job
135        if (StringUtils.isEmpty(job.getStatusDesc())) {
136            job.setStatusDesc("Batch Operation Completed");
137        }
138        updateJobProgress();  // update job
139    }
140
141    protected BusinessObjectService getBusinessObjectService() {
142        if (businessObjectService == null)
143            businessObjectService = KRADServiceLocator.getBusinessObjectService();
144        return businessObjectService;
145    }
146
147    /**
148     * First method to be called in the batch process will load the profile for the process id in the process defn document
149     *
150     * @param processDef
151     */
152    protected void loadProfile(OLEBatchProcessDefinitionDocument processDef) throws Exception {
153        this.processDef = processDef;
154        //load profile
155        try {
156            OLEBatchProcessProfileBo profileBo = KRADServiceLocator.getBusinessObjectService().findBySinglePrimaryKey(OLEBatchProcessProfileBo.class, processDef.getBatchProcessProfileId());
157            this.processDef.setOleBatchProcessProfileBo(profileBo);
158        } catch (Exception ex) {
159            LOG.error("Error while loading profile :: " + this.processDef.getBatchProcessProfileName(), ex);
160            throw ex;
161        }
162    }
163
164    /**
165     * method to update the batch job progress will be called after every other method completes operation
166     */
167    public void updateJobProgress() throws Exception {
168            updatePercentCompleted();
169            updateTimeSpent();
170            KRADServiceLocator.getBusinessObjectService().save(job);
171
172    }
173
174    /**
175     * method to be implemented to read initial data for the various processes; will be called before prepareForWrite() method
176     *
177     * @throws Exception
178     */
179    protected abstract void prepareForRead() throws Exception;
180
181    /**
182     * method to be implemented to prepare for writing the data; will be called before writeData()
183     *
184     * @throws Exception
185     */
186    protected abstract void prepareForWrite() throws Exception;
187
188    /**
189     * method to be implemented to retreive the next batch of data to be processed
190     *
191     * @throws Exception
192     */
193    protected abstract void getNextBatch() throws Exception;
194
195    /**
196     * method to be implemented for various write operation
197     */
198    protected abstract void processBatch() throws Exception;
199
200    /**
201     * checks if the job is in the RUNNING state
202     *
203     * @return
204     */
205    protected boolean isJobRunning() {
206        if(job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_COMPLETED)) {
207            return false;
208        }
209        else {
210        if (OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap() != null && OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap().size() > 0) {
211            jobStatus = OLEBatchProcessJobDetailsController.getBatchProcessJobStatus(job.getJobId());
212            job.setStatus(jobStatus);
213        }
214
215        if (jobStatus == null) {
216            Map<String, String> jobMap = new HashMap<String, String>();
217            jobMap.put("job_id", job.getJobId());
218            OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
219            OLEBatchProcessJobDetailsController.setBatchProcessJobStatusMap(job.getJobId(),jobDetailsBo.getStatus());
220            if (jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING)) {
221                return true;
222            } else {
223                return false;
224            }
225        } else if (jobStatus != null && OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING.equalsIgnoreCase(jobStatus)) {
226            return true;
227        } else {
228            return false;
229        }
230        }
231    }
232
233    /**
234     * checks if the job is in the Pause state
235     *
236     * @return
237     */
238    protected void checkJobPauseStatus() throws Exception{
239        Map<String, String> jobMap = new HashMap<String, String>();
240        jobMap.put("job_id", job.getJobId());
241        OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
242        while(jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
243            Thread.sleep(100);
244            jobMap.put("job_id", job.getJobId());
245            jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
246        }
247    }
248
249
250    /**
251     * updates the time spent since the job started
252     */
253    private void updateTimeSpent() {
254        Timestamp startTime = job.getStartTime();
255        long diff = Calendar.getInstance().getTime().getTime() - startTime.getTime();
256        long diffSeconds = diff / 1000 % 60;
257        long diffMinutes = diff / (60 * 1000) % 60;
258        long diffHours = diff / (60 * 60 * 1000) % 24;
259        StringBuffer sb = new StringBuffer();
260        sb.append(diffHours + ":" + diffMinutes + ":" + diffSeconds);
261        job.setTimeSpent(sb.toString());
262    }
263
264    /**
265     * updates the percentage completed based on the number of records processed and no of records in process
266     */
267    private void updatePercentCompleted() {
268        // String total = job.getNoOfRecords();
269        String total = job.getTotalNoOfRecords();
270        float totRec = Float.parseFloat(total == null ? "0" : total);
271        if (totRec == 0.0) return;
272        String noProcessed = job.getNoOfRecordsProcessed();
273        if (StringUtils.isEmpty(noProcessed)) return;
274        float perCompleted = (Float.valueOf(noProcessed) / Float.valueOf(total)) * 100;
275        job.setPerCompleted(String.format("%.2f", perCompleted) + PERCENT);
276    }
277
278    /**
279     * create and return the dirctory based on batch process type in the server file system
280     */
281    protected String getBatchProcessFilePath(String batchProceesType) {
282
283        String batchProcessLocation = getOLEBatchProcessDataHelper().getBatchProcessFilePath(batchProceesType);
284        return batchProcessLocation;
285    }
286
287
288    /**
289     * Create the success record file in the batch process type directory  for batch delete
290     * @param successRecordData
291     * @throws Exception
292     */
293    protected void createBatchSuccessFile(String successRecordData) throws Exception {
294
295        getOLEBatchProcessDataHelper().createBatchSuccessFile(successRecordData, processDef.getBatchProcessType(), job.getJobId() + "_SuccessRecord" + "_" + job.getUploadFileName(), job.getJobId());
296    }
297
298
299    /**
300     * Create the failure report record file in the batch process type directory for batch delete
301     * @param failureReportData
302     * @throws Exception
303     */
304    protected void createBatchDeleteFailureReportFile(String failureReportData) throws Exception {
305
306        getOLEBatchProcessDataHelper().createBatchDeleteFailureReportFile(failureReportData, processDef.getBatchProcessType(), job.getJobId() + "_FailureReport.txt", job.getJobId());
307    }
308
309
310    /**
311     * Create the failure record file in the batch process type directory
312     * @param failureRecordData
313     * @throws Exception
314     */
315    protected void createBatchFailureFile(String failureRecordData) throws Exception {
316
317        getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + job.getUploadFileName(), job.getJobId());
318    }
319
320    protected void createBatchErrorAttachmentFile(String failureRecordData) throws Exception {
321        String uploadFileName = job.getUploadFileName();
322        String errorFileName = null;
323        String[] fileNames = uploadFileName.split(",");
324        errorFileName = fileNames.length == 2 ? fileNames[0]:uploadFileName;
325        if(errorFileName.endsWith(".mrc")){
326            errorFileName = errorFileName.replace(".mrc",".txt");
327        }
328        else if(errorFileName.endsWith(".INV")){
329            errorFileName = errorFileName.replace(".INV",".txt");
330        }
331        else if(errorFileName.endsWith(".edi")){
332            errorFileName = errorFileName.replace(".edi",".txt");
333        }
334        getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + errorFileName, job.getJobId());
335    }
336
337    protected void createFile(String[] content) throws Exception {
338
339        getOLEBatchProcessDataHelper().createFile(content, processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.DELETED_BIB_IDS_FILE_NAME, job.getJobId());
340    }
341
342    /**
343     * Delete the upload file in the batch process type dirctory
344     */
345    protected void deleteBatchFile() throws Exception {
346
347        getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
348    }
349
350    /**
351     * Get the upload file content from the batch process type dirctory by using job id an upload file name
352     */
353    protected String getBatchProcessFileContent() throws Exception {
354        String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
355        return fileContent;
356    }
357
358    protected String getBatchProcessFileContent(String fileName) throws Exception {
359        String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
360        return fileContent;
361
362
363    }
364
365    /**
366     * Delete the upload file in the batch process type dirctory
367     */
368    protected void deleteBatchFile(String fileName) throws Exception {
369
370        getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
371    }
372
373    /**
374     * Create the failure record file in the batch process type directory
375     * @param failureRecordData
376     * @throws Exception
377     */
378    protected void createBatchFailureFile(String failureRecordData, String fileName) throws Exception {
379
380        getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + fileName, job.getJobId());
381    }
382
383
384
385}