1 package org.kuali.ole.docstore.process;
2
3 import java.io.File;
4 import java.text.DateFormat;
5 import java.text.SimpleDateFormat;
6 import java.util.ArrayList;
7 import java.util.Date;
8 import java.util.List;
9 import javax.jcr.Session;
10
11 import org.apache.camel.Exchange;
12 import org.apache.camel.Processor;
13 import org.apache.commons.lang.time.StopWatch;
14 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
15 import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
16 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
17 import org.kuali.ole.docstore.service.BeanLocator;
18 import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
19 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
20 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
21 import org.kuali.ole.docstore.utility.FileIngestStatistics;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import javax.jcr.Session;
26 import java.io.File;
27 import java.text.DateFormat;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.List;
32
33
34
35
36
37
38
39 public class BulkIngestNIndexProcessor
40 implements Processor {
41
42 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
43 private String user;
44 private String action;
45 private Session session = null;
46 private BatchIngestStatistics batchStatistics = null;
47
48
49
50 private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
51 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
52
53 private BulkProcessRequest bulkProcessRequest = null;
54
55 public BulkIngestNIndexProcessor(String user, String action) {
56 this.user = user;
57 this.action = action;
58 }
59
60 @Override
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 public void process(Exchange exchange) throws Exception {
142 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
143 String fileName = "";
144 String filePath = "";
145 BulkIngestStatistics bulkLoadStatistics = bulkProcessRequest.getBulkIngestStatistics();
146 FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
147 batchStatistics = fileIngestStatistics.startBatch();
148 filePath = exchange.getProperty("CamelFileExchangeFile").toString();
149 fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
150 fileName = fileName.replace(File.separator, "");
151 fileName = fileName.replace("]", "");
152
153
154 if (bulkProcessRequest.getBulkIngestStatistics().isFirstBatch()) {
155 fileIngestStatistics.setFileName(fileName);
156 fileIngestStatistics.setFileStatus("Started");
157 bulkProcessRequest.getBulkIngestStatistics().setFirstBatch(false);
158 }
159 StopWatch batchIngestNIndexTimer = new StopWatch();
160 StopWatch convertInputToReqTimer = new StopWatch();
161 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
162 Date date = new Date();
163 batchStatistics.setBatchStartTime(dateFormat.format(date));
164 batchIngestNIndexTimer.start();
165 logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
166 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
167 try {
168 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
169 convertInputToReqTimer.start();
170
171 if (exchange.getIn().getBody() instanceof List) {
172 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
173 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
174 }
175 }
176 convertInputToReqTimer.stop();
177
178 BeanLocator.getDocumentServiceImpl().bulkIngest(bulkProcessRequest, ingestDocs);
179
180
181
182
183
184 batchIngestNIndexTimer.stop();
185 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
186 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
187 date = new Date();
188 batchStatistics.setBatchEndTime(dateFormat.format(date));
189 logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
190 if (bulkLoadStatistics.isLastBatch()) {
191 fileIngestStatistics.setFileStatus("Done");
192 bulkLoadStatistics.setLastBatch(false);
193 logger.info("Bulk ingest: File metrics : \n" + bulkLoadStatistics.toString());
194 bulkLoadStatistics.setFileRecCount(0);
195 }
196 logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
197 }
198 catch (Exception e) {
199 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
200 exchange.setException(e);
201 throw e;
202 }
203 finally {
204
205 }
206 }
207
208 public BulkIngestStatistics getBulkLoadStatistics() {
209 return bulkLoadStatistics;
210 }
211
212 public void setBulkLoadStatistics(BulkIngestStatistics bulkLoadStatistics) {
213 this.bulkLoadStatistics = bulkLoadStatistics;
214 }
215
216 public BulkProcessRequest getBulkProcessRequest() {
217 return bulkProcessRequest;
218 }
219
220 public void setBulkProcessRequest(BulkProcessRequest bulkProcessRequest) {
221 this.bulkProcessRequest = bulkProcessRequest;
222 }
223 }