1 package org.kuali.ole.batch.impl;
2
3 import org.apache.commons.lang3.StringUtils;
4 import org.apache.log4j.Logger;
5 import org.kuali.ole.DataCarrierService;
6 import org.kuali.ole.OLEConstants;
7 import org.kuali.ole.batch.bo.OLEBatchProcessJobDetailsBo;
8 import org.kuali.ole.batch.bo.OLEBatchProcessProfileBo;
9 import org.kuali.ole.batch.controller.OLEBatchProcessJobDetailsController;
10 import org.kuali.ole.batch.document.OLEBatchProcessDefinitionDocument;
11 import org.kuali.ole.batch.helper.OLEBatchProcessDataHelper;
12 import org.kuali.rice.krad.service.BusinessObjectService;
13 import org.kuali.rice.core.api.resourceloader.GlobalResourceLoader;
14 import org.kuali.rice.krad.service.KRADServiceLocator;
15
16 import java.sql.Timestamp;
17 import java.util.*;
18
19
20
21
22
23
24
25
26 public abstract class AbstractBatchProcess implements OLEBatchProcess {
27
28 protected OLEBatchProcessJobDetailsBo job;
29 protected OLEBatchProcessDefinitionDocument processDef;
30 private OLEBatchProcessDataHelper oleBatchProcessDataHelper;
31 private static final Logger LOG = Logger.getLogger(OLEBatchProcessDataHelper.class);
32 private BusinessObjectService businessObjectService;
33 String jobStatus = null;
34 private OLEBatchProcessDataHelper getOLEBatchProcessDataHelper() {
35
36 if (oleBatchProcessDataHelper == null) {
37 oleBatchProcessDataHelper = OLEBatchProcessDataHelper.getInstance();
38 }
39 return oleBatchProcessDataHelper;
40 }
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 @Override
61 public final void process(OLEBatchProcessDefinitionDocument processDef, OLEBatchProcessJobDetailsBo jobBo) throws Exception {
62 this.job = jobBo;
63 if (job.getStatus() != null && job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
64 job.setStatus(job.getStatus());
65 } else {
66 job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING);
67 }
68 job.setStartTime(job.getStartTime() == null ? new Timestamp(new Date().getTime()) : job.getStartTime());
69 try {
70 updateJobProgress();
71 checkJobPauseStatus();
72 loadProfile(processDef);
73 checkJobPauseStatus();
74 updateJobProgress();
75 checkJobPauseStatus();
76 this.prepareForRead();
77 checkJobPauseStatus();
78 updateJobProgress();
79 checkJobPauseStatus();
80 if (isJobRunning()) {
81 checkJobPauseStatus();
82 this.prepareForWrite();
83 checkJobPauseStatus();
84 updateJobProgress();
85 while (isJobRunning()) {
86 checkJobPauseStatus();
87 this.processBatch();
88 checkJobPauseStatus();
89 updateJobProgress();
90 if (isJobRunning()) {
91 checkJobPauseStatus();
92 this.getNextBatch();
93 checkJobPauseStatus();
94 updateJobProgress();
95 }
96 }
97 }
98 } catch (Exception ex) {
99 LOG.error("Error while performing batch process for profile :: " + this.processDef.getBatchProcessProfileName(), ex);
100 job.setEndTime(new Timestamp(new Date().getTime()));
101 job.setStatus(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED);
102 job.setStatusDesc("Batch process Failed for profile :: " + this.processDef.getBatchProcessProfileName());
103 updateJobProgress();
104 DataCarrierService dataCarrierService = GlobalResourceLoader.getService(OLEConstants.DATA_CARRIER_SERVICE);
105 List<String> reasonForFailure = (List<String>) dataCarrierService.getData("reasonForBibImportFailure");
106 StringBuffer failureBuffer = new StringBuffer();
107 if(reasonForFailure != null && reasonForFailure.size() > 0){
108 for(int failureCount = 0;failureCount < reasonForFailure.size();failureCount++){
109 failureBuffer.append(reasonForFailure.get(failureCount) + "\n");
110 }
111 createBatchErrorAttachmentFile(failureBuffer.toString());
112 }
113 dataCarrierService.addData("reasonForBibImportFailure",new ArrayList<>());
114 List<String> reasonForInvoiceImportFailure = (List<String>) dataCarrierService.getData("invoiceIngestFailureReason");
115 StringBuffer invoiceFailureBuffer = new StringBuffer();
116 if(reasonForInvoiceImportFailure != null && reasonForInvoiceImportFailure.size() > 0){
117 for(int failureCount = 0;failureCount < reasonForInvoiceImportFailure.size();failureCount++){
118 invoiceFailureBuffer.append(reasonForInvoiceImportFailure.get(failureCount) + "\n");
119 }
120 createBatchErrorAttachmentFile(invoiceFailureBuffer.toString());
121 }
122 dataCarrierService.addData("invoiceIngestFailureReason",new ArrayList<>());
123 throw new Exception("Batch process Failed", ex);
124 }
125
126 if (job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_STOPPED)) {
127 if (jobBo != null) {
128 jobBo.setStatus(job.getStatus());
129 getBusinessObjectService().save(jobBo);
130 job.setStatusDesc("Batch Operation Stopped");
131 OLEBatchProcessJobDetailsController.removeStatusFromBatchProcess(jobBo.getJobId());
132 }
133 }
134 job.setEndTime(new Timestamp(new Date().getTime()));
135 if (StringUtils.isEmpty(job.getStatusDesc())) {
136 job.setStatusDesc("Batch Operation Completed");
137 }
138 updateJobProgress();
139 }
140
141 protected BusinessObjectService getBusinessObjectService() {
142 if (businessObjectService == null)
143 businessObjectService = KRADServiceLocator.getBusinessObjectService();
144 return businessObjectService;
145 }
146
147
148
149
150
151
152 protected void loadProfile(OLEBatchProcessDefinitionDocument processDef) throws Exception {
153 this.processDef = processDef;
154
155 try {
156 OLEBatchProcessProfileBo profileBo = KRADServiceLocator.getBusinessObjectService().findBySinglePrimaryKey(OLEBatchProcessProfileBo.class, processDef.getBatchProcessProfileId());
157 this.processDef.setOleBatchProcessProfileBo(profileBo);
158 } catch (Exception ex) {
159 LOG.error("Error while loading profile :: " + this.processDef.getBatchProcessProfileName(), ex);
160 throw ex;
161 }
162 }
163
164
165
166
167 public void updateJobProgress() throws Exception {
168 updatePercentCompleted();
169 updateTimeSpent();
170 KRADServiceLocator.getBusinessObjectService().save(job);
171
172 }
173
174
175
176
177
178
179 protected abstract void prepareForRead() throws Exception;
180
181
182
183
184
185
186 protected abstract void prepareForWrite() throws Exception;
187
188
189
190
191
192
193 protected abstract void getNextBatch() throws Exception;
194
195
196
197
198 protected abstract void processBatch() throws Exception;
199
200
201
202
203
204
205 protected boolean isJobRunning() {
206 if(job.getStatus().equals(OLEConstants.OLEBatchProcess.JOB_STATUS_COMPLETED)) {
207 return false;
208 }
209 else {
210 if (OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap() != null && OLEBatchProcessJobDetailsController.getBatchProcessJobStatusMap().size() > 0) {
211 jobStatus = OLEBatchProcessJobDetailsController.getBatchProcessJobStatus(job.getJobId());
212 job.setStatus(jobStatus);
213 }
214
215 if (jobStatus == null) {
216 Map<String, String> jobMap = new HashMap<String, String>();
217 jobMap.put("job_id", job.getJobId());
218 OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
219 OLEBatchProcessJobDetailsController.setBatchProcessJobStatusMap(job.getJobId(),jobDetailsBo.getStatus());
220 if (jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING)) {
221 return true;
222 } else {
223 return false;
224 }
225 } else if (jobStatus != null && OLEConstants.OLEBatchProcess.JOB_STATUS_RUNNING.equalsIgnoreCase(jobStatus)) {
226 return true;
227 } else {
228 return false;
229 }
230 }
231 }
232
233
234
235
236
237
238 protected void checkJobPauseStatus() throws Exception{
239 Map<String, String> jobMap = new HashMap<String, String>();
240 jobMap.put("job_id", job.getJobId());
241 OLEBatchProcessJobDetailsBo jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
242 while(jobDetailsBo.getStatus().equalsIgnoreCase(OLEConstants.OLEBatchProcess.JOB_STATUS_PAUSED)) {
243 Thread.sleep(100);
244 jobMap.put("job_id", job.getJobId());
245 jobDetailsBo = KRADServiceLocator.getBusinessObjectService().findByPrimaryKey(OLEBatchProcessJobDetailsBo.class, jobMap);
246 }
247 }
248
249
250
251
252
253 private void updateTimeSpent() {
254 Timestamp startTime = job.getStartTime();
255 long diff = Calendar.getInstance().getTime().getTime() - startTime.getTime();
256 long diffSeconds = diff / 1000 % 60;
257 long diffMinutes = diff / (60 * 1000) % 60;
258 long diffHours = diff / (60 * 60 * 1000) % 24;
259 StringBuffer sb = new StringBuffer();
260 sb.append(diffHours + ":" + diffMinutes + ":" + diffSeconds);
261 job.setTimeSpent(sb.toString());
262 }
263
264
265
266
267 private void updatePercentCompleted() {
268
269 String total = job.getTotalNoOfRecords();
270 float totRec = Float.parseFloat(total == null ? "0" : total);
271 if (totRec == 0.0) return;
272 String noProcessed = job.getNoOfRecordsProcessed();
273 if (StringUtils.isEmpty(noProcessed)) return;
274 float perCompleted = (Float.valueOf(noProcessed) / Float.valueOf(total)) * 100;
275 job.setPerCompleted(String.format("%.2f", perCompleted) + PERCENT);
276 }
277
278
279
280
281 protected String getBatchProcessFilePath(String batchProceesType) {
282
283 String batchProcessLocation = getOLEBatchProcessDataHelper().getBatchProcessFilePath(batchProceesType);
284 return batchProcessLocation;
285 }
286
287
288
289
290
291
292
293 protected void createBatchSuccessFile(String successRecordData) throws Exception {
294
295 getOLEBatchProcessDataHelper().createBatchSuccessFile(successRecordData, processDef.getBatchProcessType(), job.getJobId() + "_SuccessRecord" + "_" + job.getUploadFileName(), job.getJobId());
296 }
297
298
299
300
301
302
303
304 protected void createBatchDeleteFailureReportFile(String failureReportData) throws Exception {
305
306 getOLEBatchProcessDataHelper().createBatchDeleteFailureReportFile(failureReportData, processDef.getBatchProcessType(), job.getJobId() + "_FailureReport.txt", job.getJobId());
307 }
308
309
310
311
312
313
314
315 protected void createBatchFailureFile(String failureRecordData) throws Exception {
316
317 getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + job.getUploadFileName(), job.getJobId());
318 }
319
320 protected void createBatchErrorAttachmentFile(String failureRecordData) throws Exception {
321 String uploadFileName = job.getUploadFileName();
322 String errorFileName = null;
323 String[] fileNames = uploadFileName.split(",");
324 errorFileName = fileNames.length == 2 ? fileNames[0]:uploadFileName;
325 if(errorFileName.endsWith(".mrc")){
326 errorFileName = errorFileName.replace(".mrc",".txt");
327 }
328 else if(errorFileName.endsWith(".INV")){
329 errorFileName = errorFileName.replace(".INV",".txt");
330 }
331 else if(errorFileName.endsWith(".edi")){
332 errorFileName = errorFileName.replace(".edi",".txt");
333 }
334 getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + errorFileName, job.getJobId());
335 }
336
337 protected void createFile(String[] content) throws Exception {
338
339 getOLEBatchProcessDataHelper().createFile(content, processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.DELETED_BIB_IDS_FILE_NAME, job.getJobId());
340 }
341
342
343
344
345 protected void deleteBatchFile() throws Exception {
346
347 getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
348 }
349
350
351
352
353 protected String getBatchProcessFileContent() throws Exception {
354 String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + job.getUploadFileName(), job.getJobId());
355 return fileContent;
356 }
357
358 protected String getBatchProcessFileContent(String fileName) throws Exception {
359 String fileContent = getOLEBatchProcessDataHelper().getBatchProcessFileContent(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
360 return fileContent;
361
362
363 }
364
365
366
367
368 protected void deleteBatchFile(String fileName) throws Exception {
369
370 getOLEBatchProcessDataHelper().deleteBatchFile(processDef.getBatchProcessType(), job.getJobId() + OLEConstants.OLEBatchProcess.PROFILE_JOB + "_" + fileName, job.getJobId());
371 }
372
373
374
375
376
377
378 protected void createBatchFailureFile(String failureRecordData, String fileName) throws Exception {
379
380 getOLEBatchProcessDataHelper().createBatchFailureFile(failureRecordData, processDef.getBatchProcessType(), job.getJobId() + "_FailureRecord" + "_" + fileName, job.getJobId());
381 }
382
383
384
385 }