1 package org.kuali.ole.docstore.process;
2
3 import java.io.File;
4 import java.text.DateFormat;
5 import java.text.SimpleDateFormat;
6 import java.util.ArrayList;
7 import java.util.Date;
8 import java.util.List;
9 import javax.jcr.Session;
10
11 import org.apache.camel.Exchange;
12 import org.apache.camel.Processor;
13 import org.apache.commons.lang.time.StopWatch;
14 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
15 import org.kuali.ole.docstore.model.xstream.ingest.IngestDocumentHandler;
16 import org.kuali.ole.docstore.process.batch.BulkProcessRequest;
17 import org.kuali.ole.docstore.service.BeanLocator;
18 import org.kuali.ole.docstore.service.IngestNIndexHandlerService;
19 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
20 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
21 import org.kuali.ole.docstore.utility.FileIngestStatistics;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 import javax.jcr.Session;
26 import java.io.File;
27 import java.text.DateFormat;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Date;
31 import java.util.List;
32
33
34
35
36
37
38
39 public class BulkIngestNIndexProcessor
40 implements Processor {
41
42 private static Logger logger = LoggerFactory.getLogger(BulkIngestNIndexProcessor.class);
43 private String user;
44 private String action;
45 private Session session = null;
46 private BatchIngestStatistics batchStatistics = null;
47
48
49
50 private IngestNIndexHandlerService ingestNIndexHandlerService = BeanLocator.getIngestNIndexHandlerService();
51 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
52
53 private BulkProcessRequest bulkProcessRequest = null;
54
55 public BulkIngestNIndexProcessor(String user, String action) {
56 this.user = user;
57 this.action = action;
58 }
59
60 @Override
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 public void process(Exchange exchange) throws Exception {
142 IngestDocumentHandler ingestDocumentHandler = new IngestDocumentHandler();
143 String fileName = "";
144 String filePath = "";
145 BulkIngestStatistics bulkLoadStatistics = bulkProcessRequest.getBulkIngestStatistics();
146 FileIngestStatistics fileIngestStatistics = bulkLoadStatistics.getFileIngestStatistics();
147 batchStatistics = fileIngestStatistics.startBatch();
148 filePath = exchange.getProperty("CamelFileExchangeFile").toString();
149 fileName = filePath.substring(filePath.lastIndexOf(File.separator), filePath.length());
150 fileName = fileName.replace(File.separator, "");
151 fileName = fileName.replace("]", "");
152
153
154 if (bulkProcessRequest.getBulkIngestStatistics().isFirstBatch()) {
155 fileIngestStatistics.setFileName(fileName);
156 fileIngestStatistics.setFileStatus("Started");
157 bulkProcessRequest.getBulkIngestStatistics().setFirstBatch(false);
158 }
159 StopWatch batchIngestNIndexTimer = new StopWatch();
160 StopWatch convertInputToReqTimer = new StopWatch();
161 DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
162 Date date = new Date();
163 batchStatistics.setBatchStartTime(dateFormat.format(date));
164 batchIngestNIndexTimer.start();
165 logger.info("Bulk ingest: Batch Start time : \t" + dateFormat.format(date));
166 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
167 try {
168 ArrayList<RequestDocument> ingestDocs = new ArrayList<RequestDocument>();
169 convertInputToReqTimer.start();
170
171 if (exchange.getIn().getBody() instanceof List) {
172 for (String ingestDocXml : (List<String>) exchange.getIn().getBody()) {
173 ingestDocs.add(ingestDocumentHandler.toObject(ingestDocXml));
174 }
175 }
176 convertInputToReqTimer.stop();
177
178 BeanLocator.getDocumentServiceImpl().bulkIngest(bulkProcessRequest, ingestDocs);
179
180
181
182
183
184 batchIngestNIndexTimer.stop();
185 batchStatistics.setTimeToConvertStringToReqObj(convertInputToReqTimer.getTime());
186 batchStatistics.setBatchTime(batchIngestNIndexTimer.getTime());
187 date = new Date();
188 batchStatistics.setBatchEndTime(dateFormat.format(date));
189 logger.info("Bulk ingest: Batch metrics : " + "\n" + batchStatistics.toString());
190 if (bulkLoadStatistics.isLastBatch()) {
191 fileIngestStatistics.setFileStatus("Done");
192 bulkLoadStatistics.setLastBatch(false);
193 logger.info("Bulk ingest: File metrics : \n" + bulkLoadStatistics.toString());
194 bulkLoadStatistics.setFileRecCount(0);
195 }
196 logger.info("Bulk ingest: Batch End time : \t" + dateFormat.format(date));
197 } catch (Exception e) {
198 logger.error("Bulk Processor Failed @ Batch : " + exchange.getIn(), e);
199 exchange.setException(e);
200 throw e;
201 } finally {
202
203 }
204 }
205
206 public BulkIngestStatistics getBulkLoadStatistics() {
207 return bulkLoadStatistics;
208 }
209
210 public void setBulkLoadStatistics(BulkIngestStatistics bulkLoadStatistics) {
211 this.bulkLoadStatistics = bulkLoadStatistics;
212 }
213
214 public BulkProcessRequest getBulkProcessRequest() {
215 return bulkProcessRequest;
216 }
217
218 public void setBulkProcessRequest(BulkProcessRequest bulkProcessRequest) {
219 this.bulkProcessRequest = bulkProcessRequest;
220 }
221 }