View Javadoc

1   package org.kuali.ole.docstore.process;
2   
3   import org.apache.camel.Exchange;
4   import org.apache.camel.Processor;
5   import org.apache.commons.lang.time.StopWatch;
6   import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
7   import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
8   import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
9   import org.kuali.ole.docstore.service.BeanLocator;
10  import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
11  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
12  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
13  import org.slf4j.Logger;
14  import org.slf4j.LoggerFactory;
15  
16  import java.text.DateFormat;
17  import java.text.SimpleDateFormat;
18  import java.util.ArrayList;
19  import java.util.Date;
20  import java.util.List;
21  
22  /**
23   * 
24   * Class to Process IngestDocuments of Bulk.
25   * 
26   * @author Rajesh Chowdary K
27   * @created Mar 15, 2012
28   */
29  public class BulkIngestNIndexProcessor implements Processor {
30  
31      private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
32      private String        user;
33      private String        action;
34      /**
35       * Singleton instance of IngestNIndexHandlerService.
36       */
37      private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
38      private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
39  
40      public BulkIngestNIndexProcessor(String user, String action) {
41          this.user = user;
42          this.action = action;
43      }
44  
45      @Override
46      public void process(Exchange exchange) throws Exception {
47          IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
48          BatchIngestStatistics batchStatistics = bulkLoadStatistics.startBatch();
49          StopWatch batchIngestNIndexTimer = new StopWatch();
50          StopWatch convertInputToReqTimer = new StopWatch();
51          DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
52          Date date = new Date();
53          batchStatistics.setBatchStartTime(dateFormat.format(date));
54          batchIngestNIndexTimer.start();
55          logger.info("Bulk Ingest Process Batch: START TIME \t" + dateFormat.format(date));
56          try {
57              ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
58              convertInputToReqTimer.start();
59              if (exchange.getIn().getBody() instanceof List) {
60                  for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
61                      ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
62                  }
63              }
64              Request request = new Request();
65              request.setUser(user);
66              request.setOperation(action);
67              request.setRequestDocuments(ingestDocs);
68              convertInputToReqTimer.stop();
69              logger.debug("converting input xml to request object " + convertInputToReqTimer);
70              List<String> ids = ingestNIndexHandlerService.bulkIngestNIndex(request);
71              logger.debug("Bulk Ingest Batch(UUIDs):" + ids);
72              batchIngestNIndexTimer.stop();
73              logger.debug("Bulk Ingest and Index Process Batch took " + batchIngestNIndexTimer + " time");
74              batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
75              batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
76              date = new Date();
77              batchStatistics.setBatchEndTime(dateFormat.format(date));
78              if(bulkLoadStatistics.isLastBatch()){
79                  logger.info("Bulk ingest of the file "+ exchange.getProperty("CamelFileExchangeFile") +" is done and the metrics is as follows. \n" + bulkLoadStatistics.toString());
80                  bulkLoadStatistics.startNewIngest();
81              }
82              logger.info("Bulk Ingest Process Batch: END TIME \t" + dateFormat.format(date));
83          }
84          catch (Exception e) {
85              logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
86              exchange.setException(e);
87              throw e;
88          }
89  
90      }
91  
92  }