1 package org.kuali.ole.docstore.process;
2
3 import java.io.File;
4 import java.text.DateFormat;
5 import java.text.SimpleDateFormat;
6 import java.util.ArrayList;
7 import java.util.Date;
8 import java.util.List;
9 import javax.jcr.Session;
10
11 import org.apache.camel.Exchange;
12 import org.apache.camel.Processor;
13 import org.apache.commons.lang.time.StopWatch;
14 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
15 import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
16 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
17 import org.kuali.ole.docstore.service.BeanLocator;
18 import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
19 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
20 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
21 import org.kuali.ole.docstore.utility.FileIngestStatistics;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import javax.jcr.Session;
26 import java.io.File;
27 import java.text.DateFormat;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.List;
32
33 /**
34 * Class to Process IngestDocuments of Bulk.
35 *
36 * @author Rajesh Chowdary K
37 * @created Mar 15, 2012
38 */
39 public class BulkIngestNIndexProcessor
40 implements Processor {
41
42 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
43 private String user;
44 private String action;
45 private Session session = null;
46 private BatchIngestStatistics batchStatistics = null;
47 /**
48 * Singleton instance of IngestNIndexHandlerService.
49 */
50 private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
51 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
52 //public FileIngestStatistics fileIngestStatistics = null;
53 private BulkProcessRequest bulkProcessRequest = null;
54
55 public BulkIngestNIndexProcessor(String user, String action) {
56 this.user = user;
57 this.action = action;
58 }
59
60 @Override
61 /*public void process(Exchange exchange) throws Exception {
62 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
63 String fileName = "";
64 String filePath = "";
65 BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
66 FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
67 batchStatistics = fileIngestStatistics.startBatch();
68 filePath = exchange.getProperty("CamelFileExchangeFile").toString();
69 fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
70 fileName = fileName.replace(File.separator, "");
71 fileName = fileName.replace("]", "");
72
73 if (bulkLoadStatistics.isFirstBatch()) {
74 fileIngestStatistics.setFileName(fileName);
75 fileIngestStatistics.setFileStatus("Started");
76 bulkLoadStatistics.setFirstBatch(false);
77 }
78 StopWatch batchIngestNIndexTimer = new StopWatch();
79 StopWatch convertInputToReqTimer = new StopWatch();
80 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
81 Date date = new Date();
82 batchStatistics.setBatchStartTime(dateFormat.format(date));
83 batchIngestNIndexTimer.start();
84 logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
85 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
86 try {
87 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
88 convertInputToReqTimer.start();
89 if (exchange.getIn().getBody() instanceof List) {
90 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
91 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
92 }
93 }
94 Request request = new Request();
95 request.setUser(user);
96 request.setOperation(action);
97 request.setRequestDocuments(ingestDocs);
98 convertInputToReqTimer.stop();
99 logger.debug("converting input xml to request object " + convertInputToReqTimer);
100 if (session == null) {
101 session = RepositoryManager.getRepositoryManager()
102 .getSession(request.getUser(), request.getOperation());
103 }
104 List<String> ids = ingestNIndexHandlerService.bulkIngestNIndex(request, session);
105 logger.debug("Bulk Ingest Batch(UUIDs):" + ids);
106 batchIngestNIndexTimer.stop();
107 logger.debug("Bulk Ingest and Index Process Batch took " + batchIngestNIndexTimer + " time");
108 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
109 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
110 date = new Date();
111 batchStatistics.setBatchEndTime(dateFormat.format(date));
112 logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
113 if (bulkLoadStatistics.isLastBatch()) {
114 fileIngestStatistics.setFileStatus("Done");
115 bulkLoadStatistics.setLastBatch(false);
116 logger.info("Bulk ingest: File metrics : \n" + bulkLoadStatistics.toString());
117 bulkLoadStatistics.setFileRecCount(0);
118 }
119 logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
120 }
121 catch (Exception e) {
122 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
123 exchange.setException(e);
124 throw e;
125 }
126 finally {
127 if (session != null) {
128 try {
129 if (bulkLoadStatistics.isLastBatch()) {
130 RepositoryManager.getRepositoryManager().logout(session);
131 session = null;
132 }
133 }
134 catch (OleException e) {
135 }
136 }
137
138 }
139 }*/
140
141 public void process(Exchange exchange) throws Exception {
142 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
143 String fileName = "";
144 String filePath = "";
145 BulkIngestStatistics bulkLoadStatistics = bulkProcessRequest.getBulkIngestStatistics();
146 FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
147 batchStatistics = fileIngestStatistics.startBatch();
148 filePath = exchange.getProperty("CamelFileExchangeFile").toString();
149 fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
150 fileName = fileName.replace(File.separator, "");
151 fileName = fileName.replace("]", "");
152
153 // record file level statistics
154 if (bulkProcessRequest.getBulkIngestStatistics().isFirstBatch()) {
155 fileIngestStatistics.setFileName(fileName);
156 fileIngestStatistics.setFileStatus("Started");
157 bulkProcessRequest.getBulkIngestStatistics().setFirstBatch(false);
158 }
159 StopWatch batchIngestNIndexTimer = new StopWatch();
160 StopWatch convertInputToReqTimer = new StopWatch();
161 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
162 Date date = new Date();
163 batchStatistics.setBatchStartTime(dateFormat.format(date));
164 batchIngestNIndexTimer.start();
165 logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
166 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
167 try {
168 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
169 convertInputToReqTimer.start();
170 // build input request documents for the current batch.
171 if (exchange.getIn().getBody() instanceof List) {
172 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
173 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
174 }
175 }
176 convertInputToReqTimer.stop();
177 // call the bulk ingest service
178 BeanLocator.getDocumentServiceImpl().bulkIngest(bulkProcessRequest, ingestDocs);
179 // get the document manager for the current batch of documents (belong to same cat-type-format)
180 // DocumentManager documentManager = DocumentManagerFactory.getInstance()
181 // .getDocumentManager(ingestDocs.get(0));
182 // documentManager.bulkIngest(bulkProcessRequest, ingestDocs);
183 // record file/batch level statistics/metrics
184 batchIngestNIndexTimer.stop();
185 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
186 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
187 date = new Date();
188 batchStatistics.setBatchEndTime(dateFormat.format(date));
189 logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
190 if (bulkLoadStatistics.isLastBatch()) {
191 fileIngestStatistics.setFileStatus("Done");
192 bulkLoadStatistics.setLastBatch(false);
193 logger.info("Bulk ingest: File metrics : \n" + bulkLoadStatistics.toString());
194 bulkLoadStatistics.setFileRecCount(0);
195 }
196 logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
197 } catch (Exception e) {
198 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
199 exchange.setException(e);
200 throw e;
201 } finally {
202
203 }
204 }
205
206 public BulkIngestStatistics getBulkLoadStatistics() {
207 return bulkLoadStatistics;
208 }
209
210 public void setBulkLoadStatistics(BulkIngestStatistics bulkLoadStatistics) {
211 this.bulkLoadStatistics = bulkLoadStatistics;
212 }
213
214 public BulkProcessRequest getBulkProcessRequest() {
215 return bulkProcessRequest;
216 }
217
218 public void setBulkProcessRequest(BulkProcessRequest bulkProcessRequest) {
219 this.bulkProcessRequest = bulkProcessRequest;
220 }
221 }