1 package org.kuali.ole.docstore.process;
2
3 import org.apache.camel.Exchange;
4 import org.apache.camel.Processor;
5 import org.apache.commons.lang.time.StopWatch;
6 import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
7 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
8 import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
9 import org.kuali.ole.docstore.service.BeanLocator;
10 import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
11 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
12 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 import java.text.DateFormat;
17 import java.text.SimpleDateFormat;
18 import java.util.ArrayList;
19 import java.util.Date;
20 import java.util.List;
21
22
23
24
25
26
27
28
29 public class BulkIngestNIndexProcessor implements Processor {
30
31 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
32 private String user;
33 private String action;
34
35
36
37 private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
38 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
39
40 public BulkIngestNIndexProcessor(String user, String action) {
41 this.user = user;
42 this.action = action;
43 }
44
45 @Override
46 public void process(Exchange exchange) throws Exception {
47 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
48 BatchIngestStatistics batchStatistics = bulkLoadStatistics.startBatch();
49 StopWatch batchIngestNIndexTimer = new StopWatch();
50 StopWatch convertInputToReqTimer = new StopWatch();
51 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
52 Date date = new Date();
53 batchStatistics.setBatchStartTime(dateFormat.format(date));
54 batchIngestNIndexTimer.start();
55 logger.info("Bulk Ingest Process Batch: START TIME \t" + dateFormat.format(date));
56 try {
57 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
58 convertInputToReqTimer.start();
59 if (exchange.getIn().getBody() instanceof List) {
60 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
61 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
62 }
63 }
64 Request request = new Request();
65 request.setUser(user);
66 request.setOperation(action);
67 request.setRequestDocuments(ingestDocs);
68 convertInputToReqTimer.stop();
69 logger.debug("converting input xml to request object " + convertInputToReqTimer);
70 List<String> ids = ingestNIndexHandlerService.bulkIngestNIndex(request);
71 logger.debug("Bulk Ingest Batch(UUIDs):" + ids);
72 batchIngestNIndexTimer.stop();
73 logger.debug("Bulk Ingest and Index Process Batch took " + batchIngestNIndexTimer + " time");
74 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
75 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
76 date = new Date();
77 batchStatistics.setBatchEndTime(dateFormat.format(date));
78 if(bulkLoadStatistics.isLastBatch()){
79 logger.info("Bulk ingest of the file "+ exchange.getProperty("CamelFileExchangeFile") +" is done and the metrics is as follows. \n" + bulkLoadStatistics.toString());
80 bulkLoadStatistics.startNewIngest();
81 }
82 logger.info("Bulk Ingest Process Batch: END TIME \t" + dateFormat.format(date));
83 }
84 catch (Exception e) {
85 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
86 exchange.setException(e);
87 throw e;
88 }
89
90 }
91
92 }