View Javadoc
1   package org.kuali.ole.docstore.process;
2   
3   import java.io.File;
4   import java.text.DateFormat;
5   import java.text.SimpleDateFormat;
6   import java.util.ArrayList;
7   import java.util.Date;
8   import java.util.List;
9   import javax.jcr.Session;
10  
11  import org.apache.camel.Exchange;
12  import org.apache.camel.Processor;
13  import org.apache.commons.lang.time.StopWatch;
14  import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
15  import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
16  import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
17  import org.kuali.ole.docstore.service.BeanLocator;
18  import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
19  import org.kuali.ole.docstore.utility.BatchIngestStatistics;
20  import org.kuali.ole.docstore.utility.BulkIngestStatistics;
21  import org.kuali.ole.docstore.utility.FileIngestStatistics;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import javax.jcr.Session;
26  import java.io.File;
27  import java.text.DateFormat;
28  import java.text.SimpleDateFormat;
29  import java.util.ArrayList;
30  import java.util.Date;
31  import java.util.List;
32  
33  /**
34   * Class to Process IngestDocuments of Bulk.
35   *
36   * @author Rajesh Chowdary K
37   * @created Mar 15, 2012
38   */
39  public class BulkIngestNIndexProcessor
40          implements Processor {
41  
42      private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
43      private String user;
44      private String action;
45      private Session session = null;
46      private BatchIngestStatistics batchStatistics = null;
47      /**
48       * Singleton instance of IngestNIndexHandlerService.
49       */
50      private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
51      private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
52      //public  FileIngestStatistics       fileIngestStatistics       = null;
53      private BulkProcessRequest bulkProcessRequest = null;
54  
55      public BulkIngestNIndexProcessor(String user, String action) {
56          this.user = user;
57          this.action = action;
58      }
59  
60      @Override
61      /*public void process(Exchange exchange) throws Exception {
62          IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
63          String fileName = "";
64          String filePath = "";
65          BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
66          FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
67          batchStatistics = fileIngestStatistics.startBatch();
68          filePath = exchange.getProperty("CamelFileExchangeFile").toString();
69          fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
70          fileName = fileName.replace(File.separator, "");
71          fileName = fileName.replace("]", "");
72  
73          if (bulkLoadStatistics.isFirstBatch()) {
74              fileIngestStatistics.setFileName(fileName);
75              fileIngestStatistics.setFileStatus("Started");
76              bulkLoadStatistics.setFirstBatch(false);
77          }
78          StopWatch batchIngestNIndexTimer = new StopWatch();
79          StopWatch convertInputToReqTimer = new StopWatch();
80          DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
81          Date date = new Date();
82          batchStatistics.setBatchStartTime(dateFormat.format(date));
83          batchIngestNIndexTimer.start();
84          logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
85          long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
86          try {
87              ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
88              convertInputToReqTimer.start();
89              if (exchange.getIn().getBody() instanceof List) {
90                  for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
91                      ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
92                  }
93              }
94              Request request = new Request();
95              request.setUser(user);
96              request.setOperation(action);
97              request.setRequestDocuments(ingestDocs);
98              convertInputToReqTimer.stop();
99              logger.debug("converting input xml to request object " + convertInputToReqTimer);
100             if (session == null) {
101                 session = RepositoryManager.getRepositoryManager()
102                                            .getSession(request.getUser(), request.getOperation());
103             }
104             List<String> ids = ingestNIndexHandlerService.bulkIngestNIndex(request, session);
105             logger.debug("Bulk Ingest Batch(UUIDs):" + ids);
106             batchIngestNIndexTimer.stop();
107             logger.debug("Bulk Ingest and Index Process Batch took " + batchIngestNIndexTimer + " time");
108             batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
109             batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
110             date = new Date();
111             batchStatistics.setBatchEndTime(dateFormat.format(date));
112             logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
113             if (bulkLoadStatistics.isLastBatch()) {
114                 fileIngestStatistics.setFileStatus("Done");
115                 bulkLoadStatistics.setLastBatch(false);
116                 logger.info("Bulk ingest: File metrics :  \n" + bulkLoadStatistics.toString());
117                 bulkLoadStatistics.setFileRecCount(0);
118             }
119             logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
120         }
121         catch (Exception e) {
122             logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
123             exchange.setException(e);
124             throw e;
125         }
126         finally {
127             if (session != null) {
128                 try {
129                     if (bulkLoadStatistics.isLastBatch()) {
130                         RepositoryManager.getRepositoryManager().logout(session);
131                         session = null;
132                     }
133                 }
134                 catch (OleException e) {
135                 }
136             }
137 
138         }
139     }*/
140 
141     public void process(Exchange exchange) throws Exception {
142         IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
143         String fileName = "";
144         String filePath = "";
145         BulkIngestStatistics bulkLoadStatistics = bulkProcessRequest.getBulkIngestStatistics();
146         FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
147         batchStatistics = fileIngestStatistics.startBatch();
148         filePath = exchange.getProperty("CamelFileExchangeFile").toString();
149         fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
150         fileName = fileName.replace(File.separator, "");
151         fileName = fileName.replace("]", "");
152 
153         // record file level statistics
154         if (bulkProcessRequest.getBulkIngestStatistics().isFirstBatch()) {
155             fileIngestStatistics.setFileName(fileName);
156             fileIngestStatistics.setFileStatus("Started");
157             bulkProcessRequest.getBulkIngestStatistics().setFirstBatch(false);
158         }
159         StopWatch batchIngestNIndexTimer = new StopWatch();
160         StopWatch convertInputToReqTimer = new StopWatch();
161         DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
162         Date date = new Date();
163         batchStatistics.setBatchStartTime(dateFormat.format(date));
164         batchIngestNIndexTimer.start();
165         logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
166         long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
167         try {
168             ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
169             convertInputToReqTimer.start();
170             // build input request documents for the current batch.
171             if (exchange.getIn().getBody() instanceof List) {
172                 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
173                     ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
174                 }
175             }
176             convertInputToReqTimer.stop();
177             // call the bulk ingest service
178             BeanLocator.getDocumentServiceImpl().bulkIngest(bulkProcessRequest, ingestDocs);
179             // get the document manager for the current batch of documents (belong to same cat-type-format)
180             //            DocumentManager documentManager = DocumentManagerFactory.getInstance()
181             //                                                                    .getDocumentManager(ingestDocs.get(0));
182             //            documentManager.bulkIngest(bulkProcessRequest, ingestDocs);
183             // record file/batch level statistics/metrics
184             batchIngestNIndexTimer.stop();
185             batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
186             batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
187             date = new Date();
188             batchStatistics.setBatchEndTime(dateFormat.format(date));
189             logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
190             if (bulkLoadStatistics.isLastBatch()) {
191                 fileIngestStatistics.setFileStatus("Done");
192                 bulkLoadStatistics.setLastBatch(false);
193                 logger.info("Bulk ingest: File metrics :  \n" + bulkLoadStatistics.toString());
194                 bulkLoadStatistics.setFileRecCount(0);
195             }
196             logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
197         } catch (Exception e) {
198             logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
199             exchange.setException(e);
200             throw e;
201         } finally {
202 
203         }
204     }
205 
206     public BulkIngestStatistics getBulkLoadStatistics() {
207         return bulkLoadStatistics;
208     }
209 
210     public void setBulkLoadStatistics(BulkIngestStatistics bulkLoadStatistics) {
211         this.bulkLoadStatistics = bulkLoadStatistics;
212     }
213 
214     public BulkProcessRequest getBulkProcessRequest() {
215         return bulkProcessRequest;
216     }
217 
218     public void setBulkProcessRequest(BulkProcessRequest bulkProcessRequest) {
219         this.bulkProcessRequest = bulkProcessRequest;
220     }
221 }