View Javadoc

1   package org.kuali.ole.docstore.process;
2   
3   import java.io.File;
4   import java.text.DateFormat;
5   import java.text.SimpleDateFormat;
6   import java.util.ArrayList;
7   import java.util.Date;
8   import java.util.List;
9   import javax.jcr.Session;
10  
11  import org.apache.camel.Exchange;
12  import org.apache.camel.Processor;
13  import org.apache.commons.lang.time.StopWatch;
14  import org.kuali.ole.RepositoryManager;
15  import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
16  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
17  import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
18  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
19  import org.kuali.ole.docstore.service.BeanLocator;
20  import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
21  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
22  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
23  import org.kuali.ole.docstore.utility.FileIngestStatistics;
24  import org.kuali.ole.pojo.OleException;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  /**
29   * Class to Process IngestDocuments of Bulk.
30   *
31   * @author Rajesh Chowdary K
32   * @created Mar 15, 2012
33   */
34  public class BulkIngestNIndexProcessor
35          implements Processor {
36  
37      private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
38      private String user;
39      private String action;
40      private Session                    session                    = null;
41      private BatchIngestStatistics      batchStatistics            = null;
42      /**
43       * Singleton instance of IngestNIndexHandlerService.
44       */
45      private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
46      private BulkIngestStatistics       bulkLoadStatistics         = BulkIngestStatistics.getInstance();
47      //public  FileIngestStatistics       fileIngestStatistics       = null;
48      private BulkProcessRequest         bulkProcessRequest         = null;
49  
50      public BulkIngestNIndexProcessor(String user, String action) {
51          this.user = user;
52          this.action = action;
53      }
54  
55      @Override
56      public void process(Exchange exchange) throws Exception {
57          IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
58          String fileName = "";
59          String filePath = "";
60          BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
61          FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
62          batchStatistics = fileIngestStatistics.startBatch();
63          filePath = exchange.getProperty("CamelFileExchangeFile").toString();
64          fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
65          fileName = fileName.replace(File.separator, "");
66          fileName = fileName.replace("]", "");
67  
68          if (bulkLoadStatistics.isFirstBatch()) {
69              fileIngestStatistics.setFileName(fileName);
70              fileIngestStatistics.setFileStatus("Started");
71              bulkLoadStatistics.setFirstBatch(false);
72          }
73          StopWatch batchIngestNIndexTimer = new StopWatch();
74          StopWatch convertInputToReqTimer = new StopWatch();
75          DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
76          Date date = new Date();
77          batchStatistics.setBatchStartTime(dateFormat.format(date));
78          batchIngestNIndexTimer.start();
79          logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
80          long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
81          try {
82              ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
83              convertInputToReqTimer.start();
84              if (exchange.getIn().getBody() instanceof List) {
85                  for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
86                      ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
87                  }
88              }
89              Request request = new Request();
90              request.setUser(user);
91              request.setOperation(action);
92              request.setRequestDocuments(ingestDocs);
93              convertInputToReqTimer.stop();
94              logger.debug("converting input xml to request object " + convertInputToReqTimer);
95              if (session == null) {
96                  session = RepositoryManager.getRepositoryManager()
97                                             .getSession(request.getUser(), request.getOperation());
98              }
99              List<String> ids = ingestNIndexHandlerService.bulkIngestNIndex(request, session);
100             logger.debug("Bulk Ingest Batch(UUIDs):" + ids);
101             batchIngestNIndexTimer.stop();
102             logger.debug("Bulk Ingest and Index Process Batch took " + batchIngestNIndexTimer + " time");
103             batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
104             batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
105             date = new Date();
106             batchStatistics.setBatchEndTime(dateFormat.format(date));
107             logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
108             if (bulkLoadStatistics.isLastBatch()) {
109                 fileIngestStatistics.setFileStatus("Done");
110                 bulkLoadStatistics.setLastBatch(false);
111                 logger.info("Bulk ingest: File metrics :  \n" + bulkLoadStatistics.toString());
112                 bulkLoadStatistics.setFileRecCount(0);
113             }
114             logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
115         }
116         catch (Exception e) {
117             logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
118             exchange.setException(e);
119             throw e;
120         }
121         finally {
122             if (session != null) {
123                 try {
124                     if (bulkLoadStatistics.isLastBatch()) {
125                         RepositoryManager.getRepositoryManager().logout(session);
126                         session = null;
127                     }
128                 }
129                 catch (OleException e) {
130                 }
131             }
132 
133         }
134     }
135 
136     public void processNew(Exchange exchange) throws Exception {
137         IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
138         String fileName = "";
139         String filePath = "";
140         BulkIngestStatistics bulkLoadStatistics = bulkProcessRequest.getBulkIngestStatistics();
141         FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
142         batchStatistics = fileIngestStatistics.startBatch();
143         filePath = exchange.getProperty("CamelFileExchangeFile").toString();
144         fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
145         fileName = fileName.replace(File.separator, "");
146         fileName = fileName.replace("]", "");
147 
148         // record file level statistics
149         if (bulkProcessRequest.getBulkIngestStatistics().isFirstBatch()) {
150             fileIngestStatistics.setFileName(fileName);
151             fileIngestStatistics.setFileStatus("Started");
152             bulkProcessRequest.getBulkIngestStatistics().setFirstBatch(false);
153         }
154         StopWatch batchIngestNIndexTimer = new StopWatch();
155         StopWatch convertInputToReqTimer = new StopWatch();
156         DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
157         Date date = new Date();
158         batchStatistics.setBatchStartTime(dateFormat.format(date));
159         batchIngestNIndexTimer.start();
160         logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
161         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
162         try {
163             ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
164             convertInputToReqTimer.start();
165             // build input request documents for the current batch.
166             if (exchange.getIn().getBody() instanceof List) {
167                 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
168                     ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
169                 }
170             }
171             convertInputToReqTimer.stop();
172             // call the bulk ingest service
173             BeanLocator.getDocumentServiceImpl().bulkIngest(bulkProcessRequest, ingestDocs);
174             // get the document manager for the current batch of documents (belong to same cat-type-format)
175             //            DocumentManager documentManager = DocumentManagerFactory.getInstance()
176             //                                                                    .getDocumentManager(ingestDocs.get(0));
177             //            documentManager.bulkIngest(bulkProcessRequest, ingestDocs);
178             // record file/batch level statistics/metrics
179             batchIngestNIndexTimer.stop();
180             batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
181             batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
182             date = new Date();
183             batchStatistics.setBatchEndTime(dateFormat.format(date));
184             logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
185             if (bulkLoadStatistics.isLastBatch()) {
186                 fileIngestStatistics.setFileStatus("Done");
187                 bulkLoadStatistics.setLastBatch(false);
188                 logger.info("Bulk ingest: File metrics :  \n" + bulkLoadStatistics.toString());
189                 bulkLoadStatistics.setFileRecCount(0);
190             }
191             logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
192         }
193         catch (Exception e) {
194             logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
195             exchange.setException(e);
196             throw e;
197         }
198         finally {
199 
200         }
201     }
202 
203     public BulkIngestStatistics getBulkLoadStatistics() {
204         return bulkLoadStatistics;
205     }
206 
207     public void setBulkLoadStatistics(BulkIngestStatistics bulkLoadStatistics) {
208         this.bulkLoadStatistics = bulkLoadStatistics;
209     }
210 
211     public BulkProcessRequest getBulkProcessRequest() {
212         return bulkProcessRequest;
213     }
214 
215     public void setBulkProcessRequest(BulkProcessRequest bulkProcessRequest) {
216         this.bulkProcessRequest = bulkProcessRequest;
217     }
218 }