1 package org.kuali.ole.docstore.process;
2
3 import java.io.File;
4 import java.text.DateFormat;
5 import java.text.SimpleDateFormat;
6 import java.util.ArrayList;
7 import java.util.Date;
8 import java.util.List;
9 import javax.jcr.Session;
10
11 import org.apache.camel.Exchange;
12 import org.apache.camel.Processor;
13 import org.apache.commons.lang.time.StopWatch;
14 import org.kuali.ole.RepositoryManager;
15 import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
16 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
17 import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
18 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
19 import org.kuali.ole.docstore.service.BeanLocator;
20 import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
21 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
22 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
23 import org.kuali.ole.docstore.utility.FileIngestStatistics;
24 import org.kuali.ole.pojo.OleException;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29
30
31
32
33
34 public class BulkIngestNIndexProcessor
35 implements Processor {
36
37 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
38 private String user;
39 private String action;
40 private Session session = null;
41 private BatchIngestStatistics batchStatistics = null;
42
43
44
45 private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
46 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
47
48 private BulkProcessRequest bulkProcessRequest = null;
49
50 public BulkIngestNIndexProcessor(String user, String action) {
51 this.user = user;
52 this.action = action;
53 }
54
55 @Override
56 public void process(Exchange exchange) throws Exception {
57 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
58 String fileName = "";
59 String filePath = "";
60 BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
61 FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
62 batchStatistics = fileIngestStatistics.startBatch();
63 filePath = exchange.getProperty("CamelFileExchangeFile").toString();
64 fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
65 fileName = fileName.replace(File.separator, "");
66 fileName = fileName.replace("]", "");
67
68 if (bulkLoadStatistics.isFirstBatch()) {
69 fileIngestStatistics.setFileName(fileName);
70 fileIngestStatistics.setFileStatus("Started");
71 bulkLoadStatistics.setFirstBatch(false);
72 }
73 StopWatch batchIngestNIndexTimer = new StopWatch();
74 StopWatch convertInputToReqTimer = new StopWatch();
75 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
76 Date date = new Date();
77 batchStatistics.setBatchStartTime(dateFormat.format(date));
78 batchIngestNIndexTimer.start();
79 logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
80 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
81 try {
82 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
83 convertInputToReqTimer.start();
84 if (exchange.getIn().getBody() instanceof List) {
85 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
86 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
87 }
88 }
89 Request request = new Request();
90 request.setUser(user);
91 request.setOperation(action);
92 request.setRequestDocuments(ingestDocs);
93 convertInputToReqTimer.stop();
94 logger.debug("converting input xml to request object " + convertInputToReqTimer);
95 if (session == null) {
96 session = RepositoryManager.getRepositoryManager()
97 .getSession(request.getUser(), request.getOperation());
98 }
99 List<String> ids = ingestNIndexHandlerService.bulkIngestNIndex(request, session);
100 logger.debug("Bulk Ingest Batch(UUIDs):" + ids);
101 batchIngestNIndexTimer.stop();
102 logger.debug("Bulk Ingest and Index Process Batch took " + batchIngestNIndexTimer + " time");
103 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
104 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
105 date = new Date();
106 batchStatistics.setBatchEndTime(dateFormat.format(date));
107 logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
108 if (bulkLoadStatistics.isLastBatch()) {
109 fileIngestStatistics.setFileStatus("Done");
110 bulkLoadStatistics.setLastBatch(false);
111 logger.info("Bulk ingest: File metrics : \n" + bulkLoadStatistics.toString());
112 bulkLoadStatistics.setFileRecCount(0);
113 }
114 logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
115 }
116 catch (Exception e) {
117 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
118 exchange.setException(e);
119 throw e;
120 }
121 finally {
122 if (session != null) {
123 try {
124 if (bulkLoadStatistics.isLastBatch()) {
125 RepositoryManager.getRepositoryManager().logout(session);
126 session = null;
127 }
128 }
129 catch (OleException e) {
130 }
131 }
132
133 }
134 }
135
136 public void processNew(Exchange exchange) throws Exception {
137 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
138 String fileName = "";
139 String filePath = "";
140 BulkIngestStatistics bulkLoadStatistics = bulkProcessRequest.getBulkIngestStatistics();
141 FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
142 batchStatistics = fileIngestStatistics.startBatch();
143 filePath = exchange.getProperty("CamelFileExchangeFile").toString();
144 fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
145 fileName = fileName.replace(File.separator, "");
146 fileName = fileName.replace("]", "");
147
148
149 if (bulkProcessRequest.getBulkIngestStatistics().isFirstBatch()) {
150 fileIngestStatistics.setFileName(fileName);
151 fileIngestStatistics.setFileStatus("Started");
152 bulkProcessRequest.getBulkIngestStatistics().setFirstBatch(false);
153 }
154 StopWatch batchIngestNIndexTimer = new StopWatch();
155 StopWatch convertInputToReqTimer = new StopWatch();
156 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
157 Date date = new Date();
158 batchStatistics.setBatchStartTime(dateFormat.format(date));
159 batchIngestNIndexTimer.start();
160 logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
161 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
162 try {
163 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
164 convertInputToReqTimer.start();
165
166 if (exchange.getIn().getBody() instanceof List) {
167 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
168 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
169 }
170 }
171 convertInputToReqTimer.stop();
172
173 BeanLocator.getDocumentServiceImpl().bulkIngest(bulkProcessRequest, ingestDocs);
174
175
176
177
178
179 batchIngestNIndexTimer.stop();
180 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
181 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
182 date = new Date();
183 batchStatistics.setBatchEndTime(dateFormat.format(date));
184 logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
185 if (bulkLoadStatistics.isLastBatch()) {
186 fileIngestStatistics.setFileStatus("Done");
187 bulkLoadStatistics.setLastBatch(false);
188 logger.info("Bulk ingest: File metrics : \n" + bulkLoadStatistics.toString());
189 bulkLoadStatistics.setFileRecCount(0);
190 }
191 logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
192 }
193 catch (Exception e) {
194 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
195 exchange.setException(e);
196 throw e;
197 }
198 finally {
199
200 }
201 }
202
203 public BulkIngestStatistics getBulkLoadStatistics() {
204 return bulkLoadStatistics;
205 }
206
207 public void setBulkLoadStatistics(BulkIngestStatistics bulkLoadStatistics) {
208 this.bulkLoadStatistics = bulkLoadStatistics;
209 }
210
211 public BulkProcessRequest getBulkProcessRequest() {
212 return bulkProcessRequest;
213 }
214
215 public void setBulkProcessRequest(BulkProcessRequest bulkProcessRequest) {
216 this.bulkProcessRequest = bulkProcessRequest;
217 }
218 }