1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.ole.docstore.service;
17
18 import org.apache.commons.lang.time.StopWatch;
19 import org.kuali.ole.RepositoryManager;
20 import org.kuali.ole.docstore.common.document.content.instance.Instance;
21 import org.kuali.ole.docstore.common.document.content.instance.InstanceCollection;
22 import org.kuali.ole.docstore.common.document.content.instance.Item;
23 import org.kuali.ole.docstore.model.enums.DocCategory;
24 import org.kuali.ole.docstore.model.enums.DocFormat;
25 import org.kuali.ole.docstore.model.enums.DocType;
26 import org.kuali.ole.docstore.model.xmlpojo.ingest.Request;
27 import org.kuali.ole.docstore.model.xmlpojo.ingest.RequestDocument;
28 import org.kuali.ole.docstore.model.xmlpojo.ingest.Response;
29 import org.kuali.ole.docstore.model.xmlpojo.ingest.ResponseDocument;
30 import org.kuali.ole.docstore.model.xstream.ingest.RequestHandler;
31 import org.kuali.ole.docstore.model.xstream.ingest.ResponseHandler;
32 import org.kuali.ole.docstore.process.BulkIngestTimeManager;
33 import org.kuali.ole.docstore.process.ProcessParameters;
34 import org.kuali.ole.docstore.utility.BatchIngestStatistics;
35 import org.kuali.ole.docstore.utility.BulkIngestStatistics;
36 import org.kuali.ole.pojo.OleException;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Required;
40
41 import javax.jcr.Session;
42 import java.util.ArrayList;
43 import java.util.Iterator;
44 import java.util.List;
45
46
47
48
49
50
51
52
53
54 public class IngestNIndexHandlerService {
55
56 private static Logger logger = LoggerFactory.getLogger(IngestNIndexHandlerService.class);
57
58
59
60
61 private RequestHandler requestHandler;
62
63
64
65 private DocumentIngester documentIngester;
66
67
68
69 private DocumentIndexer documentIndexer;
70 private static long docCount = 0;
71 private BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
72 private static List<RequestDocument> prevRequestDocs = null;
73 private RepositoryManager repositoryManager;
74
75 @Required
76 public void setDocumentIngester(DocumentIngester documentIngester) {
77 this.documentIngester = documentIngester;
78 }
79
80 @Required
81 public void setDocumentIndexer(DocumentIndexer documentIndexer) {
82 this.documentIndexer = documentIndexer;
83 }
84
85 @Required
86 public void setRequestHandler(RequestHandler requestHandler) {
87 this.requestHandler = requestHandler;
88 }
89
90
91
92
93
94
95
96
97 public String ingestNIndexRequestDocuments(String xmlRequestString) throws Exception {
98 Request request = null;
99 request = requestHandler.toObject(xmlRequestString);
100 Response response = ingestNIndexRequestDocuments(request);
101 String xmlResponse = new ResponseHandler().toXML(response);
102 return xmlResponse;
103 }
104
105
106
107
108
109
110
111
112
113
114 public Response ingestNIndexRequestDocuments(Request request) throws Exception {
115
116 for (RequestDocument doc : request.getRequestDocuments()) {
117 doc.setUser(request.getUser());
118 }
119 Session session = null;
120 List<String> docUUIDs = new ArrayList<String>();
121 try {
122 session = getRepositoryManager().getSession(request.getUser(), request.getOperation());
123
124
125 for (RequestDocument reqDoc : request.getRequestDocuments()) {
126 if (DocCategory.WORK.isEqualTo(reqDoc.getCategory())) {
127 if (DocType.BIB.isEqualTo(reqDoc.getType())) {
128 if (DocFormat.MARC.isEqualTo(reqDoc.getFormat())
129 || DocFormat.DUBLIN_CORE.isEqualTo(reqDoc.getFormat()) || DocFormat.DUBLIN_UNQUALIFIED
130 .isEqualTo(reqDoc.getFormat())) {
131 docUUIDs.addAll(documentIngester.ingestBibNLinkedInstanceRequestDocuments(reqDoc, session));
132 documentIndexer.indexDocument(reqDoc);
133 } else {
134 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
135 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
136 }
137 } else if (DocType.INSTANCE.isEqualTo(reqDoc.getType())) {
138 if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) {
139 documentIngester.ingestInstanceDocument(reqDoc, session, docUUIDs, null, null);
140 documentIndexer.indexDocument(reqDoc);
141 } else {
142 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
143 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
144 }
145 } else if (DocType.LICENSE.isEqualTo(reqDoc.getType())) {
146 if (DocFormat.ONIXPL.isEqualTo(reqDoc.getFormat())
147 || DocFormat.PDF.isEqualTo(reqDoc.getFormat())
148 || DocFormat.DOC.isEqualTo(reqDoc.getFormat()) || DocFormat.XSLT
149 .isEqualTo(reqDoc.getFormat())) {
150 documentIngester.ingestWorkLicenseOnixplRequestDocument(reqDoc, session, docUUIDs);
151 documentIndexer.indexDocument(reqDoc);
152 } else {
153 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
154 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
155 }
156 } else {
157 logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
158 throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
159 }
160 } else if (DocCategory.SECURITY.isEqualTo(reqDoc.getCategory())) {
161 if (DocType.PATRON.isEqualTo(reqDoc.getType())) {
162 if (DocFormat.OLEML.isEqualTo(reqDoc.getFormat())) {
163 docUUIDs.addAll(documentIngester.ingestPatronRequestDocument(reqDoc, session, null));
164 documentIndexer.indexDocument(reqDoc);
165 } else {
166 logger.error("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
167 throw new Exception("Unsupported Document Format : " + reqDoc.getFormat() + " Called.");
168 }
169 } else {
170 logger.error("Unsupported Document Type : " + reqDoc.getType() + " Called.");
171 throw new Exception("Unsupported Document Type : " + reqDoc.getType() + " Called.");
172 }
173 } else {
174 logger.error("Unsupported Category : " + reqDoc.getCategory() + " Called.");
175 throw new Exception("Unsupported Document Category : " + reqDoc.getCategory() + " Called.");
176 }
177 }
178
179
180 session.save();
181
182 } catch (Exception e) {
183 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
184 documentIngester.rollbackDocStoreIngestedData(session, request.getRequestDocuments());
185 documentIndexer.rollbackIndexedData(request.getRequestDocuments());
186 throw e;
187 } finally {
188 if (session != null) {
189 getRepositoryManager().logout(session);
190 }
191 }
192 Response response = buildResponse(request);
193 return response;
194 }
195
196 private RepositoryManager getRepositoryManager() throws OleException {
197 if (null == repositoryManager) {
198 repositoryManager = RepositoryManager.getRepositoryManager();
199 }
200 return repositoryManager;
201 }
202
203
204
205
206
207
208
209 public List<String> bulkIngestNIndex(Request request, Session session) {
210
211
212 BatchIngestStatistics batchStatistics = BulkIngestStatistics.getInstance().getCurrentBatch();
213 BulkIngestStatistics bulkLoadStatistics = BulkIngestStatistics.getInstance();
214 long commitSize = ProcessParameters.BULK_INGEST_COMMIT_SIZE;
215 logger.debug("commitSize = " + commitSize);
216 logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") START");
217 logger.debug("BULK_INGEST_IS_LINKING_ENABLED=" + ProcessParameters.BULK_INGEST_IS_LINKING_ENABLED);
218
219 List<String> docUUIDs = new ArrayList<String>();
220 StopWatch ingestTimer = new StopWatch();
221 StopWatch indexTimer = new StopWatch();
222 StopWatch totalTimer = new StopWatch();
223 StopWatch createNodesTimer = new StopWatch();
224 StopWatch sessionSaveTimer = new StopWatch();
225 StopWatch solrOptimizeTimer = new StopWatch();
226 long recCount = request.getRequestDocuments().size();
227 boolean isCommit = false;
228 totalTimer.start();
229 try {
230 ingestTimer.start();
231 createNodesTimer.start();
232
233 List<RequestDocument> reqDocs = request.getRequestDocuments();
234 if (prevRequestDocs == null) {
235 prevRequestDocs = new ArrayList<RequestDocument>();
236 }
237 prevRequestDocs.addAll(request.getRequestDocuments());
238 logger.info("prevRequestDocs" + prevRequestDocs.size());
239 docUUIDs.addAll(documentIngester.ingestRequestDocumentsForBulk(reqDocs, session));
240
241
242 createNodesTimer.stop();
243 try {
244 ingestTimer.suspend();
245 indexTimer.start();
246 } catch (Exception e2) {
247 logger.error(e2.getMessage() , e2 );
248 }
249 bulkLoadStatistics.setCommitRecCount(bulkLoadStatistics.getCommitRecCount() + recCount);
250 if (bulkLoadStatistics.getCommitRecCount() == commitSize || bulkLoadStatistics.isLastBatch()) {
251 isCommit = true;
252 }
253 documentIndexer.indexDocumentsForBulk(reqDocs, isCommit);
254
255 try {
256 indexTimer.suspend();
257 ingestTimer.resume();
258 } catch (Exception e2) {
259 logger.error(e2.getMessage() , e2 );
260 }
261 if (isCommit) {
262 sessionSaveTimer.start();
263 logger.info("Bulk ingest: Repository commit started. Number of records being committed : "
264 + bulkLoadStatistics.getCommitRecCount());
265 session.save();
266 bulkLoadStatistics.setCommitRecCount(0);
267 prevRequestDocs = null;
268 sessionSaveTimer.stop();
269 }
270
271 try {
272 ingestTimer.stop();
273 } catch (Exception e2) {
274 logger.error(e2.getMessage() , e2 );
275 }
276
277 logger.debug("Documents processed:" + recCount);
278 bulkLoadStatistics.setFileRecCount(bulkLoadStatistics.getFileRecCount() + recCount);
279 logger.info("Bulk ingest: Records processed in the current file :" + bulkLoadStatistics.getFileRecCount());
280 } catch (Exception e) {
281 bulkLoadStatistics.setCommitRecCount(0);
282 try {
283 ingestTimer.resume();
284 } catch (Exception e2) {
285 logger.error(e2.getMessage() , e2 );
286 }
287
288 documentIngester.rollbackDocStoreIngestedData(session, prevRequestDocs);
289 ingestTimer.stop();
290 try {
291 indexTimer.resume();
292 } catch (Exception e2) {
293 logger.error(e2.getMessage() , e2 );
294 }
295
296
297
298 documentIndexer.rollbackIndexedData(prevRequestDocs);
299 prevRequestDocs = null;
300 try {
301 indexTimer.stop();
302 } catch (Exception e2) {
303 logger.error(e2.getMessage() , e2 );
304 }
305 logger.error("Document Ingest & Index Failed, Cause: " + e.getMessage(), e);
306 try {
307 totalTimer.stop();
308 } catch (Exception e2) {
309 logger.error(e2.getMessage() , e2 );
310 }
311 logger.debug("Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer
312 + "\tSessionSave(" + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size()
313 + "):" + ingestTimer + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: "
314 + totalTimer);
315 docUUIDs.clear();
316 } finally {
317
318
319
320
321
322
323 }
324 try {
325 totalTimer.stop();
326 } catch (Exception exe) {
327 logger.error(exe.getMessage() , exe );
328 }
329 logger.debug(
330 "Time Consumptions...:\tcreatingNodes(" + docUUIDs.size() + "):" + createNodesTimer + "\tSessionSave("
331 + docUUIDs.size() + "):" + sessionSaveTimer + "\tIngest(" + docUUIDs.size() + "):" + ingestTimer
332 + "\tIndexing(" + docUUIDs.size() + "):" + indexTimer + "\tTotal Time: " + totalTimer);
333 logger.debug("bulkIngestNIndex(" + request.getRequestDocuments().size() + ") END");
334 batchStatistics.setTimeToCreateNodesInJcr(createNodesTimer.getTime());
335 batchStatistics.setTimeToSaveJcrSession(sessionSaveTimer.getTime());
336 batchStatistics.setIngestingTime(ingestTimer.getTime());
337 batchStatistics.setIndexingTime(indexTimer.getTime());
338 batchStatistics.setIngestNIndexTotalTime(totalTimer.getTime());
339 updateProcessTimer(docUUIDs.size(), ingestTimer, indexTimer, totalTimer);
340 solrOptimizeTimer.start();
341 optimizeSolr(docUUIDs.size());
342 solrOptimizeTimer.stop();
343 batchStatistics.setTimeToSolrOptimize(solrOptimizeTimer.getTime());
344 return docUUIDs;
345 }
346
347 private void updateProcessTimer(int recordsProcessed, StopWatch ingest, StopWatch index, StopWatch total) {
348 BulkIngestTimeManager timer = ProcessParameters.BULK_PROCESSOR_TIME_MANAGER;
349 synchronized (timer) {
350 timer.setRecordsCount(timer.getRecordsCount() + recordsProcessed);
351 timer.setIngestingTimer(timer.getIngestingTimer() + ingest.getTime());
352 timer.setIndexingTimer(timer.getIndexingTimer() + index.getTime());
353 timer.setProcessTimer(timer.getProcessTimer() + total.getTime());
354 if (timer.getRecordsCount() >= ProcessParameters.BULK_PROCESSOR_TIMER_DISPLAY) {
355 logger.debug(
356 "----------------------------------------------------------------------------------------------------------------------");
357 logger.debug(timer.toString());
358 logger.debug(
359 "----------------------------------------------------------------------------------------------------------------------");
360 timer.reset();
361 }
362 }
363 }
364
365 private void optimizeSolr(long recordsProcessed) {
366 docCount += recordsProcessed;
367 logger.debug("BULK_INGEST_OPTIMIZE_SIZE=" + ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE
368 + ". Records processed till now=" + docCount);
369 logger.info("Bulk ingest: Records processed in the bulk ingest " + docCount);
370 if (docCount >= ProcessParameters.BULK_INGEST_OPTIMIZE_SIZE) {
371 docCount = 0;
372 try {
373 logger.debug("Solr Optimization: START");
374 documentIndexer.optimizeSolr(false, false);
375 logger.debug("Solr Optimization: END");
376 } catch (Exception e) {
377 logger.warn("Solr Optimization Failed: ", e);
378 }
379 }
380 }
381
382 public Response buildResponse(Request request) {
383 Response docStoreResponse = new Response();
384 docStoreResponse.setUser(request.getUser());
385 docStoreResponse.setOperation(request.getOperation());
386 docStoreResponse.setMessage("Documents ingested");
387 docStoreResponse.setStatus("Success");
388 docStoreResponse.setStatusMessage("Documents Ingested Successfully");
389 List<ResponseDocument> responseDocuments = new ArrayList<ResponseDocument>();
390 ResponseDocument linkedDocument = null;
391 ResponseDocument responseDocument = null;
392 ResponseDocument linkedInstanceDocument = null;
393 ResponseDocument linkedInstanceItemDocument = null;
394 ResponseDocument linkedInstanceSrHoldingDoc = null;
395
396 for (Iterator<RequestDocument> iterator = request.getRequestDocuments().iterator(); iterator.hasNext(); ) {
397 RequestDocument docStoreDocument = iterator.next();
398 docStoreDocument.getContent().setContent("");
399 responseDocument = new ResponseDocument();
400 setResponseParameters(responseDocument, docStoreDocument);
401 responseDocuments.add(responseDocument);
402 if (docStoreDocument.getLinkedRequestDocuments() != null
403 && docStoreDocument.getLinkedRequestDocuments().size() > 0 && request != null
404 && request.getOperation() != null && !request.getOperation().equalsIgnoreCase("checkIn")) {
405 List<ResponseDocument> linkResponseDos = new ArrayList<ResponseDocument>();
406
407 for (Iterator<RequestDocument> linkIterator = docStoreDocument.getLinkedRequestDocuments()
408 .iterator(); linkIterator.hasNext(); ) {
409 RequestDocument linkedRequestDocument = linkIterator.next();
410 linkedRequestDocument.getContent().setContent("");
411 linkedDocument = new ResponseDocument();
412 setResponseParameters(linkedDocument, linkedRequestDocument);
413 linkResponseDos.add(linkedDocument);
414 List<ResponseDocument> linkInstanceDocs = new ArrayList<ResponseDocument>();
415 InstanceCollection instanceCollection = (InstanceCollection) linkedRequestDocument.getContent()
416 .getContentObject();
417 for (Instance oleInstance : instanceCollection.getInstance()) {
418
419 linkedInstanceDocument = new ResponseDocument();
420 setResponseParameters(linkedInstanceDocument, linkedRequestDocument);
421 linkedInstanceDocument.setUuid(oleInstance.getOleHoldings().getHoldingsIdentifier());
422 linkedInstanceDocument.setType("holdings");
423 linkInstanceDocs.add(linkedInstanceDocument);
424
425
426 linkedInstanceSrHoldingDoc = new ResponseDocument();
427 setResponseParameters(linkedInstanceSrHoldingDoc, linkedRequestDocument);
428 if (oleInstance.getSourceHoldings() != null &&
429 oleInstance.getSourceHoldings().getHoldingsIdentifier() != null) {
430 linkedInstanceSrHoldingDoc.setUuid(oleInstance.getSourceHoldings().getHoldingsIdentifier());
431 linkedInstanceSrHoldingDoc.setType("sourceHoldings");
432 linkInstanceDocs.add(linkedInstanceSrHoldingDoc);
433 }
434
435
436
437 for (Iterator<Item> itemIterator = oleInstance.getItems().getItem().iterator(); itemIterator
438 .hasNext(); ) {
439 Item oleItem = itemIterator.next();
440 linkedInstanceItemDocument = new ResponseDocument();
441 setResponseParameters(linkedInstanceItemDocument, linkedRequestDocument);
442 linkedInstanceItemDocument.setUuid(oleItem.getItemIdentifier());
443 linkedInstanceItemDocument.setType("item");
444 linkInstanceDocs.add(linkedInstanceItemDocument);
445 }
446 }
447 responseDocument.setLinkedInstanceDocuments(linkInstanceDocs);
448 }
449 responseDocument.setLinkedDocuments(linkResponseDos);
450 }
451 }
452 docStoreResponse.setDocuments(responseDocuments);
453 return docStoreResponse;
454 }
455
456 private void setResponseParameters(ResponseDocument responseDocument, RequestDocument docStoreDocument) {
457 responseDocument.setId(docStoreDocument.getId());
458 responseDocument.setCategory(docStoreDocument.getCategory());
459 responseDocument.setType(docStoreDocument.getType());
460 responseDocument.setFormat(docStoreDocument.getFormat());
461 responseDocument.setContent(docStoreDocument.getContent());
462 responseDocument.setUuid(docStoreDocument.getUuid());
463 }
464
465 public void setRepositoryManager(RepositoryManager repositoryManager) {
466 this.repositoryManager = repositoryManager;
467 }
468
469 public DocumentIngester getDocumentIngester() {
470 return documentIngester;
471 }
472 }