1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole.docstore.service;
17
18 import org.apache.commons.lang.time.StopWatch;
19 import org.kuali.ole.RepositoryManager;
20 import org.kuali.ole.docstore.model.enums.DocCategory;
21 import org.kuali.ole.docstore.model.enums.DocFormat;
22 import org.kuali.ole.docstore.model.enums.DocType;
23 import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
24 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
25 import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
26 import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
27 import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.Instance;
28 import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.InstanceCollection;
29 import org.kuali.ole.docstore.model.xmlpojo.work.instance.oleml.Item;
30 import org.kuali.ole.docstore.model.xstream.ingest.RequestHandler;
31 import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
32 import org.kuali.ole.docstore.process.BulkIngestTimeManager;
33 import org.kuali.ole.docstore.process.ProcessParameters;
34 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
35 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.springframework.beans.factory.annotation.Required;
39
40 import javax.jcr.Session;
41 import java.util.ArrayList;
42 import java.util.Iterator;
43 import java.util.List;
44
45
46
47
48
49
50
51
52
53 public class IngestNIndexHandlerService {
54
55 private static Logger logger = LoggerFactory.getLogger(IngestNIndexHandlerService.class);
56
57
58
59
60 private RequestHandler requestHandler;
61
62
63
64 private DocumentIngester documentIngester;
65
66
67
68 private DocumentIndexer documentIndexer;
69 private static long docCount = 0;
70 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
71 private static List<RequestDocument> prevRequestDocs = null;
72
73 @Required
74 public void setDocumentIngester(DocumentIngester documentIngester) {
75 this.documentIngester = documentIngester;
76 }
77
78 @Required
79 public void setDocumentIndexer(DocumentIndexer documentIndexer) {
80 this.documentIndexer = documentIndexer;
81 }
82
83 @Required
84 public void setRequestHandler(RequestHandler requestHandler) {
85 this.requestHandler = requestHandler;
86 }
87
88
89
90
91
92
93
94
95 public String ingestNIndexRequestDocuments(String xmlRequestString) throws Exception {
96 Request request = null;
97 request = requestHandler.toObject(xmlRequestString);
98 Response response = ingestNIndexRequestDocuments(request);
99 String xmlResponse = new ResponseHandler().toXML(response);
100 return xmlResponse;
101 }
102
103
104
105
106
107
108
109
110
111
112 public Response ingestNIndexRequestDocuments(Request request) throws Exception {
113
114 for (RequestDocument doc : request.getRequestDocuments()) {
115 doc.setUser(request.getUser());
116 }
117 Session session = null;
118 List<String> docUUIDs = new ArrayList<String>();
119 try {
120 session = RepositoryManager.getRepositoryManager().getSession(request.getUser(), request.getOperation());
121
122
123 for (RequestDocument reqDoc : request.getRequestDocuments()) {
124 if (DocCategory.WORK.isEqualTo(reqDoc.getCategory())) {
125 if (DocType.BIB.isEqualTo(reqDoc.getType())) {
126 if (DocFormat.MARC.isEqualTo(reqDoc.getFormat())
127 || DocFormat.DUBLIN_CORE.isEqualTo(reqDoc.getFormat()) || DocFormat.DUBLIN_UNQUALIFIED
128 .isEqualTo(reqDoc.getFormat())) {
129 docUUIDs.addAll(documentIngester.ingestBibNLinkedInstanceRequestDocuments(reqDoc, session));
130 documentIndexer.indexDocument(reqDoc);
131 }
132 else {
133 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
134 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
135 }
136 }
137 else if (DocType.INSTANCE.isEqualTo(reqDoc.getType())) {
138 if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) {
139 documentIngester.ingestInstanceDocument(reqDoc, session, docUUIDs, null, null);
140 documentIndexer.indexDocument(reqDoc);
141 }
142 else {
143 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
144 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
145 }
146 }
147 else if (DocType.LICENSE.isEqualTo(reqDoc.getType())) {
148 if (DocFormat.ONIXPL.isEqualTo(reqDoc.getFormat())
149 || DocFormat.PDF.isEqualTo(reqDoc.getFormat())
150 || DocFormat.DOC.isEqualTo(reqDoc.getFormat()) || DocFormat.XSLT
151 .isEqualTo(reqDoc.getFormat())) {
152 documentIngester.ingestWorkLicenseOnixplRequestDocument(reqDoc, session, docUUIDs);
153 documentIndexer.indexDocument(reqDoc);
154 }
155 else {
156 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
157 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
158 }
159 }
160 else {
161 logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
162 throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
163 }
164 }
165 else if (DocCategory.SECURITY.isEqualTo(reqDoc.getCategory())) {
166 if (DocType.PATRON.isEqualTo(reqDoc.getType())) {
167 if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) {
168 docUUIDs.addAll(documentIngester.ingestPatronRequestDocument(reqDoc, session, null));
169 documentIndexer.indexDocument(reqDoc);
170 }
171 else {
172 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
173 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
174 }
175 }
176 else {
177 logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
178 throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
179 }
180 }
181 else {
182 logger.error("Unsupported Category : " + reqDoc.getCategory() + " Called.");
183 throw new Exception("Unsupported Document Category : " + reqDoc.getCategory() + " Called.");
184 }
185 }
186
187
188 session.save();
189
190 }
191 catch (Exception e) {
192 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
193 documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
194 documentIndexer.rollbackIndexedData(request.getRequestDocuments());
195 throw e;
196 }
197 finally {
198 if (session != null) {
199 RepositoryManager.getRepositoryManager().logout(session);
200 }
201 }
202 Response response = buildResponse(request);
203 return response;
204 }
205
206
207
208
209
210
211
212 public List<String> bulkIngestNIndex(Request request, Session session) {
213
214
215 BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch();
216 BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
217 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
218 logger.debug("commitSize = " + commitSize);
219 logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") START");
220 logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
221
222 List<String> docUUIDs = new ArrayList<String>();
223 StopWatch ingestTimer = new StopWatch();
224 StopWatch indexTimer = new StopWatch();
225 StopWatch totalTimer = new StopWatch();
226 StopWatch createNodesTimer = new StopWatch();
227 StopWatch sessionSaveTimer = new StopWatch();
228 StopWatch solrOptimizeTimer = new StopWatch();
229 long recCount = request.getRequestDocuments().size();
230 boolean isCommit = false;
231 totalTimer.start();
232 try {
233 ingestTimer.start();
234 createNodesTimer.start();
235
236 List<RequestDocument> reqDocs = request.getRequestDocuments();
237 if (prevRequestDocs == null) {
238 prevRequestDocs = new ArrayList<RequestDocument>();
239 }
240 prevRequestDocs.addAll(request.getRequestDocuments());
241 logger.info("prevRequestDocs" + prevRequestDocs.size());
242 docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulk(reqDocs, session));
243
244
245 createNodesTimer.stop();
246 try {
247 ingestTimer.suspend();
248 indexTimer.start();
249 }
250 catch (Exception e2) {
251 }
252 bulkLoadStatistics.setCommitRecCount(bulkLoadStatistics.getCommitRecCount() + recCount);
253 if (bulkLoadStatistics.getCommitRecCount() == commitSize || bulkLoadStatistics.isLastBatch()) {
254 isCommit = true;
255 }
256 documentIndexer.indexDocumentsForBulk(reqDocs, isCommit);
257
258 try {
259 indexTimer.suspend();
260 ingestTimer.resume();
261 }
262 catch (Exception e2) {
263 }
264 if (isCommit) {
265 sessionSaveTimer.start();
266 logger.info("Bulk ingest: Repository commit started. Number of records being committed : "
267 + bulkLoadStatistics.getCommitRecCount());
268 session.save();
269 bulkLoadStatistics.setCommitRecCount(0);
270 prevRequestDocs = null;
271 sessionSaveTimer.stop();
272 }
273
274 try {
275 ingestTimer.stop();
276 }
277 catch (Exception e2) {
278 }
279
280 logger.debug("Documents processed:" + recCount);
281 bulkLoadStatistics.setFileRecCount(bulkLoadStatistics.getFileRecCount() + recCount);
282 logger.info("Bulk ingest: Records processed in the current file :" + bulkLoadStatistics.getFileRecCount());
283 }
284 catch (Exception e) {
285 bulkLoadStatistics.setCommitRecCount(0);
286 try {
287 ingestTimer.resume();
288 }
289 catch (Exception e2) {
290 }
291
292 documentIngester.rollbackDocStoreIngestedData(session, prevRequestDocs);
293 ingestTimer.stop();
294 try {
295 indexTimer.resume();
296 }
297 catch (Exception e2) {
298 }
299
300
301
302 documentIndexer.rollbackIndexedData(prevRequestDocs);
303 prevRequestDocs = null;
304 try {
305 indexTimer.stop();
306 }
307 catch (Exception e2) {
308 }
309 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
310 try {
311 totalTimer.stop();
312 }
313 catch (Exception e2) {
314 }
315 logger.debug("Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer
316 + "\tSessionSave(" + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size()
317 + "):" + ingestTimer + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: "
318 + totalTimer);
319 docUUIDs.clear();
320 }
321 finally {
322
323
324
325
326
327
328 }
329 try {
330 totalTimer.stop();
331 }
332 catch (Exception exe) {
333 }
334 logger.debug(
335 "Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer + "\tSessionSave("
336 + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() + "):" + ingestTimer
337 + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " + totalTimer);
338 logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") END");
339 batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
340 batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
341 batchStatistics.setIngestingTime(ingestTimer.getTime());
342 batchStatistics.setIndexingTime(indexTimer.getTime());
343 batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
344 updateProcessTimer(docUUIDs.size(), ingestTimer, indexTimer, totalTimer);
345 solrOptimizeTimer.start();
346 optimizeSolr(docUUIDs.size());
347 solrOptimizeTimer.stop();
348 batchStatistics.setTimeToSolrOptimize(solrOptimizeTimer.getTime());
349 return docUUIDs;
350 }
351
352 private void updateProcessTimer(int recordsProcessed, StopWatch ingest, StopWatch index, StopWatch total) {
353 BulkIngestTimeManager timer = ProcessParameters.BULK_PROCESSOR_TIME_MANAGER;
354 synchronized (timer) {
355 timer.setRecordsCount(timer.getRecordsCount() + recordsProcessed);
356 timer.setIngestingTimer(timer.getIngestingTimer() + ingest.getTime());
357 timer.setIndexingTimer(timer.getIndexingTimer() + index.getTime());
358 timer.setProcessTimer(timer.getProcessTimer() + total.getTime());
359 if (timer.getRecordsCount() >= ProcessParameters.BULK_PROCESSOR_TIMER_DISPLAY) {
360 logger.debug(
361 "----------------------------------------------------------------------------------------------------------------------");
362 logger.debug(timer.toString());
363 logger.debug(
364 "----------------------------------------------------------------------------------------------------------------------");
365 timer.reset();
366 }
367 }
368 }
369
370 private void optimizeSolr(long recordsProcessed) {
371 docCount += recordsProcessed;
372 logger.debug("BULK_INGEST_OPTIMIZE_SIZE=" + ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE
373 + ". Records processed till now=" + docCount);
374 logger.info("Bulk ingest: Records processed in the bulk ingest " + docCount);
375 if (docCount >= ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE) {
376 docCount = 0;
377 try {
378 logger.debug("Solr Optimization: START");
379 documentIndexer.optimizeSolr(false, false);
380 logger.debug("Solr Optimization: END");
381 }
382 catch (Exception e) {
383 logger.warn("Solr Optimization Failed: ", e);
384 }
385 }
386 }
387
388 public Response buildResponse(Request request) {
389 Response docStoreResponse = new Response();
390 docStoreResponse.setUser(request.getUser());
391 docStoreResponse.setOperation(request.getOperation());
392 docStoreResponse.setMessage("Documents ingested");
393 docStoreResponse.setStatus("Success");
394 docStoreResponse.setStatusMessage("Documents Ingested Successfully");
395 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
396 ResponseDocument linkedDocument = null;
397 ResponseDocument responseDocument = null;
398 ResponseDocument linkedInstanceDocument = null;
399 ResponseDocument linkedInstanceItemDocument = null;
400 ResponseDocument linkedInstanceSrHoldingDoc = null;
401
402 for (Iterator<RequestDocument> iterator = request.getRequestDocuments().iterator(); iterator.hasNext(); ) {
403 RequestDocument docStoreDocument = iterator.next();
404 docStoreDocument.getContent().setContent("");
405 responseDocument = new ResponseDocument();
406 setResponseParameters(responseDocument, docStoreDocument);
407 responseDocuments.add(responseDocument);
408 if (docStoreDocument.getLinkedRequestDocuments() != null
409 && docStoreDocument.getLinkedRequestDocuments().size() > 0 && request != null
410 && request.getOperation() != null && !request.getOperation().equalsIgnoreCase("checkIn")) {
411 List<ResponseDocument> linkResponseDos = new ArrayList<ResponseDocument>();
412
413 for (Iterator<RequestDocument> linkIterator = docStoreDocument.getLinkedRequestDocuments()
414 .iterator(); linkIterator.hasNext(); ) {
415 RequestDocument linkedRequestDocument = linkIterator.next();
416 linkedRequestDocument.getContent().setContent("");
417 linkedDocument = new ResponseDocument();
418 setResponseParameters(linkedDocument, linkedRequestDocument);
419 linkResponseDos.add(linkedDocument);
420 List<ResponseDocument> linkInstanceDocs = new ArrayList<ResponseDocument>();
421 InstanceCollection instanceCollection = (InstanceCollection) linkedRequestDocument.getContent()
422 .getContentObject();
423 for (Instance oleInstance : instanceCollection.getInstance()) {
424
425 linkedInstanceDocument = new ResponseDocument();
426 setResponseParameters(linkedInstanceDocument, linkedRequestDocument);
427 linkedInstanceDocument.setUuid(oleInstance.getOleHoldings().getHoldingsIdentifier());
428 linkedInstanceDocument.setType("holdings");
429 linkInstanceDocs.add(linkedInstanceDocument);
430
431
432 linkedInstanceSrHoldingDoc = new ResponseDocument();
433 setResponseParameters(linkedInstanceSrHoldingDoc, linkedRequestDocument);
434 if(oleInstance.getSourceHoldings() != null &&
435 oleInstance.getSourceHoldings().getHoldingsIdentifier() != null ){
436 linkedInstanceSrHoldingDoc.setUuid(oleInstance.getSourceHoldings().getHoldingsIdentifier());
437 linkedInstanceSrHoldingDoc.setType("sourceHoldings");
438 linkInstanceDocs.add(linkedInstanceSrHoldingDoc);
439 }
440
441
442
443 for (Iterator<Item> itemIterator = oleInstance.getItems().getItem().iterator(); itemIterator
444 .hasNext(); ) {
445 Item oleItem = itemIterator.next();
446 linkedInstanceItemDocument = new ResponseDocument();
447 setResponseParameters(linkedInstanceItemDocument, linkedRequestDocument);
448 linkedInstanceItemDocument.setUuid(oleItem.getItemIdentifier());
449 linkedInstanceItemDocument.setType("item");
450 linkInstanceDocs.add(linkedInstanceItemDocument);
451 }
452 }
453 responseDocument.setLinkedInstanceDocuments(linkInstanceDocs);
454 }
455 responseDocument.setLinkedDocuments(linkResponseDos);
456 }
457 }
458 docStoreResponse.setDocuments(responseDocuments);
459 return docStoreResponse;
460 }
461
462 private void setResponseParameters(ResponseDocument responseDocument, RequestDocument docStoreDocument) {
463 responseDocument.setId(docStoreDocument.getId());
464 responseDocument.setCategory(docStoreDocument.getCategory());
465 responseDocument.setType(docStoreDocument.getType());
466 responseDocument.setFormat(docStoreDocument.getFormat());
467 responseDocument.setContent(docStoreDocument.getContent());
468 responseDocument.setUuid(docStoreDocument.getUuid());
469 }
470
471 }